Skip to content

Commit 75af887

Browse files
committed
KAFKA-3117: handle metadata updates during consumer rebalance
1 parent 506d023 commit 75af887

File tree

1 file changed

+24
-10
lines changed

1 file changed

+24
-10
lines changed

kafka/coordinator/consumer.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ def __init__(self, client, subscription, metrics, metric_group_prefix,
8181
assert self.config['assignors'], 'Coordinator requires assignors'
8282

8383
self._subscription = subscription
84-
self._partitions_per_topic = {}
84+
self._metadata_snapshot = {}
85+
self._assignment_snapshot = None
8586
self._cluster = client.cluster
8687
self._cluster.request_update()
8788
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
@@ -146,7 +147,7 @@ def _handle_metadata_update(self, cluster):
146147

147148
# check if there are any changes to the metadata which should trigger
148149
# a rebalance
149-
if self._subscription_metadata_changed():
150+
if self._subscription_metadata_changed(cluster):
150151

151152
if (self.config['api_version'] >= (0, 9)
152153
and self.config['group_id'] is not None):
@@ -159,20 +160,20 @@ def _handle_metadata_update(self, cluster):
159160
self._subscription.assign_from_subscribed([
160161
TopicPartition(topic, partition)
161162
for topic in self._subscription.subscription
162-
for partition in self._partitions_per_topic[topic]
163+
for partition in self._metadata_snapshot[topic]
163164
])
164165

165-
def _subscription_metadata_changed(self):
166+
def _subscription_metadata_changed(self, cluster):
166167
if not self._subscription.partitions_auto_assigned():
167168
return False
168169

169-
old_partitions_per_topic = self._partitions_per_topic
170-
self._partitions_per_topic = {}
170+
metadata_snapshot = {}
171171
for topic in self._subscription.group_subscription():
172-
partitions = self._cluster.partitions_for_topic(topic) or []
173-
self._partitions_per_topic[topic] = set(partitions)
172+
partitions = cluster.partitions_for_topic(topic) or []
173+
metadata_snapshot[topic] = set(partitions)
174174

175-
if self._partitions_per_topic != old_partitions_per_topic:
175+
if self._metadata_snapshot != metadata_snapshot:
176+
self._metadata_snapshot = metadata_snapshot
176177
return True
177178
return False
178179

@@ -184,8 +185,15 @@ def _lookup_assignor(self, name):
184185

185186
def _on_join_complete(self, generation, member_id, protocol,
186187
member_assignment_bytes):
188+
# if we were the assignor, then we need to make sure that there have
189+
# been no metadata updates since the rebalance begin. Otherwise, we
190+
# won't rebalance again until the next metadata change
191+
if self._assignment_snapshot and self._assignment_snapshot != self._metadata_snapshot:
192+
self._subscription.mark_for_reassignment()
193+
return
194+
187195
assignor = self._lookup_assignor(protocol)
188-
assert assignor, 'invalid assignment protocol: %s' % protocol
196+
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
189197

190198
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
191199

@@ -235,6 +243,11 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
235243
self._subscription.group_subscribe(all_subscribed_topics)
236244
self._client.set_topics(self._subscription.group_subscription())
237245

246+
# keep track of the metadata used for assignment so that we can check
247+
# after rebalance completion whether anything has changed
248+
self._cluster.request_update()
249+
self._assignment_snapshot = self._metadata_snapshot
250+
238251
log.debug("Performing assignment for group %s using strategy %s"
239252
" with subscriptions %s", self.group_id, assignor.name,
240253
member_metadata)
@@ -264,6 +277,7 @@ def _on_join_prepare(self, generation, member_id):
264277
" for group %s failed on_partitions_revoked",
265278
self._subscription.listener, self.group_id)
266279

280+
self._assignment_snapshot = None
267281
self._subscription.mark_for_reassignment()
268282

269283
def need_rejoin(self):

0 commit comments

Comments
 (0)