From 6e745382a5aa5493148c8dbc0cae6b81a6ff8a13 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Tue, 3 Dec 2024 18:15:58 -0600 Subject: [PATCH] fix(graph-edge): fix graph edge delete exception --- .../models/extractor/FieldExtractor.java | 31 ++++- .../service/UpdateGraphIndicesService.java | 44 ++++--- .../UpdateGraphIndicesServiceTest.java | 122 ++++++++++++++++++ .../service/UpdateIndicesServiceTest.java | 84 ++++++++++++ 4 files changed, 258 insertions(+), 23 deletions(-) create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java b/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java index bef7782d8f7c9..376113cafff82 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java @@ -14,7 +14,7 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */ public class FieldExtractor { @@ -30,16 +30,35 @@ private static long getNumArrayWildcards(PathSpec pathSpec) { // Extract the value of each field in the field specs from the input record public static Map> extractFields( - @Nonnull RecordTemplate record, List fieldSpecs) { - return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH); + @Nullable RecordTemplate record, List fieldSpecs) { + return extractFields(record, fieldSpecs, false); } public static Map> extractFields( - @Nonnull RecordTemplate record, List fieldSpecs, int maxValueLength) { + @Nullable RecordTemplate record, List fieldSpecs, boolean requiredFieldExtract) { + return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract); + } + + public static Map> extractFields( + @Nullable RecordTemplate record, List fieldSpecs, int maxValueLength) { + return extractFields(record, fieldSpecs, maxValueLength, false); + } + + public static Map> extractFields( + @Nullable RecordTemplate record, + List fieldSpecs, + int maxValueLength, + boolean requiredFieldExtract) { final Map> extractedFields = new HashMap<>(); for (T fieldSpec : fieldSpecs) { - Optional value = RecordUtils.getFieldValue(record, fieldSpec.getPath()); - if (!value.isPresent()) { + if (requiredFieldExtract && record == null) { + throw new IllegalArgumentException( + "Field extraction is required and the RecordTemplate is null"); + } + Optional value = + Optional.ofNullable(record) + .flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath())); + if (value.isEmpty()) { extractedFields.put(fieldSpec, Collections.emptyList()); } else { long numArrayWildcards = getNumArrayWildcards(fieldSpec.getPath()); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java index ef7f681a81539..4d54416965416 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java @@ -190,7 +190,10 @@ private void handleDeleteChangeEvent( urn.getEntityType(), event.getAspectName())); } - RecordTemplate aspect = event.getRecordTemplate(); + final RecordTemplate aspect = + event.getPreviousRecordTemplate() != null + ? event.getPreviousRecordTemplate() + : event.getRecordTemplate(); Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName()); if (!aspectSpec.isTimeseries()) { @@ -307,7 +310,7 @@ private Pair, HashMap>> getEdgesAndRelationshipTypes } Map> extractedFields = - FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs()); + FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true); for (Map.Entry> entry : extractedFields.entrySet()) { Set relationshipTypes = @@ -433,7 +436,7 @@ private void deleteGraphData( @Nonnull final OperationContext opContext, @Nonnull final Urn urn, @Nonnull final AspectSpec aspectSpec, - @Nonnull final RecordTemplate aspect, + @Nullable final RecordTemplate aspect, @Nonnull final Boolean isKeyAspect, @Nonnull final MetadataChangeLog event) { if (isKeyAspect) { @@ -441,21 +444,28 @@ private void deleteGraphData( return; } - Pair, HashMap>> edgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); - - final HashMap> urnToRelationshipTypesBeingAdded = - edgeAndRelationTypes.getSecond(); - if (!urnToRelationshipTypesBeingAdded.isEmpty()) { - for (Map.Entry> entry : urnToRelationshipTypesBeingAdded.entrySet()) { - graphService.removeEdgesFromNode( - opContext, - entry.getKey(), - new ArrayList<>(entry.getValue()), - createRelationshipFilter( - new Filter().setOr(new ConjunctiveCriterionArray()), - RelationshipDirection.OUTGOING)); + if (aspect != null) { + Pair, HashMap>> edgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); + + final HashMap> urnToRelationshipTypesBeingAdded = + edgeAndRelationTypes.getSecond(); + if (!urnToRelationshipTypesBeingAdded.isEmpty()) { + for (Map.Entry> entry : urnToRelationshipTypesBeingAdded.entrySet()) { + graphService.removeEdgesFromNode( + opContext, + entry.getKey(), + new ArrayList<>(entry.getValue()), + createRelationshipFilter( + new Filter().setOr(new ConjunctiveCriterionArray()), + RelationshipDirection.OUTGOING)); + } } + } else { + log.warn( + "Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}", + aspectSpec.getName(), + urn); } } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java index 03e381a9059da..dd02b1fdc9d78 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java @@ -1,6 +1,11 @@ package com.linkedin.metadata.service; +import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME; +import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter; +import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -8,9 +13,11 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.testng.Assert.assertEquals; +import com.google.common.collect.ImmutableList; import com.linkedin.common.Status; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; +import com.linkedin.container.Container; import com.linkedin.dataset.DatasetProperties; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; @@ -21,6 +28,14 @@ import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.LineageRegistry; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.ConjunctiveCriterion; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.CriterionArray; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.query.filter.RelationshipFilter; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.utils.GenericRecordUtils; @@ -29,6 +44,8 @@ import com.linkedin.mxe.MetadataChangeLog; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.List; +import javax.annotation.Nonnull; import org.mockito.ArgumentCaptor; import org.opensearch.index.query.QueryBuilder; import org.opensearch.script.Script; @@ -180,4 +197,109 @@ public void testStatusNoOpEvent() { verifyNoInteractions(mockWriteDAO); } + + @Test + public void testMissingAspectGraphDelete() { + // Test deleting a null aspect + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(TEST_URN.getEntityType()) + .setEntityUrn(TEST_URN) + .setAspectName(Constants.CONTAINER_ASPECT_NAME)); + + // For missing aspects, verify no writes + verifyNoInteractions(mockWriteDAO); + } + + @Test + public void testNodeGraphDelete() { + Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo"); + + // Test deleting container entity + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(CONTAINER_ENTITY_NAME) + .setEntityUrn(containerUrn) + .setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME)); + + // Delete all outgoing edges of this entity + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(containerUrn)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING))); + + // Delete all incoming edges of this entity + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(containerUrn)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING))); + + // Delete all edges where this entity is a lifecycle owner + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)), + eq(containerUrn.toString())); + } + + @Test + public void testContainerDelete() { + Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo"); + + // Test deleting a container aspect + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(TEST_URN.getEntityType()) + .setEntityUrn(TEST_URN) + .setAspectName(Constants.CONTAINER_ASPECT_NAME) + .setPreviousAspectValue( + GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn)))); + + // For container aspects, verify that only edges are removed in both cases + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(TEST_URN)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of("IsPartOf")), + eq( + createRelationshipFilter( + new Filter().setOr(new ConjunctiveCriterionArray()), + RelationshipDirection.OUTGOING))); + } + + private static Filter createUrnFilter(@Nonnull final Urn urn) { + Filter filter = new Filter(); + CriterionArray criterionArray = new CriterionArray(); + Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString()); + criterionArray.add(criterion); + filter.setOr( + new ConjunctiveCriterionArray( + ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray)))); + + return filter; + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java new file mode 100644 index 0000000000000..43f8cc0ef191d --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java @@ -0,0 +1,84 @@ +package com.linkedin.metadata.service; + +import static com.linkedin.metadata.Constants.CONTAINER_ASPECT_NAME; +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders; +import com.linkedin.metadata.search.transformer.SearchDocumentTransformer; +import com.linkedin.metadata.systemmetadata.SystemMetadataService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.utils.SystemMetadataUtils; +import com.linkedin.mxe.MetadataChangeLog; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class UpdateIndicesServiceTest { + + @Mock private UpdateGraphIndicesService updateGraphIndicesService; + @Mock private EntitySearchService entitySearchService; + @Mock private TimeseriesAspectService timeseriesAspectService; + @Mock private SystemMetadataService systemMetadataService; + @Mock private SearchDocumentTransformer searchDocumentTransformer; + @Mock private EntityIndexBuilders entityIndexBuilders; + + private OperationContext operationContext; + private UpdateIndicesService updateIndicesService; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + operationContext = TestOperationContexts.systemContextNoSearchAuthorization(); + updateIndicesService = + new UpdateIndicesService( + updateGraphIndicesService, + entitySearchService, + timeseriesAspectService, + systemMetadataService, + searchDocumentTransformer, + entityIndexBuilders, + "MD5"); + } + + @Test + public void testContainerHandleDeleteEvent() throws Exception { + Urn urn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)"); + EntitySpec entitySpec = operationContext.getEntityRegistry().getEntitySpec(DATASET_ENTITY_NAME); + AspectSpec aspectSpec = entitySpec.getAspectSpec(CONTAINER_ASPECT_NAME); + + // Create test data + MetadataChangeLog event = new MetadataChangeLog(); + event.setChangeType(ChangeType.DELETE); + event.setEntityUrn(urn); + event.setAspectName(CONTAINER_ASPECT_NAME); + event.setEntityType(urn.getEntityType()); + event.setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata()); + + // Execute Delete + updateIndicesService.handleChangeEvent(operationContext, event); + + // Verify + verify(systemMetadataService).deleteAspect(urn.toString(), CONTAINER_ASPECT_NAME); + verify(searchDocumentTransformer) + .transformAspect( + eq(operationContext), + eq(urn), + nullable(RecordTemplate.class), + eq(aspectSpec), + eq(true)); + verify(updateGraphIndicesService).handleChangeEvent(operationContext, event); + } +}