Skip to content

Commit

Permalink
Redshift data sharing - Added methods from sharing back to redshift d…
Browse files Browse the repository at this point in the history
…atasets (check_on_delete, list_shared_datasets...) (#1511)

### Feature or Bugfix
- Feature

### Detail
Complete design in #955.
This particular PR is focused on adding missing functionalities in
redshift_datasets that need to be implemented inside redshift_datasets.

For example, when we delete a redshift dataset we would want to first
check if there are any share requests shared for that dataset. To avoid
circular dependencies it is required to use an interface in the same way
it was implemented for S3.

In this PR:
- Add `RedshiftShareDatasetService(DatasetServiceInterface)` class and
implement required abstract methods (check_on_delete,
resolve_user_shared_datasets.....
- Use this class in redshift_Datasets module in resolvers, on dataset
deletion...
- Some of the code was very similar to the db queries implemented by S3
datasets; for this reason in this PR some of the queries are moved to
the generic ShareObjectRepository to be reused by both types of dataset

### Relates
- #955 

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
dlpzx authored Sep 4, 2024
1 parent ae16cc8 commit 9840995
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ def resolve_dataset_stewards_group(
return source.stewards


def resolve_user_role(context: Context, source: RedshiftDataset, **kwargs):
def resolve_user_role(
context: Context, source: RedshiftDataset, **kwargs
): # TODO- duplicated with S3 datasets - follow-up PR
if not source:
return None
if source.owner == context.username:
Expand All @@ -116,6 +118,13 @@ def resolve_user_role(context: Context, source: RedshiftDataset, **kwargs):
return DatasetRole.Admin.value
elif source.stewards in context.groups:
return DatasetRole.DataSteward.value
else:
with context.engine.scoped_session() as session:
other_modules_user_role = RedshiftDatasetService.get_other_modules_dataset_user_role(
session, source.datasetUri, context.username, context.groups
)
if other_modules_user_role is not None:
return other_modules_user_role
return DatasetRole.NoPermission.value


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging

from typing import List
from dataall.base.context import get_context
from dataall.base.db.paginator import paginate_list
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
Expand All @@ -12,6 +12,7 @@

from dataall.modules.datasets_base.services.datasets_enums import DatasetRole
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.datasets_base.services.dataset_service_interface import DatasetServiceInterface

from dataall.modules.redshift_datasets.services.redshift_dataset_permissions import (
MANAGE_REDSHIFT_DATASETS,
Expand Down Expand Up @@ -45,6 +46,46 @@


class RedshiftDatasetService:
_interfaces: List[DatasetServiceInterface] = []

@classmethod
def register(cls, interface: DatasetServiceInterface):
cls._interfaces.append(interface)

@classmethod
def get_other_modules_dataset_user_role(cls, session, uri, username, groups) -> str:
"""All other user role types that might come from other modules"""
for interface in cls._interfaces:
role = interface.resolve_additional_dataset_user_role(session, uri, username, groups)
if role is not None:
return role
return None

@classmethod
def check_before_delete(cls, session, uri, **kwargs) -> bool:
"""All actions from other modules that need to be executed before deletion"""
can_be_deleted = [interface.check_before_delete(session, uri, **kwargs) for interface in cls._interfaces]
return all(can_be_deleted)

@classmethod
def execute_on_delete(cls, session, uri, **kwargs) -> bool:
"""All actions from other modules that need to be executed during deletion"""
for interface in cls._interfaces:
interface.execute_on_delete(session, uri, **kwargs)
return True

@classmethod
def _attach_additional_steward_permissions(cls, session, dataset, new_stewards):
"""All permissions from other modules that need to be granted to stewards"""
for interface in cls._interfaces:
interface.extend_attach_steward_permissions(session, dataset, new_stewards)

@classmethod
def _delete_additional_steward_permissions(cls, session, dataset):
"""All permissions from other modules that need to be deleted to stewards"""
for interface in cls._interfaces:
interface.extend_delete_steward_permissions(session, dataset)

@staticmethod
@TenantPolicyService.has_tenant_permission(MANAGE_REDSHIFT_DATASETS)
@ResourcePolicyService.has_resource_permission(IMPORT_REDSHIFT_DATASET)
Expand Down Expand Up @@ -120,14 +161,16 @@ def delete_redshift_dataset(uri):
with context.db_engine.scoped_session() as session:
dataset: RedshiftDataset = RedshiftDatasetRepository.get_redshift_dataset_by_uri(session, uri)

# TODO: when adding sharing, add check_on_delete for shared items
RedshiftDatasetService.check_before_delete(session, uri, action=DELETE_REDSHIFT_DATASET)
tables: [RedshiftTable] = RedshiftDatasetRepository.list_redshift_dataset_tables(
session, dataset.datasetUri
)
for table in tables:
DatasetTableIndexer.delete_doc(doc_id=table.rsTableUri)
session.delete(table)

RedshiftDatasetService.execute_on_delete(session, uri, action=DELETE_REDSHIFT_DATASET)

ResourcePolicyService.delete_resource_policy(
session=session, resource_uri=uri, group=dataset.SamlAdminGroupName
)
Expand Down Expand Up @@ -350,7 +393,7 @@ def _transfer_stewardship_to_owners(session, dataset):
group=dataset.stewards,
resource_uri=dataset.datasetUri,
)

RedshiftDatasetService._delete_additional_steward_permissions(session, dataset)
return dataset

@staticmethod
Expand All @@ -368,4 +411,5 @@ def _transfer_stewardship_to_new_stewards(session, dataset, new_stewards):
resource_uri=dataset.datasetUri,
resource_type=RedshiftDataset.__name__,
)
RedshiftDatasetService._attach_additional_steward_permissions(session, dataset, new_stewards)
return dataset
7 changes: 7 additions & 0 deletions backend/dataall/modules/redshift_datasets_shares/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ def __init__(self):
from dataall.modules.shares_base.services.shares_enums import ShareableType
from dataall.modules.shares_base.services.share_object_service import ShareObjectService
from dataall.modules.redshift_datasets.db.redshift_models import RedshiftTable
from dataall.modules.redshift_datasets.services.redshift_dataset_service import RedshiftDatasetService
from dataall.modules.redshift_datasets_shares.services.redshift_table_share_processor import (
ProcessRedshiftShare,
)
from dataall.modules.redshift_datasets_shares.services.redshift_table_share_validator import (
RedshiftTableValidator,
)
from dataall.modules.datasets_base.services.dataset_list_service import DatasetListService
from dataall.modules.redshift_datasets_shares.services.redshift_share_dataset_service import (
RedshiftShareDatasetService,
)

EnvironmentResourceManager.register(RedshiftShareEnvironmentResource())

Expand All @@ -49,6 +54,8 @@ def __init__(self):
ShareableType.RedshiftTable, ProcessRedshiftShare, RedshiftTable, RedshiftTable.rsTableUri
)
)
RedshiftDatasetService.register(RedshiftShareDatasetService())
DatasetListService.register(RedshiftShareDatasetService())

