Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infinite loop in ensure_coordinator_ready when coordinator is unknown #2373

Open
pe55a opened this issue Jul 12, 2023 · 3 comments
Open

infinite loop in ensure_coordinator_ready when coordinator is unknown #2373

pe55a opened this issue Jul 12, 2023 · 3 comments

Comments

@pe55a
Copy link

pe55a commented Jul 12, 2023

we're facing an issue with kafka poll functionality and in particular we suspect that the culprit is ensure_coordinator_ready function called by the _coordinator.poll()

we're using robot framework so unfortunately we're not able to have a good amount of logs, but got these messages printed in an infinite loop:

10:36:19.658 INFO <BrokerConnection node_id=**** host=**** [IPv4 ('', )]>: connecting to **** [('', ) IPv4]
10:36:19.765 INFO <BrokerConnection node_id= host=
[IPv4 ('', )]>: Connection complete.
10:36:19.886 ERROR <BrokerConnection node_id= host=
[IPv4 ('', )]>: socket disconnected
10:36:19.900 INFO <BrokerConnection node_id= host=
[IPv4 ('****', ****)]>: Closing connection. KafkaConnectionError: socket disconnected
10:36:19.905 ERROR Error sending GroupCoordinatorRequest_v0 to node **** [KafkaConnectionError: socket disconnected]

After checking the kafka python code we noticed that the functions here
https://github.com/dpkp/kafka-python/blob/master/kafka/coordinator/base.py#L241C9-L241C33
doesn't have an exit point from the while loop and neither have an option to pass a timeout parameter.

Can this be improved/fixed?

@mro-rhansen2
Copy link

I've traced an intermittent hang in our processes to this exact same section of code. We have processes where the only thing keeping them alive is that we're consuming messages using the KafkaConsumer as an iterator. I came upon the code in question after confirming that we have properly configured the consumer to timeout if it has not received any data after a period of time.

The order of operations for us is as follows:

  1. We get the last message from the consumer.
  2. We handle the message.
  3. Some network error occurs.
  4. We try to manually commit the last message.
  5. Main thread hangs.
  6. After a little more than 9 minutes, a worker thread within Kafka indicates that idle broker connections are being destroyed.
  7. HeartbeatThread indicates that the heartbeat poll has expired (in accordance with our config settings) and then proceeds to try and exit the consumer group.

Nothing happens after that until we restart the processes. It is worth mentioning that we have around 20 processes that are using the exact same boilerplate code for connecting to Kafka, but only a small handful of them will fail to recover and it is almost always a different subset of the overall population that exhibit this odd behavior when a network failure occurs.

@ethiebautgeorge-nasuni
Copy link

We hit the same issue and have implemented a fix. I have a branch I'd like to push with a fix for this.
Can an admin give me access?

@ethiebautgeorge-nasuni
Copy link

ethiebautgeorge-nasuni commented Feb 27, 2024

I believe this should also fix #1322

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants