Skip to content

Commit

Permalink
feat(graphql) data contract resolvers (#10632)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayacryl authored Jun 7, 2024
1 parent 2c3943d commit 54a2d2a
Show file tree
Hide file tree
Showing 16 changed files with 1,844 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ private Constants() {}
public static final String FORMS_SCHEMA_FILE = "forms.graphql";
public static final String ASSERTIONS_SCHEMA_FILE = "assertions.graphql";
public static final String INCIDENTS_SCHEMA_FILE = "incident.graphql";
public static final String CONTRACTS_SCHEMA_FILE = "contract.graphql";
public static final String CONNECTIONS_SCHEMA_FILE = "connection.graphql";
public static final String BROWSE_PATH_DELIMITER = "/";
public static final String BROWSE_PATH_V2_DELIMITER = "␟";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataQualityContract;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
import com.linkedin.datahub.graphql.generated.Domain;
Expand All @@ -64,6 +65,7 @@
import com.linkedin.datahub.graphql.generated.EntityRelationshipLegacy;
import com.linkedin.datahub.graphql.generated.ForeignKeyConstraint;
import com.linkedin.datahub.graphql.generated.FormActorAssignment;
import com.linkedin.datahub.graphql.generated.FreshnessContract;
import com.linkedin.datahub.graphql.generated.GetRootGlossaryNodesResult;
import com.linkedin.datahub.graphql.generated.GetRootGlossaryTermsResult;
import com.linkedin.datahub.graphql.generated.GlossaryNode;
Expand Down Expand Up @@ -102,6 +104,7 @@
import com.linkedin.datahub.graphql.generated.QuickFilter;
import com.linkedin.datahub.graphql.generated.RecommendationContent;
import com.linkedin.datahub.graphql.generated.ResolvedAuditStamp;
import com.linkedin.datahub.graphql.generated.SchemaContract;
import com.linkedin.datahub.graphql.generated.SchemaField;
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResult;
Expand Down Expand Up @@ -141,6 +144,8 @@
import com.linkedin.datahub.graphql.resolvers.container.ParentContainersResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardStatsSummaryResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardUsageStatsResolver;
import com.linkedin.datahub.graphql.resolvers.datacontract.EntityDataContractResolver;
import com.linkedin.datahub.graphql.resolvers.datacontract.UpsertDataContractResolver;
import com.linkedin.datahub.graphql.resolvers.dataproduct.BatchSetDataProductResolver;
import com.linkedin.datahub.graphql.resolvers.dataproduct.CreateDataProductResolver;
import com.linkedin.datahub.graphql.resolvers.dataproduct.DeleteDataProductResolver;
Expand Down Expand Up @@ -746,6 +751,7 @@ public void configureRuntimeWiring(final RuntimeWiring.Builder builder) {
configureDomainResolvers(builder);
configureDataProductResolvers(builder);
configureAssertionResolvers(builder);
configureContractResolvers(builder);
configurePolicyResolvers(builder);
configureDataProcessInstanceResolvers(builder);
configureVersionedDatasetResolvers(builder);
Expand Down Expand Up @@ -820,7 +826,8 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(FORMS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONNECTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(ASSERTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE));
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONTRACTS_SCHEMA_FILE));

for (GmsGraphQLPlugin plugin : this.graphQLPlugins) {
List<String> pluginSchemaFiles = plugin.getSchemaFiles();
Expand Down Expand Up @@ -2715,6 +2722,59 @@ private void configureAssertionResolvers(final RuntimeWiring.Builder builder) {
"aspects", new WeaklyTypedAspectsResolver(entityClient, entityRegistry)));
}

private void configureContractResolvers(final RuntimeWiring.Builder builder) {
builder.type(
"Dataset",
typeWiring ->
typeWiring.dataFetcher(
"contract", new EntityDataContractResolver(this.entityClient, this.graphClient)));
builder.type(
"FreshnessContract",
typeWiring ->
typeWiring.dataFetcher(
"assertion",
new LoadableTypeResolver<>(
getAssertionType(),
(env) -> {
final FreshnessContract contract = env.getSource();
return contract.getAssertion() != null
? contract.getAssertion().getUrn()
: null;
})));
builder.type(
"DataQualityContract",
typeWiring ->
typeWiring.dataFetcher(
"assertion",
new LoadableTypeResolver<>(
getAssertionType(),
(env) -> {
final DataQualityContract contract = env.getSource();
return contract.getAssertion() != null
? contract.getAssertion().getUrn()
: null;
})));
builder.type(
"SchemaContract",
typeWiring ->
typeWiring.dataFetcher(
"assertion",
new LoadableTypeResolver<>(
getAssertionType(),
(env) -> {
final SchemaContract contract = env.getSource();
return contract.getAssertion() != null
? contract.getAssertion().getUrn()
: null;
})));
builder.type(
"Mutation",
typeWiring ->
typeWiring.dataFetcher(
"upsertDataContract",
new UpsertDataContractResolver(this.entityClient, this.graphClient)));
}

