diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 412c962cb6e36f..a5889b2d2f92de 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -75,6 +75,8 @@ jobs: path: | ~/.cache/uv key: ${{ runner.os }}-uv-${{ hashFiles('**/requirements.txt') }} + - name: Install dependencies + run: ./metadata-ingestion/scripts/install_deps.sh - name: Set up JDK 17 uses: actions/setup-java@v4 with: diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java b/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java index bef7782d8f7c9a..f4dc2ec2f0cd56 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java @@ -14,7 +14,7 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */ public class FieldExtractor { @@ -30,15 +30,34 @@ private static long getNumArrayWildcards(PathSpec pathSpec) { // Extract the value of each field in the field specs from the input record public static Map> extractFields( - @Nonnull RecordTemplate record, List fieldSpecs) { - return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH); + @Nullable RecordTemplate record, List fieldSpecs) { + return extractFields(record, fieldSpecs, false); } public static Map> extractFields( - @Nonnull RecordTemplate record, List fieldSpecs, int maxValueLength) { + @Nullable RecordTemplate record, List fieldSpecs, boolean requiredFieldExtract) { + return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract); + } + + public static Map> extractFields( + @Nullable RecordTemplate record, List fieldSpecs, int maxValueLength) { + return extractFields(record, fieldSpecs, maxValueLength, false); + } + + public static Map> extractFields( + @Nullable RecordTemplate record, + List fieldSpecs, + int maxValueLength, + boolean requiredFieldExtract) { final Map> extractedFields = new HashMap<>(); for (T fieldSpec : fieldSpecs) { - Optional value = RecordUtils.getFieldValue(record, fieldSpec.getPath()); + if (requiredFieldExtract && record == null) { + throw new IllegalArgumentException( + "Field extraction is required and the RecordTemplate is null"); + } + Optional value = + Optional.ofNullable(record) + .flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath())); if (!value.isPresent()) { extractedFields.put(fieldSpec, Collections.emptyList()); } else { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java index ef7f681a81539d..efe073fc00dfdc 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java @@ -190,7 +190,10 @@ private void handleDeleteChangeEvent( urn.getEntityType(), event.getAspectName())); } - RecordTemplate aspect = event.getRecordTemplate(); + final RecordTemplate aspect = + event.getPreviousRecordTemplate() != null + ? event.getPreviousRecordTemplate() + : event.getRecordTemplate(); Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName()); if (!aspectSpec.isTimeseries()) { @@ -280,8 +283,8 @@ private Pair, HashMap>> getEdgesAndRelationshipTypes @Nonnull final RecordTemplate aspect, @Nonnull final MetadataChangeLog event, final boolean isNewAspectVersion) { - final List edgesToAdd = new ArrayList<>(); - final HashMap> urnToRelationshipTypesBeingAdded = new HashMap<>(); + final List edges = new ArrayList<>(); + final HashMap> urnToRelationshipTypes = new HashMap<>(); // we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and // inputFields @@ -289,36 +292,28 @@ private Pair, HashMap>> getEdgesAndRelationshipTypes if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) { UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data()); updateFineGrainedEdgesAndRelationships( - urn, - upstreamLineage.getFineGrainedLineages(), - edgesToAdd, - urnToRelationshipTypesBeingAdded); + urn, upstreamLineage.getFineGrainedLineages(), edges, urnToRelationshipTypes); } else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) { final InputFields inputFields = new InputFields(aspect.data()); - updateInputFieldEdgesAndRelationships( - urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded); + updateInputFieldEdgesAndRelationships(urn, inputFields, edges, urnToRelationshipTypes); } else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) { DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data()); updateFineGrainedEdgesAndRelationships( - urn, - dataJobInputOutput.getFineGrainedLineages(), - edgesToAdd, - urnToRelationshipTypesBeingAdded); + urn, dataJobInputOutput.getFineGrainedLineages(), edges, urnToRelationshipTypes); } Map> extractedFields = - FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs()); + FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true); for (Map.Entry> entry : extractedFields.entrySet()) { - Set relationshipTypes = - urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>()); + Set relationshipTypes = urnToRelationshipTypes.getOrDefault(urn, new HashSet<>()); relationshipTypes.add(entry.getKey().getRelationshipName()); - urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes); + urnToRelationshipTypes.put(urn, relationshipTypes); final List newEdges = GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event, isNewAspectVersion); - edgesToAdd.addAll(newEdges); + edges.addAll(newEdges); } - return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded); + return Pair.of(edges, urnToRelationshipTypes); } /** Process snapshot and update graph index */ @@ -433,7 +428,7 @@ private void deleteGraphData( @Nonnull final OperationContext opContext, @Nonnull final Urn urn, @Nonnull final AspectSpec aspectSpec, - @Nonnull final RecordTemplate aspect, + @Nullable final RecordTemplate aspect, @Nonnull final Boolean isKeyAspect, @Nonnull final MetadataChangeLog event) { if (isKeyAspect) { @@ -441,21 +436,28 @@ private void deleteGraphData( return; } - Pair, HashMap>> edgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); - - final HashMap> urnToRelationshipTypesBeingAdded = - edgeAndRelationTypes.getSecond(); - if (!urnToRelationshipTypesBeingAdded.isEmpty()) { - for (Map.Entry> entry : urnToRelationshipTypesBeingAdded.entrySet()) { - graphService.removeEdgesFromNode( - opContext, - entry.getKey(), - new ArrayList<>(entry.getValue()), - createRelationshipFilter( - new Filter().setOr(new ConjunctiveCriterionArray()), - RelationshipDirection.OUTGOING)); + if (aspect != null) { + Pair, HashMap>> edgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); + + final HashMap> urnToRelationshipTypesBeingRemoved = + edgeAndRelationTypes.getSecond(); + if (!urnToRelationshipTypesBeingRemoved.isEmpty()) { + for (Map.Entry> entry : urnToRelationshipTypesBeingRemoved.entrySet()) { + graphService.removeEdgesFromNode( + opContext, + entry.getKey(), + new ArrayList<>(entry.getValue()), + createRelationshipFilter( + new Filter().setOr(new ConjunctiveCriterionArray()), + RelationshipDirection.OUTGOING)); + } } + } else { + log.warn( + "Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}", + aspectSpec.getName(), + urn); } } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java index 03e381a9059da6..dd02b1fdc9d78d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java @@ -1,6 +1,11 @@ package com.linkedin.metadata.service; +import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME; +import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter; +import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -8,9 +13,11 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.testng.Assert.assertEquals; +import com.google.common.collect.ImmutableList; import com.linkedin.common.Status; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; +import com.linkedin.container.Container; import com.linkedin.dataset.DatasetProperties; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; @@ -21,6 +28,14 @@ import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.LineageRegistry; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.ConjunctiveCriterion; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.CriterionArray; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.query.filter.RelationshipFilter; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.utils.GenericRecordUtils; @@ -29,6 +44,8 @@ import com.linkedin.mxe.MetadataChangeLog; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.List; +import javax.annotation.Nonnull; import org.mockito.ArgumentCaptor; import org.opensearch.index.query.QueryBuilder; import org.opensearch.script.Script; @@ -180,4 +197,109 @@ public void testStatusNoOpEvent() { verifyNoInteractions(mockWriteDAO); } + + @Test + public void testMissingAspectGraphDelete() { + // Test deleting a null aspect + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(TEST_URN.getEntityType()) + .setEntityUrn(TEST_URN) + .setAspectName(Constants.CONTAINER_ASPECT_NAME)); + + // For missing aspects, verify no writes + verifyNoInteractions(mockWriteDAO); + } + + @Test + public void testNodeGraphDelete() { + Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo"); + + // Test deleting container entity + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(CONTAINER_ENTITY_NAME) + .setEntityUrn(containerUrn) + .setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME)); + + // Delete all outgoing edges of this entity + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(containerUrn)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING))); + + // Delete all incoming edges of this entity + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(containerUrn)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING))); + + // Delete all edges where this entity is a lifecycle owner + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)), + eq(containerUrn.toString())); + } + + @Test + public void testContainerDelete() { + Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo"); + + // Test deleting a container aspect + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(TEST_URN.getEntityType()) + .setEntityUrn(TEST_URN) + .setAspectName(Constants.CONTAINER_ASPECT_NAME) + .setPreviousAspectValue( + GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn)))); + + // For container aspects, verify that only edges are removed in both cases + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(TEST_URN)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of("IsPartOf")), + eq( + createRelationshipFilter( + new Filter().setOr(new ConjunctiveCriterionArray()), + RelationshipDirection.OUTGOING))); + } + + private static Filter createUrnFilter(@Nonnull final Urn urn) { + Filter filter = new Filter(); + CriterionArray criterionArray = new CriterionArray(); + Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString()); + criterionArray.add(criterion); + filter.setOr( + new ConjunctiveCriterionArray( + ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray)))); + + return filter; + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java new file mode 100644 index 00000000000000..43f8cc0ef191d6 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java @@ -0,0 +1,84 @@ +package com.linkedin.metadata.service; + +import static com.linkedin.metadata.Constants.CONTAINER_ASPECT_NAME; +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders; +import com.linkedin.metadata.search.transformer.SearchDocumentTransformer; +import com.linkedin.metadata.systemmetadata.SystemMetadataService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.utils.SystemMetadataUtils; +import com.linkedin.mxe.MetadataChangeLog; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class UpdateIndicesServiceTest { + + @Mock private UpdateGraphIndicesService updateGraphIndicesService; + @Mock private EntitySearchService entitySearchService; + @Mock private TimeseriesAspectService timeseriesAspectService; + @Mock private SystemMetadataService systemMetadataService; + @Mock private SearchDocumentTransformer searchDocumentTransformer; + @Mock private EntityIndexBuilders entityIndexBuilders; + + private OperationContext operationContext; + private UpdateIndicesService updateIndicesService; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + operationContext = TestOperationContexts.systemContextNoSearchAuthorization(); + updateIndicesService = + new UpdateIndicesService( + updateGraphIndicesService, + entitySearchService, + timeseriesAspectService, + systemMetadataService, + searchDocumentTransformer, + entityIndexBuilders, + "MD5"); + } + + @Test + public void testContainerHandleDeleteEvent() throws Exception { + Urn urn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)"); + EntitySpec entitySpec = operationContext.getEntityRegistry().getEntitySpec(DATASET_ENTITY_NAME); + AspectSpec aspectSpec = entitySpec.getAspectSpec(CONTAINER_ASPECT_NAME); + + // Create test data + MetadataChangeLog event = new MetadataChangeLog(); + event.setChangeType(ChangeType.DELETE); + event.setEntityUrn(urn); + event.setAspectName(CONTAINER_ASPECT_NAME); + event.setEntityType(urn.getEntityType()); + event.setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata()); + + // Execute Delete + updateIndicesService.handleChangeEvent(operationContext, event); + + // Verify + verify(systemMetadataService).deleteAspect(urn.toString(), CONTAINER_ASPECT_NAME); + verify(searchDocumentTransformer) + .transformAspect( + eq(operationContext), + eq(urn), + nullable(RecordTemplate.class), + eq(aspectSpec), + eq(true)); + verify(updateGraphIndicesService).handleChangeEvent(operationContext, event); + } +}