Skip to content

Commit

Permalink
feat(graphql): add MutableTypeBatchResolver (#4976)
Browse files Browse the repository at this point in the history
  • Loading branch information
noahfrn authored Aug 5, 2022
1 parent da77752 commit cfcc1cf
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
import com.linkedin.datahub.graphql.resolvers.mutate.AddTagsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.AddTermResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.AddTermsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.MutableTypeBatchResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddOwnersResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTagsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTermsResolver;
Expand Down Expand Up @@ -681,6 +682,7 @@ private String getUrnField(DataFetchingEnvironment env) {
private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
builder.type("Mutation", typeWiring -> typeWiring
.dataFetcher("updateDataset", new MutableTypeResolver<>(datasetType))
.dataFetcher("updateDatasets", new MutableTypeBatchResolver<>(datasetType))
.dataFetcher("createTag", new CreateTagResolver(this.entityClient))
.dataFetcher("updateTag", new MutableTypeResolver<>(tagType))
.dataFetcher("setTagColor", new SetTagColorResolver(entityClient, entityService))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.linkedin.datahub.graphql.resolvers.mutate;

import com.codahale.metrics.Timer;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.types.BatchMutableType;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;


/**
* Generic GraphQL resolver responsible for performing updates against particular types.
*
* @param <I> the generated GraphQL POJO corresponding to the input type.
* @param <T> the generated GraphQL POJO corresponding to the return type.
*/
public class MutableTypeBatchResolver<I, B, T> implements DataFetcher<CompletableFuture<List<T>>> {

private static final Logger _logger = LoggerFactory.getLogger(MutableTypeBatchResolver.class.getName());

private final BatchMutableType<I, B, T> _batchMutableType;

public MutableTypeBatchResolver(final BatchMutableType<I, B, T> batchMutableType) {
_batchMutableType = batchMutableType;
}

@Override
public CompletableFuture<List<T>> get(DataFetchingEnvironment environment) throws Exception {
final B[] input = bindArgument(environment.getArgument("input"), _batchMutableType.batchInputClass());

return CompletableFuture.supplyAsync(() -> {
Timer.Context timer = MetricUtils.timer(this.getClass(), "batchMutate").time();

try {
return _batchMutableType.batchUpdate(input, environment.getContext());
} catch (AuthorizationException e) {
throw e;
} catch (Exception e) {
_logger.error("Failed to perform batchUpdate", e);
throw new IllegalArgumentException(e);
} finally {
timer.stop();
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.datahub.graphql.types;

import com.linkedin.datahub.graphql.QueryContext;

import javax.annotation.Nonnull;
import java.util.List;

public interface BatchMutableType<I, B, T> extends MutableType<I, T> {
default Class<B[]> batchInputClass() throws UnsupportedOperationException {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement batchInputClass method");
}

default List<T> batchUpdate(@Nonnull final B[] updateInput, QueryContext context) throws Exception {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement batchUpdate method");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql.types;

import com.linkedin.datahub.graphql.QueryContext;

import javax.annotation.Nonnull;

/**
Expand All @@ -9,12 +10,11 @@
* @param <I>: The input type corresponding to the write.
*/
public interface MutableType<I, T> {

/**
* Returns generated GraphQL class associated with the input type
*/
Class<I> inputClass();

Class<I> inputClass();

/**
* Update an entity by urn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@
import com.linkedin.datahub.graphql.authorization.ConjunctivePrivilegeGroup;
import com.linkedin.datahub.graphql.authorization.DisjunctivePrivilegeGroup;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.DatasetUpdateInput;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.AutoCompleteResults;
import com.linkedin.datahub.graphql.generated.BrowsePath;
import com.linkedin.datahub.graphql.generated.BrowseResults;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetUpdateInput;
import com.linkedin.datahub.graphql.generated.BrowsePath;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.SearchResults;
import com.linkedin.datahub.graphql.generated.BatchDatasetUpdateInput;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.BatchMutableType;
import com.linkedin.datahub.graphql.types.BrowsableEntityType;
import com.linkedin.datahub.graphql.types.MutableType;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetMapper;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetUpdateInputMapper;
Expand All @@ -40,7 +41,9 @@
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.r2.RemoteInvocationException;
import graphql.execution.DataFetcherResult;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand All @@ -56,7 +59,7 @@


public class DatasetType implements SearchableEntityType<Dataset, String>, BrowsableEntityType<Dataset, String>,
MutableType<DatasetUpdateInput, Dataset> {
BatchMutableType<DatasetUpdateInput, BatchDatasetUpdateInput, Dataset> {

private static final Set<String> ASPECTS_TO_RESOLVE = ImmutableSet.of(
DATASET_KEY_ASPECT_NAME,
Expand Down Expand Up @@ -99,6 +102,11 @@ public Class<DatasetUpdateInput> inputClass() {
return DatasetUpdateInput.class;
}

@Override
public Class<BatchDatasetUpdateInput[]> batchInputClass() {
return BatchDatasetUpdateInput[].class;
}

@Override
public EntityType type() {
return EntityType.DATASET;
Expand Down Expand Up @@ -184,6 +192,30 @@ public List<BrowsePath> browsePaths(@Nonnull String urn, @Nonnull final QueryCon
return BrowsePathsMapper.map(result);
}

@Override
public List<Dataset> batchUpdate(@Nonnull BatchDatasetUpdateInput[] input, @Nonnull QueryContext context) throws Exception {
final Urn actor = Urn.createFromString(context.getAuthentication().getActor().toUrnStr());

final Collection<MetadataChangeProposal> proposals = Arrays.stream(input).map(updateInput -> {
if (isAuthorized(updateInput.getUrn(), updateInput.getUpdate(), context)) {
Collection<MetadataChangeProposal> datasetProposals = DatasetUpdateInputMapper.map(updateInput.getUpdate(), actor);
datasetProposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(updateInput.getUrn())));
return datasetProposals;
}
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}).flatMap(Collection::stream).collect(Collectors.toList());

final List<String> urns = Arrays.stream(input).map(BatchDatasetUpdateInput::getUrn).collect(Collectors.toList());

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urns), e);
}

return batchLoad(urns, context).stream().map(DataFetcherResult::getData).collect(Collectors.toList());
}

@Override
public Dataset update(@Nonnull String urn, @Nonnull DatasetUpdateInput input, @Nonnull QueryContext context) throws Exception {
if (isAuthorized(urn, input, context)) {
Expand Down
22 changes: 22 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ type Mutation {
"""
updateDataset(urn: String!, input: DatasetUpdateInput!): Dataset

"""
Update the metadata about a batch of Datasets
"""
updateDatasets(input: [BatchDatasetUpdateInput!]!): [Dataset]

"""
Update the metadata about a particular Chart
"""
Expand Down Expand Up @@ -3596,6 +3601,23 @@ input DatasetUpdateInput {
editableProperties: DatasetEditablePropertiesUpdate
}

"""
Arguments provided to batch update Dataset entities
"""
input BatchDatasetUpdateInput {

"""
Primary key of the Dataset to which the update will be applied
"""
urn: String!

"""
Arguments provided to update the Dataset
"""
update: DatasetUpdateInput!
}


"""
Update to editable schema metadata of the dataset
"""
Expand Down
Loading

0 comments on commit cfcc1cf

Please sign in to comment.