Skip to content

Commit

Permalink
Introduce sharing State Machines (data-dot-all#267)
Browse files Browse the repository at this point in the history
- Feature
- Refactoring

- Embed sharing actions into code defined State machines
- Redesign the sharing workflow to keep track of revoke of access
- Adjust and simplify sharing managers and tasks - separation of approval and revoke process
- Revoke items in batch
- Added info in share statistics in inbox and outbox
- Added testing for S3 and LakeFormation share managers and for sharing APIs

(Check out the conversation in the below issue for more details)

- data-dot-all#255

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Co-authored-by: Dennis Goldner <[email protected]>
  • Loading branch information
2 people authored and noah-paige committed Mar 2, 2023
1 parent 75d97b8 commit a9c1829
Show file tree
Hide file tree
Showing 17 changed files with 723 additions and 141 deletions.
38 changes: 22 additions & 16 deletions backend/dataall/api/Objects/ShareObject/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def approve_share_object(context: Context, source, shareUri: str = None):

def reject_share_object(context: Context, source, shareUri: str = None):
with context.engine.scoped_session() as session:
return db.api.ShareObject.reject_share_object(
return db.api.ShareObject.reject_share_object(
session=session,
username=context.username,
Expand All @@ -97,6 +98,22 @@ def reject_share_object(context: Context, source, shareUri: str = None):
)


def revoke_items_share_object(context: Context, source, input):
with context.engine.scoped_session() as session:
share = db.api.ShareObject.revoke_items_share_object(
session=session,
username=context.username,
groups=context.groups,
uri=input.get("shareUri"),
data=input,
check_perm=True,
)

revoke_share_task: models.Task = models.Task(
action='ecs.share.revoke',
targetUri=input.get("shareUri"),


def revoke_items_share_object(context: Context, source, input):
with context.engine.scoped_session() as session:
share = db.api.ShareObject.revoke_items_share_object(
Expand All @@ -114,8 +131,10 @@ def revoke_items_share_object(context: Context, source, input):
payload={'environmentUri': share.environmentUri},
)
session.add(revoke_share_task)
session.add(revoke_share_task)

Worker.queue(engine=context.engine, task_ids=[revoke_share_task.taskUri])
Worker.queue(engine=context.engine, task_ids=[revoke_share_task.taskUri])

return share

Expand All @@ -126,6 +145,7 @@ def delete_share_object(context: Context, source, shareUri: str = None):
if not share:
raise db.exceptions.ObjectNotFound('ShareObject', shareUri)


db.api.ShareObject.delete_share_object(
session=session,
username=context.username,
Expand All @@ -136,6 +156,8 @@ def delete_share_object(context: Context, source, shareUri: str = None):

return True

return True


def add_shared_item(context, source, shareUri: str = None, input: dict = None):
with context.engine.scoped_session() as session:
Expand Down Expand Up @@ -297,22 +319,6 @@ def resolve_group(context: Context, source: models.ShareObject, **kwargs):
return source.groupUri


def resolve_consumption_data(context: Context, source: models.ShareObject, **kwargs):
if not source:
return None
with context.engine.scoped_session() as session:
ds: models.Dataset = db.api.Dataset.get_dataset_by_uri(session, source.datasetUri)
if ds:
S3AccessPointName = utils.slugify(
source.datasetUri + '-' + source.principalId,
max_length=50, lowercase=True, regex_pattern='[^a-zA-Z0-9-]', separator='-'
)
return {
's3AccessPointName': S3AccessPointName,
'sharedGlueDatabase': (ds.GlueDatabaseName + '_shared_' + source.shareUri)[:254] if ds else 'Not created',
}


def resolve_share_object_statistics(context: Context, source: models.ShareObject, **kwargs):
if not source:
return None
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/api/Objects/ShareObject/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
gql.Field(name='shareItemUri', type=gql.ID),
gql.Field('itemUri', gql.String),
gql.Field(name='status', type=gql.Ref('ShareItemStatus')),
gql.Field(name='status', type=gql.Ref('ShareItemStatus')),
gql.Field(name='action', type=gql.String),
gql.Field('itemType', ShareableType.toGraphQLEnum()),
gql.Field('itemName', gql.String),
Expand Down Expand Up @@ -116,7 +117,6 @@
gql.Field(name='updated', type=gql.String),
gql.Field(name='datasetUri', type=gql.String),
gql.Field(name='dataset', type=DatasetLink, resolver=resolve_dataset),
gql.Field(name='consumptionData', type=gql.Ref('ConsumptionData'), resolver=resolve_consumption_data),
gql.Field(name='existingSharedItems', type=gql.Boolean, resolver=resolve_existing_shared_items),
gql.Field(
name='statistics',
Expand Down
1 change: 1 addition & 0 deletions backend/dataall/db/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .notification import Notification
from .redshift_cluster import RedshiftCluster
from .vpc import Vpc
from .share_object import ShareObject, ShareObjectSM, ShareItemSM
from .notebook import Notebook
from .sgm_studio_notebook import SgmStudioNotebook
from .dashboard import Dashboard
Expand Down
18 changes: 15 additions & 3 deletions backend/dataall/db/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
)
from . import Organization
from .. import models, api, exceptions, permissions, paginate
from ..models.Enums import Language, ConfidentialityClassification
from ..models.Enums import Language, ConfidentialityClassification, ShareObjectStatus, ShareItemStatus
from ...utils.naming_convention import (
NamingConventionService,
NamingConventionPattern,
)

logger = logging.getLogger(__name__)

SHARE_ITEM_SHARED_STATES = [
ShareItemStatus.Share_Succeeded.value,
ShareItemStatus.Share_In_Progress.value,
ShareItemStatus.Revoke_In_Progress.value,
ShareItemStatus.Revoke_Approved.value,
ShareItemStatus.Revoke_Failed.value,
]


class Dataset:
@staticmethod
Expand Down Expand Up @@ -227,18 +235,22 @@ def query_user_datasets(session, username, groups, filter) -> Query:
models.ShareObjectItem,
models.ShareObjectItem.shareUri == models.ShareObject.shareUri
)
.outerjoin(
models.ShareObjectItem,
models.ShareObjectItem.shareUri == models.ShareObject.shareUri
)
.filter(
or_(
models.Dataset.owner == username,
models.Dataset.SamlAdminGroupName.in_(groups),
models.Dataset.stewards.in_(groups),
and_(
models.ShareObject.principalId.in_(groups),
models.ShareObjectItem.status.in_(share_item_shared_states),
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES),
),
and_(
models.ShareObject.owner == username,
models.ShareObjectItem.status.in_(share_item_shared_states),
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES),
),
)
)
Expand Down
14 changes: 13 additions & 1 deletion backend/dataall/db/api/dataset_location.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
import logging
from typing import List

from sqlalchemy import and_, or_
from sqlalchemy import and_, or_

from . import has_tenant_perm, has_resource_perm, Glossary
from .. import models, api, paginate, permissions, exceptions
from .dataset import Dataset
from ..models.Enums import ShareItemStatus

logger = logging.getLogger(__name__)


SHARE_ITEM_SHARED_STATES = [
ShareItemStatus.Share_Succeeded.value,
ShareItemStatus.Share_In_Progress.value,
ShareItemStatus.Revoke_Failed.value,
ShareItemStatus.Revoke_In_Progress.value,
ShareItemStatus.Revoke_Approved.value,
ShareItemStatus.Revoke_Failed.value,
]


class DatasetStorageLocation:
@staticmethod
@has_tenant_perm(permissions.MANAGE_DATASETS)
Expand Down Expand Up @@ -153,7 +165,7 @@ def delete_dataset_location(
.filter(
and_(
models.ShareObjectItem.itemUri == location.locationUri,
models.ShareObjectItem.status.in_(share_item_shared_states)
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES)
)
)
.first()
Expand Down
14 changes: 12 additions & 2 deletions backend/dataall/db/api/dataset_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@
from .. import models, api, permissions, exceptions, paginate
from . import has_tenant_perm, has_resource_perm, Glossary, ResourcePolicy, Environment
from ..models import Dataset
from ..models.Enums import ShareItemStatus
from ...utils import json_utils

logger = logging.getLogger(__name__)

SHARE_ITEM_SHARED_STATES = [
ShareItemStatus.Share_Succeeded.value,
ShareItemStatus.Share_In_Progress.value,
ShareItemStatus.Revoke_Failed.value,
ShareItemStatus.Revoke_In_Progress.value,
ShareItemStatus.Revoke_Approved.value,
ShareItemStatus.Revoke_Failed.value,
]


class DatasetTable:
@staticmethod
Expand Down Expand Up @@ -154,7 +164,7 @@ def delete_dataset_table(
.filter(
and_(
models.ShareObjectItem.itemUri == table.tableUri,
models.ShareObjectItem.status.in_(share_item_shared_states)
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES)
)
)
.first()
Expand Down Expand Up @@ -197,7 +207,7 @@ def query_dataset_tables_shared_with_env(
models.ShareObject.datasetUri == dataset_uri, # for this dataset
models.ShareObject.environmentUri
== environment_uri, # for this environment
models.ShareObjectItem.status.in_(share_item_shared_states),
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES),
)
)
.all()
Expand Down
17 changes: 14 additions & 3 deletions backend/dataall/db/api/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from ..api.organization import Organization
from ..models import EnvironmentGroup
from ..models.Enums import (
ShareObjectStatus,
ShareItemStatus,
ShareableType,
EnvironmentType,
EnvironmentPermission,
Expand All @@ -29,6 +31,15 @@

log = logging.getLogger(__name__)

SHARE_ITEM_SHARED_STATES = [
ShareItemStatus.Share_Succeeded.value,
ShareItemStatus.Share_In_Progress.value,
ShareItemStatus.Revoke_Failed.value,
ShareItemStatus.Revoke_In_Progress.value,
ShareItemStatus.Revoke_Approved.value,
ShareItemStatus.Revoke_Failed.value,
]


class Environment:
@staticmethod
Expand Down Expand Up @@ -925,7 +936,7 @@ def paginated_shared_with_environment_datasets(
)
.filter(
and_(
models.ShareObjectItem.status.in_(share_item_shared_states),
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES),
models.ShareObject.environmentUri == uri,
)
)
Expand Down Expand Up @@ -1021,7 +1032,7 @@ def paginated_shared_with_environment_group_datasets(
)
.filter(
and_(
models.ShareObjectItem.status.in_(share_item_shared_states),
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES),
models.ShareObject.environmentUri == envUri,
models.ShareObject.principalId == groupUri,
)
Expand Down Expand Up @@ -1142,7 +1153,7 @@ def paginated_environment_data_items(
)
.filter(
and_(
models.ShareObjectItem.status.in_(share_item_shared_states),
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES),
models.ShareObject.environmentUri == uri,
)
)
Expand Down
16 changes: 13 additions & 3 deletions backend/dataall/db/api/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .. import models, api, exceptions, paginate, permissions
from . import has_resource_perm, ResourcePolicy, DatasetTable, Environment, Dataset
from ..models.Enums import ShareItemStatus
from ..models.Enums import ShareItemStatus
from ...utils.naming_convention import (
NamingConventionService,
NamingConventionPattern,
Expand All @@ -14,6 +15,16 @@
log = logging.getLogger(__name__)


SHARE_ITEM_SHARED_STATES = [
ShareItemStatus.Share_Succeeded.value,
ShareItemStatus.Share_In_Progress.value,
ShareItemStatus.Revoke_Failed.value,
ShareItemStatus.Revoke_In_Progress.value,
ShareItemStatus.Revoke_Approved.value,
ShareItemStatus.Revoke_Failed.value,
]


class RedshiftCluster:
def __init__(self):
pass
Expand Down Expand Up @@ -184,7 +195,6 @@ def list_available_datasets(
cluster: models.RedshiftCluster = RedshiftCluster.get_redshift_cluster_by_uri(
session, uri
)
share_item_shared_states = api.ShareItemSM.get_share_item_shared_states()

shared = (
session.query(
Expand All @@ -199,7 +209,7 @@ def list_available_datasets(
.filter(
and_(
models.RedshiftCluster.clusterUri == cluster.clusterUri,
models.ShareObjectItem.status.in_(share_item_shared_states),
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES),
or_(
models.ShareObject.owner == username,
models.ShareObject.principalId.in_(groups),
Expand Down Expand Up @@ -320,7 +330,7 @@ def list_available_cluster_tables(
.filter(
and_(
models.RedshiftCluster.clusterUri == cluster.clusterUri,
models.ShareObjectItem.status.in_(share_item_shared_states),
models.ShareObjectItem.status.in_(SHARE_ITEM_SHARED_STATES),
or_(
models.ShareObject.owner == username,
models.ShareObject.principalId.in_(groups),
Expand Down
Loading

0 comments on commit a9c1829

Please sign in to comment.