Skip to content

Commit

Permalink
Introduce sharing State Machines (#267)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- Feature
- Refactoring

### Detail
- Embed sharing actions into code defined State machines
- Redesign the sharing workflow to keep track of revoke of access
- Adjust and simplify sharing managers and tasks - separation of approval and revoke process
- Revoke items in batch
- Added info in share statistics in inbox and outbox
- Added testing for S3 and LakeFormation share managers and for sharing APIs


(Check out the conversation in the below issue for more details)

### Relates
- #255 

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Co-authored-by: Dennis Goldner <[email protected]>
  • Loading branch information
dlpzx and degoldner authored Feb 1, 2023
1 parent bde5494 commit 15aea8c
Show file tree
Hide file tree
Showing 90 changed files with 6,826 additions and 3,391 deletions.
2 changes: 1 addition & 1 deletion backend/dataall/api/Objects/Dataset/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def delete_dataset(
env: models.Environment = db.api.Environment.get_environment_by_uri(
session, dataset.environmentUri
)
shares = db.api.Dataset.list_dataset_approved_shares(session, datasetUri)
shares = db.api.Dataset.list_dataset_shares_with_existing_shared_items(session, datasetUri)
if shares:
raise exceptions.UnauthorizedOperation(
action=permissions.DELETE_DATASET,
Expand Down
3 changes: 1 addition & 2 deletions backend/dataall/api/Objects/DatasetTable/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,5 @@ def list_shared_tables_by_env_dataset(context: Context, source, datasetUri: str,
return db.api.DatasetTable.get_dataset_tables_shared_with_env(
session,
envUri,
datasetUri,
["Approved"]
datasetUri
)
10 changes: 10 additions & 0 deletions backend/dataall/api/Objects/ShareObject/input_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@
)


RevokeItemsInput = gql.InputType(
name='RevokeItemsInput',
arguments=[
gql.Argument(name='shareUri', type=gql.NonNullableType(gql.String)),
gql.Argument(name='revokedItemUris', type=gql.NonNullableType(gql.ArrayType(gql.String))),
],
)


class ShareSortField(GraphQLEnumMapper):
created = 'created'
updated = 'updated'
Expand Down Expand Up @@ -60,6 +69,7 @@ class ShareSortField(GraphQLEnumMapper):
gql.Argument(name='term', type=gql.String),
gql.Argument('tags', gql.ArrayType(gql.String)),
gql.Argument(name='isShared', type=gql.Boolean),
gql.Argument(name='isRevokable', type=gql.Boolean),
gql.Argument('page', gql.Integer),
gql.Argument('pageSize', gql.Integer),
],
Expand Down
8 changes: 7 additions & 1 deletion backend/dataall/api/Objects/ShareObject/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
type=gql.Boolean,
)


submitShareObject = gql.MutationField(
name='submitShareObject',
args=[gql.Argument(name='shareUri', type=gql.NonNullableType(gql.String))],
Expand All @@ -61,3 +60,10 @@
type=gql.Ref('ShareObject'),
resolver=reject_share_object,
)

revokeItemsShareObject = gql.MutationField(
name='revokeItemsShareObject',
args=[gql.Argument(name='input', type=gql.Ref('RevokeItemsInput'))],
type=gql.Ref('ShareObject'),
resolver=revoke_items_share_object,
)
70 changes: 33 additions & 37 deletions backend/dataall/api/Objects/ShareObject/resolvers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import logging
import os

from sqlalchemy import and_, or_

from ..Stack import stack_helper
from .... import db
from ....api.constants import *
from ....api.context import Context
Expand Down Expand Up @@ -75,43 +72,49 @@ def approve_share_object(context: Context, source, shareUri: str = None):
check_perm=True,
)

# Create task for lake formation updates
approve_share_task: models.Task = models.Task(
action='ecs.share.approve',
targetUri=shareUri,
payload={'environmentUri': share.environmentUri},
)
session.add(approve_share_task)

# call cdk to update bucket policy of the dataset for folder shares
# stack_helper.deploy_stack(context, share.datasetUri)

