-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Investigate no comms in docker #1113
base: master
Are you sure you want to change the base?
Changes from 5 commits
d5bc3c0
8abfc0a
9c7beee
71ed48c
411309d
a478338
621d23d
0b2571c
d380383
3c4d047
c73db38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,4 +29,4 @@ USER flower | |
|
||
VOLUME $FLOWER_DATA_DIR | ||
|
||
ENTRYPOINT ["flower"] | ||
CMD ["celery flower"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ | |
from logging import NullHandler | ||
|
||
import click | ||
from celery.utils.time import humanize_seconds | ||
from kombu.exceptions import OperationalError | ||
from tornado.options import options | ||
from tornado.options import parse_command_line, parse_config_file | ||
from tornado.log import enable_pretty_logging | ||
|
@@ -48,7 +50,12 @@ def sigterm_handler(signal, frame): | |
sys.exit(0) | ||
|
||
signal.signal(signal.SIGTERM, sigterm_handler) | ||
|
||
if not is_broker_connected(celery_app=app): | ||
return | ||
|
||
print_banner(app, 'ssl_options' in settings) | ||
|
||
try: | ||
flower.start() | ||
except (KeyboardInterrupt, SystemExit): | ||
|
@@ -103,6 +110,33 @@ def warn_about_celery_args_used_in_flower_command(ctx, flower_args): | |
) | ||
|
||
|
||
def is_broker_connected(celery_app): | ||
is_connected = False | ||
max_retries = celery_app.conf.broker_connection_max_retries | ||
|
||
if not celery_app.conf.broker_connection_retry: | ||
max_retries = 0 | ||
|
||
with celery_app.connection() as conn: | ||
broker_url = conn.as_uri() | ||
|
||
def _error_handler(exc, interval): | ||
next_step = f"Trying again {humanize_seconds(interval, 'in', ' ')}... ({int(interval / 2)}/{max_retries})" | ||
logger.error(f'Cannot connect to broker: {broker_url}. Error: {exc}. {next_step}') | ||
|
||
try: | ||
conn.ensure_connection(errback=_error_handler, max_retries=max_retries) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken from the way celery does it so all the credit goes to them :): https://github.com/celery/celery/blob/master/celery/worker/consumer/consumer.py#L435 Kombu code for reference: https://docs.celeryproject.org/projects/kombu/en/stable/_modules/kombu/connection.html#Connection.ensure_connection |
||
logger.info(f'Established connection to broker: {broker_url}. Starting Flower...') | ||
is_connected = True | ||
except OperationalError as e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to find if kombu can raise other errors and add them here. |
||
logger.error( | ||
f'Unable to establish connection to broker: : {broker_url}. Error: {e}. ' | ||
f'Please make sure the broker is running when using Flower. Aborting Flower...' | ||
) | ||
|
||
return is_connected | ||
|
||
|
||
def setup_logging(): | ||
if options.debug and options.logging == 'info': | ||
options.logging = 'debug' | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here in theory the connection is checked but can drop before we call flower.start().
The chances are super small though.
I will think about better way of doing this but for now it is good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at kombu code:
and they follow similar pattern (check connection and then return value) so I think we should be fine with making a check for connection and then calling
flower.start()