Skip to content

Commit

Permalink
fix(entity-service): handle no-op system-metadata batches (#12055)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Dec 10, 2024
1 parent 638a0e3 commit 0a2ac70
Show file tree
Hide file tree
Showing 23 changed files with 927 additions and 485 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
public interface AspectsBatch {
Collection<? extends BatchItem> getItems();

Collection<? extends BatchItem> getInitialItems();

RetrieverContext getRetrieverContext();

/**
* Returns MCP items. Could be patch, upsert, etc.
* Returns MCP items. Could be one of patch, upsert, etc.
*
* @return batch items
*/
Expand Down Expand Up @@ -160,13 +162,24 @@ static Stream<MCLItem> applyMCLSideEffects(
}

default boolean containsDuplicateAspects() {
return getItems().stream()
.map(i -> String.format("%s_%s", i.getClass().getName(), i.hashCode()))
return getInitialItems().stream()
.map(i -> String.format("%s_%s", i.getClass().getSimpleName(), i.hashCode()))
.distinct()
.count()
!= getItems().size();
}

default Map<String, List<? extends BatchItem>> duplicateAspects() {
return getInitialItems().stream()
.collect(
Collectors.groupingBy(
i -> String.format("%s_%s", i.getClass().getSimpleName(), i.hashCode())))
.entrySet()
.stream()
.filter(entry -> entry.getValue() != null && entry.getValue().size() > 1)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

default Map<String, Set<String>> getUrnAspectsMap() {
return getItems().stream()
.map(aspect -> Pair.of(aspect.getUrn().toString(), aspect.getAspectName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,11 @@ public interface BatchItem extends ReadItem {
*/
@Nonnull
ChangeType getChangeType();

/**
* Determines if this item is a duplicate of another item in terms of the operation it represents
* to the database.Each implementation can define what constitutes a duplicate based on its
* specific fields which are persisted.
*/
boolean isDatabaseDuplicateOf(BatchItem other);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeLog;
import java.util.Objects;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -29,4 +31,23 @@ public class TestMCL implements MCLItem {
public String getAspectName() {
return getAspectSpec().getName();
}

@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

TestMCL testMCL = (TestMCL) o;
return Objects.equals(metadataChangeLog, testMCL.metadataChangeLog);
}

@Override
public int hashCode() {
return Objects.hashCode(metadataChangeLog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.ReadItem;
Expand All @@ -21,6 +22,7 @@
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -140,4 +142,40 @@ public Map<String, String> getHeaders() {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(headers);
}

@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

TestMCP testMCP = (TestMCP) o;
return urn.equals(testMCP.urn)
&& DataTemplateUtil.areEqual(recordTemplate, testMCP.recordTemplate)
&& Objects.equals(systemAspect, testMCP.systemAspect)
&& Objects.equals(previousSystemAspect, testMCP.previousSystemAspect)
&& Objects.equals(auditStamp, testMCP.auditStamp)
&& Objects.equals(changeType, testMCP.changeType)
&& Objects.equals(metadataChangeProposal, testMCP.metadataChangeProposal);
}

@Override
public int hashCode() {
int result = urn.hashCode();
result = 31 * result + Objects.hashCode(recordTemplate);
result = 31 * result + Objects.hashCode(systemAspect);
result = 31 * result + Objects.hashCode(previousSystemAspect);
result = 31 * result + Objects.hashCode(auditStamp);
result = 31 * result + Objects.hashCode(changeType);
result = 31 * result + Objects.hashCode(metadataChangeProposal);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,26 @@ public class EntityAspect {

private String createdFor;

@Override
public String toString() {
return "EntityAspect{"
+ "urn='"
+ urn
+ '\''
+ ", aspect='"
+ aspect
+ '\''
+ ", version="
+ version
+ ", metadata='"
+ metadata
+ '\''
+ ", systemMetadata='"
+ systemMetadata
+ '\''
+ '}';
}

/**
* Provide a typed EntityAspect without breaking the existing public contract with generic types.
*/
Expand Down Expand Up @@ -144,6 +164,11 @@ public EnvelopedAspect toEnvelopedAspects() {
return envelopedAspect;
}

@Override
public String toString() {
return entityAspect.toString();
}

public static class EntitySystemAspectBuilder {

private EntityAspect.EntitySystemAspect build() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.entity.ebean.batch;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
Expand All @@ -15,7 +16,9 @@
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -29,12 +32,23 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Getter
@Builder(toBuilder = true)
public class AspectsBatchImpl implements AspectsBatch {

@Nonnull private final Collection<? extends BatchItem> items;
@Nonnull private final RetrieverContext retrieverContext;
@Nonnull private final Collection<? extends BatchItem> nonRepeatedItems;
@Getter @Nonnull private final RetrieverContext retrieverContext;

@Override
@Nonnull
public Collection<? extends BatchItem> getItems() {
return nonRepeatedItems;
}

@Override
public Collection<? extends BatchItem> getInitialItems() {
return items;
}

/**
* Convert patches to upserts, apply hooks at the aspect and batch level.
Expand Down Expand Up @@ -207,14 +221,32 @@ public AspectsBatchImplBuilder mcps(
return this;
}

private static <T extends BatchItem> List<T> filterRepeats(Collection<T> items) {
List<T> result = new ArrayList<>();
Map<Pair<Urn, String>, T> last = new HashMap<>();

for (T item : items) {
Pair<Urn, String> urnAspect = Pair.of(item.getUrn(), item.getAspectName());
// Check if this item is a duplicate of the previous
if (!last.containsKey(urnAspect) || !item.isDatabaseDuplicateOf(last.get(urnAspect))) {
result.add(item);
}
last.put(urnAspect, item);
}

return result;
}

public AspectsBatchImpl build() {
this.nonRepeatedItems = filterRepeats(this.items);

ValidationExceptionCollection exceptions =
AspectsBatch.validateProposed(this.items, this.retrieverContext);
AspectsBatch.validateProposed(this.nonRepeatedItems, this.retrieverContext);
if (!exceptions.isEmpty()) {
throw new IllegalArgumentException("Failed to validate MCP due to: " + exceptions);
}

return new AspectsBatchImpl(this.items, this.retrieverContext);
return new AspectsBatchImpl(this.items, this.nonRepeatedItems, this.retrieverContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import com.datahub.util.exception.ModelConversionException;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringMap;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate;
Expand Down Expand Up @@ -269,6 +271,11 @@ private static RecordTemplate convertToRecordTemplate(
}
}

@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -280,13 +287,15 @@ public boolean equals(Object o) {
ChangeItemImpl that = (ChangeItemImpl) o;
return urn.equals(that.urn)
&& aspectName.equals(that.aspectName)
&& changeType.equals(that.changeType)
&& Objects.equals(systemMetadata, that.systemMetadata)
&& recordTemplate.equals(that.recordTemplate);
&& Objects.equals(auditStamp, that.auditStamp)
&& DataTemplateUtil.areEqual(recordTemplate, that.recordTemplate);
}

@Override
public int hashCode() {
return Objects.hash(urn, aspectName, systemMetadata, recordTemplate);
return Objects.hash(urn, aspectName, changeType, systemMetadata, auditStamp, recordTemplate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.entity.EntityApiUtils;
import com.linkedin.metadata.entity.EntityAspect;
Expand Down Expand Up @@ -115,6 +116,11 @@ public DeleteItemImpl build(AspectRetriever aspectRetriever) {
}
}

@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.entity.AspectUtils;
Expand Down Expand Up @@ -158,6 +159,11 @@ private static Pair<RecordTemplate, RecordTemplate> convertToRecordTemplate(
}
}

@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.batch.PatchMCP;
import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine;
Expand Down Expand Up @@ -216,6 +217,11 @@ public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) {
}
}

@Override
public boolean isDatabaseDuplicateOf(BatchItem other) {
return equals(other);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -228,12 +234,13 @@ public boolean equals(Object o) {
return urn.equals(that.urn)
&& aspectName.equals(that.aspectName)
&& Objects.equals(systemMetadata, that.systemMetadata)
&& auditStamp.equals(that.auditStamp)
&& patch.equals(that.patch);
}

@Override
public int hashCode() {
return Objects.hash(urn, aspectName, systemMetadata, patch);
return Objects.hash(urn, aspectName, systemMetadata, auditStamp, patch);
}

@Override
Expand Down
Loading

0 comments on commit 0a2ac70

Please sign in to comment.