Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aspects): support fetching of versioned aspects #2677

Merged
merged 9 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql;

import com.linkedin.dataplatform.client.DataPlatforms;
import com.linkedin.entity.client.AspectClient;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.lineage.client.Lineages;
import com.linkedin.lineage.client.Relationships;
Expand Down Expand Up @@ -34,6 +35,7 @@ public class GmsClientFactory {
private static Lineages _lineages;
private static Relationships _relationships;
private static EntityClient _entities;
private static AspectClient _aspects;


private GmsClientFactory() { }
Expand Down Expand Up @@ -81,4 +83,15 @@ public static EntityClient getEntitiesClient() {
}
return _entities;
}

public static AspectClient getAspectsClient() {
if (_aspects == null) {
synchronized (GmsClientFactory.class) {
if (_aspects == null) {
_aspects = new AspectClient(REST_CLIENT);
}
}
}
return _aspects;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.graphql.generated.Aspect;
import com.linkedin.datahub.graphql.generated.Chart;
import com.linkedin.datahub.graphql.generated.ChartInfo;
import com.linkedin.datahub.graphql.generated.DashboardInfo;
Expand All @@ -12,15 +13,18 @@
import com.linkedin.datahub.graphql.generated.RelatedDataset;
import com.linkedin.datahub.graphql.generated.SearchResult;
import com.linkedin.datahub.graphql.generated.InstitutionalMemoryMetadata;
import com.linkedin.datahub.graphql.resolvers.load.AspectResolver;
import com.linkedin.datahub.graphql.resolvers.load.EntityTypeResolver;
import com.linkedin.datahub.graphql.resolvers.load.LoadableTypeBatchResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.MutableTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.AspectInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.HyperParameterValueTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.ResultsTypeResolver;
import com.linkedin.datahub.graphql.types.BrowsableEntityType;
import com.linkedin.datahub.graphql.types.EntityType;
import com.linkedin.datahub.graphql.types.LoadableType;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
import com.linkedin.datahub.graphql.types.aspect.AspectType;
import com.linkedin.datahub.graphql.types.chart.ChartType;
import com.linkedin.datahub.graphql.types.corpuser.CorpUserType;
import com.linkedin.datahub.graphql.types.corpgroup.CorpGroupType;
Expand Down Expand Up @@ -50,6 +54,7 @@
import com.linkedin.datahub.graphql.types.lineage.DataFlowDataJobsRelationshipsType;
import com.linkedin.datahub.graphql.types.glossary.GlossaryTermType;

import graphql.execution.DataFetcherResult;
import graphql.schema.idl.RuntimeWiring;
import org.apache.commons.io.IOUtils;
import org.dataloader.BatchLoaderContextProvider;
Expand Down Expand Up @@ -97,6 +102,7 @@ public class GmsGraphQLEngine {
GmsClientFactory.getRelationshipsClient()
);
public static final GlossaryTermType GLOSSARY_TERM_TYPE = new GlossaryTermType(GmsClientFactory.getEntitiesClient());
public static final AspectType ASPECT_TYPE = new AspectType(GmsClientFactory.getAspectsClient());

/**
* Configures the graph objects that can be fetched primary key.
Expand Down Expand Up @@ -196,6 +202,7 @@ public static GraphQLEngine.Builder builder() {
return GraphQLEngine.builder()
.addSchema(schema())
.addDataLoaders(loaderSuppliers(LOADABLE_TYPES))
.addDataLoader("Aspect", (context) -> createAspectLoader(context))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. We'll have to keep an eye on this. Adding an aspect specific fetcher kind of breaks the existing abstractions huh?

.configureRuntimeWiring(GmsGraphQLEngine::configureRuntimeWiring);
}

Expand Down Expand Up @@ -293,6 +300,9 @@ private static void configureDatasetResolvers(final RuntimeWiring.Builder builde
UPSTREAM_LINEAGE_TYPE,
(env) -> ((Entity) env.getSource()).getUrn()))
)
.dataFetcher("schemaMetadata", new AuthenticatedResolver<>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess for every aspect we want to support versioned aspect fetching on we'll need this.... Okay

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, yes. However, when we autogenerate the graph we can also autogenerate these.

new AspectResolver())
)
)
.type("Owner", typeWiring -> typeWiring
.dataFetcher("owner", new AuthenticatedResolver<>(
Expand Down Expand Up @@ -459,6 +469,7 @@ private static void configureTypeResolvers(final RuntimeWiring.Builder builder)
.type("HyperParameterValueType", typeWiring -> typeWiring
.typeResolver(new HyperParameterValueTypeResolver())
)
.type("Aspect", typeWiring -> typeWiring.typeResolver(new AspectInterfaceTypeResolver()))
.type("ResultsType", typeWiring -> typeWiring
.typeResolver(new ResultsTypeResolver()));
}
Expand Down Expand Up @@ -519,7 +530,7 @@ private static void configureDataJobResolvers(final RuntimeWiring.Builder builde
}


private static <T> DataLoader<String, T> createDataLoader(final LoadableType<T> graphType, final QueryContext queryContext) {
private static <T> DataLoader<String, DataFetcherResult<T>> createDataLoader(final LoadableType<T> graphType, final QueryContext queryContext) {
BatchLoaderContextProvider contextProvider = () -> queryContext;
DataLoaderOptions loaderOptions = DataLoaderOptions.newOptions().setBatchLoaderContextProvider(contextProvider);
return DataLoader.newDataLoader((keys, context) -> CompletableFuture.supplyAsync(() -> {
Expand All @@ -531,6 +542,18 @@ private static <T> DataLoader<String, T> createDataLoader(final LoadableType<T>
}), loaderOptions);
}

private static DataLoader<VersionedAspectKey, DataFetcherResult<Aspect>> createAspectLoader(final QueryContext queryContext) {
BatchLoaderContextProvider contextProvider = () -> queryContext;
DataLoaderOptions loaderOptions = DataLoaderOptions.newOptions().setBatchLoaderContextProvider(contextProvider);
return DataLoader.newDataLoader((keys, context) -> CompletableFuture.supplyAsync(() -> {
try {
return ASPECT_TYPE.batchLoad(keys, context.getContext());
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to retrieve entities of type Aspect", e));
}
}), loaderOptions);
}

private GmsGraphQLEngine() { }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.datahub.graphql;

import lombok.Data;

@Data
public class VersionedAspectKey {
private String aspectName;
private String urn;
private Long version;

public VersionedAspectKey(String urn, String aspectName, Long version) {
this.urn = urn;
this.version = version;
this.aspectName = aspectName;
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package com.linkedin.datahub.graphql.resolvers;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.data.DataMap;
import com.linkedin.data.element.DataElement;
import com.linkedin.datahub.graphql.exception.ValidationException;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;

import com.linkedin.metadata.aspect.VersionedAspect;
import graphql.schema.DataFetchingEnvironment;
import java.lang.reflect.InvocationTargetException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ResolverUtils {

private static final ObjectMapper MAPPER = new ObjectMapper();

private static final Logger _logger = LoggerFactory.getLogger(ResolverUtils.class.getName());

private ResolverUtils() { }

@Nonnull
Expand Down Expand Up @@ -55,4 +65,67 @@ public static Map<String, String> buildFacetFilters(@Nullable List<FacetFilterIn

return facetFilters;
}

private static Object constructAspectFromDataElement(DataElement aspectDataElement)
throws ClassNotFoundException, IllegalAccessException, InvocationTargetException, InstantiationException {
String restliAspectClassName = aspectDataElement.getSchema().getUnionMemberKey();
// construct the restli aspect class from the aspect's DataMap stored in local context
Object constructedAspect = Class.forName(restliAspectClassName)
.cast((
ConstructorUtils.getMatchingAccessibleConstructor(
Class.forName(restliAspectClassName),
new Class[]{DataMap.class}
).newInstance(aspectDataElement.getValue())
));

return constructedAspect;
}

private static com.linkedin.metadata.aspect.Aspect constructAspectUnionInstanceFromAspect(Object constructedAspect)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
return (com.linkedin.metadata.aspect.Aspect)
com.linkedin.metadata.aspect.Aspect.class.getMethod("create", constructedAspect.getClass())
.invoke(com.linkedin.metadata.aspect.Aspect.class, constructedAspect);
}

@Nonnull
public static VersionedAspect getAspectFromLocalContext(DataFetchingEnvironment environment) {
String fieldName = environment.getField().getName();
Long version = environment.getArgument("version");

Object localContext = environment.getLocalContext();
// if we have context & the version is 0, we should try to retrieve it from the fetched entity
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting ... Do we need this is we are only using for schemaMetadata case, where we are sure we need a specific version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this question- this resolver is for all versioned aspect fetchers. If someone fetches version 0, we don't actually need to use the aspect resource since version 0 has already been fetched earlier by the Entity type's batchGet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. Does it make sense to have a versioned aspect fetcher everywhere? I guess that's my main question

// otherwise, we should just fetch the entity from the aspect resource
if (localContext == null && version == 0 || version == null) {
if (localContext instanceof Map) {
// de-register the prefetched aspect from local context. Since aspects will only
// ever be first-class properties of an entity type, local context will always
// contain a map of { aspectName: DataMap }
DataElement prefetchedAspect = ((Map<String, DataElement>) localContext).getOrDefault(fieldName, null);

if (prefetchedAspect != null) {
try {
Object constructedAspect = constructAspectFromDataElement(prefetchedAspect);

VersionedAspect resultWithMetadata = new VersionedAspect();

resultWithMetadata.setAspect(constructAspectUnionInstanceFromAspect(constructedAspect));

resultWithMetadata.setVersion(0);

return resultWithMetadata;
} catch (IllegalAccessException | InstantiationException | InvocationTargetException | ClassNotFoundException | NoSuchMethodException e) {
_logger.error(
"Error fetch aspect from local context. field: {} version: {}. Error: {}",
fieldName,
version,
e.toString()
);
e.printStackTrace();
}
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.linkedin.datahub.graphql.resolvers.load;

import com.linkedin.datahub.graphql.VersionedAspectKey;
import com.linkedin.datahub.graphql.generated.Aspect;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.aspect.AspectMapper;
import com.linkedin.metadata.aspect.VersionedAspect;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;
import org.dataloader.DataLoader;


/**
* Generic GraphQL resolver responsible for
*
* 1. Generating a single input AspectLoadKey.
* 2. Resolving a single {@link Aspect}.
*
*/
public class AspectResolver implements DataFetcher<CompletableFuture<Aspect>> {

public AspectResolver() {
}

@Override
public CompletableFuture<Aspect> get(DataFetchingEnvironment environment) {
final DataLoader<VersionedAspectKey, Aspect> loader = environment.getDataLoaderRegistry().getDataLoader("Aspect");

String fieldName = environment.getField().getName();
Long version = environment.getArgument("version");
String urn = ((Entity) environment.getSource()).getUrn();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this always be invoked in the context of a broader entity fetch? I guess it must?

We could also have a dedicated "getVersionedAspect" method that requires version, urn, etc to do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getVersionedAspect is something we could consider- although the type safety of that will be a bit off- the return type will need to be Aspect and we will always need to type check it on the client.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - similar to our entity resource


// first, we try fetching the aspect from the local cache
// we need to convert it into a VersionedAspect so we can make use of existing mappers
VersionedAspect aspectFromContext = ResolverUtils.getAspectFromLocalContext(environment);
if (aspectFromContext != null) {
return CompletableFuture.completedFuture(AspectMapper.map(aspectFromContext));
}

// if the aspect is not in the cache, we need to fetch it from GMS Aspect Resource
return loader.load(new VersionedAspectKey(urn, fieldName, version));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.linkedin.datahub.graphql.resolvers.type;

import graphql.TypeResolutionEnvironment;
import graphql.schema.GraphQLObjectType;
import graphql.schema.TypeResolver;

/**
* Responsible for resolving the {@link com.linkedin.datahub.graphql.generated.Aspect} interface type.
*/
public class AspectInterfaceTypeResolver implements TypeResolver {

public AspectInterfaceTypeResolver() { }
@Override
public GraphQLObjectType getType(TypeResolutionEnvironment env) {
// TODO(Gabe): Fill this out. This method is not called today. We will need to fill this
// out in the case we ever want to return fields of type Aspect in graphql. Right now
// we just use Aspect to define the shared `version` field.
return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fit this out with something in this PR to set the stage for future extension?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure- there are a few ways this could be implemented. Since this method is never called at the moment, I'd rather wait to implement it when we are more clear on requirements.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.graphql.QueryContext;

import graphql.execution.DataFetcherResult;
import javax.annotation.Nonnull;
import java.util.List;

Expand Down Expand Up @@ -31,7 +32,7 @@ default String name() {
* @param urn to retrieve
* @param context the {@link QueryContext} corresponding to the request.
*/
default T load(@Nonnull final String urn, @Nonnull final QueryContext context) throws Exception {
default DataFetcherResult<T> load(@Nonnull final String urn, @Nonnull final QueryContext context) throws Exception {
return batchLoad(ImmutableList.of(urn), context).get(0);
};

Expand All @@ -42,6 +43,6 @@ default T load(@Nonnull final String urn, @Nonnull final QueryContext context) t
* @param urns to retrieve
* @param context the {@link QueryContext} corresponding to the request.
*/
List<T> batchLoad(@Nonnull final List<String> urns, @Nonnull final QueryContext context) throws Exception;
List<DataFetcherResult<T>> batchLoad(@Nonnull final List<String> urns, @Nonnull final QueryContext context) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.datahub.graphql.types.aspect;

import com.linkedin.datahub.graphql.generated.Aspect;
import com.linkedin.datahub.graphql.types.dataset.mappers.SchemaMetadataMapper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.metadata.aspect.VersionedAspect;
import javax.annotation.Nonnull;


public class AspectMapper implements ModelMapper<VersionedAspect, Aspect> {

public static final AspectMapper INSTANCE = new AspectMapper();

public static Aspect map(@Nonnull final VersionedAspect restliAspect) {
return INSTANCE.apply(restliAspect);
}

@Override
public Aspect apply(@Nonnull final VersionedAspect restliAspect) {
if (restliAspect.getAspect().isSchemaMetadata()) {
return SchemaMetadataMapper.map(restliAspect);
}
return null;
}
}
Loading