Skip to content

Commit

Permalink
metadata-models 72.0.8 -> 80.0.0 (datahub-project#1756)
Browse files Browse the repository at this point in the history
  • Loading branch information
jywadhwani authored and arunvasudevan committed Sep 10, 2020
1 parent fbb0fb4 commit e4ebeaa
Show file tree
Hide file tree
Showing 37 changed files with 861 additions and 80 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# Pegasus & Avro
**/src/mainGenerated*
**/src/testGenerated*
metadata-events/mxe-registration/src/main/resources/**/*.avsc

# Java
.java-version
Expand Down
15 changes: 15 additions & 0 deletions metadata-dao-impl/ebean-dao/gma-create-all.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ create table metadata_aspect (
constraint pk_metadata_aspect primary key (urn,aspect,version)
);

create table metadata_index (
id bigint auto_increment not null,
urn varchar(500) not null,
aspect varchar(200) not null,
path varchar(200) not null,
longval bigint,
stringval varchar(500),
doubleval double,
constraint pk_metadata_index primary key (id)
);

create index idx_long_val on metadata_index (aspect,path,longval,urn);
create index idx_string_val on metadata_index (aspect,path,stringval,urn);
create index idx_double_val on metadata_index (aspect,path,doubleval,urn);
create index idx_urn on metadata_index (urn);
6 changes: 6 additions & 0 deletions metadata-dao-impl/ebean-dao/gma-drop-all.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@ drop table if exists metadata_id;

drop table if exists metadata_aspect;

drop table if exists metadata_index;

drop index if exists idx_long_val;
drop index if exists idx_string_val;
drop index if exists idx_double_val;
drop index if exists idx_urn;
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.linkedin.metadata.dao;

import io.ebean.annotation.Index;
import io.ebean.Model;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;


@Getter
@Setter
// define composite indexes
@Index(name = "idx_long_val", columnNames = {
EbeanMetadataIndex.ASPECT_COLUMN,
EbeanMetadataIndex.PATH_COLUMN,
EbeanMetadataIndex.LONG_COLUMN,
EbeanMetadataIndex.URN_COLUMN
})
@Index(name = "idx_string_val", columnNames = {
EbeanMetadataIndex.ASPECT_COLUMN,
EbeanMetadataIndex.PATH_COLUMN,
EbeanMetadataIndex.STRING_COLUMN,
EbeanMetadataIndex.URN_COLUMN
})
@Index(name = "idx_double_val", columnNames = {
EbeanMetadataIndex.ASPECT_COLUMN,
EbeanMetadataIndex.PATH_COLUMN,
EbeanMetadataIndex.DOUBLE_COLUMN,
EbeanMetadataIndex.URN_COLUMN
})
@Entity
@Table(name = "metadata_index")
public class EbeanMetadataIndex extends Model {

public static final long serialVersionUID = 1L;

private static final String ID_COLUMN = "id";
public static final String URN_COLUMN = "urn";
public static final String ASPECT_COLUMN = "aspect";
public static final String PATH_COLUMN = "path";
public static final String LONG_COLUMN = "longVal";
public static final String STRING_COLUMN = "stringVal";
public static final String DOUBLE_COLUMN = "doubleVal";

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = ID_COLUMN)
protected long id;

@NonNull
@Index(name = "idx_urn")
@Column(name = URN_COLUMN, length = 500, nullable = false)
protected String urn;

@NonNull
@Column(name = ASPECT_COLUMN, length = 200, nullable = false)
protected String aspect;

@NonNull
@Column(name = PATH_COLUMN, length = 200, nullable = false)
protected String path;

@Column(name = LONG_COLUMN)
protected Long longVal;

@Column(name = STRING_COLUMN, length = 500)
protected String stringVal;

@Column(name = DOUBLE_COLUMN)
protected Double doubleVal;

}
15 changes: 15 additions & 0 deletions metadata-dao-impl/ebean-dao/src/main/resources/gma-create-all.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ create table metadata_id (
constraint uq_metadata_id_namespace_id unique (namespace,id)
);

create table metadata_index (
id bigint auto_increment not null,
urn varchar(500) not null,
aspect varchar(200) not null,
path varchar(200) not null,
longval bigint,
stringval varchar(500),
doubleval double,
constraint pk_metadata_index primary key (id)
);

