Skip to content

Commit

Permalink
refactor(API): Add "Filter" support for Assertion Run Events, Dataset…
Browse files Browse the repository at this point in the history
… Profiles, Dataset Operations (datahub-project#4869)
  • Loading branch information
jjoyce0510 authored and justinas-marozas committed May 17, 2022
1 parent 0ac433f commit b0cbf4a
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.linkedin.datahub.graphql.resolvers.assertion;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.AssertionResultType;
import com.linkedin.datahub.graphql.generated.AssertionRunEvent;
import com.linkedin.datahub.graphql.generated.AssertionRunEventsResult;
import com.linkedin.datahub.graphql.generated.AssertionRunStatus;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.FilterInput;
import com.linkedin.datahub.graphql.types.dataset.mappers.AssertionRunEventMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
Expand All @@ -19,11 +20,14 @@
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;


/**
* GraphQL Resolver used for fetching AssertionRunEvents.
Expand All @@ -47,6 +51,9 @@ public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment e
final Long maybeStartTimeMillis = environment.getArgumentOrDefault("startTimeMillis", null);
final Long maybeEndTimeMillis = environment.getArgumentOrDefault("endTimeMillis", null);
final Integer maybeLimit = environment.getArgumentOrDefault("limit", null);
final FilterInput maybeFilters = environment.getArgument("filter") != null
? bindArgument(environment.getArgument("filter"), FilterInput.class)
: null;

try {
// Step 1: Fetch aspects from GMS
Expand All @@ -58,7 +65,7 @@ public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment e
maybeEndTimeMillis,
maybeLimit,
false,
buildStatusFilter(maybeStatus),
buildFilter(maybeFilters, maybeStatus),
context.getAuthentication());

// Step 2: Bind profiles into GraphQL strong types.
Expand Down Expand Up @@ -87,16 +94,19 @@ public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment e
}

@Nullable
private Filter buildStatusFilter(@Nullable final String status) {
if (status == null) {
private Filter buildFilter(@Nullable FilterInput filtersInput, @Nullable final String status) {
if (filtersInput == null && status == null) {
return null;
}
return new Filter().setOr(new ConjunctiveCriterionArray(ImmutableList.of(
new ConjunctiveCriterion().setAnd(new CriterionArray(ImmutableList.of(
new Criterion()
.setField("status")
.setValue(status)
)))
)));
List<FacetFilterInput> facetFilters = new ArrayList<>();
if (status != null) {
facetFilters.add(new FacetFilterInput("status", status));
}
if (filtersInput != null) {
facetFilters.addAll(filtersInput.getAnd());
}
return new Filter().setOr(new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(new CriterionArray(facetFilters.stream()
.map(filter -> new Criterion().setField(filter.getField()).setValue(filter.getValue()))
.collect(Collectors.toList())))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.FilterInput;
import com.linkedin.datahub.graphql.generated.TimeSeriesAspect;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand All @@ -18,6 +24,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;


/**
Expand All @@ -33,6 +43,7 @@
* be invoked for each {@link EnvelopedAspect} received from the GMS getTimeSeriesAspectValues API.
*
*/
@Slf4j
public class TimeSeriesAspectResolver implements DataFetcher<CompletableFuture<List<TimeSeriesAspect>>> {

private final EntityClient _client;
Expand Down Expand Up @@ -77,12 +88,15 @@ public CompletableFuture<List<TimeSeriesAspect>> get(DataFetchingEnvironment env
final Long maybeEndTimeMillis = environment.getArgumentOrDefault("endTimeMillis", null);
// Max number of aspects to return.
final Integer maybeLimit = environment.getArgumentOrDefault("limit", null);
final FilterInput maybeFilters = environment.getArgument("filter") != null
? bindArgument(environment.getArgument("filter"), FilterInput.class)
: null;

try {
// Step 1: Get aspects.
List<EnvelopedAspect> aspects =
_client.getTimeseriesAspectValues(urn, _entityName, _aspectName, maybeStartTimeMillis, maybeEndTimeMillis,
maybeLimit, null, null, context.getAuthentication());
maybeLimit, null, buildFilters(maybeFilters), context.getAuthentication());

// Step 2: Bind profiles into GraphQL strong types.
return aspects.stream().map(_aspectMapper::apply).collect(Collectors.toList());
Expand All @@ -91,4 +105,13 @@ public CompletableFuture<List<TimeSeriesAspect>> get(DataFetchingEnvironment env
}
});
}

