Skip to content

Commit

Permalink
feat(platform): add support for via nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Jan 29, 2024
1 parent f3cc4e0 commit e7be03b
Show file tree
Hide file tree
Showing 60 changed files with 2,374 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,8 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
"scrollAcrossEntities",
new ScrollAcrossEntitiesResolver(this.entityClient, this.viewService))
.dataFetcher(
"searchAcrossLineage", new SearchAcrossLineageResolver(this.entityClient))
"searchAcrossLineage",
new SearchAcrossLineageResolver(this.entityClient, this.entityRegistry))
.dataFetcher(
"scrollAcrossLineage", new ScrollAcrossLineageResolver(this.entityClient))
.dataFetcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public static CompletableFuture<List<Entity>> batchLoadEntitiesOfSameType(
.filter(entity -> entities.get(0).getClass().isAssignableFrom(entity.objectClass()))
.collect(Collectors.toList()));

final DataLoader loader = dataLoaderRegistry.getDataLoader(filteredEntity.name());
List keyList = new ArrayList();
final DataLoader<Object, Entity> loader =
dataLoaderRegistry.getDataLoader(filteredEntity.name());
List<Object> keyList = new ArrayList();
for (Entity entity : entities) {
keyList.add(filteredEntity.getKeyProvider().apply(entity));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.*;
import static com.linkedin.metadata.Constants.QUERY_ENTITY_NAME;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
Expand All @@ -14,31 +16,63 @@
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.UrnSearchAcrossLineageResultsMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.LineageSearchResult;
import com.linkedin.r2.RemoteInvocationException;
import graphql.VisibleForTesting;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/** Resolver responsible for resolving 'searchAcrossEntities' field of the Query type */
@Slf4j
@RequiredArgsConstructor
public class SearchAcrossLineageResolver
implements DataFetcher<CompletableFuture<SearchAcrossLineageResults>> {

private static final int DEFAULT_START = 0;
private static final int DEFAULT_COUNT = 10;

private static final Set<String> TRANSIENT_ENTITIES = ImmutableSet.of(QUERY_ENTITY_NAME);

private final EntityClient _entityClient;

private final EntityRegistry _entityRegistry;

@VisibleForTesting final Set<String> _allEntities;
private final List<String> _allowedEntities;

public SearchAcrossLineageResolver(EntityClient entityClient, EntityRegistry entityRegistry) {
this._entityClient = entityClient;
this._entityRegistry = entityRegistry;
this._allEntities =
entityRegistry.getEntitySpecs().values().stream()
.map(EntitySpec::getName)
.collect(Collectors.toSet());

this._allowedEntities =
this._allEntities.stream()
.filter(e -> !TRANSIENT_ENTITIES.contains(e))
.collect(Collectors.toList());
}

private List<String> getEntityNamesFromInput(List<EntityType> inputTypes) {
if (inputTypes != null && !inputTypes.isEmpty()) {
return inputTypes.stream().map(EntityTypeMapper::getName).collect(Collectors.toList());
} else {
return this._allowedEntities;
}
}

@Override
public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment environment)
throws URISyntaxException {
Expand All @@ -50,12 +84,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment

final LineageDirection lineageDirection = input.getDirection();

List<EntityType> entityTypes =
(input.getTypes() == null || input.getTypes().isEmpty())
? SEARCHABLE_ENTITY_TYPES
: input.getTypes();
List<String> entityNames =
entityTypes.stream().map(EntityTypeMapper::getName).collect(Collectors.toList());
List<String> entityNames = getEntityNamesFromInput(input.getTypes());

// escape forward slash since it is a reserved character in Elasticsearch
final String sanitizedQuery =
Expand Down Expand Up @@ -99,8 +128,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
} else {
searchFlags = new SearchFlags().setFulltext(true).setSkipHighlighting(true);
}

return UrnSearchAcrossLineageResultsMapper.map(
LineageSearchResult salResults =
_entityClient.searchAcrossLineage(
urn,
resolvedDirection,
Expand All @@ -114,7 +142,9 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
startTimeMillis,
endTimeMillis,
searchFlags,
ResolverUtils.getAuthentication(environment)));
getAuthentication(environment));

