Skip to content

Commit 506d023

Browse files
authored
Expose selector type as config option (#764)
1 parent 43bbdf1 commit 506d023

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

kafka/client_async.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class KafkaClient(object):
6464
'ssl_crlfile': None,
6565
'api_version': None,
6666
'api_version_auto_timeout_ms': 2000,
67+
'selector': selectors.DefaultSelector,
6768
}
6869
API_VERSIONS = [
6970
(0, 10),
@@ -135,6 +136,9 @@ def __init__(self, **configs):
135136
api_version_auto_timeout_ms (int): number of milliseconds to throw a
136137
timeout exception from the constructor when checking the broker
137138
api version. Only applies if api_version is None
139+
selector (selectors.BaseSelector): Provide a specific selector
140+
implementation to use for I/O multiplexing.
141+
Default: selectors.DefaultSelector
138142
"""
139143
self.config = copy.copy(self.DEFAULT_CONFIG)
140144
for key in self.config:
@@ -150,7 +154,7 @@ def __init__(self, **configs):
150154
self._topics = set() # empty set will fetch all topic metadata
151155
self._metadata_refresh_in_progress = False
152156
self._last_no_node_available_ms = 0
153-
self._selector = selectors.DefaultSelector()
157+
self._selector = self.config['selector']()
154158
self._conns = {}
155159
self._connecting = set()
156160
self._refresh_on_disconnects = True

kafka/consumer/group.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import six
88

9-
from kafka.client_async import KafkaClient
9+
from kafka.client_async import KafkaClient, selectors
1010
from kafka.consumer.fetcher import Fetcher
1111
from kafka.consumer.subscription_state import SubscriptionState
1212
from kafka.coordinator.consumer import ConsumerCoordinator
@@ -173,6 +173,9 @@ class KafkaConsumer(six.Iterator):
173173
metrics. Default: 2
174174
metrics_sample_window_ms (int): The maximum age in milliseconds of
175175
samples used to compute metrics. Default: 30000
176+
selector (selectors.BaseSelector): Provide a specific selector
177+
implementation to use for I/O multiplexing.
178+
Default: selectors.DefaultSelector
176179
177180
Note:
178181
Configuration parameters are described in more detail at
@@ -218,6 +221,7 @@ class KafkaConsumer(six.Iterator):
218221
'metric_reporters': [],
219222
'metrics_num_samples': 2,
220223
'metrics_sample_window_ms': 30000,
224+
'selector': selectors.DefaultSelector,
221225
}
222226

223227
def __init__(self, *topics, **configs):

kafka/producer/kafka.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import weakref
99

1010
from .. import errors as Errors
11-
from ..client_async import KafkaClient
11+
from ..client_async import KafkaClient, selectors
1212
from ..metrics import MetricConfig, Metrics
1313
from ..partitioner.default import DefaultPartitioner
1414
from ..protocol.message import Message, MessageSet
@@ -228,6 +228,9 @@ class KafkaProducer(object):
228228
metrics. Default: 2
229229
metrics_sample_window_ms (int): The maximum age in milliseconds of
230230
samples used to compute metrics. Default: 30000
231+
selector (selectors.BaseSelector): Provide a specific selector
232+
implementation to use for I/O multiplexing.
233+
Default: selectors.DefaultSelector
231234
232235
Note:
233236
Configuration parameters are described in more detail at
@@ -267,6 +270,7 @@ class KafkaProducer(object):
267270
'metric_reporters': [],
268271
'metrics_num_samples': 2,
269272
'metrics_sample_window_ms': 30000,
273+
'selector': selectors.DefaultSelector,
270274
}
271275

272276
def __init__(self, **configs):

0 commit comments

Comments
 (0)