Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DATASET_READ_TABLE read permissions #1237

Merged
merged 20 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
from typing import Optional
from typing import Optional, List

from sqlalchemy.sql import and_


from dataall.core.permissions.db.permission.permission_models import Permission
from dataall.core.permissions.db.resource_policy.resource_policy_models import ResourcePolicy, ResourcePolicyPermission

Expand Down Expand Up @@ -71,16 +70,34 @@ def has_group_resource_permission(
else:
return policy

@staticmethod
def find_resource_policy(session, group_uri: str, resource_uri: str) -> ResourcePolicy:
resource_policy = (
session.query(ResourcePolicy)
.filter(
and_(
ResourcePolicy.principalId == group_uri,
ResourcePolicy.resourceUri == resource_uri,
)
)
.first()
def query_all_resource_policies(session, group_uri: str, resource_uri: str, resource_type: str = None):
resource_policy = session.query(ResourcePolicy).filter(
ResourcePolicy.resourceUri == resource_uri,
)
if group_uri is not None:
resource_policy = resource_policy.filter(
ResourcePolicy.principalId == group_uri,
)

if resource_type is not None:
resource_policy = resource_policy.filter(
ResourcePolicy.resourceType == resource_type,
)

return resource_policy

@staticmethod
def find_resource_policy(session, group_uri: str, resource_uri: str, resource_type: str = None) -> ResourcePolicy:
resource_policy = ResourcePolicyRepository.query_all_resource_policies(
session, group_uri, resource_uri, resource_type
)
return resource_policy.first()

@staticmethod
def find_all_resource_policies(
session, group_uri: str, resource_uri: str, resource_type: str = None
) -> List[ResourcePolicy]:
resource_policy = ResourcePolicyRepository.query_all_resource_policies(
session, group_uri, resource_uri, resource_type
)
return resource_policy.all()
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ def get_resource_uri(self) -> str: ...

class ResourcePolicyRequestValidationService:
@staticmethod
def validate_find_or_delete_resource_policy_params(group_uri, resource_uri):
def validate_find_or_delete_resource_policy_params(group_uri, resource_uri, resource_type):
if not group_uri:
raise exceptions.RequiredParameter(param_name='group')
if not resource_type:
raise exceptions.RequiredParameter(param_name='group | resource_type')
if not resource_uri:
raise exceptions.RequiredParameter(param_name='resource_uri')

Expand Down Expand Up @@ -75,21 +76,49 @@ def check_user_resource_permission(session, username: str, groups: [str], resour
else:
return resource_policy

@staticmethod
def find_resource_policies(session, group, resource_uri, resource_type):
"""

:param session:
:param group: The group URI for which to find the resource policy. If None -- resource policies for all groups will be found
:param resource_uri: The resource URI for which to find the resource policy.
:param resource_type: The type of the resource. -- required, if the group is None
:return: list: A list of ResourcePolicy objects matching the given criteria.
"""
ResourcePolicyRequestValidationService.validate_find_or_delete_resource_policy_params(
group, resource_uri, resource_type
)
policies = ResourcePolicyRepository.find_all_resource_policies(
session, group_uri=group, resource_uri=resource_uri, resource_type=resource_type
)
return policies

@staticmethod
def delete_resource_policy(
session,
group: str,
resource_uri: str,
resource_type: str = None,
) -> bool:
ResourcePolicyRequestValidationService.validate_find_or_delete_resource_policy_params(group, resource_uri)
policy = ResourcePolicyRepository.find_resource_policy(session, group_uri=group, resource_uri=resource_uri)
if policy:
for permission in policy.permissions:
session.delete(permission)
session.delete(policy)
"""
Deletes all resources policy for given group, resource_uri, resource_type
:param session:
:param group:
:param resource_uri:
:param resource_type:
:return:
"""
policies = ResourcePolicyService.find_resource_policies(session, group, resource_uri, resource_type)
noah-paige marked this conversation as resolved.
Show resolved Hide resolved
try:
for policy in policies:
for permission in policy.permissions:
session.delete(permission)
session.delete(policy)
session.commit()

except Exception as e:
session.rollback()
raise e
return True

@staticmethod
Expand Down Expand Up @@ -193,6 +222,7 @@ def get_resource_policy_permissions(session, group_uri, resource_uri) -> List[Re
permissions.append(p.permission)
return permissions

@staticmethod
def has_resource_permission(
permission: str, param_name: str = None, resource_name: str = None, parent_resource: Callable = None
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ShareObjectRepository,
ShareItemSM,
)
from dataall.modules.dataset_sharing.services.share_item_service import ShareItemService
from dataall.modules.dataset_sharing.services.share_object_service import ShareObjectService
from dataall.modules.dataset_sharing.services.share_processors.lakeformation_process_share import (
ProcessLakeFormationShare,
Expand All @@ -23,7 +24,7 @@
ShareItemHealthStatus,
ShareObjectActions,
ShareItemStatus,
ShareableType,
PrincipalType,
)
from dataall.modules.datasets.db.dataset_models import DatasetLock

Expand Down Expand Up @@ -72,6 +73,10 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
new_share_state = share_sm.run_transition(ShareObjectActions.Start.value)
share_sm.update_state(session, share, new_share_state)

need_grant_permissions = (
share.groupUri != dataset.SamlAdminGroupName and share.principalType == PrincipalType.Group.value
)

(shared_tables, shared_folders, shared_buckets) = ShareObjectRepository.get_share_data_items(
session, share_uri, ShareItemStatus.Share_Approved.value
)
Expand Down Expand Up @@ -176,6 +181,10 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value)
share_sm.update_state(session, share, new_share_state)

log.info('Attaching TABLE/FOLDER READ permissions to successfully shared items...')
ShareObjectService.attach_dataset_table_read_permission(session, share)
ShareObjectService.attach_dataset_folder_read_permission(session, share)

return share_successful

except Exception as e:
Expand Down Expand Up @@ -281,7 +290,9 @@ def revoke_share(cls, engine: Engine, share_uri: str):
env_group,
)
log.info(f'revoking folders succeeded = {revoked_folders_succeed}')

if share.groupUri != dataset.SamlAdminGroupName and share.principalType == PrincipalType.Group.value:
log.info('Deleting FOLDER READ permissions...')
ShareItemService.delete_dataset_folder_read_permission(session, share)
log.info('Revoking permissions to S3 buckets')

revoked_s3_buckets_succeed = ProcessS3BucketShare.process_revoked_shares(
Expand All @@ -300,7 +311,11 @@ def revoke_share(cls, engine: Engine, share_uri: str):
revoked_tables_succeed = ProcessLakeFormationShare(
session, dataset, share, revoked_tables, source_environment, target_environment, env_group
).process_revoked_shares()

log.info(f'revoking tables succeeded = {revoked_tables_succeed}')
if share.groupUri != dataset.SamlAdminGroupName and share.principalType == PrincipalType.Group.value:
log.info('Deleting TABLE READ permissions...')
ShareItemService.delete_dataset_table_read_permission(session, share)

existing_pending_items = ShareObjectRepository.check_pending_share_items(session, share_uri)
if existing_pending_items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
ShareItemStatus,
ShareItemActions,
ShareItemHealthStatus,
PrincipalType,
)
from dataall.modules.dataset_sharing.aws.glue_client import GlueClient
from dataall.modules.dataset_sharing.db.share_object_models import ShareObjectItem
Expand Down Expand Up @@ -102,11 +101,6 @@ def revoke_items_share_object(uri, revoked_uris):

share_sm.update_state(session, share, new_share_state)

if share.groupUri != dataset.SamlAdminGroupName and share.principalType == PrincipalType.Group.value:
log.info('Deleting TABLE/FOLDER READ permissions...')
ShareItemService._delete_dataset_table_read_permission(session, share)
ShareItemService._delete_dataset_folder_read_permission(session, share)

ShareNotificationService(session=session, dataset=dataset, share=share).notify_share_object_rejection(
email_id=context.username
)
Expand Down Expand Up @@ -243,25 +237,25 @@ def _get_glue_database_for_share(glueDatabase, account_id, region):
raise e

@staticmethod
def _delete_dataset_table_read_permission(session, share):
def delete_dataset_table_read_permission(session, share):
"""
Delete Table permissions to share groups
"""
share_table_items = ShareObjectRepository.find_all_share_items(
session, share.shareUri, ShareableType.Table.value, [ShareItemStatus.Revoke_Approved.value]
session, share.shareUri, ShareableType.Table.value, [ShareItemStatus.Revoke_Succeeded.value]
)
for table in share_table_items:
ResourcePolicyService.delete_resource_policy(
session=session, group=share.groupUri, resource_uri=table.itemUri
)

@staticmethod
def _delete_dataset_folder_read_permission(session, share):
def delete_dataset_folder_read_permission(session, share):
"""
Delete Folder permissions to share groups
"""
share_folder_items = ShareObjectRepository.find_all_share_items(
session, share.shareUri, ShareableType.StorageLocation.value, [ShareItemStatus.Revoke_Approved.value]
session, share.shareUri, ShareableType.StorageLocation.value, [ShareItemStatus.Revoke_Succeeded.value]
)
for location in share_folder_items:
ResourcePolicyService.delete_resource_policy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ShareItemStatus,
ShareObjectStatus,
PrincipalType,
ShareItemHealthStatus,
)
from dataall.modules.dataset_sharing.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.dataset_sharing.db.share_object_repositories import (
Expand Down Expand Up @@ -312,11 +311,6 @@ def approve_share_object(cls, uri: str):

cls._run_transitions(session, share, states, ShareObjectActions.Approve)

if share.groupUri != dataset.SamlAdminGroupName and share.principalType == PrincipalType.Group.value:
log.info('Attaching TABLE/FOLDER READ permissions...')
ShareObjectService._attach_dataset_table_read_permission(session, share)
ShareObjectService._attach_dataset_folder_read_permission(session, share)

share.rejectPurpose = ''
session.commit()

Expand Down Expand Up @@ -531,12 +525,12 @@ def _validate_group_membership(session, share_object_group, environment_uri):
)

@staticmethod
def _attach_dataset_table_read_permission(session, share):
def attach_dataset_table_read_permission(session, share):
"""
Attach Table permissions to share groups
"""
share_table_items = ShareObjectRepository.find_all_share_items(
session, share.shareUri, ShareableType.Table.value, [ShareItemStatus.Share_Approved.value]
session, share.shareUri, ShareableType.Table.value, [ShareItemStatus.Share_Succeeded.value]
)
for table in share_table_items:
ResourcePolicyService.attach_resource_policy(
Expand All @@ -548,12 +542,12 @@ def _attach_dataset_table_read_permission(session, share):
)

@staticmethod
def _attach_dataset_folder_read_permission(session, share):
def attach_dataset_folder_read_permission(session, share):
"""
Attach Table permissions to share groups
"""
share_folder_items = ShareObjectRepository.find_all_share_items(
session, share.shareUri, ShareableType.StorageLocation.value, [ShareItemStatus.Share_Approved.value]
session, share.shareUri, ShareableType.StorageLocation.value, [ShareItemStatus.Share_Succeeded.value]
)
for location in share_folder_items:
ResourcePolicyService.attach_resource_policy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def process_revoked_shares(self) -> bool:

new_state = revoked_item_SM.run_transition(ShareItemActions.Success.value)
revoked_item_SM.update_state_single_item(self.session, share_item, new_state)

ShareObjectRepository.update_share_item_health_status(
self.session, share_item, None, None, share_item.lastVerificationTime
)
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/modules/datasets/api/table/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def delete_table(context, source, tableUri: str = None):
def preview(context, source, tableUri: str = None):
if not tableUri:
return None
return DatasetTableService.preview(table_uri=tableUri)
return DatasetTableService.preview(uri=tableUri)


def get_glue_table_properties(context: Context, source: DatasetTable, **kwargs):
Expand Down
22 changes: 16 additions & 6 deletions backend/dataall/modules/datasets/services/dataset_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def delete_table(uri: str):
DatasetService.check_before_delete(session, table.tableUri, action=DELETE_DATASET_TABLE)
DatasetService.execute_on_delete(session, table.tableUri, action=DELETE_DATASET_TABLE)
DatasetTableRepository.delete(session, table)
DatasetTableService._delete_dataset_table_read_permission(session, uri)
noah-paige marked this conversation as resolved.
Show resolved Hide resolved

GlossaryRepository.delete_glossary_terms_links(
session, target_uri=table.tableUri, target_type='DatasetTable'
Expand All @@ -77,10 +78,10 @@ def delete_table(uri: str):
return True

@staticmethod
def preview(table_uri: str):
def preview(uri: str):
context = get_context()
with context.db_engine.scoped_session() as session:
table: DatasetTable = DatasetTableRepository.get_dataset_table_by_uri(session, table_uri)
table: DatasetTable = DatasetTableRepository.get_dataset_table_by_uri(session, uri)
dataset = DatasetRepository.get_dataset_by_uri(session, table.datasetUri)
if (
ConfidentialityClassification.get_confidentiality_level(dataset.confidentiality)
Expand Down Expand Up @@ -111,7 +112,7 @@ def sync_tables_for_dataset(cls, uri):
dataset = DatasetRepository.get_dataset_by_uri(session, uri)
S3Prefix = dataset.S3BucketName
tables = DatasetCrawler(dataset).list_glue_database_tables(S3Prefix)
cls.sync_existing_tables(session, dataset.datasetUri, glue_tables=tables)
cls.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables)
DatasetTableIndexer.upsert_all(session=session, dataset_uri=dataset.datasetUri)
DatasetTableIndexer.remove_all_deleted(session=session, dataset_uri=dataset.datasetUri)
return DatasetRepository.paginated_dataset_tables(
Expand All @@ -121,10 +122,10 @@ def sync_tables_for_dataset(cls, uri):
)

@staticmethod
def sync_existing_tables(session, dataset_uri, glue_tables=None):
dataset: Dataset = DatasetRepository.get_dataset_by_uri(session, dataset_uri)
def sync_existing_tables(session, uri, glue_tables=None):
dataset: Dataset = DatasetRepository.get_dataset_by_uri(session, uri)
if dataset:
existing_tables = DatasetTableRepository.find_dataset_tables(session, dataset_uri)
existing_tables = DatasetTableRepository.find_dataset_tables(session, uri)
existing_table_names = [e.GlueTableName for e in existing_tables]
existing_dataset_tables_map = {t.GlueTableName: t for t in existing_tables}

Expand Down Expand Up @@ -161,3 +162,12 @@ def _attach_dataset_table_permission(session, dataset: Dataset, table_uri):
resource_uri=table_uri,
resource_type=DatasetTable.__name__,
)

@staticmethod
def _delete_dataset_table_read_permission(session, table_uri):
"""
Delete Table permissions to dataset groups
"""
ResourcePolicyService.delete_resource_policy(
session=session, group=None, resource_uri=table_uri, resource_type=DatasetTable.__name__
)
2 changes: 1 addition & 1 deletion backend/dataall/modules/datasets/tasks/tables_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def sync_tables(engine):

log.info(f'Found {len(tables)} tables on Glue database {dataset.GlueDatabaseName}')

DatasetTableService.sync_existing_tables(session, dataset.datasetUri, glue_tables=tables)
DatasetTableService.sync_existing_tables(session, uri=dataset.datasetUri, glue_tables=tables)

tables = session.query(DatasetTable).filter(DatasetTable.datasetUri == dataset.datasetUri).all()

Expand Down
Loading
Loading