Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1248: Persistent Email Reminders #1354

Merged
merged 25 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5cd5119
Feature:1221 - Make visibility of auto-approval toggle configurable b…
anushka-singh Apr 25, 2024
b752ba3
Merge conflicts resolve
anushka-singh Jun 10, 2024
ce3b56d
Merge branch 'main' of https://github.com/anushka-singh/aws-dataall
anushka-singh Jun 14, 2024
3f4340e
Merge branch 'main' of https://github.com/anushka-singh/aws-dataall
anushka-singh Jun 20, 2024
32bea9d
Data712 y branch 2 5 (#375)
Jun 18, 2024
382a924
Data712 y branch 2 5 (#377)
Jun 18, 2024
653a480
Data712 y branch 2 5 (#381)
Jun 18, 2024
93b7f9b
Persistent emails
anushka-singh Jun 20, 2024
b73d12a
Persistent emails
anushka-singh Jun 20, 2024
e8dd359
Persistent emails
anushka-singh Jun 20, 2024
04994b5
Persistent emails
anushka-singh Jun 20, 2024
fa5e7df
Persistent emails
anushka-singh Jun 20, 2024
be83075
Persistent emails
anushka-singh Jun 20, 2024
b5379aa
lint fix
anushka-singh Jun 20, 2024
79c5c5b
Merge branch 'main' into issue1248
anushka-singh Jun 24, 2024
3a669ba
Merge branch 'data-dot-all:main' into issue1248
anushka-singh Jun 25, 2024
e567009
Merge branch 'main' of https://github.com/data-dot-all/dataall into i…
anushka-singh Jun 25, 2024
0468acd
Merge branch 'issue1248' of https://github.com/anushka-singh/aws-data…
anushka-singh Jun 25, 2024
5e0d8e4
Addressed comments
anushka-singh Jun 25, 2024
2b9bdf9
Addressed comments
anushka-singh Jun 25, 2024
9aaeb13
Addressed comments
anushka-singh Jun 25, 2024
efa27ce
Addressed comments
anushka-singh Jun 25, 2024
6eeaf93
lint fix
anushka-singh Jun 25, 2024
950ae28
Addressed comments
anushka-singh Jun 26, 2024
87981e7
Addressed comments
anushka-singh Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ def verify_share(engine, task: Task):
def reapply_share(engine, task: Task):
return EcsShareHandler._manage_share(engine, task, SharingService.reapply_share, 'reapply_share')

anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
@Worker.handler(path='ecs.share.persistent_email_reminder')
def persistent_email_reminder(engine, task: Task):
return EcsShareHandler._manage_share(
engine, task, SharingService.persistent_email_reminder, 'persistent_email_reminder'
)

@staticmethod
def _manage_share(engine, task: Task, local_handler, ecs_handler: str):
envname = os.environ.get('envname', 'local')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,50 @@ def notify_share_object_submission(self, email_id: str):
self._create_notification_task(subject=subject, msg=email_notification_msg)
return notifications

def notify_persistent_email_reminder(self, email_id: str, engine):
share_link_text = ''
if os.environ.get('frontend_domain_url'):
dlpzx marked this conversation as resolved.
Show resolved Hide resolved
share_link_text = (
f'<br><br>Please visit data.all <a href="{os.environ.get("frontend_domain_url")}'
f'/console/shares/{self.share.shareUri}">share link</a> '
f'to review and take appropriate action or view more details.'
)

msg_intro = f"""Dear User,

This is a reminder that a share request for the dataset "{self.dataset.label}" submitted by {email_id}
on behalf of principal "{self.share.principalId}" is still pending and has not been addressed.

"""

msg_end = """Your prompt attention to this matter is greatly appreciated.

Best regards,
The Data.all Team
"""

subject = f'URGENT REMINDER: Data.all | Action Required on Pending Share Request for {self.dataset.label}'
email_notification_msg = msg_intro + share_link_text + msg_end

notifications = self._register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_SUBMITTED.value, msg=msg_intro
)

self._create_persistent_reminder_notification_task(subject=subject, msg=email_notification_msg, engine=engine)
return notifications

def notify_share_object_approval(self, email_id: str):
share_link_text = ''
if os.environ.get('frontend_domain_url'):
share_link_text = f'<br><br> Please visit data.all <a href="{os.environ.get("frontend_domain_url")}/console/shares/{self.share.shareUri}">share link </a> to take action or view more details'
msg = f'User {email_id} APPROVED share request for dataset {self.dataset.label} for principal {self.share.principalId}'
share_link_text = (
f'<br><br> Please visit data.all <a href="{os.environ.get("frontend_domain_url")}'
f'/console/shares/{self.share.shareUri}">share link </a> '
f'to take action or view more details'
)
msg = (
f'User {email_id} APPROVED share request for dataset {self.dataset.label} '
f'for principal {self.share.principalId}'
)
subject = f'Data.all | Share Request Approved for {self.dataset.label}'
email_notification_msg = msg + share_link_text

Expand Down Expand Up @@ -175,3 +214,40 @@ def _create_notification_task(self, subject, msg):
log.info(f'Notification type : {share_notification_config_type} is not active')
else:
log.info('Notifications are not active')

Copy link
Contributor

@dlpzx dlpzx Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to review the design a bit because this pattern is new to data.all:

  1. an scheduled ECS task gets triggered, reads some metadata from RDS, executes some light business logic and queues Worker tasks
  2. the Worker Lambda sends the emails

At first sight it seems like we can simplify the workflow and run everything from ECS. Instead of creating Worker tasks (lines 235-249) we could directly call

SESEmailNotificationService.send_email_task(
                subject, message, recipient_groups_list, recipient_email_list
            )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noah-paige @petrkalos @SofiaSazonova I would like your opinion on this one :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only benefit of sending task to queue for worker lambda is if the worker lambda has code bundled in its image that is not there for the ECS container. However this is not the case and both images should have all the backend code at their disposal for whatever data.all task

While separating out all of our backend code into a more microservice aligned design could be useful - it is not in scope for this PR and requires much more design consideration and careful thought

TLDR - I think let's just keep it all in ECS :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had discussed the design with Noah before implementing it and we considered both options:

Option 1: Create a new ECS task that finds pending shares, creates tasks for them, and queues them up, then uses the existing worker Lambda to send emails.

Option 2: Use the send_email_task directly, as you suggested.

I chose Option 1 because I thought having a queue would be beneficial for managing a high volume of outgoing emails. It helps to queue up tasks and avoid potential clashes.

I am open to hearing what others have to say too.

Copy link
Contributor

@dlpzx dlpzx Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to vote for option 1: If we were implementing this from scratch, I would go for a queue + Lambda to process the emails. The part that I do not like form the current architecture is that we are using an ECS task to do very minor things. The new pattern that I would like to see is: scheduled Lambda ---> SQS queue ---> Worker Lambda. But that is out of scope for this PR. We can approve the current scheduled ECS ---> SQS queue ---> Worker Lambda and think about scheduled ECS tasks that could be handled in Lambda in a separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 after some more thought I think we maybe should just keep all compute for email reminders in ECS for now

I think it should only require a small change to L238 of share_notification_service to not use Worker.queue() and instead directly call

SESEmailNotificationService.send_email_task(
     subject, message, recipient_groups_list, recipient_email_list
 )

I think it could be worthwhile to keep Task creation nonetheless for auditability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noah-paige Added

SESEmailNotificationService.send_email_task(
     subject, message, recipient_groups_list, recipient_email_list
 )

directly to share_notification_service

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: I voted for option 1, but I am not strong opinionated.

def _create_persistent_reminder_notification_task(self, subject, msg, engine):
"""
At the moment just for notification_config_type = email, but designed for additional notification types
Emails sent to:
- dataset.SamlAdminGroupName
- dataset.stewards
"""
share_notification_config = config.get_property(
'modules.datasets_base.features.share_notifications', default=None
)
if share_notification_config:
for share_notification_config_type in share_notification_config.keys():
n_config = share_notification_config[share_notification_config_type]
if n_config.get('active', False) == True:
notification_recipient_groups_list = [self.dataset.SamlAdminGroupName, self.dataset.stewards]

if share_notification_config_type == 'email':
notification_task: Task = Task(
action='notification.service',
targetUri=self.share.shareUri,
payload={
'notificationType': share_notification_config_type,
'subject': subject,
'message': msg,
'recipientGroupsList': notification_recipient_groups_list,
'recipientEmailList': [],
},
)
self.session.add(notification_task)
self.session.commit()

Worker.queue(engine=engine, task_ids=[notification_task.taskUri])
else:
log.info(f'Notification type : {share_notification_config_type} is not active')
else:
log.info('Notifications are not active')
52 changes: 50 additions & 2 deletions backend/dataall/modules/shares_base/services/sharing_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from typing import Any
from dataclasses import dataclass
from sqlalchemy import and_
from abc import ABC, abstractmethod
from sqlalchemy import and_, func
from time import sleep
from dataall.base.db import Engine
from dataall.base.db import Engine, get_engine
from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup
from dataall.modules.shares_base.db.share_object_state_machines import (
ShareObjectSM,
Expand All @@ -17,8 +18,13 @@
)
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository

from dataall.modules.shares_base.services.share_notification_service import ShareNotificationService
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.notifications.db.notification_models import Notification
from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository
from dataall.modules.shares_base.services.share_processor_manager import ShareProcessorManager

from dataall.modules.shares_base.services.share_object_service import (
ShareObjectService,
)
Expand Down Expand Up @@ -356,6 +362,48 @@ def reapply_share(cls, engine: Engine, share_uri: str) -> bool:
if not lock_released:
log.error(f'Failed to release lock for dataset {share_data.dataset.datasetUri}.')

@staticmethod
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
def fetch_pending_shares(engine):
"""
A method used by the scheduled ECS Task to run fetch_pending_shares() process against ALL shared objects in ALL
active share objects within dataall
"""
with engine.scoped_session() as session:
pending_shares = (
session.query(ShareObject)
.join(
Notification,
and_(
ShareObject.shareUri == func.split_part(Notification.target_uri, '|', 1),
ShareObject.datasetUri == func.split_part(Notification.target_uri, '|', 2),
),
)
.filter(and_(Notification.type == 'SHARE_OBJECT_SUBMITTED', ShareObject.status == 'Submitted'))
.all()
)
return pending_shares

anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
def _get_share_data(session, uri):
share = ShareObjectRepository.get_share_by_uri(session, uri)
dataset = DatasetRepository.get_dataset_by_uri(session, share.datasetUri)
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
share_items_states = ShareObjectRepository.get_share_items_states(session, uri)
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
return share, dataset, share_items_states

@classmethod
def persistent_email_reminder(cls, uri: str, envname):
"""
A method used by the scheduled ECS Task to send email notifications to the requestor of the share object
"""
engine = get_engine(envname=envname)

with engine.scoped_session() as session:
share, dataset, states = cls._get_share_data(session, uri)
ShareNotificationService(session=session, dataset=dataset, share=share).notify_persistent_email_reminder(
email_id=share.owner, engine=engine
)
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
log.info(f'Email reminder sent for share {share.shareUri}')

@staticmethod
def _get_share_data_and_items(session, share_uri, status, healthStatus=None):
data = ShareObjectRepository.get_share_data(session, share_uri)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
import os
import sys
from dataall.modules.dataset_sharing.api.types import ShareObject
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
from dataall.modules.shares_base.services.sharing_service import SharingService
from dataall.base.db import get_engine
from dataall.base.aws.sqs import SqsQueue
from dataall.core.tasks.service_handlers import Worker

root = logging.getLogger()
root.setLevel(logging.INFO)
if not root.hasHandlers():
root.addHandler(logging.StreamHandler(sys.stdout))
log = logging.getLogger(__name__)
Worker.queue = SqsQueue.send


def persistent_email_reminders(engine, envname):
"""
A method used by the scheduled ECS Task to run persistent_email_reminder() process against ALL
active share objects within data.all and send emails to all pending shares.
"""
with engine.scoped_session() as session:
log.info('Running Persistent Email Reminders Task')
pending_shares = SharingService.fetch_pending_shares(engine=engine)
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved
log.info(f'Found {len(pending_shares)} pending shares')
pending_share: ShareObject
for pending_share in pending_shares:
log.info(f'Sending Email Reminder for Share: {pending_share.shareUri}')
SharingService.persistent_email_reminder(uri=pending_share.shareUri, envname=envname)
log.info('Completed Persistent Email Reminders Task')


if __name__ == '__main__':
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
persistent_email_reminders(engine=ENGINE, envname=ENVNAME)
1 change: 1 addition & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"share_notifications": {
"email": {
"active": false,
"persistent_reminders": false,
"parameters": {
"group_notifications": true
}
Expand Down
3 changes: 3 additions & 0 deletions deploy/stacks/backend_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ def __init__(
self.lambda_api_stack.api_handler,
self.lambda_api_stack.elasticsearch_proxy_handler,
],
email_custom_domain=ses_stack.ses_identity.email_identity_name if ses_stack is not None else None,
ses_configuration_set=ses_stack.configuration_set.configuration_set_name if ses_stack is not None else None,
custom_domain=custom_domain,
**kwargs,
)

Expand Down
61 changes: 59 additions & 2 deletions deploy/stacks/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def __init__(
tooling_account_id=None,
s3_prefix_list=None,
lambdas=None,
email_custom_domain=None,
ses_configuration_set=None,
custom_domain=None,
**kwargs,
):
super().__init__(scope, id, **kwargs)
Expand All @@ -44,11 +47,20 @@ def __init__(
self._ecr_repository = ecr_repository
self._vpc = vpc
self._prod_sizing = prod_sizing
self._email_custom_domain = email_custom_domain
self._ses_configuration_set = ses_configuration_set
anushka-singh marked this conversation as resolved.
Show resolved Hide resolved

(self.scheduled_tasks_sg, self.share_manager_sg) = self.create_ecs_security_groups(
envname, resource_prefix, vpc, vpce_connection, s3_prefix_list, lambdas
)
self.ecs_security_groups: [aws_ec2.SecurityGroup] = [self.scheduled_tasks_sg, self.share_manager_sg]
self.env_vars = self._create_env('INFO')
dlpzx marked this conversation as resolved.
Show resolved Hide resolved

# Check if custom domain exists and if it exists email notifications could be enabled.
# Create an env variable which stores the domain URL.
# This is used for sending data.all share weblinks in the email notifications.
if custom_domain and custom_domain.get('hosted_zone_name'):
self.env_vars.update({'frontend_domain_url': f'https://{custom_domain["hosted_zone_name"]}'})

cluster = ecs.Cluster(
self,
Expand All @@ -58,7 +70,10 @@ def __init__(
container_insights=True,
)

self.task_role = self.create_task_role(envname, resource_prefix, pivot_role_name)
self.task_role = self.create_task_role(
envname, resource_prefix, pivot_role_name, email_custom_domain, ses_configuration_set
)

self.cicd_stacks_updater_role = self.create_cicd_stacks_updater_role(
envname, resource_prefix, tooling_account_id
)
Expand Down Expand Up @@ -178,6 +193,7 @@ def __init__(
self.add_share_verifier_task()
self.add_share_reapplier_task()
self.add_omics_fetch_workflows_task()
self.add_persistent_email_reminders_task()

@run_if(['modules.s3_datasets.active', 'modules.dashboards.active'])
def add_catalog_indexer_task(self):
Expand Down Expand Up @@ -286,6 +302,32 @@ def add_share_reapplier_task(self):
)
self.ecs_task_definitions_families.append(share_reapplier_task_definition.family)

@run_if(['modules.dataset_base.features.share_notifications.email.persistent_reminders'])
def add_persistent_email_reminders_task(self):
persistent_email_reminders_task, persistent_email_reminders_task_def = self.set_scheduled_task(
cluster=self.ecs_cluster,
command=[
'python3.9',
'-m',
'dataall.modules.shares_base.tasks.persistent_email_reminders_task',
],
container_id='container',
ecr_repository=self._ecr_repository,
environment=self.env_vars,
image_tag=self._cdkproxy_image_tag,
log_group=self.create_log_group(
self._envname, self._resource_prefix, log_group_name='persistent-email-reminders'
),
schedule_expression=Schedule.expression('cron(0 9 ? * 2 *)'), # Run at 9:00 AM UTC every Monday
scheduled_task_id=f'{self._resource_prefix}-{self._envname}-persistent-email-reminders-schedule',
task_id=f'{self._resource_prefix}-{self._envname}-persistent-email-reminders',
task_role=self.task_role,
vpc=self._vpc,
security_group=self.scheduled_tasks_sg,
prod_sizing=self._prod_sizing,
)
self.ecs_task_definitions_families.append(persistent_email_reminders_task.task_definition.family)

@run_if(['modules.s3_datasets.active'])
def add_subscription_task(self):
subscriptions_task, subscription_task_def = self.set_scheduled_task(
Expand Down Expand Up @@ -453,7 +495,9 @@ def create_cicd_stacks_updater_role(self, envname, resource_prefix, tooling_acco
)
return cicd_stacks_updater_role

def create_task_role(self, envname, resource_prefix, pivot_role_name):
def create_task_role(
self, envname, resource_prefix, pivot_role_name, email_custom_domain=None, ses_configuration_set=None
):
role_inline_policy = iam.Policy(
self,
f'ECSRolePolicy{envname}',
Expand Down Expand Up @@ -542,13 +586,26 @@ def create_task_role(self, envname, resource_prefix, pivot_role_name):
),
],
)

if email_custom_domain and ses_configuration_set:
role_inline_policy.document.add_statements(
iam.PolicyStatement(
actions=['ses:SendEmail'],
resources=[
f'arn:aws:ses:{self.region}:{self.account}:identity/{email_custom_domain}',
f'arn:aws:ses:{self.region}:{self.account}:configuration-set/{ses_configuration_set}',
],
)
)

task_role = iam.Role(
self,
f'ECSTaskRole{envname}',
role_name=f'{resource_prefix}-{envname}-ecs-tasks-role',
inline_policies={f'ECSRoleInlinePolicy{envname}': role_inline_policy.document},
assumed_by=iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
)

task_role.grant_pass_role(task_role)
return task_role

Expand Down
Loading