From 3d2dfc72bf9a1360a13e2e6baf49c3de1f6d2a86 Mon Sep 17 00:00:00 2001 From: dlpzx <71252798+dlpzx@users.noreply.github.com> Date: Wed, 19 Jun 2024 15:45:28 +0200 Subject: [PATCH] Generic shares_base module and specific s3_datasets_shares module - part 7 (share_object_service) (#1340) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Feature or Bugfix - Refactoring ### Detail As explained in the design for #1123 and #1283 we are trying to implement generic `datasets_base` and `shares_base` modules that can be used by any type of datasets and by any type of shareable object in a generic way. The goal of this PR is to move the `share_object_service` to `shares_base` and refactor any dependency to S3 in the service. - Move file and fix imports of ShareObjectService - Use DatasetsBase and DatasetsBaseRepository instead of the S3 equivalents - ⚠️ Avoid Dashboard check logic in `ShareObjectService.submit_share_object` see below - ⚠️ Avoid SharePolicyService logic in `ShareObjectService.create_share_object` see below - Create ShareLogsService for logs - Remove unused methods - I also copied share_item_service to shares_base (it will be used in next PR) #### Avoid Dashboard check logic in `ShareObjectService.submit_share_object` Currently, whenever a share request is submitted, we check if the REQUESTER environment has dashboards enabled and if there are shared tables we verify that the Quicksight subscription is active. Alternative: perform this check in the share processor of tables. It solves the issue, but it gives a poorer user experience as it is difficult to figure out for the requester why the share failed. This can be solved holistically as requested in https://github.com/data-dot-all/dataall/issues/1168. Decision: move the logic to the processor and make the table share fail. #### Avoid SharePolicyService logic in `ShareObjectService.create_share_object` When a share request is first created, we perform a series of operations to ensure that an IAM policy for the share requester principal IAM role is created. Alternative 1: move this logic inside the share processor. Not sure if it is possible. It would be the ideal solution, but the SharePolicyService throws errors in the create share object API if the policy is not attached. Alternative 2: implement interface to define share_policies (similar to the dataset-actions that use share logic in `backend/dataall/modules/s3_datasets_shares/services/dataset_sharing_service.py`). Decision: we want to preserve the user experience of having the IAM policy created before the share request is processed. Plus, it is not an uncommon pattern that could get extended by other dataset types, for example redshift sharing might need additional share policies. For this reason this PR implements alternative 2 ### Relates - #1283 - #1123 - #955 ### Security Please answer the questions below briefly where applicable, or write `N/A`. Based on [OWASP 10](https://owasp.org/Top10/en/). - Does this PR introduce or modify any input fields or queries - this includes fetching data from storage outside the application (e.g. a database, an S3 bucket)? - Is the input sanitized? - What precautions are you taking before deserializing the data you consume? - Is injection prevented by parametrizing queries? - Have you ensured no `eval` or similar functions are used? - Does this PR introduce any functionality or component that requires authorization? - How have you ensured it respects the existing AuthN/AuthZ mechanisms? - Are you logging failed auth attempts? - Are you using or adding any cryptographic features? - Do you use a standard proven implementations? - Are the used keys controlled by the customer? Where are they stored? - Are you introducing any new policies/roles/users? - Have you used the least-privilege principle? How? By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --- backend/dataall/base/aws/quicksight.py | 1 - .../services/managed_iam_policies.py | 14 +- .../db/share_object_repositories.py | 2 +- .../glue_table_share_processor.py | 9 +- .../s3_access_point_share_processor.py | 2 +- .../s3_bucket_share_processor.py | 2 +- .../modules/shares_base/api/resolvers.py | 8 +- .../services/share_item_service.py | 334 ++++++++++++++++++ .../services/share_logs_service.py | 75 ++++ .../services/share_object_service.py | 160 ++------- .../shares_base/services/sharing_service.py | 4 +- .../modules/s3_datasets_shares/test_share.py | 1 - 12 files changed, 467 insertions(+), 145 deletions(-) create mode 100644 backend/dataall/modules/shares_base/services/share_item_service.py create mode 100644 backend/dataall/modules/shares_base/services/share_logs_service.py rename backend/dataall/modules/{s3_datasets_shares => shares_base}/services/share_object_service.py (75%) diff --git a/backend/dataall/base/aws/quicksight.py b/backend/dataall/base/aws/quicksight.py index 5f73fbbd0..b9eda86d7 100644 --- a/backend/dataall/base/aws/quicksight.py +++ b/backend/dataall/base/aws/quicksight.py @@ -133,7 +133,6 @@ def check_quicksight_enterprise_subscription(AwsAccountId, region=None): except client.exceptions.AccessDeniedException: raise Exception('Access denied to Quicksight for selected role') - return False @staticmethod def create_quicksight_group(AwsAccountId, region, GroupName=DEFAULT_GROUP_NAME): diff --git a/backend/dataall/core/environment/services/managed_iam_policies.py b/backend/dataall/core/environment/services/managed_iam_policies.py index e55505267..5962f58fd 100644 --- a/backend/dataall/core/environment/services/managed_iam_policies.py +++ b/backend/dataall/core/environment/services/managed_iam_policies.py @@ -26,21 +26,29 @@ def policy_type(self): """ Returns string and needs to be implemented in the ManagedPolicies inherited classes """ - raise NotImplementedError + ... @abstractmethod def generate_policy_name(self) -> str: """ Returns string and needs to be implemented in the ManagedPolicies inherited classes """ - raise NotImplementedError + ... @abstractmethod def generate_empty_policy(self) -> dict: """ Returns dict and needs to be implemented in the ManagedPolicies inherited classes """ - raise NotImplementedError + ... + + @abstractmethod + def create_managed_policy_from_inline_and_delete_inline(self) -> str: + """ + Returns policy arn and needs to be implemented in the ManagedPolicies inherited classes + It is used for backwards compatibility. It should be deprecated and removed in future releases. + """ + ... def check_if_policy_exists(self) -> bool: policy_name = self.generate_policy_name() diff --git a/backend/dataall/modules/s3_datasets_shares/db/share_object_repositories.py b/backend/dataall/modules/s3_datasets_shares/db/share_object_repositories.py index d695644f6..d5776f548 100644 --- a/backend/dataall/modules/s3_datasets_shares/db/share_object_repositories.py +++ b/backend/dataall/modules/s3_datasets_shares/db/share_object_repositories.py @@ -53,7 +53,7 @@ def list_all_active_share_objects(session) -> [ShareObject]: ## TODO: Already i return session.query(ShareObject).filter(ShareObject.deleted.is_(None)).all() @staticmethod - def find_share(session, dataset: S3Dataset, env, principal_id, group_uri) -> ShareObject: + def find_share(session, dataset: DatasetBase, env, principal_id, group_uri) -> ShareObject: return ( session.query(ShareObject) .filter( diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py index a4879d98f..4acd831aa 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py +++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py @@ -2,6 +2,8 @@ from typing import List from warnings import warn from datetime import datetime +from dataall.core.environment.services.environment_service import EnvironmentService +from dataall.base.aws.quicksight import QuicksightClient from dataall.modules.shares_base.services.shares_enums import ( ShareItemHealthStatus, ShareItemStatus, @@ -13,7 +15,7 @@ from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound from dataall.modules.s3_datasets_shares.services.share_managers import LFShareManager from dataall.modules.s3_datasets_shares.aws.ram_client import RamClient -from dataall.modules.s3_datasets_shares.services.share_object_service import ShareObjectService +from dataall.modules.shares_base.services.share_object_service import ShareObjectService from dataall.modules.s3_datasets_shares.services.share_item_service import ShareItemService from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareObjectRepository from dataall.modules.shares_base.db.share_object_state_machines import ShareItemSM @@ -78,6 +80,11 @@ def process_approved_shares(self) -> bool: raise Exception( 'Source account details not initialized properly. Please check if the catalog account is properly onboarded on data.all' ) + env = EnvironmentService.get_environment_by_uri(self.session, self.share_data.share.environmentUri) + if EnvironmentService.get_boolean_env_param(self.session, env, 'dashboardsEnabled'): + QuicksightClient.check_quicksight_enterprise_subscription( + AwsAccountId=env.AwsAccountId, region=env.region + ) manager.initialize_clients() manager.grant_pivot_role_all_database_permissions_to_source_database() manager.check_if_exists_and_create_shared_database_in_target() diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py index 76cab06f4..efab0bd4d 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py +++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_access_point_share_processor.py @@ -4,7 +4,7 @@ from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound from dataall.modules.s3_datasets_shares.services.share_managers import S3AccessPointShareManager -from dataall.modules.s3_datasets_shares.services.share_object_service import ShareObjectService +from dataall.modules.shares_base.services.share_object_service import ShareObjectService from dataall.modules.s3_datasets_shares.services.share_item_service import ShareItemService from dataall.modules.shares_base.services.shares_enums import ( ShareItemHealthStatus, diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_bucket_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_bucket_share_processor.py index df5f292fa..751e027c2 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_bucket_share_processor.py +++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/s3_bucket_share_processor.py @@ -4,7 +4,7 @@ from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound from dataall.modules.s3_datasets_shares.services.share_managers import S3BucketShareManager -from dataall.modules.s3_datasets_shares.services.share_object_service import ShareObjectService +from dataall.modules.shares_base.services.share_object_service import ShareObjectService from dataall.modules.shares_base.services.shares_enums import ( ShareItemHealthStatus, ShareItemStatus, diff --git a/backend/dataall/modules/shares_base/api/resolvers.py b/backend/dataall/modules/shares_base/api/resolvers.py index 7845fae9c..aa202b28b 100644 --- a/backend/dataall/modules/shares_base/api/resolvers.py +++ b/backend/dataall/modules/shares_base/api/resolvers.py @@ -8,8 +8,8 @@ from dataall.modules.shares_base.services.shares_enums import ShareObjectPermission from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject from dataall.modules.s3_datasets_shares.services.share_item_service import ShareItemService -from dataall.modules.s3_datasets_shares.services.share_object_service import ShareObjectService -from dataall.modules.s3_datasets_shares.aws.glue_client import GlueClient +from dataall.modules.shares_base.services.share_object_service import ShareObjectService +from dataall.modules.shares_base.services.share_logs_service import ShareLogsService from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset @@ -132,7 +132,7 @@ def get_share_object(context, source, shareUri: str = None): def get_share_logs(context, source, shareUri: str): - return ShareObjectService.get_share_logs(shareUri) + return ShareLogsService.get_share_logs(shareUri) def resolve_user_role(context: Context, source: ShareObject, **kwargs): @@ -168,7 +168,7 @@ def resolve_user_role(context: Context, source: ShareObject, **kwargs): def resolve_can_view_logs(context: Context, source: ShareObject): - return ShareObjectService.check_view_log_permissions(context.username, context.groups, source.shareUri) + return ShareLogsService.check_view_log_permissions(context.username, context.groups, source.shareUri) def resolve_dataset(context: Context, source: ShareObject, **kwargs): diff --git a/backend/dataall/modules/shares_base/services/share_item_service.py b/backend/dataall/modules/shares_base/services/share_item_service.py new file mode 100644 index 000000000..49827ea78 --- /dev/null +++ b/backend/dataall/modules/shares_base/services/share_item_service.py @@ -0,0 +1,334 @@ +import logging + +from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService +from dataall.core.tasks.service_handlers import Worker +from dataall.base.context import get_context +from dataall.core.environment.services.environment_service import EnvironmentService +from dataall.core.tasks.db.task_models import Task +from dataall.base.db import utils +from dataall.base.db.exceptions import ObjectNotFound, UnauthorizedOperation +from dataall.modules.shares_base.services.shares_enums import ( + ShareObjectActions, + ShareableType, + ShareItemStatus, + ShareItemActions, + ShareItemHealthStatus, +) +from dataall.modules.s3_datasets_shares.aws.glue_client import GlueClient +from dataall.modules.shares_base.db.share_object_models import ShareObjectItem +from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareObjectRepository +from dataall.modules.shares_base.db.share_object_state_machines import ( + ShareObjectSM, + ShareItemSM, +) +from dataall.modules.shares_base.services.share_exceptions import ShareItemsFound +from dataall.modules.shares_base.services.share_notification_service import ShareNotificationService +from dataall.modules.shares_base.services.share_permissions import ( + GET_SHARE_OBJECT, + ADD_ITEM, + REMOVE_ITEM, + LIST_ENVIRONMENT_SHARED_WITH_OBJECTS, + APPROVE_SHARE_OBJECT, +) +from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository +from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset, DatasetStorageLocation +from dataall.modules.s3_datasets.services.dataset_permissions import DATASET_TABLE_READ, DATASET_FOLDER_READ + +log = logging.getLogger(__name__) + + +class ShareItemService: + @staticmethod + def _get_share_uri(session, uri): + share_item = ShareObjectRepository.get_share_item_by_uri(session, uri) + share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri) + return share.shareUri + + @staticmethod + @ResourcePolicyService.has_resource_permission(GET_SHARE_OBJECT) + def verify_items_share_object(uri, item_uris): + context = get_context() + with context.db_engine.scoped_session() as session: + verify_items = [ShareObjectRepository.get_share_item_by_uri(session, uri) for uri in item_uris] + for item in verify_items: + setattr(item, 'healthStatus', ShareItemHealthStatus.PendingVerify.value) + + verify_share_items_task: Task = Task(action='ecs.share.verify', targetUri=uri) + session.add(verify_share_items_task) + + Worker.queue(engine=context.db_engine, task_ids=[verify_share_items_task.taskUri]) + return True + + @staticmethod + @ResourcePolicyService.has_resource_permission(APPROVE_SHARE_OBJECT) + def reapply_items_share_object(uri, item_uris): + context = get_context() + with context.db_engine.scoped_session() as session: + verify_items = [ShareObjectRepository.get_share_item_by_uri(session, uri) for uri in item_uris] + for item in verify_items: + setattr(item, 'healthStatus', ShareItemHealthStatus.PendingReApply.value) + + reapply_share_items_task: Task = Task(action='ecs.share.reapply', targetUri=uri) + session.add(reapply_share_items_task) + + Worker.queue(engine=context.db_engine, task_ids=[reapply_share_items_task.taskUri]) + return True + + @staticmethod + @ResourcePolicyService.has_resource_permission(GET_SHARE_OBJECT) + def revoke_items_share_object(uri, revoked_uris): + context = get_context() + with context.db_engine.scoped_session() as session: + share = ShareObjectRepository.get_share_by_uri(session, uri) + dataset = DatasetRepository.get_dataset_by_uri(session, share.datasetUri) + revoked_items_states = ShareObjectRepository.get_share_items_states(session, uri, revoked_uris) + revoked_items = [ShareObjectRepository.get_share_item_by_uri(session, uri) for uri in revoked_uris] + + if not revoked_items_states: + raise ShareItemsFound( + action='Revoke Items from Share Object', + message='Nothing to be revoked.', + ) + + share_sm = ShareObjectSM(share.status) + new_share_state = share_sm.run_transition(ShareObjectActions.RevokeItems.value) + + for item_state in revoked_items_states: + item_sm = ShareItemSM(item_state) + new_state = item_sm.run_transition(ShareObjectActions.RevokeItems.value) + for item in revoked_items: + if item.status == item_state: + item_sm.update_state_single_item(session, item, new_state) + + share_sm.update_state(session, share, new_share_state) + + ShareNotificationService(session=session, dataset=dataset, share=share).notify_share_object_rejection( + email_id=context.username + ) + + revoke_share_task: Task = Task( + action='ecs.share.revoke', + targetUri=uri, + payload={'environmentUri': share.environmentUri}, + ) + session.add(revoke_share_task) + + Worker.queue(engine=context.db_engine, task_ids=[revoke_share_task.taskUri]) + + return share + + @staticmethod + @ResourcePolicyService.has_resource_permission(ADD_ITEM) + def add_shared_item(uri: str, data: dict = None): + context = get_context() + with context.db_engine.scoped_session() as session: + item_type = data.get('itemType') + item_uri = data.get('itemUri') + share = ShareObjectRepository.get_share_by_uri(session, uri) + dataset: S3Dataset = DatasetRepository.get_dataset_by_uri(session, share.datasetUri) + target_environment = EnvironmentService.get_environment_by_uri(session, share.environmentUri) + + share_sm = ShareObjectSM(share.status) + new_share_state = share_sm.run_transition(ShareItemActions.AddItem.value) + share_sm.update_state(session, share, new_share_state) + + item = ShareObjectRepository.get_share_item(session, item_type, item_uri) + if not item: + raise ObjectNotFound('ShareObjectItem', item_uri) + + if item_type == ShareableType.Table.value and item.region != target_environment.region: + raise UnauthorizedOperation( + action=ADD_ITEM, + message=f'Lake Formation cross region sharing is not supported. ' + f'Table {item.itemUri} is in {item.region} and target environment ' + f'{target_environment.name} is in {target_environment.region} ', + ) + + share_item: ShareObjectItem = ShareObjectRepository.find_sharable_item(session, uri, item_uri) + + s3_access_point_name = utils.slugify( + share.datasetUri + '-' + share.principalId, + max_length=50, + lowercase=True, + regex_pattern='[^a-zA-Z0-9-]', + separator='-', + ) + log.info(f'S3AccessPointName={s3_access_point_name}') + + if not share_item: + share_item = ShareObjectItem( + shareUri=uri, + itemUri=item_uri, + itemType=item_type, + itemName=item.name, + status=ShareItemStatus.PendingApproval.value, + owner=context.username, + ) + session.add(share_item) + return share_item + + @staticmethod + @ResourcePolicyService.has_resource_permission(REMOVE_ITEM, parent_resource=_get_share_uri) + def remove_shared_item(uri: str): + with get_context().db_engine.scoped_session() as session: + share_item = ShareObjectRepository.get_share_item_by_uri(session, uri) + if ( + share_item.itemType == ShareableType.Table.value + and share_item.status == ShareItemStatus.Share_Failed.value + ): + share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri) + ResourcePolicyService.delete_resource_policy( + session=session, + group=share.groupUri, + resource_uri=share_item.itemUri, + ) + + item_sm = ShareItemSM(share_item.status) + item_sm.run_transition(ShareItemActions.RemoveItem.value) + ShareObjectRepository.remove_share_object_item(session, share_item) + return True + + @staticmethod + @ResourcePolicyService.has_resource_permission(GET_SHARE_OBJECT) + def resolve_shared_item(uri, item: ShareObjectItem): + with get_context().db_engine.scoped_session() as session: + return ShareObjectRepository.get_share_item(session, item.itemType, item.itemUri) + + @staticmethod + def check_existing_shared_items(share): + with get_context().db_engine.scoped_session() as session: + return ShareObjectRepository.check_existing_shared_items(session, share.shareUri) + + @staticmethod + def list_shareable_objects(share, filter, is_revokable=False): + states = None + if is_revokable: + states = ShareItemSM.get_share_item_revokable_states() + + with get_context().db_engine.scoped_session() as session: + return ShareObjectRepository.list_shareable_items(session, share, states, filter) + + @staticmethod + @ResourcePolicyService.has_resource_permission(LIST_ENVIRONMENT_SHARED_WITH_OBJECTS) + def paginated_shared_with_environment_datasets(session, uri, data) -> dict: + return ShareObjectRepository.paginate_shared_datasets(session, uri, data) + + @staticmethod + def _get_glue_database_for_share(glueDatabase, account_id, region): + # Check if a catalog account exists and return database accordingly + try: + catalog_dict = GlueClient( + account_id=account_id, + region=region, + database=glueDatabase, + ).get_source_catalog() + + if catalog_dict is not None: + return catalog_dict.get('database_name') + else: + return glueDatabase + except Exception as e: + raise e + + @staticmethod + def delete_dataset_table_read_permission(session, share, tableUri): + """ + Delete Table permissions to share groups + """ + other_shares = ShareObjectRepository.find_all_other_share_items( + session, + not_this_share_uri=share.shareUri, + item_uri=tableUri, + share_type=ShareableType.Table.value, + principal_type='GROUP', + principal_uri=share.groupUri, + item_status=[ShareItemStatus.Share_Succeeded.value], + ) + log.info(f'Table {tableUri} has been shared with group {share.groupUri} in {len(other_shares)} more shares') + if len(other_shares) == 0: + log.info('Delete permissions...') + ResourcePolicyService.delete_resource_policy(session=session, group=share.groupUri, resource_uri=tableUri) + + @staticmethod + def delete_dataset_folder_read_permission(session, share, locationUri): + """ + Delete Folder permissions to share groups + """ + other_shares = ShareObjectRepository.find_all_other_share_items( + session, + not_this_share_uri=share.shareUri, + item_uri=locationUri, + share_type=ShareableType.StorageLocation.value, + principal_type='GROUP', + principal_uri=share.groupUri, + item_status=[ShareItemStatus.Share_Succeeded.value], + ) + log.info( + f'Location {locationUri} has been shared with group {share.groupUri} in {len(other_shares)} more shares' + ) + if len(other_shares) == 0: + log.info('Delete permissions...') + ResourcePolicyService.delete_resource_policy( + session=session, + group=share.groupUri, + resource_uri=locationUri, + ) + + @staticmethod + def attach_dataset_table_read_permission(session, share, tableUri): + """ + Attach Table permissions to share groups + """ + existing_policy = ResourcePolicyService.find_resource_policies( + session, + group=share.groupUri, + resource_uri=tableUri, + resource_type=DatasetTable.__name__, + permissions=DATASET_TABLE_READ, + ) + # toDo: separate policies from list DATASET_TABLE_READ, because in future only one of them can be granted (Now they are always granted together) + if len(existing_policy) == 0: + log.info( + f'Attaching new resource permission policy {DATASET_TABLE_READ} to table {tableUri} for group {share.groupUri}' + ) + ResourcePolicyService.attach_resource_policy( + session=session, + group=share.groupUri, + permissions=DATASET_TABLE_READ, + resource_uri=tableUri, + resource_type=DatasetTable.__name__, + ) + else: + log.info( + f'Resource permission policy {DATASET_TABLE_READ} to table {tableUri} for group {share.groupUri} already exists. Skip... ' + ) + + @staticmethod + def attach_dataset_folder_read_permission(session, share, locationUri): + """ + Attach Folder permissions to share groups + """ + existing_policy = ResourcePolicyService.find_resource_policies( + session, + group=share.groupUri, + resource_uri=locationUri, + resource_type=DatasetStorageLocation.__name__, + permissions=DATASET_FOLDER_READ, + ) + # toDo: separate policies from list DATASET_TABLE_READ, because in future only one of them can be granted (Now they are always granted together) + if len(existing_policy) == 0: + log.info( + f'Attaching new resource permission policy {DATASET_FOLDER_READ} to folder {locationUri} for group {share.groupUri}' + ) + + ResourcePolicyService.attach_resource_policy( + session=session, + group=share.groupUri, + permissions=DATASET_FOLDER_READ, + resource_uri=locationUri, + resource_type=DatasetStorageLocation.__name__, + ) + else: + log.info( + f'Resource permission policy {DATASET_FOLDER_READ} to table {locationUri} for group {share.groupUri} already exists. Skip... ' + ) diff --git a/backend/dataall/modules/shares_base/services/share_logs_service.py b/backend/dataall/modules/shares_base/services/share_logs_service.py new file mode 100644 index 000000000..93592eb31 --- /dev/null +++ b/backend/dataall/modules/shares_base/services/share_logs_service.py @@ -0,0 +1,75 @@ +import os +import logging + +from dataall.base.context import get_context +from dataall.base.utils import Parameter +from dataall.base.db import exceptions +from dataall.core.stacks.aws.cloudwatch import CloudWatch + +from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository +from dataall.modules.s3_datasets_shares.db.share_object_repositories import ( + ShareObjectRepository, +) # TODO: REPOSITORY TO SHARES_BASE + +log = logging.getLogger(__name__) + + +class ShareLogsService: + @staticmethod + def check_view_log_permissions(username, groups, shareUri): + with get_context().db_engine.scoped_session() as session: + share = ShareObjectRepository.get_share_by_uri(session, shareUri) + ds = DatasetBaseRepository.get_dataset_by_uri(session, share.datasetUri) + return ds.stewards in groups or ds.SamlAdminGroupName in groups or username == ds.owner + + @staticmethod + def get_share_logs_name_query(shareUri): + log.info(f'Get share Logs stream name for share {shareUri}') + + query = f"""fields @logStream + |filter @message like '{shareUri}' + | sort @timestamp desc + | limit 1 + """ + return query + + @staticmethod + def get_share_logs_query(log_stream_name): + query = f"""fields @timestamp, @message, @logStream, @log as @logGroup + | sort @timestamp asc + | filter @logStream like "{log_stream_name}" + """ + return query + + @staticmethod + def get_share_logs(shareUri): + context = get_context() + if not ShareLogsService.check_view_log_permissions(context.username, context.groups, shareUri): + raise exceptions.ResourceUnauthorized( + username=context.username, + action='View Share Logs', + resource_uri=shareUri, + ) + + envname = os.getenv('envname', 'local') + log_group_name = f"/{Parameter().get_parameter(env=envname, path='resourcePrefix')}/{envname}/ecs/share-manager" + + query_for_name = ShareLogsService.get_share_logs_name_query(shareUri=shareUri) + name_query_result = CloudWatch.run_query( + query=query_for_name, + log_group_name=log_group_name, + days=1, + ) + if len(name_query_result) == 0: + return [] + + name = name_query_result[0]['logStream'] + + query = ShareLogsService.get_share_logs_query(log_stream_name=name) + results = CloudWatch.run_query( + query=query, + log_group_name=log_group_name, + days=1, + ) + log.info(f'Running Logs query {query} for log_group_name={log_group_name}') + return results diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_object_service.py b/backend/dataall/modules/shares_base/services/share_object_service.py similarity index 75% rename from backend/dataall/modules/s3_datasets_shares/services/share_object_service.py rename to backend/dataall/modules/shares_base/services/share_object_service.py index 985e55787..4f649f84e 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/share_object_service.py +++ b/backend/dataall/modules/shares_base/services/share_object_service.py @@ -1,5 +1,4 @@ import os -from datetime import datetime from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService from dataall.core.tasks.service_handlers import Worker @@ -7,10 +6,8 @@ from dataall.core.activity.db.activity_models import Activity from dataall.core.environment.db.environment_models import EnvironmentGroup, ConsumptionRole from dataall.core.environment.services.environment_service import EnvironmentService -from dataall.core.permissions.services.environment_permissions import GET_ENVIRONMENT +from dataall.core.environment.services.managed_iam_policies import PolicyManager from dataall.core.tasks.db.task_models import Task -from dataall.base.db import utils -from dataall.base.aws.quicksight import QuicksightClient from dataall.base.db.exceptions import UnauthorizedOperation from dataall.modules.shares_base.services.shares_enums import ( ShareObjectActions, @@ -20,14 +17,15 @@ PrincipalType, ) from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject -from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareObjectRepository +from dataall.modules.s3_datasets_shares.db.share_object_repositories import ( + ShareObjectRepository, +) # TODO: REPOSITORY TO SHARES_BASE from dataall.modules.shares_base.db.share_object_state_machines import ( ShareObjectSM, ShareItemSM, ) from dataall.modules.shares_base.services.share_exceptions import ShareItemsFound, PrincipalRoleNotFound from dataall.modules.shares_base.services.share_notification_service import ShareNotificationService -from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import SharePolicyService from dataall.modules.shares_base.services.share_permissions import ( REJECT_SHARE_OBJECT, APPROVE_SHARE_OBJECT, @@ -38,28 +36,16 @@ DELETE_SHARE_OBJECT, GET_SHARE_OBJECT, ) -from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository -from dataall.modules.s3_datasets.db.dataset_models import S3Dataset +from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository +from dataall.modules.datasets_base.db.dataset_models import DatasetBase from dataall.base.aws.iam import IAM -from dataall.base.utils import Parameter -from dataall.base.db import exceptions -from dataall.core.stacks.aws.cloudwatch import CloudWatch - - import logging log = logging.getLogger(__name__) class ShareObjectService: - @staticmethod - def check_view_log_permissions(username, groups, shareUri): - with get_context().db_engine.scoped_session() as session: - share = ShareObjectRepository.get_share_by_uri(session, shareUri) - ds = DatasetRepository.get_dataset_by_uri(session, share.datasetUri) - return ds.stewards in groups or ds.SamlAdminGroupName in groups or username == ds.owner - @staticmethod def verify_principal_role(session, share: ShareObject) -> bool: log.info('Verifying principal IAM role...') @@ -68,27 +54,6 @@ def verify_principal_role(session, share: ShareObject) -> bool: principal_role = IAM.get_role_arn_by_name(account_id=env.AwsAccountId, region=env.region, role_name=role_name) return principal_role is not None - @staticmethod - def update_all_share_items_status( # TODO: moved to ShareObject #Test removed for the moment - session, shareUri, new_health_status: str, message, previous_health_status: str = None - ): - for item in ShareObjectRepository.get_all_shareable_items( - session, shareUri, healthStatus=previous_health_status - ): - ShareObjectRepository.update_share_item_health_status( - session, - share_item=item, - healthStatus=new_health_status, - healthMessage=message, - timestamp=datetime.now(), - ) - - @staticmethod - @ResourcePolicyService.has_resource_permission(GET_ENVIRONMENT) - def get_share_object_in_environment(uri, shareUri): - with get_context().db_engine.scoped_session() as session: - return ShareObjectRepository.get_share_by_uri(session, shareUri) - @staticmethod @ResourcePolicyService.has_resource_permission(GET_SHARE_OBJECT) def get_share_object(uri): @@ -111,7 +76,7 @@ def create_share_object( ): context = get_context() with context.db_engine.scoped_session() as session: - dataset: S3Dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri) + dataset: DatasetBase = DatasetBaseRepository.get_dataset_by_uri(session, dataset_uri) environment = EnvironmentService.get_environment_by_uri(session, uri) if environment.region != dataset.region: @@ -147,28 +112,32 @@ def create_share_object( cls._validate_group_membership(session, group_uri, environment.environmentUri) - share_policy_service = SharePolicyService( - account=environment.AwsAccountId, - region=environment.region, + share_policy_manager = PolicyManager( role_name=principal_iam_role_name, environmentUri=environment.environmentUri, + account=environment.AwsAccountId, + region=environment.region, resource_prefix=environment.resourcePrefix, ) - # Backwards compatibility - # we check if a managed share policy exists. If False, the role was introduced to data.all before this update - # We create the policy from the inline statements - # In this case it could also happen that the role is the Admin of the environment - if not share_policy_service.check_if_policy_exists(): - share_policy_service.create_managed_policy_from_inline_and_delete_inline() - # End of backwards compatibility - - attached = share_policy_service.check_if_policy_attached() - if not attached and not managed and not attachMissingPolicies: - raise Exception( - f'Required customer managed policy {share_policy_service.generate_policy_name()} is not attached to role {principal_iam_role_name}' - ) - elif not attached: - share_policy_service.attach_policy() + for Policy in [ + Policy for Policy in share_policy_manager.initializedPolicies if Policy.policy_type == 'SharePolicy' + ]: + # Backwards compatibility + # we check if a managed share policy exists. If False, the role was introduced to data.all before this update + # We create the policy from the inline statements + # In this case it could also happen that the role is the Admin of the environment + if not Policy.check_if_policy_exists(): + Policy.create_managed_policy_from_inline_and_delete_inline() + # End of backwards compatibility + + attached = Policy.check_if_policy_attached() + if not attached and not managed and not attachMissingPolicies: + raise Exception( + f'Required customer managed policy {Policy.generate_policy_name()} is not attached to role {principal_iam_role_name}' + ) + elif not attached: + Policy.attach_policy() + share = ShareObjectRepository.find_share(session, dataset, environment, principal_id, group_uri) already_existed = share is not None if not share: @@ -189,14 +158,6 @@ def create_share_object( item = ShareObjectRepository.get_share_item(session, item_type, item_uri) share_item = ShareObjectRepository.find_sharable_item(session, share.shareUri, item_uri) - s3_access_point_name = utils.slugify( - share.datasetUri + '-' + share.principalId, - max_length=50, - lowercase=True, - regex_pattern='[^a-zA-Z0-9-]', - separator='-', - ) - if not share_item and item: new_share_item: ShareObjectItem = ShareObjectItem( shareUri=share.shareUri, @@ -271,15 +232,6 @@ def submit_share_object(cls, uri: str): message='The request is empty of pending items. Add items to share request.', ) - env = EnvironmentService.get_environment_by_uri(session, share.environmentUri) - dashboard_enabled = EnvironmentService.get_boolean_env_param(session, env, 'dashboardsEnabled') - if dashboard_enabled: - share_table_items = ShareObjectRepository.find_all_share_items(session, uri, ShareableType.Table.value) - if share_table_items: - QuicksightClient.check_quicksight_enterprise_subscription( - AwsAccountId=env.AwsAccountId, region=env.region - ) - cls._run_transitions(session, share, states, ShareObjectActions.Submit) ShareNotificationService(session=session, dataset=dataset, share=share).notify_share_object_submission( @@ -468,7 +420,7 @@ def _run_transitions(session, share, share_items_states, action): @staticmethod def _get_share_data(session, uri): share = ShareObjectRepository.get_share_by_uri(session, uri) - dataset = DatasetRepository.get_dataset_by_uri(session, share.datasetUri) + dataset = DatasetBaseRepository.get_dataset_by_uri(session, share.datasetUri) share_items_states = ShareObjectRepository.get_share_items_states(session, uri) return share, dataset, share_items_states @@ -488,55 +440,3 @@ def _validate_group_membership(session, share_object_group, environment_uri): action=CREATE_SHARE_OBJECT, message=f'Team: {share_object_group} is not a member of the environment {environment_uri}', ) - - @staticmethod - def get_share_logs_name_query(shareUri): - log.info(f'Get share Logs stream name for share {shareUri}') - - query = f"""fields @logStream - |filter @message like '{shareUri}' - | sort @timestamp desc - | limit 1 - """ - return query - - @staticmethod - def get_share_logs_query(log_stream_name): - query = f"""fields @timestamp, @message, @logStream, @log as @logGroup - | sort @timestamp asc - | filter @logStream like "{log_stream_name}" - """ - return query - - @staticmethod - def get_share_logs(shareUri): - context = get_context() - if not ShareObjectService.check_view_log_permissions(context.username, context.groups, shareUri): - raise exceptions.ResourceUnauthorized( - username=context.username, - action='View Share Logs', - resource_uri=shareUri, - ) - - envname = os.getenv('envname', 'local') - log_group_name = f"/{Parameter().get_parameter(env=envname, path='resourcePrefix')}/{envname}/ecs/share-manager" - - query_for_name = ShareObjectService.get_share_logs_name_query(shareUri=shareUri) - name_query_result = CloudWatch.run_query( - query=query_for_name, - log_group_name=log_group_name, - days=1, - ) - if len(name_query_result) == 0: - return [] - - name = name_query_result[0]['logStream'] - - query = ShareObjectService.get_share_logs_query(log_stream_name=name) - results = CloudWatch.run_query( - query=query, - log_group_name=log_group_name, - days=1, - ) - log.info(f'Running Logs query {query} for log_group_name={log_group_name}') - return results diff --git a/backend/dataall/modules/shares_base/services/sharing_service.py b/backend/dataall/modules/shares_base/services/sharing_service.py index 7f81fca05..c6fe47c2f 100644 --- a/backend/dataall/modules/shares_base/services/sharing_service.py +++ b/backend/dataall/modules/shares_base/services/sharing_service.py @@ -19,9 +19,9 @@ ) from dataall.modules.shares_base.db.share_object_models import ShareObject from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository -from dataall.modules.s3_datasets_shares.services.share_object_service import ( +from dataall.modules.shares_base.services.share_object_service import ( ShareObjectService, -) # TODO move to shares_base in following PR +) from dataall.modules.shares_base.services.share_exceptions import ( PrincipalRoleNotFound, DatasetLockTimeout, diff --git a/tests/modules/s3_datasets_shares/test_share.py b/tests/modules/s3_datasets_shares/test_share.py index 6c25ed244..6334f9229 100644 --- a/tests/modules/s3_datasets_shares/test_share.py +++ b/tests/modules/s3_datasets_shares/test_share.py @@ -18,7 +18,6 @@ from dataall.modules.shares_base.db.share_object_models import ShareObject, ShareObjectItem from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareObjectRepository from dataall.modules.shares_base.db.share_object_state_machines import ShareItemSM, ShareObjectSM -from dataall.modules.s3_datasets_shares.services.share_object_service import ShareObjectService from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset