Skip to content

Commit 3463f59

Browse files
authored
Do not reset fetch positions if offset commit fetch times out (#2629)
1 parent bcbd1b7 commit 3463f59

File tree

3 files changed

+17
-14
lines changed

3 files changed

+17
-14
lines changed

kafka/consumer/fetcher.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ def reset_offsets_if_needed(self):
178178
Arguments:
179179
partitions ([TopicPartition]): the partitions that need offsets reset
180180
181+
Returns:
182+
bool: True if any partitions need reset; otherwise False (no reset pending)
183+
181184
Raises:
182185
NoOffsetForPartitionError: if no offset reset strategy is defined
183186
KafkaTimeoutError if timeout_ms provided
@@ -189,7 +192,8 @@ def reset_offsets_if_needed(self):
189192

190193
partitions = self._subscriptions.partitions_needing_reset()
191194
if not partitions:
192-
return
195+
return False
196+
log.debug('Resetting offsets for %s', partitions)
193197

194198
offset_resets = dict()
195199
for tp in partitions:
@@ -198,6 +202,7 @@ def reset_offsets_if_needed(self):
198202
offset_resets[tp] = ts
199203

200204
self._reset_offsets_async(offset_resets)
205+
return True
201206

202207
def offsets_by_times(self, timestamps, timeout_ms=None):
203208
"""Fetch offset for each partition passed in ``timestamps`` map.

kafka/consumer/group.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,7 @@ def _update_fetch_positions(self, timeout_ms=None):
11241124
partitions (List[TopicPartition]): The partitions that need
11251125
updating fetch positions.
11261126
1127-
Returns True if fetch positions updated, False if timeout
1127+
Returns True if fetch positions updated, False if timeout or async reset is pending
11281128
11291129
Raises:
11301130
NoOffsetForPartitionError: If no offset is stored for a given
@@ -1135,15 +1135,13 @@ def _update_fetch_positions(self, timeout_ms=None):
11351135

11361136
if (self.config['api_version'] >= (0, 8, 1) and
11371137
self.config['group_id'] is not None):
1138-
try:
1139-
# If there are any partitions which do not have a valid position and are not
1140-
# awaiting reset, then we need to fetch committed offsets. We will only do a
1141-
# coordinator lookup if there are partitions which have missing positions, so
1142-
# a consumer with manually assigned partitions can avoid a coordinator dependence
1143-
# by always ensuring that assigned partitions have an initial position.
1144-
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
1145-
except KafkaTimeoutError:
1146-
pass
1138+
# If there are any partitions which do not have a valid position and are not
1139+
# awaiting reset, then we need to fetch committed offsets. We will only do a
1140+
# coordinator lookup if there are partitions which have missing positions, so
1141+
# a consumer with manually assigned partitions can avoid a coordinator dependence
1142+
# by always ensuring that assigned partitions have an initial position.
1143+
if not self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms):
1144+
return False
11471145

11481146
# If there are partitions still needing a position and a reset policy is defined,
11491147
# request reset using the default policy. If no reset strategy is defined and there
@@ -1152,8 +1150,7 @@ def _update_fetch_positions(self, timeout_ms=None):
11521150

11531151
# Finally send an asynchronous request to lookup and update the positions of any
11541152
# partitions which are awaiting reset.
1155-
self._fetcher.reset_offsets_if_needed()
1156-
return False
1153+
return not self._fetcher.reset_offsets_if_needed()
11571154

11581155
def _message_generator_v2(self):
11591156
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())

kafka/coordinator/consumer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,8 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None):
427427
future_key = frozenset(partitions)
428428
timer = Timer(timeout_ms)
429429
while True:
430-
self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms)
430+
if not self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms):
431+
timer.maybe_raise()
431432

432433
# contact coordinator to fetch committed offsets
433434
if future_key in self._offset_fetch_futures:

0 commit comments

Comments
 (0)