create index idx_long_val on metadata_index (aspect,path,longval,urn);
create index idx_string_val on metadata_index (aspect,path,stringval,urn);
create index idx_double_val on metadata_index (aspect,path,doubleval,urn);
create index idx_urn on metadata_index (urn);
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public class ESUtils {
private static final String DEFAULT_SEARCH_RESULTS_SORT_BY_FIELD = "urn";

/*
* TODO: we might need to extend this list if need be, below link has the complete list
* https://www.elastic.co/guide/en/elasticsearch/reference/current/regexp-syntax.html
* */
private static final char[] ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS = {'*'};
* Refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/regexp-syntax.html for list of reserved
* characters in an Elasticsearch regular expression.
*/
private static final String ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS = "?+*|{}[]()";

private ESUtils() {

Expand Down Expand Up @@ -81,7 +81,7 @@ public static void buildSortOrder(@Nonnull SearchSourceBuilder searchSourceBuild
*/
@Nonnull
public static String escapeReservedCharacters(@Nonnull String input) {
for (char reservedChar : ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS) {
for (char reservedChar : ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS.toCharArray()) {
input = input.replace(String.valueOf(reservedChar), "\\" + reservedChar);
}
return input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ public void testBuildFilterQuery() throws Exception {
public void testEscapeReservedCharacters() {
assertEquals(escapeReservedCharacters("foobar"), "foobar");
assertEquals(escapeReservedCharacters("**"), "\\*\\*");
assertEquals(escapeReservedCharacters("()"), "\\(\\)");
assertEquals(escapeReservedCharacters("{}"), "\\{\\}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.mxe.Configs;
import com.linkedin.mxe.MetadataAuditEvent;
import com.linkedin.mxe.MetadataChangeEvent;
import com.linkedin.mxe.Topics;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -107,11 +111,57 @@ record = EventUtils.pegasusToAvroMAE(metadataAuditEvent);
}
}

@Override
public <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue) {

validateAspectSpecificTopic(ModelUtils.getAspectSpecificMAETopicName(urn, newValue));

String topic;
Class<? extends SpecificRecord> maeAvroClass;
RecordTemplate metadataAuditEvent;
try {
topic = (String) Topics.class.getField(ModelUtils.getAspectSpecificMAETopicName(urn, newValue)).get(null);
maeAvroClass = Configs.TOPIC_SCHEMA_CLASS_MAP.get(topic);
metadataAuditEvent = (RecordTemplate) EventUtils.getPegasusClass(maeAvroClass).newInstance();

metadataAuditEvent.getClass().getMethod("setUrn", urn.getClass()).invoke(metadataAuditEvent, urn);
metadataAuditEvent.getClass().getMethod("setNewValue", newValue.getClass()).invoke(metadataAuditEvent, newValue);
if (oldValue != null) {
metadataAuditEvent.getClass()
.getMethod("setOldValue", oldValue.getClass())
.invoke(metadataAuditEvent, oldValue);
}
} catch (NoSuchFieldException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException
| InstantiationException | InvocationTargetException e) {
throw new IllegalArgumentException("Failed to compose the Pegasus aspect specific MAE", e);
}

GenericRecord record;
try {
record = EventUtils.pegasusToAvroAspectSpecificMXE(maeAvroClass, metadataAuditEvent);
} catch (NoSuchFieldException | IOException | IllegalAccessException e) {
throw new ModelConversionException("Failed to convert Pegasus aspect specific MAE to Avro", e);
}

if (_callback.isPresent()) {
_producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
} else {
_producer.send(new ProducerRecord(topic, urn.toString(), record));
}
}

@Nonnull
private Snapshot makeSnapshot(@Nonnull URN urn, @Nonnull RecordTemplate value) {
Snapshot snapshot = new Snapshot();
List<ASPECT_UNION> aspects = Collections.singletonList(ModelUtils.newAspectUnion(_aspectUnionClass, value));
RecordUtils.setSelectedRecordTemplateInUnion(snapshot, ModelUtils.newSnapshot(_snapshotClass, urn, aspects));
return snapshot;
}
}

static void validateAspectSpecificTopic(@Nonnull String topic) {
if (!Arrays.stream(Topics.class.getFields()).anyMatch(field -> field.getName().equals(topic))) {
throw new IllegalArgumentException(String.format("The aspect specific topic %s is not registered.", topic));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.linkedin.metadata.query.RelationshipFilter;
import com.linkedin.metadata.validator.EntityValidator;
import com.linkedin.metadata.validator.RelationshipValidator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -185,7 +185,7 @@ public List<RecordTemplate> findMixedTypesRelationships(@Nonnull Statement query

@Nonnull
public <SRC_ENTITY extends RecordTemplate, DEST_ENTITY extends RecordTemplate, RELATIONSHIP extends RecordTemplate>
List<List<RecordTemplate>> getPathsToAllNodesTraversed(
List<List<RecordTemplate>> getTraversedPaths(
@Nullable Class<SRC_ENTITY> sourceEntityClass, @Nonnull Filter sourceEntityFilter,
@Nullable Class<DEST_ENTITY> destinationEntityClass, @Nonnull Filter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull RelationshipFilter relationshipFilter,
Expand Down Expand Up @@ -221,7 +221,7 @@ List<List<RecordTemplate>> getPathsToAllNodesTraversed(

final Statement statement = buildStatement(statementString, "length(p), dest.urn", offset, count);

return runQuery(statement, this::pathRecordToEntityList);
return runQuery(statement, this::pathRecordToPathList);
}

/**
Expand Down Expand Up @@ -302,11 +302,21 @@ private <ENTITY extends RecordTemplate> ENTITY nodeRecordToEntity(@Nonnull Class
}

@Nonnull
private List<RecordTemplate> pathRecordToEntityList(@Nonnull Record pathRecord) {
private List<RecordTemplate> pathRecordToPathList(@Nonnull Record pathRecord) {
final Path path = pathRecord.values().get(0).asPath();
return StreamSupport.stream(path.nodes().spliterator(), false)
.map(Neo4jUtil::nodeToEntity)
.collect(Collectors.toList());
final List<RecordTemplate> pathList = new ArrayList<>();

StreamSupport.stream(path.spliterator(), false)
.map(Neo4jUtil::pathSegmentToRecordList)
.forEach(segment -> {
if (pathList.isEmpty()) {
pathList.add(segment.get(0));
}
pathList.add(segment.get(1));
pathList.add(segment.get(2));
});

return pathList;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
import com.linkedin.metadata.query.Filter;
import com.linkedin.metadata.query.RelationshipDirection;
import com.linkedin.metadata.query.RelationshipFilter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ClassUtils;
import org.neo4j.driver.types.Node;
import org.neo4j.driver.types.Path;
import org.neo4j.driver.types.Relationship;

import static com.linkedin.metadata.dao.utils.QueryUtils.*;
Expand Down Expand Up @@ -165,6 +168,25 @@ public static RecordTemplate nodeToEntity(@Nonnull Node node) {
return RecordUtils.toRecordTemplate(className, new DataMap(node.asMap()));
}

/**
* Converts path segment (field:value map) list of {@link RecordTemplate}s of nodes & edges
*
* @param segment The segment of a path containing nodes & edges
* @return List<RecordTemplate>
*/
@Nonnull
public static List<RecordTemplate> pathSegmentToRecordList(@Nonnull Path.Segment segment) {
final Node startNode = segment.start();
final Node endNode = segment.end();
final Relationship edge = segment.relationship();

return Arrays.asList(
nodeToEntity(startNode),
edgeToRelationship(startNode, endNode, edge),
nodeToEntity(endNode)
);
}

/**
* Converts edge (source-relationship->destination) to RELATIONSHIP
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,17 +511,17 @@ public void testFindNodesInPath() throws Exception {
// Get reports roll-up - 2 levels
Filter sourceFilter = newFilter("urn", urn1.toString());
RelationshipFilter relationshipFilter = createRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING);
List<List<RecordTemplate>> nodesInPath = _dao.getPathsToAllNodesTraversed(EntityFoo.class, sourceFilter, null,
List<List<RecordTemplate>> paths = _dao.getTraversedPaths(EntityFoo.class, sourceFilter, null,
EMPTY_FILTER, RelationshipFoo.class, relationshipFilter, 1, 2, -1, -1);
assertEquals(nodesInPath.size(), 5);
assertEquals(nodesInPath.stream().filter(l -> l.size() == 2).collect(Collectors.toList()).size(), 2);
assertEquals(nodesInPath.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 3);
assertEquals(paths.size(), 5);
assertEquals(paths.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 2);
assertEquals(paths.stream().filter(l -> l.size() == 5).collect(Collectors.toList()).size(), 3);

// Get reports roll-up - 1 level
nodesInPath = _dao.getPathsToAllNodesTraversed(EntityFoo.class, sourceFilter, null,
paths = _dao.getTraversedPaths(EntityFoo.class, sourceFilter, null,
EMPTY_FILTER, RelationshipFoo.class, relationshipFilter, 1, 1, -1, -1);
assertEquals(nodesInPath.size(), 2);
assertEquals(nodesInPath.stream().filter(l -> l.size() == 2).collect(Collectors.toList()).size(), 2);
assertEquals(nodesInPath.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 0);
assertEquals(paths.size(), 2);
assertEquals(paths.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 2);
assertEquals(paths.stream().filter(l -> l.size() == 5).collect(Collectors.toList()).size(), 0);
}
}
Loading

0 comments on commit e4ebeaa

Please sign in to comment.