Skip to content

Commit

Permalink
send emails using 3 new queues (#1997)
Browse files Browse the repository at this point in the history
  • Loading branch information
sastels authored Oct 25, 2023
1 parent 86be8bc commit e3ccb0c
Show file tree
Hide file tree
Showing 14 changed files with 79 additions and 52 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ file. Copy that file to `.env` and customize it to your needs.

## To run the queues
```
scripts/run_celery.sh
```

```
scripts/run_celery_sms.sh
scripts/run_celery_local.sh
```

```
Expand Down Expand Up @@ -179,7 +175,7 @@ To help debug full code paths of emails and SMS, we have a special email and pho
set in the application's configuration. As it stands at the moment these are the following:

| Notification Type | Test destination |
|-------------------|--------------------------|
| ----------------- | ------------------------ |
| Email | [email protected] |
| SMS | +16135550123 |

Expand Down
18 changes: 12 additions & 6 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from kombu import Exchange, Queue
from notifications_utils import logging

# from app.models import EMAIL_TYPE, SMS_TYPE, Priorities
from celery.schedules import crontab

env = Env()
Expand Down Expand Up @@ -95,8 +94,12 @@ class QueueNames(object):
# we have a limit to send per second and hence, needs to be throttled.
SEND_THROTTLED_SMS = "send-throttled-sms-tasks"

# The queue to send emails by default, normal priority.
# TODO: Deprecate to favor priority queues instead, i.e. bulk, normal, priority.
# Queues for sending all emails.
SEND_EMAIL_HIGH = "send-email-high"
SEND_EMAIL_MEDIUM = "send-email-medium"
SEND_EMAIL_LOW = "send-email-low"

# TODO: Delete this queue once we verify that it is not used anymore.
SEND_EMAIL = "send-email-tasks"

# The research mode queue for notifications that are tested by users trying
Expand Down Expand Up @@ -131,9 +134,9 @@ class QueueNames(object):
Priorities.HIGH: SEND_SMS_HIGH,
},
"email": {
Priorities.LOW: BULK,
Priorities.MEDIUM: SEND_EMAIL,
Priorities.HIGH: PRIORITY,
Priorities.LOW: SEND_EMAIL_LOW,
Priorities.MEDIUM: SEND_EMAIL_MEDIUM,
Priorities.HIGH: SEND_EMAIL_HIGH,
},
"letter": {
Priorities.LOW: BULK,
Expand All @@ -157,6 +160,9 @@ def all_queues():
QueueNames.SEND_SMS_LOW,
QueueNames.SEND_SMS,
QueueNames.SEND_THROTTLED_SMS,
QueueNames.SEND_EMAIL_HIGH,
QueueNames.SEND_EMAIL_MEDIUM,
QueueNames.SEND_EMAIL_LOW,
QueueNames.SEND_EMAIL,
QueueNames.RESEARCH_MODE,
QueueNames.REPORTING,
Expand Down
4 changes: 2 additions & 2 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def choose_queue(notification, research_mode, queue=None) -> QueueNames:
queue = QueueNames.SEND_SMS_MEDIUM
if notification.notification_type == EMAIL_TYPE:
if not queue:
queue = QueueNames.SEND_EMAIL
queue = QueueNames.SEND_EMAIL_MEDIUM
if notification.notification_type == LETTER_TYPE:
if not queue:
queue = QueueNames.CREATE_LETTERS_PDF
Expand Down Expand Up @@ -264,7 +264,7 @@ def send_notification_to_queue(notification, research_mode, queue=None):
queue = QueueNames.SEND_SMS_MEDIUM
if notification.notification_type == EMAIL_TYPE:
if not queue or queue == QueueNames.NORMAL:
queue = QueueNames.SEND_EMAIL
queue = QueueNames.SEND_EMAIL_MEDIUM
deliver_task = provider_tasks.deliver_email
if notification.notification_type == LETTER_TYPE:
if not queue or queue == QueueNames.NORMAL:
Expand Down
10 changes: 10 additions & 0 deletions scripts/run_celery_core_tasks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh

# runs celery with all celery queues except send-throttled-sms-tasks, send-sms-* and send-email-*

set -e

echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}"

celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,
service-callbacks,delivery-receipts
2 changes: 1 addition & 1 deletion scripts/run_celery_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ set -e

echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}"

celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,send-sms-tasks,send-sms-high,send-sms-medium,send-sms-low,send-throttled-sms-tasks,send-email-tasks,service-callbacks,delivery-receipts
celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,send-sms-tasks,send-sms-high,send-sms-medium,send-sms-low,send-throttled-sms-tasks,send-email-high,send-email-medium,send-email-low,send-email-tasks,service-callbacks,delivery-receipts
2 changes: 1 addition & 1 deletion scripts/run_celery_no_sms_sending.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ fi

echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}"

celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,send-email-tasks,service-callbacks,delivery-receipts
celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,send-email-tasks,send-email-high,send-email-medium,send-email-low,service-callbacks,delivery-receipts
10 changes: 10 additions & 0 deletions scripts/run_celery_send_email.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh

# runs celery with only the send-email-* queues

set -e

echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}"

# TODO: we shouldn't be using the send-email-tasks queue anymore - once we verify this we can remove it
celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q send-email-tasks,send-email-high,send-email-medium,send-email-low
2 changes: 1 addition & 1 deletion tests/app/celery/test_scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def test_replay_created_notifications(notify_db_session, sample_service, mocker)
save_notification(create_notification(template=email_template, created_at=datetime.utcnow(), status="created"))

replay_created_notifications()
email_delivery_queue.assert_called_once_with([str(old_email.id)], queue="send-email-tasks")
email_delivery_queue.assert_called_once_with([str(old_email.id)], queue=QueueNames.SEND_EMAIL_MEDIUM)
sms_delivery_queue.assert_called_once_with([str(old_sms.id)], queue=QueueNames.SEND_SMS_MEDIUM)


Expand Down
42 changes: 22 additions & 20 deletions tests/app/celery/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ def test_process_rows_sends_save_task(
"to": "recip",
"row_number": "row_num",
"personalisation": {"foo": "bar"},
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else "send-{}-tasks".format(template_type),
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else QueueNames.SEND_EMAIL_MEDIUM,
"client_reference": reference,
"sender_id": str(sender_id) if sender_id else None,
},
Expand All @@ -930,12 +930,12 @@ def test_process_rows_sends_save_task(
@pytest.mark.parametrize(
"csv_bulk_threshold, template_process_type, expected_queue",
[
(1_000, PRIORITY, "priority-tasks"), # keep priority when no thresholds are met
(1, PRIORITY, "bulk-tasks"), # autoswitch to bulk queue if bulk threshold is met, even if in priority.
(1, NORMAL, "bulk-tasks"), # autoswitch to bulk queue if bulk threshold is met.
(1_000, NORMAL, "send-email-tasks"), # keep normal priority
(1, BULK, "bulk-tasks"), # keep bulk priority
(1_000, BULK, "send-email-tasks"), # autoswitch to normal queue if normal threshold is met.
(1_000, PRIORITY, QueueNames.SEND_EMAIL_HIGH), # keep priority when no thresholds are met
(1, PRIORITY, QueueNames.SEND_EMAIL_LOW), # autoswitch to bulk queue if bulk threshold is met, even if in priority.
(1, NORMAL, QueueNames.SEND_EMAIL_LOW), # autoswitch to bulk queue if bulk threshold is met.
(1_000, NORMAL, QueueNames.SEND_EMAIL_MEDIUM), # keep normal priority
(1, BULK, QueueNames.SEND_EMAIL_LOW), # keep bulk priority
(1_000, BULK, QueueNames.SEND_EMAIL_MEDIUM), # autoswitch to normal queue if normal threshold is met.
],
)
def test_should_redirect_email_job_to_queue_depending_on_csv_threshold(
Expand Down Expand Up @@ -1103,7 +1103,7 @@ def test_process_rows_works_without_key_type(
"to": "recip",
"row_number": "row_num",
"personalisation": {"foo": "bar"},
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else "send-{}-tasks".format(template_type),
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else QueueNames.SEND_EMAIL_MEDIUM,
"sender_id": str(sender_id) if sender_id else None,
"client_reference": reference,
},
Expand Down Expand Up @@ -1582,7 +1582,7 @@ def test_save_emails_should_use_redis_cache_to_retrieve_service_and_template_whe
assert persisted_notification.personalisation == {"name": "Jo"}
assert persisted_notification._personalisation == signer_personalisation.sign({"name": "Jo"})
assert persisted_notification.notification_type == "email"
mocked_deliver_email.assert_called_once_with([str(persisted_notification.id)], queue="send-email-tasks")
mocked_deliver_email.assert_called_once_with([str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM)
if sender_id:
mocked_get_sender_id.assert_called_once_with(persisted_notification.service_id, sender_id)

Expand Down Expand Up @@ -1633,9 +1633,11 @@ def test_should_put_save_email_task_in_research_mode_queue_if_research_mode_serv
[str(persisted_notification.id)], queue="research-mode-tasks"
)

@pytest.mark.parametrize("process_type", ["priority", "bulk"])
@pytest.mark.parametrize(
"process_type,expected_queue", [("priority", QueueNames.SEND_EMAIL_HIGH), ("bulk", QueueNames.SEND_EMAIL_LOW)]
)
def test_should_route_save_email_task_to_appropriate_queue_according_to_template_process_type(
self, notify_db_session, mocker, process_type
self, notify_db_session, mocker, process_type, expected_queue
):
service = create_service()
template = create_template(service=service, template_type="email", process_type=process_type)
Expand All @@ -1648,14 +1650,12 @@ def test_should_route_save_email_task_to_appropriate_queue_according_to_template
save_emails(service.id, [signer_notification.sign(notification)], notification_id)

persisted_notification = Notification.query.one()
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue=f"{process_type}-tasks"
)
provider_tasks.deliver_email.apply_async.assert_called_once_with([str(persisted_notification.id)], queue=expected_queue)

def test_should_route_save_email_task_to_bulk_on_large_csv_file(self, notify_db_session, mocker):
service = create_service()
template = create_template(service=service, template_type="email", process_type="normal")
notification = _notification_json(template, to="[email protected]", queue="bulk-tasks")
notification = _notification_json(template, to="[email protected]", queue=QueueNames.SEND_EMAIL_LOW)

mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")

Expand All @@ -1664,7 +1664,9 @@ def test_should_route_save_email_task_to_bulk_on_large_csv_file(self, notify_db_
save_emails(service.id, [signer_notification.sign(notification)], notification_id)

persisted_notification = Notification.query.one()
provider_tasks.deliver_email.apply_async.assert_called_once_with([str(persisted_notification.id)], queue="bulk-tasks")
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_LOW
)

def test_should_use_email_template_and_persist(
self, notify_api, sample_email_template_with_placeholders, sample_api_key, mocker
Expand Down Expand Up @@ -1705,7 +1707,7 @@ def test_should_use_email_template_and_persist(
assert persisted_notification.notification_type == "email"

provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-email-tasks"
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM
)
mock_over_daily_limit.assert_called_once_with("normal", sample_email_template_with_placeholders.service)

Expand Down Expand Up @@ -1734,7 +1736,7 @@ def test_save_email_should_use_template_version_from_job_not_latest(self, sample
assert not persisted_notification.sent_by
assert persisted_notification.notification_type == "email"
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-email-tasks"
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM
)

def test_should_use_email_template_subject_placeholders(self, sample_email_template_with_placeholders, mocker):
Expand All @@ -1756,7 +1758,7 @@ def test_should_use_email_template_subject_placeholders(self, sample_email_templ
assert not persisted_notification.reference
assert persisted_notification.notification_type == "email"
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-email-tasks"
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM
)

def test_save_email_uses_the_reply_to_text_when_provided(self, sample_email_template, mocker):
Expand Down Expand Up @@ -1813,7 +1815,7 @@ def test_should_use_email_template_and_persist_without_personalisation(self, sam
assert not persisted_notification.reference
assert persisted_notification.notification_type == "email"
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-email-tasks"
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM
)

def test_save_email_should_go_to_retry_queue_if_database_errors(self, sample_email_template, mocker):
Expand Down
12 changes: 6 additions & 6 deletions tests/app/notifications/rest/test_send_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_send_notification_with_placeholders_replaced(notify_api, sample_email_t
notification_id = response_data["notification"]["id"]
data.update({"template_version": sample_email_template_with_placeholders.version})

mocked.assert_called_once_with([notification_id], queue="send-email-tasks")
mocked.assert_called_once_with([notification_id], queue=QueueNames.SEND_EMAIL_MEDIUM)
assert response.status_code == 201
assert response_data["body"] == "Hello Jo\nThis is an email from GOV.UK"
assert response_data["subject"] == "Jo"
Expand Down Expand Up @@ -370,7 +370,7 @@ def test_should_allow_valid_email_notification(notify_api, sample_email_template
response_data = json.loads(response.get_data(as_text=True))["data"]
notification_id = response_data["notification"]["id"]
app.celery.provider_tasks.deliver_email.apply_async.assert_called_once_with(
[notification_id], queue="send-email-tasks"
[notification_id], queue=QueueNames.SEND_EMAIL_MEDIUM
)

assert response.status_code == 201
Expand Down Expand Up @@ -563,7 +563,7 @@ def test_should_send_email_if_team_api_key_and_a_service_user(client, sample_ema
headers=[("Content-Type", "application/json"), auth_header],
)

app.celery.provider_tasks.deliver_email.apply_async.assert_called_once_with([fake_uuid], queue="send-email-tasks")
app.celery.provider_tasks.deliver_email.apply_async.assert_called_once_with([fake_uuid], queue=QueueNames.SEND_EMAIL_MEDIUM)
assert response.status_code == 201


Expand Down Expand Up @@ -660,7 +660,7 @@ def test_should_send_sms_if_team_api_key_and_a_service_user(client, sample_templ

@pytest.mark.parametrize(
"template_type,queue_name",
[(SMS_TYPE, QueueNames.SEND_SMS_MEDIUM), (EMAIL_TYPE, "send-email-tasks")],
[(SMS_TYPE, QueueNames.SEND_SMS_MEDIUM), (EMAIL_TYPE, QueueNames.SEND_EMAIL_MEDIUM)],
)
def test_should_persist_notification(
client,
Expand Down Expand Up @@ -710,7 +710,7 @@ def test_should_persist_notification(

@pytest.mark.parametrize(
"template_type,queue_name",
[(SMS_TYPE, QueueNames.SEND_SMS_MEDIUM), (EMAIL_TYPE, "send-email-tasks")],
[(SMS_TYPE, QueueNames.SEND_SMS_MEDIUM), (EMAIL_TYPE, QueueNames.SEND_EMAIL_MEDIUM)],
)
def test_should_delete_notification_and_return_error_if_sqs_fails(
client,
Expand Down Expand Up @@ -1028,7 +1028,7 @@ def test_send_notification_uses_appropriate_queue_when_template_has_process_type
if notification_type == SMS_TYPE:
expected_queue = QueueNames.SEND_SMS_HIGH if process_type == "priority" else QueueNames.SEND_SMS_LOW
else:
expected_queue = f"{process_type}-tasks"
expected_queue = QueueNames.SEND_EMAIL_HIGH if process_type == "priority" else QueueNames.SEND_EMAIL_LOW
mocked.assert_called_once_with([notification_id], queue=expected_queue)


Expand Down
6 changes: 3 additions & 3 deletions tests/app/notifications/test_process_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ class TestSendNotificationQueue:
"deliver_throttled_sms",
),
(False, None, "sms", "normal", None, QueueNames.SEND_SMS_MEDIUM, "deliver_sms"),
(False, None, "email", "normal", None, "send-email-tasks", "deliver_email"),
(False, None, "email", "normal", None, QueueNames.SEND_EMAIL_MEDIUM, "deliver_email"),
(False, None, "sms", "team", None, QueueNames.SEND_SMS_MEDIUM, "deliver_sms"),
(
False,
Expand Down Expand Up @@ -677,7 +677,7 @@ class TestChooseQueue:
"send-throttled-sms-tasks",
),
(False, None, "sms", "normal", None, QueueNames.SEND_SMS_MEDIUM),
(False, None, "email", "normal", None, "send-email-tasks"),
(False, None, "email", "normal", None, QueueNames.SEND_EMAIL_MEDIUM),
(False, None, "sms", "team", None, QueueNames.SEND_SMS_MEDIUM),
(
False,
Expand Down Expand Up @@ -975,7 +975,7 @@ def test_db_save_and_send_notification_saves_to_db(self, client, sample_template
"deliver_throttled_sms",
),
("sms", "normal", None, QueueNames.SEND_SMS_MEDIUM, "deliver_sms"),
("email", "normal", None, "send-email-tasks", "deliver_email"),
("email", "normal", None, QueueNames.SEND_EMAIL_MEDIUM, "deliver_email"),
("sms", "team", None, QueueNames.SEND_SMS_MEDIUM, "deliver_sms"),
("sms", "test", None, "research-mode-tasks", "deliver_sms"),
(
Expand Down
Loading

0 comments on commit e3ccb0c

Please sign in to comment.