-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcelery.py
96 lines (82 loc) · 3.56 KB
/
celery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""
As described in
http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""
import logging
import os
import time
from celery import Celery
from celery.signals import before_task_publish, task_postrun
from main.constants import DEFAULT_PRIORITY, PRIORITY_STEPS
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
log = logging.getLogger(__name__)
app = Celery("ocw_studio")
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.conf.task_default_queue = "default"
app.conf.task_default_priority = DEFAULT_PRIORITY
app.conf.broker_transport_options = {
"priority_steps": list(range(PRIORITY_STEPS)),
"sep": ":",
"queue_order_strategy": "priority",
}
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
app.conf.task_routes = {
"content_sync.tasks.sync_website_content": {"queue": "publish"},
"content_sync.tasks.create_website_backend": {"queue": "publish"},
"content_sync.tasks.publish_website_backend_draft": {"queue": "publish"},
"content_sync.tasks.publish_website_backend_live": {"queue": "publish"},
"content_sync.tasks.check_incomplete_publish_build_statuses": {"queue": "publish"},
"content_sync.tasks.upsert_website_publishing_pipeline": {"queue": "publish"},
"content_sync.tasks.sync_unsynced_websites": {"queue": "batch"},
"content_sync.tasks.upsert_pipelines": {"queue": "batch"},
"content_sync.tasks.trigger_mass_build": {"queue": "batch"},
"content_sync.tasks.publish_website_batch": {"queue": "batch"},
"content_sync.tasks.publish_websites": {"queue": "batch"},
"external_resources.tasks.check_external_resources": {"queue": "batch"},
"external_resources.tasks.submit_url_to_wayback_task": {"queue": "batch"},
"external_resources.tasks.update_wayback_jobs_status_batch": {"queue": "batch"},
}
@before_task_publish.connect
def timestamp_task_send(
headers=None,
**kwargs, # noqa: ARG001
): # pylint: disable=unused-argument
"""Before a task is sent, timestamp the task with the current time"""
headers.setdefault("task_sent_timestamp", time.time())
@task_postrun.connect
def log_task_deltatime(
task=None,
state=None,
**kwargs, # noqa: ARG001
): # pylint: disable=unused-argument
"""If the task provided a timestamp for which it was sent, log timing information"""
# Note: you'd think headers would come in on `task.request.headers` but you'd be wrong # noqa: E501
try:
task_sent_timestamp = getattr(task.request, "task_sent_timestamp", None)
task_id = task.request.id
task_name = task.request.task
if task_sent_timestamp:
task_postrun_timestamp = time.time()
task_deltatime = task_postrun_timestamp - task_sent_timestamp
# ignore deltas below zero in case of clock drift
task_deltatime = max(task_deltatime, 0)
log.info(
"task_event=log_task_deltatime "
"task_name=%s task_id=%s task_state=%s "
"task_sent_timestamp=%s task_postrun_timstamp=%s "
"task_deltatime=%s",
task_name,
task_id,
state,
task_sent_timestamp,
task_postrun_timestamp,
task_deltatime,
)
else:
log.error(
"Task had no task_sent_timestamp: name=%s id=%s ", task_name, task_id
)
except: # pylint: disable=bare-except # noqa: E722
log.exception("Unexpected error trying to log task deltatime")