Skip to content

Commit

Permalink
fix(graph-edge): fix graph edge delete exception
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Dec 4, 2024
1 parent eef9759 commit 6e74538
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */
public class FieldExtractor {
Expand All @@ -30,16 +30,35 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {

// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nonnull RecordTemplate record, List<T> fieldSpecs) {
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH);
@Nullable RecordTemplate record, List<T> fieldSpecs) {
return extractFields(record, fieldSpecs, false);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nonnull RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
@Nullable RecordTemplate record, List<T> fieldSpecs, boolean requiredFieldExtract) {
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nullable RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
return extractFields(record, fieldSpecs, maxValueLength, false);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nullable RecordTemplate record,
List<T> fieldSpecs,
int maxValueLength,
boolean requiredFieldExtract) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
if (!value.isPresent()) {
if (requiredFieldExtract && record == null) {
throw new IllegalArgumentException(
"Field extraction is required and the RecordTemplate is null");
}
Optional<Object> value =
Optional.ofNullable(record)
.flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath()));
if (value.isEmpty()) {
extractedFields.put(fieldSpec, Collections.emptyList());
} else {
long numArrayWildcards = getNumArrayWildcards(fieldSpec.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ private void handleDeleteChangeEvent(
urn.getEntityType(), event.getAspectName()));
}

RecordTemplate aspect = event.getRecordTemplate();
final RecordTemplate aspect =
event.getPreviousRecordTemplate() != null
? event.getPreviousRecordTemplate()
: event.getRecordTemplate();
Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName());

if (!aspectSpec.isTimeseries()) {
Expand Down Expand Up @@ -307,7 +310,7 @@ private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypes
}

Map<RelationshipFieldSpec, List<Object>> extractedFields =
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true);

for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
Set<String> relationshipTypes =
Expand Down Expand Up @@ -433,29 +436,36 @@ private void deleteGraphData(
@Nonnull final OperationContext opContext,
@Nonnull final Urn urn,
@Nonnull final AspectSpec aspectSpec,
@Nonnull final RecordTemplate aspect,
@Nullable final RecordTemplate aspect,
@Nonnull final Boolean isKeyAspect,
@Nonnull final MetadataChangeLog event) {
if (isKeyAspect) {
graphService.removeNode(opContext, urn);
return;
}

Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);

final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
edgeAndRelationTypes.getSecond();
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
entry.getKey(),
new ArrayList<>(entry.getValue()),
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING));
if (aspect != null) {
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);

final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
edgeAndRelationTypes.getSecond();
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
entry.getKey(),
new ArrayList<>(entry.getValue()),
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING));
}
}
} else {
log.warn(
"Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}",
aspectSpec.getName(),
urn);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package com.linkedin.metadata.service;

import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter;
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.testng.Assert.assertEquals;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.container.Container;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
Expand All @@ -21,6 +28,14 @@
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.utils.GenericRecordUtils;
Expand All @@ -29,6 +44,8 @@
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.util.List;
import javax.annotation.Nonnull;
import org.mockito.ArgumentCaptor;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.script.Script;
Expand Down Expand Up @@ -180,4 +197,109 @@ public void testStatusNoOpEvent() {

verifyNoInteractions(mockWriteDAO);
}

@Test
public void testMissingAspectGraphDelete() {
// Test deleting a null aspect
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(TEST_URN.getEntityType())
.setEntityUrn(TEST_URN)
.setAspectName(Constants.CONTAINER_ASPECT_NAME));

// For missing aspects, verify no writes
verifyNoInteractions(mockWriteDAO);
}

@Test
public void testNodeGraphDelete() {
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");

// Test deleting container entity
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(CONTAINER_ENTITY_NAME)
.setEntityUrn(containerUrn)
.setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME));

// Delete all outgoing edges of this entity
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(containerUrn)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING)));

// Delete all incoming edges of this entity
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(containerUrn)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)));

// Delete all edges where this entity is a lifecycle owner
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)),
eq(containerUrn.toString()));
}

@Test
public void testContainerDelete() {
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");

// Test deleting a container aspect
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(TEST_URN.getEntityType())
.setEntityUrn(TEST_URN)
.setAspectName(Constants.CONTAINER_ASPECT_NAME)
.setPreviousAspectValue(
GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn))));

// For container aspects, verify that only edges are removed in both cases
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(TEST_URN)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of("IsPartOf")),
eq(
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}

private static Filter createUrnFilter(@Nonnull final Urn urn) {
Filter filter = new Filter();
CriterionArray criterionArray = new CriterionArray();
Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString());
criterionArray.add(criterion);
filter.setOr(
new ConjunctiveCriterionArray(
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));

return filter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.linkedin.metadata.service;

import static com.linkedin.metadata.Constants.CONTAINER_ASPECT_NAME;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;

import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class UpdateIndicesServiceTest {

@Mock private UpdateGraphIndicesService updateGraphIndicesService;
@Mock private EntitySearchService entitySearchService;
@Mock private TimeseriesAspectService timeseriesAspectService;
@Mock private SystemMetadataService systemMetadataService;
@Mock private SearchDocumentTransformer searchDocumentTransformer;
@Mock private EntityIndexBuilders entityIndexBuilders;

private OperationContext operationContext;
private UpdateIndicesService updateIndicesService;

@BeforeMethod
public void setup() {
MockitoAnnotations.openMocks(this);
operationContext = TestOperationContexts.systemContextNoSearchAuthorization();
updateIndicesService =
new UpdateIndicesService(
updateGraphIndicesService,
entitySearchService,
timeseriesAspectService,
systemMetadataService,
searchDocumentTransformer,
entityIndexBuilders,
"MD5");
}

@Test
public void testContainerHandleDeleteEvent() throws Exception {
Urn urn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)");
EntitySpec entitySpec = operationContext.getEntityRegistry().getEntitySpec(DATASET_ENTITY_NAME);
AspectSpec aspectSpec = entitySpec.getAspectSpec(CONTAINER_ASPECT_NAME);

// Create test data
MetadataChangeLog event = new MetadataChangeLog();
event.setChangeType(ChangeType.DELETE);
event.setEntityUrn(urn);
event.setAspectName(CONTAINER_ASPECT_NAME);
event.setEntityType(urn.getEntityType());
event.setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata());

// Execute Delete
updateIndicesService.handleChangeEvent(operationContext, event);

// Verify
verify(systemMetadataService).deleteAspect(urn.toString(), CONTAINER_ASPECT_NAME);
verify(searchDocumentTransformer)
.transformAspect(
eq(operationContext),
eq(urn),
nullable(RecordTemplate.class),
eq(aspectSpec),
eq(true));
verify(updateGraphIndicesService).handleChangeEvent(operationContext, event);
}
}

0 comments on commit 6e74538

Please sign in to comment.