Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add deprecation task and common send notification mail #1482

Merged
merged 4 commits into from
Feb 20, 2025
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 76 additions & 25 deletions backend/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,36 @@ def check_controls_with_expired_eta():
owner_controls[owner.email].append(control)
# Send personalized email to each owner
for owner_email, controls in owner_controls.items():
send_notification_email(owner_email, controls)
send_notification_email_expired_eta(owner_email, controls)


@task()
def send_notification_email(owner_email, controls):
# TODO this will probably will move to a common section later on.
notifications_enable_mailing = GlobalSettings.objects.get(name="general").value.get(
"notifications_enable_mailing", False
# @db_periodic_task(crontab(minute='*/1'))# for testing
@db_periodic_task(crontab(hour="5", minute="30"))
def check_deprecated_controls():
deprecated_controls_list = (
AppliedControl.objects
.filter(status="active")
.filter(expiry_date__lte=date.today(), expiry_date__isnull=False)
)
if not notifications_enable_mailing:
logger.warning(
"Email notification is disabled. You can enable it under Extra/Settings. Skipping for now."
)
return

# Check required email settings
required_settings = ["EMAIL_HOST", "EMAIL_PORT", "DEFAULT_FROM_EMAIL"]
missing_settings = [
setting
for setting in required_settings
if not hasattr(settings, setting) or not getattr(settings, setting)
]
deprecated_controls = deprecated_controls_list.prefetch_related("owner")

if missing_settings:
error_msg = f"Cannot send email notification: Missing email settings: {', '.join(missing_settings)}"
logger.error(error_msg)
return
# Group by individual owner
owner_controls = {}
for control in deprecated_controls:
for owner in control.owner.all():
if owner.email not in owner_controls:
owner_controls[owner.email] = []
owner_controls[owner.email].append(control)

if not owner_email:
logger.error("Cannot send email notification: No recipient email provided")
# Update the status of each expired control
deprecated_controls_list.update(status="deprecated")

for owner_email, controls in owner_controls.items():
send_notification_email_deprecated_control(owner_email, controls)

@task()
def send_notification_email_expired_eta(owner_email, controls):
if not check_email_configuration(owner_email, controls):
return

subject = f"CISO Assistant: You have {len(controls)} expired control(s)"
Expand All @@ -73,7 +73,27 @@ def send_notification_email(owner_email, controls):
message += "Log in to your CISO Assistant portal and check 'my assignments' section to get to your controls directly.\n\n"
message += "Thank you."

send_notification_email(subject, message, owner_email)

@task()
def send_notification_email_deprecated_control(owner_email, controls):
if not check_email_configuration(owner_email, controls):
return

subject = f"CISO Assistant: You have {len(controls)} deprecated control(s)"
message = "Hello,\n\nThe following controls have the expiracy date set to today:\n\n"
for control in controls:
message += f"- {control.name} (Set to: deprecated)\n"
message += "\nThis control(s) will be set to deprecated.\n"
message += "Log in to your CISO Assistant portal and check 'my assignments' section to get to your controls directly.\n\n"
message += "Thank you."

send_notification_email(subject, message, owner_email)

@task()
def send_notification_email(subject, message, owner_email):
try:
logger.debug("Sending notification email", subject=subject, message=message)
send_mail(
subject=subject,
message=message,
Expand All @@ -84,3 +104,34 @@ def send_notification_email(owner_email, controls):
logger.info(f"Successfully sent notification email to {owner_email}")
except Exception as e:
logger.error(f"Failed to send notification email to {owner_email}: {str(e)}")

@task()
def check_email_configuration(owner_email, controls):
# TODO this will probably will move to a common section later on.
notifications_enable_mailing = GlobalSettings.objects.get(name="general").value.get(
"notifications_enable_mailing", False
)
if not notifications_enable_mailing:
logger.warning(
"Email notification is disabled. You can enable it under Extra/Settings. Skipping for now."
)
return False

# Check required email settings
required_settings = ["EMAIL_HOST", "EMAIL_PORT", "DEFAULT_FROM_EMAIL"]
missing_settings = [
setting
for setting in required_settings
if not hasattr(settings, setting) or not getattr(settings, setting)
]

if missing_settings:
error_msg = f"Cannot send email notification: Missing email settings: {', '.join(missing_settings)}"
logger.error(error_msg)
return False

if not owner_email:
logger.error("Cannot send email notification: No recipient email provided")
return False

return True