Skip to content

Commit 43bbdf1

Browse files
authored
Protect writes to wakeup socket with threading lock (#763 / #709)
1 parent 5ab4d5c commit 43bbdf1

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

kafka/client_async.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import itertools
77
import logging
88
import random
9+
import threading
910

1011
# selectors in stdlib as of py3.4
1112
try:
@@ -158,6 +159,7 @@ def __init__(self, **configs):
158159
self._bootstrap_fails = 0
159160
self._wake_r, self._wake_w = socket.socketpair()
160161
self._wake_r.setblocking(False)
162+
self._wake_lock = threading.Lock()
161163
self._selector.register(self._wake_r, selectors.EVENT_READ)
162164
self._closed = False
163165
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
@@ -758,10 +760,12 @@ def check_version(self, node_id=None, timeout=2, strict=False):
758760
raise Errors.NoBrokersAvailable()
759761

760762
def wakeup(self):
761-
if self._wake_w.send(b'x') != 1:
762-
log.warning('Unable to send to wakeup socket!')
763+
with self._wake_lock:
764+
if self._wake_w.send(b'x') != 1:
765+
log.warning('Unable to send to wakeup socket!')
763766

764767
def _clear_wake_fd(self):
768+
# reading from wake socket should only happen in a single thread
765769
while True:
766770
try:
767771
self._wake_r.recv(1024)

0 commit comments

Comments
 (0)