ShareObjectService.register_validator(dataset_type=DatasetTypes.Redshift, validator=RedshiftTableValidator)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.base.db import exceptions
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository
from dataall.modules.shares_base.services.share_permissions import SHARE_OBJECT_APPROVER
from dataall.modules.redshift_datasets.services.redshift_dataset_permissions import DELETE_REDSHIFT_DATASET
from dataall.modules.datasets_base.services.datasets_enums import DatasetRole, DatasetTypes
from dataall.modules.datasets_base.services.dataset_service_interface import DatasetServiceInterface


import logging

log = logging.getLogger(__name__)


class RedshiftShareDatasetService(DatasetServiceInterface):
@property
def dataset_type(self):
return DatasetTypes.Redshift

@staticmethod
def resolve_additional_dataset_user_role(session, uri, username, groups):
"""Implemented as part of the DatasetServiceInterface"""
share = ShareObjectRepository.find_share_by_dataset_attributes(session, uri, username, groups)
if share is not None:
return DatasetRole.Shared.value
return None

@staticmethod
def check_before_delete(session, uri, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
action = kwargs.get('action')
if action in [DELETE_REDSHIFT_DATASET]:
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
shares = ShareObjectRepository.list_dataset_shares_with_existing_shared_items(
session=session, dataset_uri=uri, share_item_shared_states=share_item_shared_states
)
log.info(f'Found {len(shares)} shares for dataset {uri}')
for share in shares:
log.info(f'Share {share.shareUri} items, {share.status}')
if shares:
raise exceptions.ResourceShared(
action=DELETE_REDSHIFT_DATASET,
message='Revoke all dataset shares before deletion.',
)
return True

@staticmethod
def execute_on_delete(session, uri, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
action = kwargs.get('action')
if action in [DELETE_REDSHIFT_DATASET]:
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
ShareObjectRepository.delete_dataset_shares_with_no_shared_items(session, uri, share_item_shared_states)
return True

@staticmethod
def append_to_list_user_datasets(session, username, groups):
"""Implemented as part of the DatasetServiceInterface"""
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
return ShareObjectRepository.list_user_shared_datasets(
session, username, groups, share_item_shared_states, DatasetTypes.Redshift
)

@staticmethod
def extend_attach_steward_permissions(session, dataset, new_stewards, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
dataset_shares = ShareObjectRepository.find_dataset_shares(session, dataset.datasetUri)
if dataset_shares:
for share in dataset_shares:
ResourcePolicyService.attach_resource_policy(
session=session,
group=new_stewards,
permissions=SHARE_OBJECT_APPROVER,
resource_uri=share.shareUri,
resource_type=ShareObject.__name__,
)
if dataset.stewards != dataset.SamlAdminGroupName:
ResourcePolicyService.delete_resource_policy(
session=session,
group=dataset.stewards,
resource_uri=share.shareUri,
)

@staticmethod
def extend_delete_steward_permissions(session, dataset, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
dataset_shares = ShareObjectRepository.find_dataset_shares(session, dataset.datasetUri)
if dataset_shares:
for share in dataset_shares:
if dataset.stewards != dataset.SamlAdminGroupName:
ResourcePolicyService.delete_resource_policy(
session=session,
group=dataset.stewards,
resource_uri=share.shareUri,
)
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _attach_additional_steward_permissions(cls, session, dataset, new_stewards):
interface.extend_attach_steward_permissions(session, dataset, new_stewards)

@classmethod
def _delete_additional_steward__permissions(cls, session, dataset):
def _delete_additional_steward_permissions(cls, session, dataset):
"""All permissions from other modules that need to be deleted to stewards"""
for interface in cls._interfaces:
interface.extend_delete_steward_permissions(session, dataset)
Expand Down Expand Up @@ -511,7 +511,7 @@ def _transfer_stewardship_to_owners(session, dataset):
resource_uri=tableUri,
)

DatasetService._delete_additional_steward__permissions(session, dataset)
DatasetService._delete_additional_steward_permissions(session, dataset)
return dataset

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,41 +126,6 @@ def find_all_other_share_items(
query = query.filter(ShareObjectItem.status.in_(item_status))
return query.all()

@staticmethod
def list_user_s3_shared_datasets(session, username, groups) -> Query:
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
query = (
session.query(DatasetBase)
.outerjoin(
ShareObject,
ShareObject.datasetUri == DatasetBase.datasetUri,
)
.outerjoin(ShareObjectItem, ShareObjectItem.shareUri == ShareObject.shareUri)
.filter(
and_(
or_(
ShareObject.principalId.in_(groups),
ShareObject.owner == username,
),
ShareObjectItem.status.in_(share_item_shared_states),
ShareObjectItem.itemType.in_(
[ShareableType.Table.value, ShareableType.S3Bucket.value, ShareableType.StorageLocation.value]
),
)
)
)
return query.distinct(DatasetBase.datasetUri)

@staticmethod
def get_share_by_dataset_attributes(session, dataset_uri, dataset_owner, groups=[]):
share: ShareObject = (
session.query(ShareObject)
.filter(ShareObject.datasetUri == dataset_uri)
.filter(or_(ShareObject.owner == dataset_owner, ShareObject.principalId.in_(groups)))
.first()
)
return share

@staticmethod
def check_other_approved_share_item_table_exists(session, environment_uri, item_uri, share_item_uri):
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
Expand Down Expand Up @@ -315,54 +280,6 @@ def check_existing_s3_shared_items(session, item_uri: str) -> int:
def delete_s3_share_item(session, item_uri: str):
session.query(ShareObjectItem).filter(ShareObjectItem.itemUri == item_uri).delete()

@staticmethod
def delete_s3_shares_with_no_shared_items(session, dataset_uri):
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
shares = (
session.query(ShareObject)
.outerjoin(ShareObjectItem, ShareObjectItem.shareUri == ShareObject.shareUri)
.filter(
and_(
ShareObject.datasetUri == dataset_uri,
ShareObjectItem.status.notin_(share_item_shared_states),
)
)
.all()
)
for share in shares:
share_items = session.query(ShareObjectItem).filter(ShareObjectItem.shareUri == share.shareUri).all()
for item in share_items:
session.delete(item)

share_obj = session.query(ShareObject).filter(ShareObject.shareUri == share.shareUri).first()
session.delete(share_obj)

@staticmethod
def find_s3_dataset_shares(session, dataset_uri):
return session.query(ShareObject).filter(ShareObject.datasetUri == dataset_uri).all()

@staticmethod
def list_s3_dataset_shares_with_existing_shared_items(
session, dataset_uri, environment_uri=None, item_type=None
) -> [ShareObject]:
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
query = (
session.query(ShareObject)
.outerjoin(ShareObjectItem, ShareObjectItem.shareUri == ShareObject.shareUri)
.filter(
and_(
ShareObject.datasetUri == dataset_uri,
ShareObject.deleted.is_(None),
ShareObjectItem.status.in_(share_item_shared_states),
)
)
)
if environment_uri:
query = query.filter(ShareObject.environmentUri == environment_uri)
if item_type:
query = query.filter(ShareObjectItem.itemType == item_type)
return query.all()

# the next 2 methods are used in subscription task
@staticmethod
def find_share_items_by_item_uri(session, item_uri):
Expand Down
Loading

0 comments on commit 9840995

Please sign in to comment.