-
Notifications
You must be signed in to change notification settings - Fork 5
/
celery_app.py
79 lines (61 loc) · 2.01 KB
/
celery_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import datetime
import logging
import time
import celery
from db import SessionLocal, User
from task_lock import no_parallel_processing_of_task
LOGGER = logging.getLogger(__name__)
celery_app = celery.Celery(
__name__,
)
celery_app.conf.broker_transport_options = {
"visibility_timeout": datetime.timedelta(minutes=15).total_seconds()
}
celery_app.conf.task_acks_late = True
celery_app.conf.task_reject_on_worker_lost = True
celery_app.conf.worker_prefetch_multiplier = 1
@celery_app.task(name="send_newsletter")
def send_newsletter():
session = SessionLocal()
for user in session.query(User).all():
LOGGER.info(f"Sending newsletter to {user.email}")
time.sleep(1)
@celery_app.task(name="send_newsletter_to_user")
def send_newsletter_to_user(user_email):
LOGGER.info(f"Sending newsletter to {user_email}")
time.sleep(1)
@celery_app.task(name="send_newsletter_fan_out")
def send_newsletter_fan_out():
session = SessionLocal()
for user in session.query(User).all():
send_newsletter_to_user.apply_async(kwargs={"user_email": user.email})
BATCH_SIZE = 2
@celery_app.task(
name="send_newsletter_batching",
)
def send_newsletter_batching(last_evaluated_key=None):
last_evaluated_key = last_evaluated_key or -1
session = SessionLocal()
users = (
session.query(User)
.filter(User.id > last_evaluated_key)
.order_by(User.id)
.limit(BATCH_SIZE)
.all()
)
for user in users:
LOGGER.info(f"Sending newsletter to {user.email}")
time.sleep(1)
if len(users) < BATCH_SIZE:
return
new_last_evaluated_key = users[-1].id
send_newsletter_batching.apply_async(
kwargs={"last_evaluated_key": new_last_evaluated_key}
)
@celery_app.task(name="send_newsletter_locking", bind=True)
@no_parallel_processing_of_task
def send_newsletter_locking(self):
session = SessionLocal()
for user in session.query(User).all():
LOGGER.info(f"Sending newsletter to {user.email}")
time.sleep(1)