Skip to content

Commit

Permalink
fix(gms): Fixes delete references for single relationship aspects (da…
Browse files Browse the repository at this point in the history
…tahub-project#7211)

Deleting Container references were broken. This PR fixes that.

Root cause is datahub-project#5248 which changed the MCP change type to DELETE for cases where an aspect after getting the reference removed was no longer valid. We do not support DELETE operations for MCPs.

The solution is then to execute EntityService#deleteAspect rather than emit a sync MCP.

A regression test was made to prevent this from happening again.

Co-authored-by: John Joyce <[email protected]>
  • Loading branch information
2 people authored and Eric Yomi committed Feb 8, 2023
1 parent 3dcd7f7 commit 8c79e9f
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.linkedin.mxe.MetadataChangeProposal;

import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -241,21 +242,14 @@ private void deleteReference(final Urn urn, final RelatedEntity relatedEntity) {
* @param prevAspect the old value for the aspect
*/
private void deleteAspect(Urn urn, String aspectName, RecordTemplate prevAspect) {
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
proposal.setChangeType(ChangeType.DELETE);
proposal.setEntityType(urn.getEntityType());
proposal.setAspectName(aspectName);

final AuditStamp auditStamp = new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final EntityService.IngestProposalResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp, false);

if (!ingestProposalResult.isDidUpdate()) {
log.error("Failed to ingest aspect with references removed. Before {}, after: null, please check MCP processor"
final RollbackResult rollbackResult = _entityService.deleteAspect(urn.toString(), aspectName,
new HashMap<>(), true);
if (rollbackResult == null || rollbackResult.getNewValue() != null) {
log.error("Failed to delete aspect with references. Before {}, after: null, please check GMS logs"
+ " logs for more information", prevAspect);
handleError(new DeleteEntityServiceError("Failed to ingest new aspect",
DeleteEntityServiceErrorReason.MCP_PROCESSOR_FAILED,
ImmutableMap.of("proposal", proposal)));
DeleteEntityServiceErrorReason.ASPECT_DELETE_FAILED,
ImmutableMap.of("urn", urn, "aspectName", aspectName)));
}
}

Expand Down Expand Up @@ -380,7 +374,6 @@ private Stream<RelationshipFieldSpec> findRelationshipFor(final AspectSpec spec,
return spec.getRelationshipFieldSpecs().stream()
.filter(relationship -> relationship.getRelationshipName().equals(relationshipType)
&& relationship.getValidDestinationTypes().contains(entityType));
//.collect(Collectors.toList());
}

/**
Expand All @@ -403,6 +396,7 @@ private enum DeleteEntityServiceErrorReason {
ENTITY_SERVICE_ASPECT_NOT_FOUND,
ENTITY_REGISTRY_SPEC_NOT_FOUND,
MCP_PROCESSOR_FAILED,
ASPECT_DELETE_FAILED,
CLONE_FAILED,
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.linkedin.metadata.entity;

import com.datahub.util.RecordUtils;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.container.Container;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.ebean.EbeanAspectDao;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.RelatedEntitiesResult;
import com.linkedin.metadata.graph.RelatedEntity;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.run.DeleteReferencesResponse;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import java.sql.Timestamp;
import java.util.Map;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import static com.linkedin.metadata.search.utils.QueryUtils.*;
import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.*;


public class DeleteEntityServiceTest {

protected EbeanAspectDao _aspectDao;

protected EntityService _entityService;
protected GraphService _graphService = Mockito.mock(GraphService.class);

protected DeleteEntityService _deleteEntityService;

protected EntityRegistry _entityRegistry;

public DeleteEntityServiceTest() {
_entityRegistry = new ConfigEntityRegistry(Snapshot.class.getClassLoader()
.getResourceAsStream("entity-registry.yml"));
_aspectDao = mock(EbeanAspectDao.class);
_entityService = new EntityService(_aspectDao, mock(EventProducer.class), _entityRegistry);
_deleteEntityService = new DeleteEntityService(_entityService, _graphService);
}

/**
* This test checks whether deleting non array references in PDL aspects generates a valid MCP.
*/
@Test
public void testDeleteUniqueRefGeneratesValidMCP() {
final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV");
final Urn container = UrnUtils.getUrn("urn:li:container:d1006cf3-3ff9-48e3-85cd-26eb23775ab2");

final RelatedEntitiesResult mockRelatedEntities =
new RelatedEntitiesResult(0, 1, 1, ImmutableList.of(new RelatedEntity("IsPartOf", dataset.toString())));

Mockito.when(_graphService.findRelatedEntities(null, newFilter("urn", container.toString()),
null, EMPTY_FILTER, ImmutableList.of(),
newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), 0, 10000))
.thenReturn(mockRelatedEntities);

final EntityResponse entityResponse = new EntityResponse();
entityResponse.setUrn(dataset);
entityResponse.setEntityName(dataset.getEntityType());
final Container containerAspect = new Container();
containerAspect.setContainer(container);
final EntityAspectIdentifier dbKey = new EntityAspectIdentifier(dataset.toString(), Constants.CONTAINER_ASPECT_NAME, 0);

final EntityAspect dbValue = new EntityAspect();
dbValue.setUrn(dataset.toString());
dbValue.setVersion(0);
dbValue.setAspect(Constants.CONTAINER_ASPECT_NAME);
dbValue.setMetadata(RecordUtils.toJsonString(containerAspect));
dbValue.setSystemMetadata(RecordUtils.toJsonString(SystemMetadataUtils.createDefaultSystemMetadata()));
final AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp();
dbValue.setCreatedBy(auditStamp.getActor().toString());
dbValue.setCreatedOn(new Timestamp(auditStamp.getTime()));

final Map<EntityAspectIdentifier, EntityAspect> dbEntries = Map.of(dbKey, dbValue);
Mockito.when(_aspectDao.batchGet(Mockito.any())).thenReturn(dbEntries);

RollbackResult result = new RollbackResult(container, Constants.DATASET_ENTITY_NAME,
Constants.CONTAINER_ASPECT_NAME, containerAspect, null, null, null,
ChangeType.DELETE, false, 1);

Mockito.when(_aspectDao.runInTransactionWithRetry(Mockito.any(), Mockito.anyInt()))
.thenReturn(result);

final DeleteReferencesResponse response = _deleteEntityService.deleteReferencesTo(container, false);
assertEquals(1, (int) response.getTotal());
assertFalse(response.getRelatedAspects().isEmpty());
}
}

0 comments on commit 8c79e9f

Please sign in to comment.