Worker.queue(engine=context.engine, task_ids=[approve_share_task.taskUri])

return share


def reject_share_object(context: Context, source, shareUri: str = None):
with context.engine.scoped_session() as session:
share = db.api.ShareObject.reject_share_object(
return db.api.ShareObject.reject_share_object(
session=session,
username=context.username,
groups=context.groups,
uri=shareUri,
data=None,
check_perm=True,
)
# Create task for lake formation updates
reject_share_task: models.Task = models.Task(
action='ecs.share.reject',
targetUri=shareUri,
payload={'environmentUri': share.environmentUri},


def revoke_items_share_object(context: Context, source, input):
with context.engine.scoped_session() as session:
share = db.api.ShareObject.revoke_items_share_object(
session=session,
username=context.username,
groups=context.groups,
uri=input.get("shareUri"),
data=input,
check_perm=True,
)
session.add(reject_share_task)

# stack_helper.deploy_stack(context, share.datasetUri)
revoke_share_task: models.Task = models.Task(
action='ecs.share.revoke',
targetUri=input.get("shareUri"),
payload={'environmentUri': share.environmentUri},
)
session.add(revoke_share_task)

Worker.queue(engine=context.engine, task_ids=[reject_share_task.taskUri])
Worker.queue(engine=context.engine, task_ids=[revoke_share_task.taskUri])

return share

Expand All @@ -121,14 +124,16 @@ def delete_share_object(context: Context, source, shareUri: str = None):
share = db.api.ShareObject.get_share_by_uri(session, shareUri)
if not share:
raise db.exceptions.ObjectNotFound('ShareObject', shareUri)

db.api.ShareObject.delete_share_object(
session=session,
username=context.username,
groups=context.groups,
uri=shareUri,
check_perm=True,
)
return True

return True


def add_shared_item(context, source, shareUri: str = None, input: dict = None):
Expand Down Expand Up @@ -289,31 +294,22 @@ def resolve_group(context: Context, source: models.ShareObject, **kwargs):
return source.groupUri


def get_share_object_statistics(context: Context, source: models.ShareObject, **kwargs):
def resolve_share_object_statistics(context: Context, source: models.ShareObject, **kwargs):
if not source:
return None
with context.engine.scoped_session() as session:
tables = (
session.query(models.ShareObjectItem)
.filter(
and_(
models.ShareObjectItem.shareUri == source.shareUri,
models.ShareObjectItem.itemType == 'DatasetTable',
)
)
.count()
return db.api.ShareObject.resolve_share_object_statistics(
session, source.shareUri
)
locations = (
session.query(models.ShareObjectItem)
.filter(
and_(
models.ShareObjectItem.shareUri == source.shareUri,
models.ShareObjectItem.itemType == 'DatasetStorageLocation',
)
)
.count()


def resolve_existing_shared_items(context: Context, source: models.ShareObject, **kwargs):
if not source:
return None
with context.engine.scoped_session() as session:
return db.api.ShareObject.check_existing_shared_items(
session, source.shareUri
)
return {'tables': tables, 'locations': locations}


def list_shareable_objects(
Expand Down
13 changes: 11 additions & 2 deletions backend/dataall/api/Objects/ShareObject/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
gql.Field(name='shareUri', type=gql.String),
gql.Field(name='shareItemUri', type=gql.ID),
gql.Field('itemUri', gql.String),
gql.Field(name='status', type=gql.Ref('ShareObjectStatus')),
gql.Field(name='status', type=gql.Ref('ShareItemStatus')),
gql.Field(name='action', type=gql.String),
gql.Field('itemType', ShareableType.toGraphQLEnum()),
gql.Field('itemName', gql.String),
Expand Down Expand Up @@ -77,6 +77,10 @@
fields=[
gql.Field(name='locations', type=gql.Integer),
gql.Field(name='tables', type=gql.Integer),
gql.Field(name='sharedItems', type=gql.Integer),
gql.Field(name='revokedItems', type=gql.Integer),
gql.Field(name='failedItems', type=gql.Integer),
gql.Field(name='pendingItems', type=gql.Integer),
],
)

Expand All @@ -101,7 +105,12 @@
gql.Field(name='updated', type=gql.String),
gql.Field(name='datasetUri', type=gql.String),
gql.Field(name='dataset', type=DatasetLink, resolver=resolve_dataset),
gql.Field(name='statistics', type=gql.Ref('ShareObjectStatistic')),
gql.Field(name='existingSharedItems', type=gql.Boolean, resolver=resolve_existing_shared_items),
gql.Field(
name='statistics',
type=gql.Ref('ShareObjectStatistic'),
resolver=resolve_share_object_statistics,
),
gql.Field(
name='principal', resolver=resolve_principal, type=gql.Ref('Principal')
),
Expand Down
44 changes: 35 additions & 9 deletions backend/dataall/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,47 @@ class ShareObjectPermission(GraphQLEnumMapper):


class ShareObjectStatus(GraphQLEnumMapper):
Deleted = 'Deleted'
Approved = 'Approved'
Rejected = 'Rejected'
PendingApproval = 'PendingApproval'
Revoked = 'Revoked'
Draft = 'Draft'
Share_In_Progress = 'Share_In_Progress'
Share_Failed = 'Share_Failed'
Share_Succeeded = 'Share_Succeeded'
Submitted = 'Submitted'
Revoke_In_Progress = 'Revoke_In_Progress'
Revoke_Share_Failed = 'Revoke_Share_Failed'
Revoke_Share_Succeeded = 'Revoke_Share_Succeeded'
Share_In_Progress = 'Share_In_Progress'
Processed = 'Processed'


class ShareObjectItemAction(GraphQLEnumMapper):
New = 'New'
Removed = 'Removed'
class ShareItemStatus(GraphQLEnumMapper):
Deleted = 'Deleted'
PendingApproval = 'PendingApproval'
Share_Approved = 'Share_Approved'
Share_Rejected = 'Share_Rejected'
Share_In_Progress = 'Share_In_Progress'
Share_Succeeded = 'Share_Succeeded'
Share_Failed = 'Share_Failed'
Revoke_Approved = 'Revoke_Approved'
Revoke_In_Progress = 'Revoke_In_Progress'
Revoke_Failed = 'Revoke_Failed'
Revoke_Succeeded = 'Revoke_Succeeded'


class ShareObjectActions(GraphQLEnumMapper):
Submit = 'Submit'
Approve = 'Approve'
Reject = 'Reject'
RevokeItems = 'RevokeItems'
Start = 'Start'
Finish = 'Finish'
FinishPending = 'FinishPending'
Delete = 'Delete'


class ShareItemActions(GraphQLEnumMapper):
AddItem = 'AddItem'
RemoveItem = 'RemoveItem'
Failure = 'Failure'
Success = 'Success'


class ConfidentialityClassification(GraphQLEnumMapper):
Expand Down
8 changes: 4 additions & 4 deletions backend/dataall/aws/handlers/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ def approve_share(engine, task: models.Task):
)

@staticmethod
@Worker.handler(path='ecs.share.reject')
def reject_share(engine, task: models.Task):
@Worker.handler(path='ecs.share.revoke')
def revoke_share(engine, task: models.Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
return DataSharingService.reject_share(engine, task.targetUri)
return DataSharingService.revoke_share(engine, task.targetUri)
else:
return Ecs.run_share_management_ecs_task(
envname, task.targetUri, 'reject_share'
envname, task.targetUri, 'revoke_share'
)

@staticmethod
Expand Down
17 changes: 17 additions & 0 deletions backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,23 @@ def _create_table(**data):
)
return response

@staticmethod
def delete_table(accountid, region, database, tablename):
session = SessionHelper.remote_session(accountid=accountid)
client = session.client('glue', region_name=region)
log.info(
'Deleting table {} in database {}'.format(
tablename, database
)
)
response = client.delete_table(
CatalogId=accountid,
DatabaseName=database,
Name=tablename
)

return response

@staticmethod
def create_resource_link(**data):
accountid = data['accountid']
Expand Down
Loading

0 comments on commit 15aea8c

Please sign in to comment.