diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 0eb7f0eec..5e1f72621 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -857,14 +857,12 @@ def _disable_heartbeat_thread(self): self._heartbeat_thread.disable() def _close_heartbeat_thread(self, timeout_ms=None): - with self._lock: - if self._heartbeat_thread is not None: - heartbeat_log.info('Stopping heartbeat thread') - try: - self._heartbeat_thread.close(timeout_ms=timeout_ms) - except ReferenceError: - pass - self._heartbeat_thread = None + if self._heartbeat_thread is not None: + try: + self._heartbeat_thread.close(timeout_ms=timeout_ms) + except ReferenceError: + pass + self._heartbeat_thread = None def __del__(self): try: @@ -1047,17 +1045,20 @@ def disable(self): self.enabled = False def close(self, timeout_ms=None): - if self.closed: - return - self.closed = True + with self.coordinator._lock: + if self.closed: + return - # Generally this should not happen - close() is triggered - # by the coordinator. But in some cases GC may close the coordinator - # from within the heartbeat thread. - if threading.current_thread() == self: - return + heartbeat_log.info('Stopping heartbeat thread') + self.closed = True - with self.coordinator._lock: + # Generally this should not happen - close() is triggered + # by the coordinator. But in some cases GC may close the coordinator + # from within the heartbeat thread. + if threading.current_thread() == self: + return + + # Notify coordinator lock to wake thread from sleep/lock.wait self.coordinator._lock.notify() if self.is_alive():