private void configurePolicyResolvers(final RuntimeWiring.Builder builder) {
// Register resolvers for "resolvedUsers" and "resolvedGroups" field of the Policy type.
builder.type(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.datahub.graphql.resolvers.datacontract;

import com.datahub.authorization.ConjunctivePrivilegeGroup;
import com.datahub.authorization.DisjunctivePrivilegeGroup;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.metadata.authorization.PoliciesConfig;
import javax.annotation.Nonnull;

public class DataContractUtils {

public static boolean canEditDataContract(@Nonnull QueryContext context, Urn entityUrn) {
final DisjunctivePrivilegeGroup orPrivilegeGroups =
new DisjunctivePrivilegeGroup(
ImmutableList.of(
AuthorizationUtils.ALL_PRIVILEGES_GROUP,
new ConjunctivePrivilegeGroup(
ImmutableList.of(
PoliciesConfig.EDIT_ENTITY_DATA_CONTRACT_PRIVILEGE.getType()))));

return AuthorizationUtils.isAuthorized(
context.getAuthorizer(),
context.getActorUrn(),
entityUrn.getEntityType(),
entityUrn.toString(),
orPrivilegeGroups);
}

private DataContractUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.linkedin.datahub.graphql.resolvers.datacontract;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.EntityRelationship;
import com.linkedin.common.EntityRelationships;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataContract;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.types.datacontract.DataContractMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class EntityDataContractResolver implements DataFetcher<CompletableFuture<DataContract>> {
static final String CONTRACT_FOR_RELATIONSHIP = "ContractFor";

private final EntityClient _entityClient;
private final GraphClient _graphClient;

public EntityDataContractResolver(
final EntityClient entityClient, final GraphClient graphClient) {
_entityClient = Objects.requireNonNull(entityClient, "entityClient must not be null");
_graphClient = Objects.requireNonNull(graphClient, "graphClient must not be null");
}

@Override
public CompletableFuture<DataContract> get(DataFetchingEnvironment environment) {
return CompletableFuture.supplyAsync(
() -> {
final QueryContext context = environment.getContext();
final String entityUrn = ((Entity) environment.getSource()).getUrn();

try {
// Step 1: Fetch the contract associated with the dataset.
final EntityRelationships relationships =
_graphClient.getRelatedEntities(
entityUrn,
ImmutableList.of(CONTRACT_FOR_RELATIONSHIP),
RelationshipDirection.INCOMING,
0,
1,
context.getActorUrn());

// If we found multiple contracts for same entity, we have an invalid system state. Log
// a warning.
if (relationships.getTotal() > 1) {
// Someone created 2 contracts for the same entity. Currently, we do not handle this
// in the UI.
log.warn(
String.format(
"Unexpectedly found multiple contracts (%s) for entity with urn %s! This may lead to inconsistent behavior.",
relationships.getRelationships(), entityUrn));
}

final List<Urn> contractUrns =
relationships.getRelationships().stream()
.map(EntityRelationship::getEntity)
.collect(Collectors.toList());

if (!contractUrns.isEmpty()) {
final Urn contractUrn = contractUrns.get(0);

// Step 2: Hydrate the contract entities based on the urns from step 1
final EntityResponse entityResponse =
_entityClient.getV2(
context.getOperationContext(),
Constants.DATA_CONTRACT_ENTITY_NAME,
contractUrn,
null);

if (entityResponse != null) {
// Step 4: Package and return result
return DataContractMapper.mapContract(entityResponse);
}
}
// No contract found
return null;
} catch (URISyntaxException | RemoteInvocationException e) {
throw new RuntimeException("Failed to retrieve Data Contract from GMS", e);
}
});
}
}
Loading

0 comments on commit 54a2d2a

Please sign in to comment.