Skip to content

Commit

Permalink
feat(data quality): custom assertions models, graphql, sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Jun 24, 2024
1 parent 710e605 commit 4047b14
Show file tree
Hide file tree
Showing 19 changed files with 2,000 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@
import com.linkedin.datahub.graphql.resolvers.assertion.AssertionRunEventResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.DeleteAssertionResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.EntityAssertionsResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.ReportAssertionResultResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.UpsertCustomAssertionResolver;
import com.linkedin.datahub.graphql.resolvers.auth.CreateAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.auth.DebugAccessResolver;
import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenMetadataResolver;
Expand Down Expand Up @@ -377,6 +379,7 @@
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.recommendation.RecommendationsService;
import com.linkedin.metadata.service.AssertionService;
import com.linkedin.metadata.service.BusinessAttributeService;
import com.linkedin.metadata.service.DataProductService;
import com.linkedin.metadata.service.ERModelRelationshipService;
Expand Down Expand Up @@ -454,6 +457,7 @@ public class GmsGraphQLEngine {
private final FormService formService;
private final RestrictedService restrictedService;
private ConnectionService connectionService;
private AssertionService assertionService;

private final BusinessAttributeService businessAttributeService;
private final FeatureFlags featureFlags;
Expand Down Expand Up @@ -575,6 +579,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.formService = args.formService;
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;
this.assertionService = args.assertionService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -1220,6 +1225,10 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
"createTestConnectionRequest",
new CreateTestConnectionRequestResolver(
this.entityClient, this.ingestionConfiguration))
.dataFetcher(
"upsertCustomAssertion", new UpsertCustomAssertionResolver(assertionService))
.dataFetcher(
"reportAssertionResult", new ReportAssertionResultResolver(assertionService))
.dataFetcher(
"deleteAssertion",
new DeleteAssertionResolver(this.entityClient, this.entityService))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.recommendation.RecommendationsService;
import com.linkedin.metadata.service.AssertionService;
import com.linkedin.metadata.service.BusinessAttributeService;
import com.linkedin.metadata.service.DataProductService;
import com.linkedin.metadata.service.ERModelRelationshipService;
Expand Down Expand Up @@ -86,6 +87,7 @@ public class GmsGraphQLEngineArgs {
boolean graphQLQueryIntrospectionEnabled;
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;
AssertionService assertionService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.linkedin.datahub.graphql.resolvers.assertion;

import com.datahub.authorization.ConjunctivePrivilegeGroup;
import com.datahub.authorization.DisjunctivePrivilegeGroup;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.metadata.authorization.PoliciesConfig;

public class AssertionUtils {
public static boolean isAuthorizedToEditAssertionFromAssertee(
final QueryContext context, final Urn asserteeUrn) {
final DisjunctivePrivilegeGroup orPrivilegeGroups =
new DisjunctivePrivilegeGroup(
ImmutableList.of(
AuthorizationUtils.ALL_PRIVILEGES_GROUP,
new ConjunctivePrivilegeGroup(
ImmutableList.of(PoliciesConfig.EDIT_ENTITY_ASSERTIONS_PRIVILEGE.getType()))));
return AuthorizationUtils.isAuthorized(
context.getAuthorizer(),
context.getActorUrn(),
asserteeUrn.getEntityType(),
asserteeUrn.toString(),
orPrivilegeGroups);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.linkedin.datahub.graphql.resolvers.assertion;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;

import com.linkedin.assertion.AssertionResult;
import com.linkedin.assertion.AssertionResultError;
import com.linkedin.assertion.AssertionResultErrorType;
import com.linkedin.assertion.AssertionResultType;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.AssertionResultInput;
import com.linkedin.datahub.graphql.generated.StringMapEntryInput;
import com.linkedin.metadata.service.AssertionService;
import graphql.execution.DataFetcherExceptionHandler;
import graphql.execution.DataFetcherResult;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ReportAssertionResultResolver implements DataFetcher<CompletableFuture<Boolean>> {

public static final String ERROR_MESSAGE_KEY = "message";
private final AssertionService _assertionService;

public ReportAssertionResultResolver(AssertionService assertionService) {
_assertionService = assertionService;
}

/**
* This is called by the graphql engine to fetch the value. The {@link DataFetchingEnvironment} is
* a composite context object that tells you all you need to know about how to fetch a data value
* in graphql type terms.
*
* @param environment this is the data fetching environment which contains all the context you
* need to fetch a value
* @return a value of type T. May be wrapped in a {@link DataFetcherResult}
* @throws Exception to relieve the implementations from having to wrap checked exceptions. Any
* exception thrown from a {@code DataFetcher} will eventually be handled by the registered
* {@link DataFetcherExceptionHandler} and the related field will have a value of {@code null}
* in the result.
*/
@Override
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final Urn assertionUrn = UrnUtils.getUrn(environment.getArgument("urn"));
final AssertionResultInput input =
bindArgument(environment.getArgument("result"), AssertionResultInput.class);

return CompletableFuture.supplyAsync(
() -> {
final Urn asserteeUrn =
_assertionService.getEntityUrnForAssertion(
context.getOperationContext(), assertionUrn);
if (asserteeUrn == null) {
throw new RuntimeException(
String.format(
"Failed to report Assertion Run Event. Assertion with urn %s does not exist or is not associated with any entity.",
assertionUrn));
}

// Check whether the current user is allowed to update the assertion.
if (AssertionUtils.isAuthorizedToEditAssertionFromAssertee(context, asserteeUrn)) {
AssertionResult assertionResult = mapAssertionResult(input);
_assertionService.addAssertionRunEvent(
context.getOperationContext(),
assertionUrn,
asserteeUrn,
input.getTimestampMillis(),
assertionResult,
mapContextParameters(input.getProperties()));
return true;
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
});
}

private static StringMap mapContextParameters(List<StringMapEntryInput> input) {

if (input == null || input.isEmpty()) {
return null;
}
StringMap entries = new StringMap();
input.forEach(entry -> entries.put(entry.getKey(), entry.getValue()));
return entries;
}

private AssertionResult mapAssertionResult(AssertionResultInput input) {
AssertionResult assertionResult = new AssertionResult();
assertionResult.setType(AssertionResultType.valueOf(input.getType().toString()));
assertionResult.setExternalUrl(input.getExternalUrl(), SetMode.IGNORE_NULL);
if (assertionResult.getType() == AssertionResultType.ERROR && input.getError() != null) {
assertionResult.setError(mapAssertionResultError(input));
}
return assertionResult;
}

private static AssertionResultError mapAssertionResultError(AssertionResultInput input) {
AssertionResultError error = new AssertionResultError();
error.setType(AssertionResultErrorType.valueOf(input.getError().getType().toString()));
StringMap errorProperties = new StringMap();
errorProperties.put(ERROR_MESSAGE_KEY, input.getError().getMessage());
error.setProperties(errorProperties);
return error;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.linkedin.datahub.graphql.resolvers.assertion;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.metadata.Constants.*;

import com.linkedin.assertion.CustomAssertionInfo;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.PlatformInput;
import com.linkedin.datahub.graphql.generated.UpsertCustomAssertionInput;
import com.linkedin.datahub.graphql.types.assertion.AssertionMapper;
import com.linkedin.metadata.key.DataPlatformKey;
import com.linkedin.metadata.service.AssertionService;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.SchemaFieldUtils;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class UpsertCustomAssertionResolver implements DataFetcher<CompletableFuture<Assertion>> {

private final AssertionService _assertionService;

public UpsertCustomAssertionResolver(@Nonnull final AssertionService assertionService) {
_assertionService = Objects.requireNonNull(assertionService, "assertionService is required");
}

@Override
public CompletableFuture<Assertion> get(DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final String maybeAssertionUrn = environment.getArgument("urn");
final UpsertCustomAssertionInput input =
bindArgument(environment.getArgument("input"), UpsertCustomAssertionInput.class);

final Urn entityUrn = UrnUtils.getUrn(input.getEntityUrn());
final Urn assertionUrn;

if (maybeAssertionUrn == null) {
assertionUrn = _assertionService.generateAssertionUrn();
} else {
assertionUrn = UrnUtils.getUrn(maybeAssertionUrn);
}

return CompletableFuture.supplyAsync(
() -> {
// Check whether the current user is allowed to update the assertion.
if (AssertionUtils.isAuthorizedToEditAssertionFromAssertee(context, entityUrn)) {
_assertionService.upsertCustomAssertion(
context.getOperationContext(),
assertionUrn,
entityUrn,
input.getDescription(),
input.getExternalUrl(),
mapAssertionPlatform(input.getPlatform()),
createCustomAssertionInfo(input, entityUrn));

return AssertionMapper.map(
context,
_assertionService.getAssertionEntityResponse(
context.getOperationContext(), assertionUrn));
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
});
}

@SneakyThrows
private DataPlatformInstance mapAssertionPlatform(PlatformInput platformInput) {
DataPlatformInstance platform = new DataPlatformInstance();
if (platformInput.getUrn() != null) {
platform.setPlatform(Urn.createFromString(platformInput.getUrn()));
} else if (platformInput.getName() != null) {
platform.setPlatform(
EntityKeyUtils.convertEntityKeyToUrn(
new DataPlatformKey().setPlatformName(platformInput.getName()),
DATA_PLATFORM_ENTITY_NAME));
} else {
throw new IllegalArgumentException(
"Failed to upsert Custom Assertion. Platform Name or Platform Urn must be specified.");
}

return platform;
}

private CustomAssertionInfo createCustomAssertionInfo(
UpsertCustomAssertionInput input, Urn entityUrn) {
CustomAssertionInfo customAssertionInfo = new CustomAssertionInfo();
customAssertionInfo.setType(input.getType());
customAssertionInfo.setEntity(entityUrn);
customAssertionInfo.setLogic(input.getLogic(), SetMode.IGNORE_NULL);

if (input.getFieldPath() != null) {
customAssertionInfo.setField(
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn.toString(), input.getFieldPath()));
}
return customAssertionInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.datahub.graphql.generated.AssertionStdParameters;
import com.linkedin.datahub.graphql.generated.AssertionType;
import com.linkedin.datahub.graphql.generated.AuditStamp;
import com.linkedin.datahub.graphql.generated.CustomAssertionInfo;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DatasetAssertionInfo;
import com.linkedin.datahub.graphql.generated.DatasetAssertionScope;
Expand Down Expand Up @@ -162,10 +163,20 @@ public static com.linkedin.datahub.graphql.generated.AssertionInfo mapAssertionI
mapSchemaAssertionInfo(context, gmsAssertionInfo.getSchemaAssertion());
assertionInfo.setSchemaAssertion(schemaAssertionInfo);
}
if (gmsAssertionInfo.hasCustomAssertion()) {
CustomAssertionInfo customAssertionInfo =
mapCustomAssertionInfo(context, gmsAssertionInfo.getCustomAssertion());
assertionInfo.setCustomAssertion(customAssertionInfo);
}

// Source Type
if (gmsAssertionInfo.hasSource()) {
assertionInfo.setSource(mapSource(gmsAssertionInfo.getSource()));
}

if (gmsAssertionInfo.hasExternalUrl()) {
assertionInfo.setExternalUrl(gmsAssertionInfo.getExternalUrl().toString());
}
return assertionInfo;
}

Expand Down Expand Up @@ -320,6 +331,22 @@ private static SchemaAssertionInfo mapSchemaAssertionInfo(
return result;
}

private static CustomAssertionInfo mapCustomAssertionInfo(
@Nullable final QueryContext context,
final com.linkedin.assertion.CustomAssertionInfo gmsCustomAssertionInfo) {
CustomAssertionInfo result = new CustomAssertionInfo();
result.setType(gmsCustomAssertionInfo.getType());
result.setEntityUrn(gmsCustomAssertionInfo.getEntity().toString());
if (gmsCustomAssertionInfo.hasField()) {
result.setField(AssertionMapper.mapDatasetSchemaField(gmsCustomAssertionInfo.getField()));
}
if (gmsCustomAssertionInfo.hasLogic()) {
result.setLogic(gmsCustomAssertionInfo.getLogic());
}

return result;
}

private static SchemaAssertionField mapSchemaField(final SchemaField gmsField) {
SchemaAssertionField result = new SchemaAssertionField();
result.setPath(gmsField.getFieldPath());
Expand Down
Loading

0 comments on commit 4047b14

Please sign in to comment.