Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(graph-edge): fix graph edge delete exception #12025

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ jobs:
path: |
~/.cache/uv
key: ${{ runner.os }}-uv-${{ hashFiles('**/requirements.txt') }}
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */
public class FieldExtractor {
Expand All @@ -30,15 +30,34 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {

// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nonnull RecordTemplate record, List<T> fieldSpecs) {
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH);
@Nullable RecordTemplate record, List<T> fieldSpecs) {
return extractFields(record, fieldSpecs, false);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nonnull RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
@Nullable RecordTemplate record, List<T> fieldSpecs, boolean requiredFieldExtract) {
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nullable RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
return extractFields(record, fieldSpecs, maxValueLength, false);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nullable RecordTemplate record,
List<T> fieldSpecs,
int maxValueLength,
boolean requiredFieldExtract) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
if (requiredFieldExtract && record == null) {
throw new IllegalArgumentException(
"Field extraction is required and the RecordTemplate is null");
}
Optional<Object> value =
Optional.ofNullable(record)
.flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath()));
if (!value.isPresent()) {
extractedFields.put(fieldSpec, Collections.emptyList());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ private void handleDeleteChangeEvent(
urn.getEntityType(), event.getAspectName()));
}

RecordTemplate aspect = event.getRecordTemplate();
final RecordTemplate aspect =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can we end up with a null aspect? Seems that some of the implementations from RecordTemplate can be either Nullable and Nonnull Just curious if might be just better to return an empty record template that handling a mix of Nullable and Nonnull that could end up in NPEs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MCL usually contains both a current aspect and a previous aspect, the previous aspect is null when the aspect is being first created. For a delete, it doesn't make sense that the current aspect would be populated necessarily. I would expect a previous aspect unless we hit a third case which is a delete of the key aspect of an entity which should technically be added separately but is complete redundant with the entity urn.

In any case, we need to respect that the data model defined both current and previous as optional here and here. Changing the model is a backwards incompatible change.

event.getPreviousRecordTemplate() != null
? event.getPreviousRecordTemplate()
: event.getRecordTemplate();
Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName());

if (!aspectSpec.isTimeseries()) {
Expand Down Expand Up @@ -280,45 +283,37 @@ private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypes
@Nonnull final RecordTemplate aspect,
@Nonnull final MetadataChangeLog event,
final boolean isNewAspectVersion) {
final List<Edge> edgesToAdd = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>();
final List<Edge> edges = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypes = new HashMap<>();

// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and
// inputFields
// since @Relationship only links between the parent entity urn and something else.
if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data());
updateFineGrainedEdgesAndRelationships(
urn,
upstreamLineage.getFineGrainedLineages(),
edgesToAdd,
urnToRelationshipTypesBeingAdded);
urn, upstreamLineage.getFineGrainedLineages(), edges, urnToRelationshipTypes);
} else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) {
final InputFields inputFields = new InputFields(aspect.data());
updateInputFieldEdgesAndRelationships(
urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded);
updateInputFieldEdgesAndRelationships(urn, inputFields, edges, urnToRelationshipTypes);
} else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) {
DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data());
updateFineGrainedEdgesAndRelationships(
urn,
dataJobInputOutput.getFineGrainedLineages(),
edgesToAdd,
urnToRelationshipTypesBeingAdded);
urn, dataJobInputOutput.getFineGrainedLineages(), edges, urnToRelationshipTypes);
}

Map<RelationshipFieldSpec, List<Object>> extractedFields =
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true);

for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
Set<String> relationshipTypes =
urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>());
Set<String> relationshipTypes = urnToRelationshipTypes.getOrDefault(urn, new HashSet<>());
relationshipTypes.add(entry.getKey().getRelationshipName());
urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes);
urnToRelationshipTypes.put(urn, relationshipTypes);
final List<Edge> newEdges =
GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event, isNewAspectVersion);
edgesToAdd.addAll(newEdges);
edges.addAll(newEdges);
}
return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded);
return Pair.of(edges, urnToRelationshipTypes);
}

/** Process snapshot and update graph index */
Expand Down Expand Up @@ -433,29 +428,36 @@ private void deleteGraphData(
@Nonnull final OperationContext opContext,
@Nonnull final Urn urn,
@Nonnull final AspectSpec aspectSpec,
@Nonnull final RecordTemplate aspect,
@Nullable final RecordTemplate aspect,
@Nonnull final Boolean isKeyAspect,
@Nonnull final MetadataChangeLog event) {
if (isKeyAspect) {
graphService.removeNode(opContext, urn);
return;
}

Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);

final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
edgeAndRelationTypes.getSecond();
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
entry.getKey(),
new ArrayList<>(entry.getValue()),
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING));
if (aspect != null) {
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);

final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingRemoved =
edgeAndRelationTypes.getSecond();
if (!urnToRelationshipTypesBeingRemoved.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingRemoved.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
entry.getKey(),
new ArrayList<>(entry.getValue()),
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING));
}
}
} else {
log.warn(
"Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}",
aspectSpec.getName(),
urn);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package com.linkedin.metadata.service;

import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter;
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.testng.Assert.assertEquals;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.container.Container;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
Expand All @@ -21,6 +28,14 @@
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.utils.GenericRecordUtils;
Expand All @@ -29,6 +44,8 @@
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.util.List;
import javax.annotation.Nonnull;
import org.mockito.ArgumentCaptor;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.script.Script;
Expand Down Expand Up @@ -180,4 +197,109 @@ public void testStatusNoOpEvent() {

verifyNoInteractions(mockWriteDAO);
}

@Test
public void testMissingAspectGraphDelete() {
// Test deleting a null aspect
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(TEST_URN.getEntityType())
.setEntityUrn(TEST_URN)
.setAspectName(Constants.CONTAINER_ASPECT_NAME));

// For missing aspects, verify no writes
verifyNoInteractions(mockWriteDAO);
}

@Test
public void testNodeGraphDelete() {
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");

// Test deleting container entity
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(CONTAINER_ENTITY_NAME)
.setEntityUrn(containerUrn)
.setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME));

// Delete all outgoing edges of this entity
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(containerUrn)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING)));

// Delete all incoming edges of this entity
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(containerUrn)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)));

// Delete all edges where this entity is a lifecycle owner
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)),
eq(containerUrn.toString()));
}

@Test
public void testContainerDelete() {
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");

// Test deleting a container aspect
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(TEST_URN.getEntityType())
.setEntityUrn(TEST_URN)
.setAspectName(Constants.CONTAINER_ASPECT_NAME)
.setPreviousAspectValue(
GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn))));

// For container aspects, verify that only edges are removed in both cases
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(TEST_URN)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of("IsPartOf")),
eq(
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}

private static Filter createUrnFilter(@Nonnull final Urn urn) {
Filter filter = new Filter();
CriterionArray criterionArray = new CriterionArray();
Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString());
criterionArray.add(criterion);
filter.setOr(
new ConjunctiveCriterionArray(
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));

return filter;
}
}
Loading
Loading