Skip to content

Commit

Permalink
feat: add deprecation task and common method for notifications mail (#…
Browse files Browse the repository at this point in the history
…1482)

* feat(task)/ Add deprecation task and common send notification mail

* fix: fix check email configuration calls

* fix: fix logger from warning to debug

* fix: run ruf formatter
  • Loading branch information
Pioupuch authored Feb 20, 2025
1 parent 4996c07 commit c93d901
Showing 1 changed file with 79 additions and 24 deletions.
103 changes: 79 additions & 24 deletions backend/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,36 @@ def check_controls_with_expired_eta():
owner_controls[owner.email].append(control)
# Send personalized email to each owner
for owner_email, controls in owner_controls.items():
send_notification_email(owner_email, controls)
send_notification_email_expired_eta(owner_email, controls)


@task()
def send_notification_email(owner_email, controls):
# TODO this will probably will move to a common section later on.
notifications_enable_mailing = GlobalSettings.objects.get(name="general").value.get(
"notifications_enable_mailing", False
# @db_periodic_task(crontab(minute='*/1'))# for testing
@db_periodic_task(crontab(hour="5", minute="30"))
def check_deprecated_controls():
deprecated_controls_list = AppliedControl.objects.filter(status="active").filter(
expiry_date__lte=date.today(), expiry_date__isnull=False
)
if not notifications_enable_mailing:
logger.warning(
"Email notification is disabled. You can enable it under Extra/Settings. Skipping for now."
)
return

# Check required email settings
required_settings = ["EMAIL_HOST", "EMAIL_PORT", "DEFAULT_FROM_EMAIL"]
missing_settings = [
setting
for setting in required_settings
if not hasattr(settings, setting) or not getattr(settings, setting)
]
deprecated_controls = deprecated_controls_list.prefetch_related("owner")

if missing_settings:
error_msg = f"Cannot send email notification: Missing email settings: {', '.join(missing_settings)}"
logger.error(error_msg)
return
# Group by individual owner
owner_controls = {}
for control in deprecated_controls:
for owner in control.owner.all():
if owner.email not in owner_controls:
owner_controls[owner.email] = []
owner_controls[owner.email].append(control)

if not owner_email:
logger.error("Cannot send email notification: No recipient email provided")
# Update the status of each expired control
deprecated_controls_list.update(status="deprecated")

for owner_email, controls in owner_controls.items():
send_notification_email_deprecated_control(owner_email, controls)


@task()
def send_notification_email_expired_eta(owner_email, controls):
if not check_email_configuration(owner_email, controls):
return

subject = f"CISO Assistant: You have {len(controls)} expired control(s)"
Expand All @@ -73,7 +73,31 @@ def send_notification_email(owner_email, controls):
message += "Log in to your CISO Assistant portal and check 'my assignments' section to get to your controls directly.\n\n"
message += "Thank you."

send_notification_email(subject, message, owner_email)


@task()
def send_notification_email_deprecated_control(owner_email, controls):
if not check_email_configuration(owner_email, controls):
return

subject = f"CISO Assistant: You have {len(controls)} deprecated control(s)"
message = (
"Hello,\n\nThe following controls have the expiracy date set to today:\n\n"
)
for control in controls:
message += f"- {control.name} (Set to: deprecated)\n"
message += "\nThis control(s) will be set to deprecated.\n"
message += "Log in to your CISO Assistant portal and check 'my assignments' section to get to your controls directly.\n\n"
message += "Thank you."

send_notification_email(subject, message, owner_email)


@task()
def send_notification_email(subject, message, owner_email):
try:
logger.debug("Sending notification email", subject=subject, message=message)
send_mail(
subject=subject,
message=message,
Expand All @@ -84,3 +108,34 @@ def send_notification_email(owner_email, controls):
logger.info(f"Successfully sent notification email to {owner_email}")
except Exception as e:
logger.error(f"Failed to send notification email to {owner_email}: {str(e)}")


@task()
def check_email_configuration(owner_email, controls):
notifications_enable_mailing = GlobalSettings.objects.get(name="general").value.get(
"notifications_enable_mailing", False
)
if not notifications_enable_mailing:
logger.warning(
"Email notification is disabled. You can enable it under Extra/Settings. Skipping for now."
)
return False

# Check required email settings
required_settings = ["EMAIL_HOST", "EMAIL_PORT", "DEFAULT_FROM_EMAIL"]
missing_settings = [
setting
for setting in required_settings
if not hasattr(settings, setting) or not getattr(settings, setting)
]

if missing_settings:
error_msg = f"Cannot send email notification: Missing email settings: {', '.join(missing_settings)}"
logger.error(error_msg)
return False

if not owner_email:
logger.error("Cannot send email notification: No recipient email provided")
return False

return True

0 comments on commit c93d901

Please sign in to comment.