Skip to content

Commit 0e1a5bc

Browse files
committed
Fix coordinator lock contention during close() (#2652)
1 parent f6eb0b4 commit 0e1a5bc

File tree

1 file changed

+18
-17
lines changed

1 file changed

+18
-17
lines changed

kafka/coordinator/base.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -857,14 +857,12 @@ def _disable_heartbeat_thread(self):
857857
self._heartbeat_thread.disable()
858858

859859
def _close_heartbeat_thread(self, timeout_ms=None):
860-
with self._lock:
861-
if self._heartbeat_thread is not None:
862-
heartbeat_log.info('Stopping heartbeat thread')
863-
try:
864-
self._heartbeat_thread.close(timeout_ms=timeout_ms)
865-
except ReferenceError:
866-
pass
867-
self._heartbeat_thread = None
860+
if self._heartbeat_thread is not None:
861+
try:
862+
self._heartbeat_thread.close(timeout_ms=timeout_ms)
863+
except ReferenceError:
864+
pass
865+
self._heartbeat_thread = None
868866

869867
def __del__(self):
870868
try:
@@ -1047,17 +1045,20 @@ def disable(self):
10471045
self.enabled = False
10481046

10491047
def close(self, timeout_ms=None):
1050-
if self.closed:
1051-
return
1052-
self.closed = True
1048+
with self.coordinator._lock:
1049+
if self.closed:
1050+
return
10531051

1054-
# Generally this should not happen - close() is triggered
1055-
# by the coordinator. But in some cases GC may close the coordinator
1056-
# from within the heartbeat thread.
1057-
if threading.current_thread() == self:
1058-
return
1052+
heartbeat_log.info('Stopping heartbeat thread')
1053+
self.closed = True
10591054

1060-
with self.coordinator._lock:
1055+
# Generally this should not happen - close() is triggered
1056+
# by the coordinator. But in some cases GC may close the coordinator
1057+
# from within the heartbeat thread.
1058+
if threading.current_thread() == self:
1059+
return
1060+
1061+
# Notify coordinator lock to wake thread from sleep/lock.wait
10611062
self.coordinator._lock.notify()
10621063

10631064
if self.is_alive():

0 commit comments

Comments
 (0)