Skip to content

Commit 04d68c5

Browse files
committed
Expose selector type as config option
1 parent 3666b66 commit 04d68c5

File tree

3 files changed

+16
-4
lines changed

3 files changed

+16
-4
lines changed

kafka/client_async.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class KafkaClient(object):
6363
'ssl_crlfile': None,
6464
'api_version': None,
6565
'api_version_auto_timeout_ms': 2000,
66+
'selector': selectors.DefaultSelector,
6667
}
6768
API_VERSIONS = [
6869
(0, 10),
@@ -134,6 +135,9 @@ def __init__(self, **configs):
134135
api_version_auto_timeout_ms (int): number of milliseconds to throw a
135136
timeout exception from the constructor when checking the broker
136137
api version. Only applies if api_version is None
138+
selector (selectors.BaseSelector): Provide a specific selector
139+
implementation to use for I/O multiplexing.
140+
Default: selectors.DefaultSelector
137141
"""
138142
self.config = copy.copy(self.DEFAULT_CONFIG)
139143
for key in self.config:
@@ -149,7 +153,7 @@ def __init__(self, **configs):
149153
self._topics = set() # empty set will fetch all topic metadata
150154
self._metadata_refresh_in_progress = False
151155
self._last_no_node_available_ms = 0
152-
self._selector = selectors.DefaultSelector()
156+
self._selector = self.config['selector']()
153157
self._conns = {}
154158
self._connecting = set()
155159
self._refresh_on_disconnects = True

kafka/consumer/group.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import six
88

9-
from kafka.client_async import KafkaClient
9+
from kafka.client_async import KafkaClient, selectors
1010
from kafka.consumer.fetcher import Fetcher
1111
from kafka.consumer.subscription_state import SubscriptionState
1212
from kafka.coordinator.consumer import ConsumerCoordinator
@@ -173,6 +173,9 @@ class KafkaConsumer(six.Iterator):
173173
metrics. Default: 2
174174
metrics_sample_window_ms (int): The number of samples maintained to
175175
compute metrics. Default: 30000
176+
selector (selectors.BaseSelector): Provide a specific selector
177+
implementation to use for I/O multiplexing.
178+
Default: selectors.DefaultSelector
176179
177180
Note:
178181
Configuration parameters are described in more detail at
@@ -218,6 +221,7 @@ class KafkaConsumer(six.Iterator):
218221
'metric_reporters': [],
219222
'metrics_num_samples': 2,
220223
'metrics_sample_window_ms': 30000,
224+
'selector': selectors.DefaultSelector,
221225
}
222226

223227
def __init__(self, *topics, **configs):

kafka/producer/kafka.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import weakref
99

1010
from .. import errors as Errors
11-
from ..client_async import KafkaClient
11+
from ..client_async import KafkaClient, selectors
1212
from ..partitioner.default import DefaultPartitioner
1313
from ..protocol.message import Message, MessageSet
1414
from ..structs import TopicPartition
@@ -220,6 +220,9 @@ class KafkaProducer(object):
220220
api_version_auto_timeout_ms (int): number of milliseconds to throw a
221221
timeout exception from the constructor when checking the broker
222222
api version. Only applies if api_version set to 'auto'
223+
selector (selectors.BaseSelector): Provide a specific selector
224+
implementation to use for I/O multiplexing.
225+
Default: selectors.DefaultSelector
223226
224227
Note:
225228
Configuration parameters are described in more detail at
@@ -255,7 +258,8 @@ class KafkaProducer(object):
255258
'ssl_keyfile': None,
256259
'ssl_crlfile': None,
257260
'api_version': None,
258-
'api_version_auto_timeout_ms': 2000
261+
'api_version_auto_timeout_ms': 2000,
262+
'selector': selectors.DefaultSelector,
259263
}
260264

261265
def __init__(self, **configs):

0 commit comments

Comments
 (0)