diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/hooks/FieldPathMutator.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/hooks/FieldPathMutator.java new file mode 100644 index 00000000000000..cc6a338d41c22b --- /dev/null +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/hooks/FieldPathMutator.java @@ -0,0 +1,142 @@ +package com.linkedin.metadata.aspect.hooks; + +import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; + +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.ReadItem; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import com.linkedin.schema.EditableSchemaFieldInfo; +import com.linkedin.schema.EditableSchemaFieldInfoArray; +import com.linkedin.schema.EditableSchemaMetadata; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaFieldArray; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.util.Pair; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Setter +@Getter +@Accessors(chain = true) +public class FieldPathMutator extends MutationHook { + @Nonnull private AspectPluginConfig config; + + @Override + protected Stream> writeMutation( + @Nonnull Collection changeMCPS, @Nonnull RetrieverContext retrieverContext) { + + List> results = new LinkedList<>(); + + for (ChangeMCP item : changeMCPS) { + if (changeTypeFilter(item) && aspectFilter(item)) { + if (item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { + results.add(Pair.of(item, processSchemaMetadataAspect(item))); + } else { + results.add(Pair.of(item, processEditableSchemaMetadataAspect(item))); + } + } else { + // no op + results.add(Pair.of(item, false)); + } + } + + return results.stream(); + } + + /* + TODO: After some time, this should no longer be required. Assuming at least 1 write has + occurred for all schema aspects. + */ + @Override + protected Stream> readMutation( + @Nonnull Collection items, @Nonnull RetrieverContext retrieverContext) { + List> results = new LinkedList<>(); + + for (ReadItem item : items) { + if (aspectFilter(item)) { + if (item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { + results.add(Pair.of(item, processSchemaMetadataAspect(item))); + } else { + results.add(Pair.of(item, processEditableSchemaMetadataAspect(item))); + } + } else { + // no op + results.add(Pair.of(item, false)); + } + } + + return results.stream(); + } + + private static boolean changeTypeFilter(BatchItem item) { + return !ChangeType.DELETE.equals(item.getChangeType()) + && !ChangeType.PATCH.equals(item.getChangeType()); + } + + private static boolean aspectFilter(ReadItem item) { + return item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME) + || item.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME); + } + + private static boolean processEditableSchemaMetadataAspect(ReadItem item) { + boolean mutated = false; + final EditableSchemaMetadata schemaMetadata = item.getAspect(EditableSchemaMetadata.class); + EditableSchemaFieldInfoArray fields = schemaMetadata.getEditableSchemaFieldInfo(); + List replaceFields = + deduplicateFieldPaths(fields, EditableSchemaFieldInfo::getFieldPath); + if (!replaceFields.isEmpty()) { + schemaMetadata.setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(replaceFields)); + mutated = true; + } + return mutated; + } + + private static boolean processSchemaMetadataAspect(ReadItem item) { + boolean mutated = false; + final SchemaMetadata schemaMetadata = item.getAspect(SchemaMetadata.class); + SchemaFieldArray fields = schemaMetadata.getFields(); + List replaceFields = deduplicateFieldPaths(fields, SchemaField::getFieldPath); + if (!replaceFields.isEmpty()) { + schemaMetadata.setFields(new SchemaFieldArray(replaceFields)); + mutated = true; + } + return mutated; + } + + private static List deduplicateFieldPaths( + Collection fields, Function fieldPathExtractor) { + + // preserve order + final LinkedHashMap> grouped = + fields.stream() + .collect( + Collectors.groupingBy(fieldPathExtractor, LinkedHashMap::new, Collectors.toList())); + + if (grouped.values().stream().anyMatch(v -> v.size() > 1)) { + log.warn( + "Duplicate field path(s) detected. Dropping duplicates: {}", + grouped.values().stream().filter(v -> v.size() > 1).collect(Collectors.toList())); + // return first + return grouped.values().stream().map(l -> l.get(0)).collect(Collectors.toList()); + } + + return Collections.emptyList(); + } +} diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/hooks/FieldPathMutatorTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/hooks/FieldPathMutatorTest.java new file mode 100644 index 00000000000000..131d5f9a3d6079 --- /dev/null +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/hooks/FieldPathMutatorTest.java @@ -0,0 +1,249 @@ +package com.linkedin.metadata.aspect.hooks; + +import static com.linkedin.metadata.Constants.DOMAINS_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.UrnArray; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.domain.Domains; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.GraphRetriever; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.schema.EditableSchemaFieldInfo; +import com.linkedin.schema.EditableSchemaFieldInfoArray; +import com.linkedin.schema.EditableSchemaMetadata; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaFieldArray; +import com.linkedin.schema.SchemaFieldDataType; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.schema.StringType; +import com.linkedin.test.metadata.aspect.TestEntityRegistry; +import com.linkedin.test.metadata.aspect.batch.TestMCP; +import com.linkedin.util.Pair; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class FieldPathMutatorTest { + + private EntityRegistry entityRegistry; + private RetrieverContext mockRetrieverContext; + private DatasetUrn testDatasetUrn; + private final FieldPathMutator test = + new FieldPathMutator().setConfig(mock(AspectPluginConfig.class)); + + @BeforeTest + public void init() throws URISyntaxException { + testDatasetUrn = + DatasetUrn.createFromUrn( + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)")); + + entityRegistry = new TestEntityRegistry(); + AspectRetriever mockAspectRetriever = mock(AspectRetriever.class); + when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry); + GraphRetriever mockGraphRetriever = mock(GraphRetriever.class); + mockRetrieverContext = mock(RetrieverContext.class); + when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever); + when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever); + } + + @Test + public void testValidateIncorrectAspect() { + final Domains domains = + new Domains() + .setDomains(new UrnArray(ImmutableList.of(UrnUtils.getUrn("urn:li:domain:123")))); + assertEquals( + test.writeMutation( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(testDatasetUrn) + .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(testDatasetUrn.getEntityType()) + .getAspectSpec(DOMAINS_ASPECT_NAME)) + .recordTemplate(domains) + .build()), + mockRetrieverContext) + .filter(Pair::getSecond) + .count(), + 0); + } + + @Test + public void testValidateNonDuplicatedSchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(false); + assertEquals( + test.writeMutation( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(testDatasetUrn) + .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(testDatasetUrn.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .filter(Pair::getSecond) + .count(), + 0); + } + + @Test + public void testValidateDuplicatedSchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(true); + + List> result = + test.writeMutation( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(testDatasetUrn) + .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(testDatasetUrn.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .collect(Collectors.toList()); + + assertEquals(result.stream().filter(Pair::getSecond).count(), 1); + assertEquals(result.get(0).getFirst().getAspect(SchemaMetadata.class).getFields().size(), 1); + } + + @Test + public void testValidateDeleteDuplicatedSchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(true); + + assertEquals( + test.writeMutation( + Set.of( + TestMCP.builder() + .changeType(ChangeType.DELETE) + .urn(testDatasetUrn) + .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(testDatasetUrn.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .filter(Pair::getSecond) + .count(), + 0); + } + + @Test + public void testValidateNonDuplicatedEditableSchemaFieldPath() { + final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false); + assertEquals( + test.writeMutation( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(testDatasetUrn) + .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(testDatasetUrn.getEntityType()) + .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .filter(Pair::getSecond) + .count(), + 0); + } + + @Test + public void testValidateDuplicatedEditableSchemaFieldPath() { + final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(true); + + List> result = + test.writeMutation( + Set.of( + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(testDatasetUrn) + .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(testDatasetUrn.getEntityType()) + .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build()), + mockRetrieverContext) + .collect(Collectors.toList()); + + assertEquals(result.stream().filter(Pair::getSecond).count(), 1); + assertEquals( + result + .get(0) + .getFirst() + .getAspect(EditableSchemaMetadata.class) + .getEditableSchemaFieldInfo() + .size(), + 1); + } + + private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { + List fields = new ArrayList<>(); + fields.add( + new SchemaField() + .setType( + new SchemaFieldDataType() + .setType(SchemaFieldDataType.Type.create(new StringType()))) + .setNullable(false) + .setNativeDataType("string") + .setFieldPath("test")); + + if (duplicateFields) { + fields.add( + new SchemaField() + .setType( + new SchemaFieldDataType() + .setType(SchemaFieldDataType.Type.create(new StringType()))) + .setNullable(false) + .setNativeDataType("string") + .setFieldPath("test")); + } + + return new SchemaMetadata() + .setPlatform(testDatasetUrn.getPlatformEntity()) + .setFields(new SchemaFieldArray(fields)); + } + + private EditableSchemaMetadata getMockEditableSchemaMetadataAspect(boolean duplicateFields) { + + List fields = new ArrayList<>(); + fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); + + if (duplicateFields) { + fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); + } + + return new EditableSchemaMetadata() + .setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(fields)); + } +} diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index fe6063b3cefc63..1e8044e3b5f86e 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -614,3 +614,16 @@ plugins: supportedEntityAspectNames: - entityName: '*' aspectName: structuredProperties + - className: 'com.linkedin.metadata.aspect.hooks.FieldPathMutator' + enabled: true + supportedOperations: + - CREATE + - UPSERT + - UPDATE + - RESTATE + - PATCH + supportedEntityAspectNames: + - entityName: '*' + aspectName: 'schemaMetadata' + - entityName: '*' + aspectName: 'editableSchemaMetadata'