return UrnSearchAcrossLineageResultsMapper.map(salResults);
} catch (RemoteInvocationException e) {
log.error(
"Failed to execute search across relationships: source urn {}, direction {}, entity types {}, query {}, filters: {}, start: {}, count: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.datahub.graphql.generated.GroupingCriterion;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import javax.annotation.Nonnull;

public class GroupingCriterionInputMapper
implements ModelMapper<GroupingCriterion, com.linkedin.metadata.query.GroupingCriterion> {

public static final GroupingCriterionInputMapper INSTANCE = new GroupingCriterionInputMapper();

public static com.linkedin.metadata.query.GroupingCriterion map(
@Nonnull final GroupingCriterion groupingCriterion) {
return INSTANCE.apply(groupingCriterion);
}

@Override
public com.linkedin.metadata.query.GroupingCriterion apply(GroupingCriterion input) {
return new com.linkedin.metadata.query.GroupingCriterion()
.setRawEntityType(input.getRawEntityType())
.setGroupingEntityType(input.getGroupingEntityType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.linkedin.datahub.graphql.generated.SearchFlags;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.metadata.query.GroupingCriterionArray;
import com.linkedin.metadata.query.GroupingSpec;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/**
Expand Down Expand Up @@ -42,6 +45,15 @@ public com.linkedin.metadata.query.SearchFlags apply(@Nonnull final SearchFlags
if (searchFlags.getGetSuggestions() != null) {
result.setGetSuggestions(searchFlags.getGetSuggestions());
}
if (searchFlags.getGroupingSpec() != null) {
result.setGroupingSpec(
new GroupingSpec()
.setGroupingCriteria(
new GroupingCriterionArray(
searchFlags.getGroupingSpec().getGroupingCriteria().stream()
.map(GroupingCriterionInputMapper::map)
.collect(Collectors.toList()))));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.linkedin.datahub.graphql.generated.MLPrimaryKey;
import com.linkedin.datahub.graphql.generated.Notebook;
import com.linkedin.datahub.graphql.generated.OwnershipTypeEntity;
import com.linkedin.datahub.graphql.generated.QueryEntity;
import com.linkedin.datahub.graphql.generated.Role;
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
import com.linkedin.datahub.graphql.generated.StructuredPropertyEntity;
Expand Down Expand Up @@ -198,6 +199,11 @@ public Entity apply(Urn input) {
((StructuredPropertyEntity) partialEntity).setUrn(input.toString());
((StructuredPropertyEntity) partialEntity).setType(EntityType.STRUCTURED_PROPERTY);
}
if (input.getEntityType().equals(QUERY_ENTITY_NAME)) {
partialEntity = new QueryEntity();
((QueryEntity) partialEntity).setUrn(input.toString());
((QueryEntity) partialEntity).setType(EntityType.QUERY);
}
return partialEntity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private SearchAcrossLineageResult mapResult(LineageSearchEntity searchEntity) {
.setMatchedFields(getMatchedFieldEntry(searchEntity.getMatchedFields()))
.setPaths(searchEntity.getPaths().stream().map(this::mapPath).collect(Collectors.toList()))
.setDegree(searchEntity.getDegree())
.setDegrees(searchEntity.getDegrees().stream().collect(Collectors.toList()))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class QueryType
implements com.linkedin.datahub.graphql.types.EntityType<QueryEntity, String> {
Expand Down Expand Up @@ -50,6 +52,7 @@ public List<DataFetcherResult<QueryEntity>> batchLoad(
final List<Urn> viewUrns = urns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());

try {
log.debug("Fetching query entities: {}", viewUrns);
final Map<Urn, EntityResponse> entities =
_entityClient.batchGetV2(
QUERY_ENTITY_NAME,
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -10948,6 +10948,11 @@ enum QuerySource {
The query was provided manually, e.g. from the UI.
"""
MANUAL

"""
The query was extracted by the system, e.g. from a dashboard.
"""
SYSTEM
}

"""
Expand Down
38 changes: 38 additions & 0 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ input SearchFlags {
Whether to request for search suggestions on the _entityName virtualized field
"""
getSuggestions: Boolean

"""
Additional grouping specifications to apply to the search results
"""
groupingSpec: GroupingSpec
}

"""
Expand Down Expand Up @@ -278,6 +283,7 @@ input ScrollAcrossEntitiesInput {
searchFlags: SearchFlags
}


"""
Input arguments for a search query over the results of a multi-hop graph query
"""
Expand Down Expand Up @@ -669,6 +675,12 @@ type SearchAcrossLineageResult {
Degree of relationship (number of hops to get to entity)
"""
degree: Int!

"""
Degrees of relationship (for entities discoverable at multiple degrees)
"""
degrees: [Int!]

}

"""
Expand Down Expand Up @@ -1303,4 +1315,30 @@ input SortCriterion {
The order in which we will be sorting
"""
sortOrder: SortOrder!
}

"""
A grouping specification for search results
"""
input GroupingSpec {

groupingCriteria: [GroupingCriterion!]!

}

"""
A single grouping criterion for grouping search results
"""
input GroupingCriterion {

"""
The raw entity type that needs to be grouped
"""
rawEntityType: String!

"""
The grouping entity type
"""
groupingEntityType: String!

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResult;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResults;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.search.AggregationMetadataArray;
import com.linkedin.metadata.search.LineageSearchEntity;
Expand All @@ -22,6 +24,7 @@
import com.linkedin.metadata.search.MatchedFieldArray;
import com.linkedin.metadata.search.SearchResultMetadata;
import graphql.schema.DataFetchingEnvironment;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import org.testng.annotations.BeforeMethod;
Expand All @@ -43,13 +46,28 @@ public class SearchAcrossLineageResolverTest {
private Authentication _authentication;
private SearchAcrossLineageResolver _resolver;

private EntityRegistry _entityRegistry;

@BeforeMethod
public void setupTest() {
_entityClient = mock(EntityClient.class);
_dataFetchingEnvironment = mock(DataFetchingEnvironment.class);
_authentication = mock(Authentication.class);

_resolver = new SearchAcrossLineageResolver(_entityClient);
_entityRegistry = mock(EntityRegistry.class);
_resolver = new SearchAcrossLineageResolver(_entityClient, _entityRegistry);
}

@Test
public void testAllEntitiesInitialization() {
InputStream inputStream = ClassLoader.getSystemResourceAsStream("entity-registry.yml");
EntityRegistry entityRegistry = new ConfigEntityRegistry(inputStream);
SearchAcrossLineageResolver resolver =
new SearchAcrossLineageResolver(_entityClient, entityRegistry);
assertTrue(resolver._allEntities.contains("dataset"));
assertTrue(resolver._allEntities.contains("dataFlow"));
// Test for case sensitivity
assertFalse(resolver._allEntities.contains("dataflow"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void testDefaultSearchFlags() throws Exception {
.setSkipAggregates(false)
.setSkipHighlighting(true) // empty/wildcard
.setMaxAggValues(20)
.setSkipCache(false));
.setSkipCache(false)
.setConvertSchemaFieldsToDatasets(true));
}

@Test
Expand Down Expand Up @@ -82,7 +83,8 @@ public void testOverrideSearchFlags() throws Exception {
.setSkipAggregates(true)
.setSkipHighlighting(true)
.setMaxAggValues(10)
.setSkipCache(true));
.setSkipCache(true)
.setConvertSchemaFieldsToDatasets(true));
}

@Test
Expand Down Expand Up @@ -112,7 +114,8 @@ public void testNonWildCardSearchFlags() throws Exception {
.setSkipAggregates(false)
.setSkipHighlighting(false) // empty/wildcard
.setMaxAggValues(20)
.setSkipCache(false));
.setSkipCache(false)
.setConvertSchemaFieldsToDatasets(true));
}

private EntityClient initMockSearchEntityClient() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.EntityService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ReindexDataJobViaNodesCLLConfig {

@Bean
public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(EntityService<?> entityService) {
return new ReindexDataJobViaNodesCLL(entityService);
}
}
Loading

0 comments on commit e7be03b

Please sign in to comment.