Skip to content

Commit

Permalink
fix cache key and bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien committed Mar 23, 2024
1 parent 52dcbf2 commit 5669ff8
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import com.linkedin.datahub.graphql.generated.LineageFlags;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -56,8 +58,7 @@ private static UrnArrayMap mapIgnoreAsHops(List<EntityTypeToPlatforms> ignoreAsH
ignoreAsHop ->
result.put(
EntityTypeMapper.getName(ignoreAsHop.getEntityType()),
new UrnArray(
ignoreAsHop.getPlatforms().stream()
new UrnArray(Optional.ofNullable(ignoreAsHop.getPlatforms()).orElse(Collections.emptyList()).stream()
.map(UrnUtils::getUrn)
.collect(Collectors.toList()))));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,21 +389,12 @@ private Stream<Urn> processOneHopLineage(
ignoreAsHops.get(entityType)))))
.forEach(
lineageRelationship -> additionalCurrentLevel.add(lineageRelationship.getEntity()));
Stream<Urn> ignoreAsHopUrns =
processOneHopLineage(
additionalCurrentLevel,
remainingTime,
direction,
maxHops,
graphFilters,
visitedEntities,
viaEntities,
existingPaths,
exploreMultiplePaths,
result,
lineageFlags,
i);
intermediateStream = Stream.concat(intermediateStream, ignoreAsHopUrns);
if (!additionalCurrentLevel.isEmpty()) {
Stream<Urn> ignoreAsHopUrns =
processOneHopLineage(additionalCurrentLevel, remainingTime, direction, maxHops, graphFilters,
visitedEntities, viaEntities, existingPaths, exploreMultiplePaths, result, lineageFlags, i);
intermediateStream = Stream.concat(intermediateStream, ignoreAsHopUrns);
}
}
// We limit after adding all the relationships at the previous level so each hop is fully
// returned,
Expand All @@ -424,7 +415,7 @@ private boolean platformMatches(Urn urn, UrnArray platforms) {
.anyMatch(
platform ->
DataPlatformInstanceUtils.getDataPlatform(urn)
.getPlatformNameEntity()
.toString()
.equals(platform.toString()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ public class EntityLineageResultCacheKey {
private final Urn sourceUrn;
private final LineageDirection direction;
private final Integer maxHops;
private final Integer entitiesExploredPerHopLimit;

public EntityLineageResultCacheKey(
@Nonnull String contextId, Urn sourceUrn, LineageDirection direction, Integer maxHops) {
@Nonnull String contextId, Urn sourceUrn, LineageDirection direction, Integer maxHops,
Integer entitiesExploredPerHopLimit) {
this.contextId = contextId;
this.sourceUrn = sourceUrn;
this.direction = direction;
this.maxHops = maxHops;
this.entitiesExploredPerHopLimit = entitiesExploredPerHopLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public LineageSearchResult searchAcrossLineage(
// Cache multihop result for faster performance
final EntityLineageResultCacheKey cacheKey =
new EntityLineageResultCacheKey(
finalOpContext.getSearchContextId(), sourceUrn, direction, maxHops);
finalOpContext.getSearchContextId(), sourceUrn, direction, maxHops,
opContext.getSearchContext().getLineageFlags().getEntitiesExploredPerHopLimit());
CachedEntityLineageResult cachedLineageResult = null;

if (cacheEnabled) {
Expand Down Expand Up @@ -745,7 +746,8 @@ public LineageScrollResult scrollAcrossLineage(
// Cache multihop result for faster performance
final EntityLineageResultCacheKey cacheKey =
new EntityLineageResultCacheKey(
opContext.getSearchContextId(), sourceUrn, direction, maxHops);
opContext.getSearchContextId(), sourceUrn, direction, maxHops,
opContext.getSearchContext().getLineageFlags().getEntitiesExploredPerHopLimit());
CachedEntityLineageResult cachedLineageResult =
cacheEnabled ? cache.get(cacheKey, CachedEntityLineageResult.class) : null;
EntityLineageResult lineageResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ public class LineageSearchResultCacheKeyTest extends AbstractTestNGSpringContext
public void testNulls() {
// ensure no NPE
assertEquals(
new EntityLineageResultCacheKey("", null, null, null),
new EntityLineageResultCacheKey("", null, null, null));
new EntityLineageResultCacheKey("", null, null, null, null),
new EntityLineageResultCacheKey("", null, null, null, null));
}

@Test
public void testDateTruncation() {
// expect start of day milli
assertEquals(
new EntityLineageResultCacheKey("", null, null, null),
new EntityLineageResultCacheKey("", null, null, null));
new EntityLineageResultCacheKey("", null, null, null, null),
new EntityLineageResultCacheKey("", null, null, null, null));
assertNotSame(
new EntityLineageResultCacheKey("", null, null, null),
new EntityLineageResultCacheKey("", null, null, null));
new EntityLineageResultCacheKey("", null, null, null, null),
new EntityLineageResultCacheKey("", null, null, null, null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void testLineageCaching() {

EntityLineageResultCacheKey key =
new EntityLineageResultCacheKey(
"", corpuserUrn, LineageDirection.DOWNSTREAM, 0L, 1L, 1, ChronoUnit.DAYS);
"", corpuserUrn, LineageDirection.DOWNSTREAM, 1, null);

cache1.put(key, cachedEntityLineageResult);

Expand Down

0 comments on commit 5669ff8

Please sign in to comment.