private Filter buildFilters(@Nullable FilterInput maybeFilters) {
if (maybeFilters == null) {
return null;
}
return new Filter().setOr(new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(new CriterionArray(maybeFilters.getAnd().stream()
.map(filter -> new Criterion().setField(filter.getField()).setValue(filter.getValue()))
.collect(Collectors.toList())))));
}
}
6 changes: 3 additions & 3 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -791,12 +791,12 @@ type Dataset implements EntityWithRelationships & Entity {
Profile Stats resource that retrieves the events in a previous unit of time in descending order
If no start or end time are provided, the most recent events will be returned
"""
datasetProfiles(startTimeMillis: Long, endTimeMillis: Long, limit: Int): [DatasetProfile!]
datasetProfiles(startTimeMillis: Long, endTimeMillis: Long, filter: FilterInput, limit: Int): [DatasetProfile!]

"""
Operational events for an entity.
"""
operations(startTimeMillis: Long, endTimeMillis: Long, limit: Int): [Operation!]
operations(startTimeMillis: Long, endTimeMillis: Long, filter: FilterInput, limit: Int): [Operation!]

"""
Assertions associated with the Dataset
Expand Down Expand Up @@ -5168,7 +5168,7 @@ type Assertion implements EntityWithRelationships & Entity {
Lifecycle events detailing individual runs of this assertion. If startTimeMillis & endTimeMillis are not provided, the most
recent events will be returned.
"""
runEvents(status: AssertionRunStatus, startTimeMillis: Long, endTimeMillis: Long, limit: Int): AssertionRunEventsResult
runEvents(status: AssertionRunStatus, startTimeMillis: Long, endTimeMillis: Long, filter: FilterInput, limit: Int): AssertionRunEventsResult

"""
Edges extending from this entity
Expand Down
10 changes: 10 additions & 0 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,13 @@ type BrowsePath {
"""
path: [String!]!
}

"""
A set of filter criteria
"""
input FilterInput {
"""
A list of conjunctive filters
"""
and: [FacetFilterInput!]!
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ record SchemaField {
*/
isPartOfKey: boolean = false

/**
* For Datasets which are partitioned, this determines the partitioning key.
*/
isPartitioningKey: optional boolean

/**
* For schema fields that have other properties that are not modeled explicitly,
* use this field to serialize those properties into a JSON string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ record PartitionSpec {
/**
* String representation of the partition
*/
@TimeseriesField = {}
partition: string

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,8 @@
"addToFilters" : true,
"fieldName" : "glossaryTerms",
"fieldType" : "URN",
"filterNameOverride" : "Glossary Term"
"filterNameOverride" : "Glossary Term",
"hasValuesFieldName" : "hasGlossaryTerms"
}
} ]
}, "com.linkedin.common.GlossaryTermUrn", {
Expand Down Expand Up @@ -2515,6 +2516,11 @@
"type" : "boolean",
"doc" : "For schema fields that are part of complex keys, set this field to true\nWe do this to easily distinguish between value and key fields",
"default" : false
}, {
"name" : "isPartitioningKey",
"type" : "boolean",
"doc" : "For Datasets which are partitioned, this determines the partitioning key.",
"optional" : true
}, {
"name" : "jsonProps",
"type" : "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@
"addToFilters" : true,
"fieldName" : "glossaryTerms",
"fieldType" : "URN",
"filterNameOverride" : "Glossary Term"
"filterNameOverride" : "Glossary Term",
"hasValuesFieldName" : "hasGlossaryTerms"
}
} ]
}, "com.linkedin.common.GlossaryTermUrn", {
Expand Down Expand Up @@ -2908,6 +2909,11 @@
"type" : "boolean",
"doc" : "For schema fields that are part of complex keys, set this field to true\nWe do this to easily distinguish between value and key fields",
"default" : false
}, {
"name" : "isPartitioningKey",
"type" : "boolean",
"doc" : "For Datasets which are partitioned, this determines the partitioning key.",
"optional" : true
}, {
"name" : "jsonProps",
"type" : "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,8 @@
"addToFilters" : true,
"fieldName" : "glossaryTerms",
"fieldType" : "URN",
"filterNameOverride" : "Glossary Term"
"filterNameOverride" : "Glossary Term",
"hasValuesFieldName" : "hasGlossaryTerms"
}
} ]
}, "com.linkedin.common.GlossaryTermUrn", {
Expand Down Expand Up @@ -2262,6 +2263,11 @@
"type" : "boolean",
"doc" : "For schema fields that are part of complex keys, set this field to true\nWe do this to easily distinguish between value and key fields",
"default" : false
}, {
"name" : "isPartitioningKey",
"type" : "boolean",
"doc" : "For Datasets which are partitioned, this determines the partitioning key.",
"optional" : true
}, {
"name" : "jsonProps",
"type" : "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@
"addToFilters" : true,
"fieldName" : "glossaryTerms",
"fieldType" : "URN",
"filterNameOverride" : "Glossary Term"
"filterNameOverride" : "Glossary Term",
"hasValuesFieldName" : "hasGlossaryTerms"
}
} ]
}, "com.linkedin.common.GlossaryTermUrn", {
Expand Down Expand Up @@ -2908,6 +2909,11 @@
"type" : "boolean",
"doc" : "For schema fields that are part of complex keys, set this field to true\nWe do this to easily distinguish between value and key fields",
"default" : false
}, {
"name" : "isPartitioningKey",
"type" : "boolean",
"doc" : "For Datasets which are partitioned, this determines the partitioning key.",
"optional" : true
}, {
"name" : "jsonProps",
"type" : "string",
Expand Down

0 comments on commit b0cbf4a

Please sign in to comment.