Skip to content

Commit 2328cbd

Browse files
committed
Dont return empty record lists from fetcher.fetched_records
1 parent 89c8dd3 commit 2328cbd

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

kafka/consumer/fetcher.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -378,10 +378,13 @@ def _append(self, drained, part, max_records, update_offsets):
378378
# as long as the partition is still assigned
379379
position = self._subscriptions.assignment[tp].position
380380
if part.next_fetch_offset == position.offset:
381-
part_records = part.take(max_records)
382381
log.debug("Returning fetched records at offset %d for assigned"
383382
" partition %s", position.offset, tp)
384-
drained[tp].extend(part_records)
383+
part_records = part.take(max_records)
384+
# list.extend([]) is a noop, but because drained is a defaultdict
385+
# we should avoid initializing the default list unless there are records
386+
if part_records:
387+
drained[tp].extend(part_records)
385388
# We want to increment subscription position if (1) we're using consumer.poll(),
386389
# or (2) we didn't return any records (consumer iterator will update position
387390
# when each message is yielded). There may be edge cases where we re-fetch records

0 commit comments

Comments
 (0)