From 7b9cbd4d736354c72bfd2e139284b881d3800dd5 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 09:18:47 -0700 Subject: [PATCH 01/15] elasticsearch as graph backend --- .../dashboard/mappers/DashboardMapper.java | 81 ------ datahub-kubernetes/datahub/README.md | 1 + .../datahub/charts/datahub-gms/README.md | 1 + .../datahub-gms/templates/deployment.yaml | 4 + .../datahub/charts/datahub-gms/values.yaml | 3 +- .../charts/datahub-mae-consumer/README.md | 1 + .../templates/deployment.yaml | 6 +- .../charts/datahub-mae-consumer/values.yaml | 3 +- datahub-kubernetes/datahub/values.yaml | 2 +- .../datahub-gms/env/docker-without-neo4j.env | 39 +++ docker/datahub-gms/env/docker.env | 1 + docker/datahub-gms/start.sh | 1 - .../docker-compose-without-neo4j.override.yml | 33 +++ docker/docker-compose-without-neo4j.yml | 136 +++++++++++ ...ocker-compose-without-neo4j.quickstart.yml | 146 +++++++++++ .../quickstart/docker-compose.quickstart.yml | 1 + .../quickstart/generate_docker_quickstart.sh | 2 +- .../ElasticSearchGraphServiceFactory.java | 55 +++++ .../factory/common/GraphServiceFactory.java | 35 ++- .../linkedin/metadata/graph/GraphService.java | 2 + .../metadata/graph/Neo4jGraphService.java | 5 + .../graph/elastic/ESGraphReadDAO.java | 112 +++++++++ .../graph/elastic/ESGraphWriteDAO.java | 60 +++++ .../elastic/ElasticSearchGraphService.java | 231 ++++++++++++++++++ .../GraphRelationshipMappingsBuilder.java | 36 +++ .../query/request/SearchRequestHandler.java | 3 + .../graph/ElasticSearchGraphServiceTest.java | 187 ++++++++++++++ .../kafka/MetadataAuditEventsProcessor.java | 10 +- 28 files changed, 1102 insertions(+), 95 deletions(-) delete mode 100644 datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardMapper.java create mode 100644 docker/datahub-gms/env/docker-without-neo4j.env create mode 100644 docker/docker-compose-without-neo4j.override.yml create mode 100644 docker/docker-compose-without-neo4j.yml create mode 100644 docker/quickstart/docker-compose-without-neo4j.quickstart.yml create mode 100644 gms/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphReadDAO.java create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphRelationshipMappingsBuilder.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardMapper.java deleted file mode 100644 index da3cca409a9c03..00000000000000 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardMapper.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.linkedin.datahub.graphql.types.dashboard.mappers; - -import com.linkedin.datahub.graphql.generated.AccessLevel; -import com.linkedin.datahub.graphql.generated.Chart; -import com.linkedin.datahub.graphql.generated.Dashboard; -import com.linkedin.datahub.graphql.generated.DashboardInfo; -import com.linkedin.datahub.graphql.generated.EntityType; -import com.linkedin.datahub.graphql.generated.DashboardEditableProperties; -import com.linkedin.datahub.graphql.types.common.mappers.AuditStampMapper; -import com.linkedin.datahub.graphql.types.common.mappers.StringMapMapper; -import com.linkedin.datahub.graphql.types.mappers.ModelMapper; -import com.linkedin.datahub.graphql.types.common.mappers.OwnershipMapper; -import com.linkedin.datahub.graphql.types.common.mappers.StatusMapper; -import com.linkedin.datahub.graphql.types.tag.mappers.GlobalTagsMapper; - -import javax.annotation.Nonnull; -import java.util.stream.Collectors; - -public class DashboardMapper implements ModelMapper { - - public static final DashboardMapper INSTANCE = new DashboardMapper(); - - public static Dashboard map(@Nonnull final com.linkedin.dashboard.Dashboard dashboard) { - return INSTANCE.apply(dashboard); - } - - @Override - public Dashboard apply(@Nonnull final com.linkedin.dashboard.Dashboard dashboard) { - final Dashboard result = new Dashboard(); - result.setUrn(dashboard.getUrn().toString()); - result.setType(EntityType.DASHBOARD); - result.setDashboardId(dashboard.getDashboardId()); - result.setTool(dashboard.getTool()); - if (dashboard.hasInfo()) { - result.setInfo(mapDashboardInfo(dashboard.getInfo())); - } - if (dashboard.hasOwnership()) { - result.setOwnership(OwnershipMapper.map(dashboard.getOwnership())); - } - if (dashboard.hasStatus()) { - result.setStatus(StatusMapper.map(dashboard.getStatus())); - } - if (dashboard.hasGlobalTags()) { - result.setGlobalTags(GlobalTagsMapper.map(dashboard.getGlobalTags())); - } - if (dashboard.hasEditableProperties()) { - final DashboardEditableProperties dashboardEditableProperties = new DashboardEditableProperties(); - dashboardEditableProperties.setDescription(dashboard.getEditableProperties().getDescription()); - result.setEditableProperties(dashboardEditableProperties); - } - return result; - } - - private DashboardInfo mapDashboardInfo(final com.linkedin.dashboard.DashboardInfo info) { - final DashboardInfo result = new DashboardInfo(); - result.setDescription(info.getDescription()); - result.setName(info.getTitle()); - result.setLastRefreshed(info.getLastRefreshed()); - result.setCharts(info.getCharts().stream().map(urn -> { - final Chart chart = new Chart(); - chart.setUrn(urn.toString()); - return chart; - }).collect(Collectors.toList())); - if (info.hasExternalUrl()) { - // TODO: Migrate to using the External URL field for consistency. - result.setExternalUrl(info.getDashboardUrl().toString()); - } - if (info.hasCustomProperties()) { - result.setCustomProperties(StringMapMapper.map(info.getCustomProperties())); - } - if (info.hasAccess()) { - result.setAccess(AccessLevel.valueOf(info.getAccess().toString())); - } - result.setLastModified(AuditStampMapper.map(info.getLastModified().getLastModified())); - result.setCreated(AuditStampMapper.map(info.getLastModified().getCreated())); - if (info.getLastModified().hasDeleted()) { - result.setDeleted(AuditStampMapper.map(info.getLastModified().getDeleted())); - } - return result; - } -} diff --git a/datahub-kubernetes/datahub/README.md b/datahub-kubernetes/datahub/README.md index 3a7b77b5d3fbe7..fe7260b250aa83 100644 --- a/datahub-kubernetes/datahub/README.md +++ b/datahub-kubernetes/datahub/README.md @@ -57,6 +57,7 @@ helm install datahub datahub/ | global.sql.datasource.username | string | `"root"` | SQL user name | | global.sql.datasource.password.secretRef | string | `"mysql-secrets"` | Secret that contains the MySQL password | | global.sql.datasource.password.secretKey | string | `"mysql-password"` | Secret key that contains the MySQL password | +| global.graphService | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. ## Optional Chart Values diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/README.md b/datahub-kubernetes/datahub/charts/datahub-gms/README.md index 7b544a8c3c20b3..ba855ba2ea2d2a 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/README.md +++ b/datahub-kubernetes/datahub/charts/datahub-gms/README.md @@ -36,6 +36,7 @@ Current chart version is `0.2.0` | global.sql.datasource.username | string | `"datahub"` | | | global.sql.datasource.password.secretRef | string | `"mysql-secrets"` | | | global.sql.datasource.password.secretKey | string | `"mysql-password"` | | +| global.graphService | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. | image.pullPolicy | string | `"IfNotPresent"` | | | image.repository | string | `"linkedin/datahub-gms"` | | | image.tag | string | `"v0.8.1"` | | diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml index e0ef91188c135f..9ae327e25a5459 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml @@ -115,6 +115,9 @@ spec: name: "{{ .password.secretRef }}" key: "{{ .password.secretKey }}" {{- end }} + - name: GRAPH_SERVICE + value: {{ .Values.global.graphService }} + {{- if eq .Values.global.graphService "neo4j" }} - name: NEO4J_HOST value: "{{ .Values.global.neo4j.host }}" - name: NEO4J_URI @@ -126,6 +129,7 @@ spec: secretKeyRef: name: "{{ .Values.global.neo4j.password.secretRef }}" key: "{{ .Values.global.neo4j.password.secretKey }}" + {{- end }} {{- if .Values.global.springKafkaConfigurationOverrides }} {{- range $configName, $configValue := .Values.global.springKafkaConfigurationOverrides }} - name: SPRING_KAFKA_PROPERTIES_{{ $configName | replace "." "_" | upper }} diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml b/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml index 9b23a76ef1b0f8..37b4cc1a7a270d 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml @@ -151,6 +151,7 @@ readinessProbe: # helm install datahub-gms datahub-gms/ global: datahub_analytics_enabled: true + graphService: neo4j elasticsearch: host: "elasticsearch" @@ -191,4 +192,4 @@ global: - "broker" - "mysql" - "elasticsearch" - - "neo4j" \ No newline at end of file + - "neo4j" diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/README.md b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/README.md index 29696289f30d37..1421b0fdcd112b 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/README.md +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/README.md @@ -29,6 +29,7 @@ Current chart version is `0.2.0` | global.hostAliases[0].hostnames[2] | string | `"elasticsearch"` | | | global.hostAliases[0].hostnames[3] | string | `"neo4j"` | | | global.hostAliases[0].ip | string | `"192.168.0.104"` | | +| global.graphService | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. | image.pullPolicy | string | `"IfNotPresent"` | | | image.repository | string | `"linkedin/datahub-mae-consumer"` | | | image.tag | string | `"v0.8.1"` | | diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml index 553397a62affc3..98cc6bdd6aeeaf 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml @@ -100,6 +100,9 @@ spec: name: "{{ .password.secretRef }}" key: "{{ .password.secretKey }}" {{- end }} + - name: GRAPH_SERVICE + value: {{ .Values.global.graphService }} + {{- if eq .Values.global.graphService "neo4j" }} - name: NEO4J_HOST value: "{{ .Values.global.neo4j.host }}" - name: NEO4J_URI @@ -111,8 +114,7 @@ spec: secretKeyRef: name: "{{ .Values.global.neo4j.password.secretRef }}" key: "{{ .Values.global.neo4j.password.secretKey }}" - - name: DATAHUB_ANALYTICS_ENABLED - value: "{{ .Values.global.datahub_analytics_enabled }}" + {{- end }} {{- if .Values.global.springKafkaConfigurationOverrides }} {{- range $configName, $configValue := .Values.global.springKafkaConfigurationOverrides }} - name: SPRING_KAFKA_PROPERTIES_{{ $configName | replace "." "_" | upper }} diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/values.yaml b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/values.yaml index 3da74b68321730..c8fd4347598c01 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/values.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/values.yaml @@ -145,6 +145,7 @@ readinessProbe: failureThreshold: 8 global: + graphService: neo4j datahub_analytics_enabled: true elasticsearch: @@ -175,4 +176,4 @@ global: - "broker" - "mysql" - "elasticsearch" - - "neo4j" \ No newline at end of file + - "neo4j" diff --git a/datahub-kubernetes/datahub/values.yaml b/datahub-kubernetes/datahub/values.yaml index e7cad25392f06c..9b532edbc4c230 100644 --- a/datahub-kubernetes/datahub/values.yaml +++ b/datahub-kubernetes/datahub/values.yaml @@ -58,7 +58,7 @@ datahubUpgrade: tag: "v0.8.1" global: - + graphService: neo4j datahub_analytics_enabled: true datahub_standalone_consumers_enabled: false diff --git a/docker/datahub-gms/env/docker-without-neo4j.env b/docker/datahub-gms/env/docker-without-neo4j.env new file mode 100644 index 00000000000000..96e8ccfc7b04e4 --- /dev/null +++ b/docker/datahub-gms/env/docker-without-neo4j.env @@ -0,0 +1,39 @@ +DATASET_ENABLE_SCSI=false +EBEAN_DATASOURCE_USERNAME=datahub +EBEAN_DATASOURCE_PASSWORD=datahub +EBEAN_DATASOURCE_HOST=mysql:3306 +EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8 +EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver +KAFKA_BOOTSTRAP_SERVER=broker:29092 +KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 +ELASTICSEARCH_HOST=elasticsearch +ELASTICSEARCH_PORT=9200 +GRAPH_SERVICE=elasticsearch + +MAE_CONSUMER_ENABLED=true +MCE_CONSUMER_ENABLED=true + +# Uncomment to disable persistence of client-side analytics events +# DATAHUB_ANALYTICS_ENABLED=false + +# Uncomment to configure kafka topic names +# Make sure these names are consistent across the whole deployment +# METADATA_AUDIT_EVENT_NAME=MetadataAuditEvent_v4 +# METADATA_CHANGE_EVENT_NAME=MetadataChangeEvent_v4 +# FAILED_METADATA_CHANGE_EVENT_NAME=FailedMetadataChangeEvent_v4 + +# Uncomment and set these to support SSL connection to Elasticsearch +# ELASTICSEARCH_USE_SSL=true +# ELASTICSEARCH_SSL_PROTOCOL=TLSv1.2 +# ELASTICSEARCH_SSL_SECURE_RANDOM_IMPL= +# ELASTICSEARCH_SSL_TRUSTSTORE_FILE= +# ELASTICSEARCH_SSL_TRUSTSTORE_TYPE= +# ELASTICSEARCH_SSL_TRUSTSTORE_PASSWORD= +# ELASTICSEARCH_SSL_KEYSTORE_FILE= +# ELASTICSEARCH_SSL_KEYSTORE_TYPE= +# ELASTICSEARCH_SSL_KEYSTORE_PASSWORD= + +# To use simple username/password authentication to Elasticsearch over HTTPS +# set ELASTICSEARCH_USE_SSL=true and uncomment: +# ELASTICSEARCH_USERNAME= +# ELASTICSEARCH_PASSWORD= diff --git a/docker/datahub-gms/env/docker.env b/docker/datahub-gms/env/docker.env index b56337df5e3dee..4b579bec3d50e6 100644 --- a/docker/datahub-gms/env/docker.env +++ b/docker/datahub-gms/env/docker.env @@ -12,6 +12,7 @@ NEO4J_HOST=http://neo4j:7474 NEO4J_URI=bolt://neo4j NEO4J_USERNAME=neo4j NEO4J_PASSWORD=datahub +GRAPH_SERVICE=neo4j MAE_CONSUMER_ENABLED=true MCE_CONSUMER_ENABLED=true diff --git a/docker/datahub-gms/start.sh b/docker/datahub-gms/start.sh index 0712aea550585b..0e4d18f6f71cb3 100755 --- a/docker/datahub-gms/start.sh +++ b/docker/datahub-gms/start.sh @@ -30,7 +30,6 @@ dockerize \ -wait tcp://$EBEAN_DATASOURCE_HOST \ -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') \ -wait $ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT -wait-http-header "$ELASTICSEARCH_AUTH_HEADER" \ - -wait $NEO4J_HOST \ -timeout 240s \ java $JAVA_OPTS $JMX_OPTS \ -jar /jetty-runner.jar \ diff --git a/docker/docker-compose-without-neo4j.override.yml b/docker/docker-compose-without-neo4j.override.yml new file mode 100644 index 00000000000000..f6f05a52d64898 --- /dev/null +++ b/docker/docker-compose-without-neo4j.override.yml @@ -0,0 +1,33 @@ +--- +version: '3.8' +services: + mysql: + container_name: mysql + hostname: mysql + image: mysql:5.7 + env_file: mysql/env/docker.env + command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci + ports: + - "3306:3306" + volumes: + - ./mysql/init.sql:/docker-entrypoint-initdb.d/init.sql + - mysqldata:/var/lib/mysql + + mysql-setup: + build: + context: ../ + dockerfile: docker/mysql-setup/Dockerfile + image: acryldata/datahub-mysql-setup:head + env_file: mysql-setup/env/docker.env + hostname: mysql-setup + container_name: mysql-setup + depends_on: + - mysql + + datahub-gms: + env_file: datahub-gms/env/docker-without-neo4j.env + depends_on: + - mysql + +volumes: + mysqldata: diff --git a/docker/docker-compose-without-neo4j.yml b/docker/docker-compose-without-neo4j.yml new file mode 100644 index 00000000000000..369ce37a04ea57 --- /dev/null +++ b/docker/docker-compose-without-neo4j.yml @@ -0,0 +1,136 @@ +# Docker compose file covering DataHub's default configuration, which is to run all containers on a single host. + +# Please see the README.md for instructions as to how to use and customize. + +# NOTE: This file will cannot build! No dockerfiles are set. See the README.md in this directory. +--- +version: '3.8' +services: + zookeeper: + image: confluentinc/cp-zookeeper:5.4.0 + env_file: zookeeper/env/docker.env + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + volumes: + - zkdata:/var/opt/zookeeper + + broker: + image: confluentinc/cp-kafka:5.4.0 + env_file: broker/env/docker.env + hostname: broker + container_name: broker + depends_on: + - zookeeper + ports: + - "29092:29092" + - "9092:9092" + + kafka-rest-proxy: + image: confluentinc/cp-kafka-rest:5.4.0 + env_file: kafka-rest-proxy/env/docker.env + hostname: kafka-rest-proxy + container_name: kafka-rest-proxy + ports: + - "8082:8082" + depends_on: + - zookeeper + - broker + - schema-registry + + kafka-topics-ui: + image: landoop/kafka-topics-ui:0.9.4 + env_file: kafka-topics-ui/env/docker.env + hostname: kafka-topics-ui + container_name: kafka-topics-ui + ports: + - "18000:8000" + depends_on: + - zookeeper + - broker + - schema-registry + - kafka-rest-proxy + + # This "container" is a workaround to pre-create topics + kafka-setup: + build: + context: kafka-setup + image: linkedin/datahub-kafka-setup:${DATAHUB_VERSION:-latest} + env_file: kafka-setup/env/docker.env + hostname: kafka-setup + container_name: kafka-setup + depends_on: + - broker + - schema-registry + + schema-registry: + image: confluentinc/cp-schema-registry:5.4.0 + env_file: schema-registry/env/docker.env + hostname: schema-registry + container_name: schema-registry + depends_on: + - zookeeper + - broker + ports: + - "8081:8081" + + schema-registry-ui: + image: landoop/schema-registry-ui:latest + env_file: schema-registry-ui/env/docker.env + container_name: schema-registry-ui + hostname: schema-registry-ui + ports: + - "8000:8000" + depends_on: + - schema-registry + + elasticsearch: + image: elasticsearch:7.9.3 + env_file: elasticsearch/env/docker.env + container_name: elasticsearch + hostname: elasticsearch + ports: + - "9200:9200" + volumes: + - esdata:/usr/share/elasticsearch/data + healthcheck: + test: ["CMD-SHELL", "curl -sS --fail 'http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=0s' || exit 1"] + start_period: 2m + retries: 4 + + datahub-gms: + build: + context: ../ + dockerfile: docker/datahub-gms/Dockerfile + image: linkedin/datahub-gms:${DATAHUB_VERSION:-latest} + env_file: gms/env/docker-without-neo4j.env + hostname: datahub-gms + container_name: datahub-gms + ports: + - "8080:8080" + depends_on: + - elasticsearch-setup + - kafka-setup + - mysql + + datahub-frontend-react: + build: + context: ../ + dockerfile: docker/datahub-frontend/Dockerfile + image: linkedin/datahub-frontend-react:${DATAHUB_VERSION:-latest} + env_file: datahub-frontend/env/docker.env + hostname: datahub-frontend-react + container_name: datahub-frontend-react + ports: + - "9002:9002" + depends_on: + - datahub-gms + +networks: + default: + name: datahub_network + +volumes: + esdata: + zkdata: diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml new file mode 100644 index 00000000000000..b921577d12e23f --- /dev/null +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -0,0 +1,146 @@ +networks: + default: + name: datahub_network +services: + broker: + container_name: broker + depends_on: + - zookeeper + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 + hostname: broker + image: confluentinc/cp-kafka:5.4.0 + ports: + - 29092:29092 + - 9092:9092 + datahub-frontend-react: + container_name: datahub-frontend-react + depends_on: + - datahub-gms + environment: + - DATAHUB_GMS_HOST=datahub-gms + - DATAHUB_GMS_PORT=8080 + - DATAHUB_SECRET=YouKnowNothing + - DATAHUB_APP_VERSION=1.0 + - DATAHUB_PLAY_MEM_BUFFER_SIZE=10MB + - KAFKA_BOOTSTRAP_SERVER=broker:29092 + - DATAHUB_TRACKING_TOPIC=DataHubUsageEvent_v1 + - ELASTIC_CLIENT_HOST=elasticsearch + - ELASTIC_CLIENT_PORT=9200 + hostname: datahub-frontend-react + image: linkedin/datahub-frontend-react:${DATAHUB_VERSION:-latest} + ports: + - 9002:9002 + datahub-gms: + container_name: datahub-gms + depends_on: + - mysql + environment: + - DATASET_ENABLE_SCSI=false + - EBEAN_DATASOURCE_USERNAME=datahub + - EBEAN_DATASOURCE_PASSWORD=datahub + - EBEAN_DATASOURCE_HOST=mysql:3306 + - EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8 + - EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver + - KAFKA_BOOTSTRAP_SERVER=broker:29092 + - KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 + - ELASTICSEARCH_HOST=elasticsearch + - ELASTICSEARCH_PORT=9200 + - GRAPH_SERVICE=elasticsearch + - MAE_CONSUMER_ENABLED=true + - MCE_CONSUMER_ENABLED=true + hostname: datahub-gms + image: acryldata/datahub-gms:gabe-test + mem_limit: 850m + ports: + - 8080:8080 + elasticsearch: + container_name: elasticsearch + environment: + - discovery.type=single-node + - xpack.security.enabled=false + - ES_JAVA_OPTS=-Xms512m -Xmx512m + healthcheck: + retries: 4 + start_period: 2m + test: + - CMD-SHELL + - curl -sS --fail 'http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=0s' + || exit 1 + hostname: elasticsearch + image: elasticsearch:7.9.3 + mem_limit: 1g + ports: + - 9200:9200 + volumes: + - esdata:/usr/share/elasticsearch/data + kafka-setup: + container_name: kafka-setup + depends_on: + - broker + - schema-registry + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_BOOTSTRAP_SERVER=broker:29092 + hostname: kafka-setup + image: linkedin/datahub-kafka-setup:${DATAHUB_VERSION:-latest} + mysql: + command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci + container_name: mysql + environment: + - MYSQL_DATABASE=datahub + - MYSQL_USER=datahub + - MYSQL_PASSWORD=datahub + - MYSQL_ROOT_PASSWORD=datahub + hostname: mysql + image: mysql:5.7 + ports: + - 3306:3306 + volumes: + - ./mysql/init.sql:/docker-entrypoint-initdb.d/init.sql + - mysqldata:/var/lib/mysql + mysql-setup: + container_name: mysql-setup + depends_on: + - mysql + environment: + - MYSQL_HOST=mysql + - MYSQL_PORT=3306 + - MYSQL_USERNAME=datahub + - MYSQL_PASSWORD=datahub + - DATAHUB_DB_NAME=datahub + hostname: mysql-setup + image: acryldata/datahub-mysql-setup:head + schema-registry: + container_name: schema-registry + depends_on: + - zookeeper + - broker + environment: + - SCHEMA_REGISTRY_HOST_NAME=schemaregistry + - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 + hostname: schema-registry + image: confluentinc/cp-schema-registry:5.4.0 + ports: + - 8081:8081 + zookeeper: + container_name: zookeeper + environment: + - ZOOKEEPER_CLIENT_PORT=2181 + - ZOOKEEPER_TICK_TIME=2000 + hostname: zookeeper + image: confluentinc/cp-zookeeper:5.4.0 + ports: + - 2181:2181 + volumes: + - zkdata:/var/opt/zookeeper +version: '2' +volumes: + esdata: null + mysqldata: null + zkdata: null diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index 5f928233112697..be558a8a1d4e90 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -55,6 +55,7 @@ services: - NEO4J_URI=bolt://neo4j - NEO4J_USERNAME=neo4j - NEO4J_PASSWORD=datahub + - GRAPH_SERVICE=neo4j - MAE_CONSUMER_ENABLED=true - MCE_CONSUMER_ENABLED=true hostname: datahub-gms diff --git a/docker/quickstart/generate_docker_quickstart.sh b/docker/quickstart/generate_docker_quickstart.sh index bdef531f24a6d5..61e1bbc7bdcddf 100755 --- a/docker/quickstart/generate_docker_quickstart.sh +++ b/docker/quickstart/generate_docker_quickstart.sh @@ -10,4 +10,4 @@ source venv/bin/activate pip install -r requirements.txt python generate_docker_quickstart.py ../docker-compose.yml ../docker-compose.override.yml docker-compose.quickstart.yml - +python generate_docker_quickstart.py ../docker-compose-without-neo4j.yml ../docker-compose-without-neo4j.override.yml docker-compose-without-neo4j.quickstart.yml diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java new file mode 100644 index 00000000000000..1328a8cc87850a --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java @@ -0,0 +1,55 @@ +package com.linkedin.gms.factory.common; + +import com.linkedin.metadata.graph.elastic.ESGraphReadDAO; +import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import javax.annotation.Nonnull; +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + + +@Configuration +@Import({RestHighLevelClientFactory.class, IndexConventionFactory.class}) +public class ElasticSearchGraphServiceFactory { + @Autowired + @Qualifier("elasticSearchRestHighLevelClient") + private RestHighLevelClient searchClient; + + @Autowired + @Qualifier(IndexConventionFactory.INDEX_CONVENTION_BEAN) + private IndexConvention indexConvention; + + @Value("${ES_BULK_REQUESTS_LIMIT:1}") + private Integer bulkRequestsLimit; + + @Value("${ES_BULK_FLUSH_PERIOD:1}") + private Integer bulkFlushPeriod; + + @Value("${ES_BULK_NUM_RETRIES:3}") + private Integer numRetries; + + @Value("${ES_BULK_RETRY_INTERVAL:1}") + private Long retryInterval; + + @Bean(name = "elasticSearchGraphService") + @Nonnull + protected ElasticSearchGraphService getInstance() { + return new ElasticSearchGraphService( + searchClient, + indexConvention, + new ESGraphWriteDAO( + searchClient, + indexConvention, + bulkRequestsLimit, + bulkFlushPeriod, + numRetries, + retryInterval), + new ESGraphReadDAO(searchClient, indexConvention)); + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java index af5a73dd9c12ea..9d77fa61b78849 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java @@ -2,27 +2,56 @@ import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.Neo4jGraphService; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import javax.annotation.Nonnull; +import org.elasticsearch.client.RestHighLevelClient; import org.neo4j.driver.Driver; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; @Configuration -@Import({Neo4jDriverFactory.class}) +@Import({Neo4jDriverFactory.class, ElasticSearchGraphServiceFactory.class}) public class GraphServiceFactory { + @Autowired + @Qualifier("elasticSearchGraphService") + private ElasticSearchGraphService _elasticSearchGraphService; + @Autowired @Qualifier("neo4jDriver") private Driver neo4jDriver; + @Autowired + @Qualifier("elasticSearchRestHighLevelClient") + private RestHighLevelClient searchClient; + + @Autowired + @Qualifier(IndexConventionFactory.INDEX_CONVENTION_BEAN) + private IndexConvention indexConvention; + + @Value("${GRAPH_SERVICE:neo4j}") + private String graphService; + @Nonnull - @DependsOn({"neo4jDriver"}) + @DependsOn({"neo4jDriver", "elasticSearchGraphService"}) @Bean(name = "graphService") + @Primary protected GraphService createInstance() { - return new Neo4jGraphService(neo4jDriver); + if (graphService.equals("neo4j")) { + return new Neo4jGraphService(neo4jDriver); + } else if (graphService.equals("elastic")) { + return _elasticSearchGraphService; + } else { + throw new RuntimeException( + "Error: Failed to initialize graph service. Graph Service provided: " + graphService + + ". Valid options: [neo4j, elasticsearch]."); + } } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java index dd0f211127c229..a99b0861128fea 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java @@ -28,4 +28,6 @@ void removeEdgeTypesFromNode( @Nonnull final Urn urn, @Nonnull final List relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter); + + void configure(); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java index a1d75110821ded..f7f81dddb718e3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java @@ -146,6 +146,11 @@ public void removeEdgeTypesFromNode( runQuery(buildStatement(statement, params)).consume(); } + @Override + public void configure() { + // Do nothing + } + // visible for testing @Nonnull Statement buildStatement(@Nonnull String queryTemplate, @Nonnull Map params) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphReadDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphReadDAO.java new file mode 100644 index 00000000000000..6f988e32c8357a --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphReadDAO.java @@ -0,0 +1,112 @@ +package com.linkedin.metadata.graph.elastic; + +import com.linkedin.metadata.query.Condition; +import com.linkedin.metadata.query.CriterionArray; +import com.linkedin.metadata.query.Filter; +import com.linkedin.metadata.query.RelationshipDirection; +import com.linkedin.metadata.query.RelationshipFilter; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import java.io.IOException; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.*; + + +/** + * A search DAO for Elasticsearch backend. + */ +@Slf4j +@RequiredArgsConstructor +public class ESGraphReadDAO { + + private final RestHighLevelClient client; + private final IndexConvention indexConvention; + + /** + * Converts {@link CriterionArray} to neo4j query string. + * + * @param criterionArray CriterionArray in a Filter + * @return Neo4j criteria string + */ + @Nonnull + public static void addCriterionToQueryBuilder(@Nonnull CriterionArray criterionArray, String node, BoolQueryBuilder finalQuery) { + if (!criterionArray.stream().allMatch(criterion -> Condition.EQUAL.equals(criterion.getCondition()))) { + throw new RuntimeException("Currently Elastic query filter only supports EQUAL condition " + criterionArray); + } + + criterionArray.forEach( + criterion -> finalQuery.must( + QueryBuilders.termQuery(node + "." + criterion.getField(), criterion.getValue()) + ) + ); + } + + public SearchResponse getSearchResponse( + @Nullable final String sourceType, + @Nonnull final Filter sourceEntityFilter, + @Nullable final String destinationType, + @Nonnull final Filter destinationEntityFilter, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter, + final int offset, + final int count) { + // also delete any relationship going to or from it + final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); + + SearchRequest searchRequest = new SearchRequest(); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + searchSourceBuilder.from(offset); + searchSourceBuilder.size(count); + + BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); + + // set source filter + String sourceNode = relationshipDirection == RelationshipDirection.OUTGOING ? "source" : "destination"; + if (sourceType != null && sourceType.length() > 0) { + finalQuery.must(QueryBuilders.termQuery(sourceNode + ".entityType", sourceType)); + } + addCriterionToQueryBuilder(sourceEntityFilter.getCriteria(), sourceNode, finalQuery); + + // set destination filter + String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? "destination" : "source"; + if (destinationType != null && destinationType.length() > 0) { + finalQuery.must(QueryBuilders.termQuery(destinationNode + ".entityType", destinationType)); + } + addCriterionToQueryBuilder(destinationEntityFilter.getCriteria(), destinationNode, finalQuery); + + // set relationship filter + if (relationshipTypes.size() > 0) { + BoolQueryBuilder relationshipQuery = QueryBuilders.boolQuery(); + relationshipTypes.forEach(relationshipType + -> relationshipQuery.should(QueryBuilders.termQuery("relationshipType", relationshipType))); + finalQuery.must(relationshipQuery); + } + + searchSourceBuilder.query(finalQuery); + + searchRequest.source(searchSourceBuilder); + + searchRequest.indices(indexConvention.getIndexName(INDEX_NAME)); + + try { + final SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + return searchResponse; + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java new file mode 100644 index 00000000000000..30c087b175c4ab --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java @@ -0,0 +1,60 @@ +package com.linkedin.metadata.graph.elastic; + +import com.linkedin.metadata.search.elasticsearch.update.BulkListener; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import java.io.IOException; +import javax.annotation.Nonnull; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; + +import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.*; + + +public class ESGraphWriteDAO { + private final BulkProcessor bulkProcessor; + private final IndexConvention indexConvention; + + public ESGraphWriteDAO(RestHighLevelClient searchClient, IndexConvention indexConvention, int bulkRequestsLimit, int bulkFlushPeriod, int numRetries, + long retryInterval) { + this.indexConvention = indexConvention; + this.bulkProcessor = BulkProcessor.builder( + (request, bulkListener) -> { + searchClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); + }, + BulkListener.getInstance()) + .setBulkActions(bulkRequestsLimit) + .setFlushInterval(TimeValue.timeValueSeconds(bulkFlushPeriod)) + .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(retryInterval), numRetries)) + .build(); + } + + /** + * Updates or inserts the given search document. + * + * @param document the document to update / insert + * @param docId the ID of the document + */ + public void upsertDocument(@Nonnull String document, @Nonnull String docId) { + final IndexRequest indexRequest = new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON); + final UpdateRequest updateRequest = new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON) + .detectNoop(false) + .upsert(indexRequest); + bulkProcessor.add(updateRequest); + } + + /** + * Deletes the document with the given document ID from the index. + * + * @param docId the ID of the document to delete + */ + public void deleteDocument(@Nonnull String docId) { + bulkProcessor.add(new DeleteRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId)); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java new file mode 100644 index 00000000000000..3800a59137d4d6 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -0,0 +1,231 @@ +package com.linkedin.metadata.graph.elastic; + +import com.fasterxml.jackson.databind.node.JsonNodeFactory; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.graph.Edge; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.query.Condition; +import com.linkedin.metadata.query.Criterion; +import com.linkedin.metadata.query.CriterionArray; +import com.linkedin.metadata.query.Filter; +import com.linkedin.metadata.query.RelationshipDirection; +import com.linkedin.metadata.query.RelationshipFilter; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; + +@Slf4j +@RequiredArgsConstructor +public class ElasticSearchGraphService implements GraphService { + + private static final int MAX_ELASTIC_RESULT = 10000; + private final RestHighLevelClient searchClient; + private final IndexConvention _indexConvention; + private final ESGraphWriteDAO _graphWriteDAO; + private final ESGraphReadDAO _graphReadDAO; + + private static final String DOC_DELIMETER = "--"; + public static final String INDEX_NAME = "graph_service_v1"; + private static final Map EMPTY_HASH = new HashMap<>(); + + private String toDocument(@Nonnull final Edge edge) { + final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode(); + + final ObjectNode sourceObject = JsonNodeFactory.instance.objectNode(); + sourceObject.put("urn", edge.getSource().toString()); + sourceObject.put("entityType", edge.getSource().getEntityType()); + + final ObjectNode destinationObject = JsonNodeFactory.instance.objectNode(); + destinationObject.put("urn", edge.getDestination().toString()); + destinationObject.put("entityType", edge.getDestination().getEntityType()); + + searchDocument.set("source", sourceObject); + + searchDocument.set("destination", destinationObject); + + searchDocument.put("relationshipType", edge.getRelationshipType()); + + return searchDocument.toString(); + } + + private String toDocId(@Nonnull final Edge edge) { + String rawDocId = + edge.getSource().toString() + DOC_DELIMETER + edge.getRelationshipType() + DOC_DELIMETER + edge.getDestination().toString(); + + try { + byte[] bytesOfRawDocID = rawDocId.getBytes("UTF-8"); + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] thedigest = md.digest(bytesOfRawDocID); + return thedigest.toString(); + } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { + e.printStackTrace(); + return rawDocId; + } + } + + public void addEdge(@Nonnull final Edge edge) { + String docId = toDocId(edge); + String edgeDocument = toDocument(edge); + _graphWriteDAO.upsertDocument(edgeDocument, docId); + } + + @Nonnull + public List findRelatedUrns( + @Nullable final String sourceType, + @Nonnull final Filter sourceEntityFilter, + @Nullable final String destinationType, + @Nonnull final Filter destinationEntityFilter, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter, + final int offset, + final int count) { + + final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); + String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? "destination" : "source"; + + SearchResponse response = _graphReadDAO.getSearchResponse( + sourceType, + sourceEntityFilter, + destinationType, + destinationEntityFilter, + relationshipTypes, + relationshipFilter, + offset, + count + ); + + return Arrays.stream(response.getHits().getHits()) + .map(hit -> ((HashMap) hit.getSourceAsMap().getOrDefault(destinationNode, EMPTY_HASH)).getOrDefault("urn", null)) + .filter(urn -> urn != null) + .collect(Collectors.toList()); + } + + private Filter createUrnFilter(@Nonnull final Urn urn) { + Filter filter = new Filter(); + CriterionArray criterionArray = new CriterionArray(); + Criterion criterion = new Criterion(); + criterion.setCondition(Condition.EQUAL); + criterion.setField("urn"); + criterion.setValue(urn.toString()); + criterionArray.add(criterion); + filter.setCriteria(criterionArray); + + return filter; + } + + public void removeNode(@Nonnull final Urn urn) { + Filter urnFilter = createUrnFilter(urn); + Filter emptyFilter = new Filter().setCriteria(new CriterionArray()); + List relationshipTypes = new ArrayList<>(); + + RelationshipFilter outgoingFilter = new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING); + RelationshipFilter incomingFilter = new RelationshipFilter().setDirection(RelationshipDirection.INCOMING); + + SearchResponse outgoingEdges = _graphReadDAO.getSearchResponse( + null, + urnFilter, + null, + emptyFilter, + relationshipTypes, + outgoingFilter, + 0, + MAX_ELASTIC_RESULT + ); + + SearchResponse incomingEdges = _graphReadDAO.getSearchResponse( + null, + urnFilter, + null, + emptyFilter, + relationshipTypes, + incomingFilter, + 0, + MAX_ELASTIC_RESULT + ); + + outgoingEdges.getHits().forEach( + hit -> _graphWriteDAO.deleteDocument(hit.getId()) + ); + + incomingEdges.getHits().forEach( + hit -> _graphWriteDAO.deleteDocument(hit.getId()) + ); + + return; + } + + public void removeEdgeTypesFromNode( + @Nonnull final Urn urn, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter) { + + Filter urnFilter = createUrnFilter(urn); + Filter emptyFilter = new Filter().setCriteria(new CriterionArray()); + + SearchResponse edges = _graphReadDAO.getSearchResponse( + null, + urnFilter, + null, + emptyFilter, + relationshipTypes, + relationshipFilter, + 0, + MAX_ELASTIC_RESULT + ); + + edges.getHits().forEach( + hit -> _graphWriteDAO.deleteDocument(hit.getId()) + ); + } + + @Override + public void configure() { + log.info("Setting up elastic graph index"); + boolean exists = false; + try { + exists = searchClient.indices().exists( + new GetIndexRequest(_indexConvention.getIndexName(INDEX_NAME)), RequestOptions.DEFAULT); + } catch (IOException e) { + e.printStackTrace(); + } + + // If index doesn't exist, create index + if (!exists) { + log.info("Elastic Graph Index does not exist. Creating."); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(_indexConvention.getIndexName(INDEX_NAME)); + + createIndexRequest.mapping(GraphRelationshipMappingsBuilder.getMappings()); + createIndexRequest.settings(SettingsBuilder.getSettings()); + + try { + searchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + e.printStackTrace(); + } + + log.info("Created Elastic Graph Index"); + } + + return; + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphRelationshipMappingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphRelationshipMappingsBuilder.java new file mode 100644 index 00000000000000..c034acee176e23 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphRelationshipMappingsBuilder.java @@ -0,0 +1,36 @@ +package com.linkedin.metadata.graph.elastic; + +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class GraphRelationshipMappingsBuilder { + + private GraphRelationshipMappingsBuilder() { } + + public static Map getMappings() { + Map mappings = new HashMap<>(); + mappings.put("source", getMappingsForEntity()); + mappings.put("destination", getMappingsForEntity()); + mappings.put("relationshipType", getMappingsForKeyword()); + + return ImmutableMap.of("properties", mappings); + } + + private static Map getMappingsForKeyword() { + return ImmutableMap.builder().put("type", "keyword").build(); + } + + private static Map getMappingsForEntity() { + + Map mappings = ImmutableMap.builder() + .put("urn", getMappingsForKeyword()) + .put("entityType", getMappingsForKeyword()) + .build(); + + return ImmutableMap.of("properties", mappings); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java index 6bfe72b47a8cdc..658066b923c806 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java @@ -192,6 +192,9 @@ public HighlightBuilder getHighlights() { public SearchResult extractResult(@Nonnull SearchResponse searchResponse, int from, int size) { int totalCount = (int) searchResponse.getHits().getTotalHits().value; + + // searchResponse.getHits().getHits()[0].field("sourceUrn") + List resultList = getResults(searchResponse); SearchResultMetadata searchResultMetadata = extractSearchResultMetadata(searchResponse); searchResultMetadata.setUrns(new UrnArray(resultList)); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java new file mode 100644 index 00000000000000..a42106eb11b13a --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java @@ -0,0 +1,187 @@ +package com.linkedin.metadata.graph; + +import com.linkedin.common.urn.Urn; + +import com.linkedin.metadata.graph.elastic.ESGraphReadDAO; +import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO; +import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.query.RelationshipDirection; +import com.linkedin.metadata.query.RelationshipFilter; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.apache.http.HttpHost; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import static com.linkedin.metadata.dao.utils.QueryUtils.*; +import static org.testng.Assert.*; + + +public class ElasticSearchGraphServiceTest { + + private ElasticsearchContainer _elasticsearchContainer; + private RestHighLevelClient _searchClient; + private IndexConvention _indexConvention; + private ElasticSearchGraphService _client; + + private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:7.9.3"; + private static final int HTTP_PORT = 9200; + + @BeforeMethod + public void wipe() throws URISyntaxException { + _client.removeNode(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)")); + _client.removeNode(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)")); + } + + @BeforeTest + public void setup() { + _indexConvention = new IndexConventionImpl(null); + _elasticsearchContainer = new ElasticsearchContainer(IMAGE_NAME); + _elasticsearchContainer.start(); + _searchClient = buildRestClient(); + _client = buildService(); + _client.configure(); + } + + @Nonnull + private RestHighLevelClient buildRestClient() { + final RestClientBuilder builder = + RestClient.builder(new HttpHost("localhost", _elasticsearchContainer.getMappedPort(HTTP_PORT), "http")) + .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultIOReactorConfig( + IOReactorConfig.custom().setIoThreadCount(1).build())); + + builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder. + setConnectionRequestTimeout(3000)); + + return new RestHighLevelClient(builder); + } + + @Nonnull + private ElasticSearchGraphService buildService() { + ESGraphReadDAO readDAO = new ESGraphReadDAO(_searchClient, _indexConvention); + ESGraphWriteDAO writeDAO = new ESGraphWriteDAO(_searchClient, _indexConvention, 1, 1, 1, 1); + return new ElasticSearchGraphService(_searchClient, _indexConvention, writeDAO, readDAO); + } + + @AfterTest + public void tearDown() { + _elasticsearchContainer.stop(); + } + + @Test + public void testAddEdge() throws Exception { + Edge edge1 = new Edge( + Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), + Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"), + "DownstreamOf"); + + _client.addEdge(edge1); + TimeUnit.SECONDS.sleep(5); + + List edgeTypes = new ArrayList<>(); + edgeTypes.add("DownstreamOf"); + RelationshipFilter relationshipFilter = new RelationshipFilter(); + relationshipFilter.setDirection(RelationshipDirection.OUTGOING); + relationshipFilter.setCriteria(EMPTY_FILTER.getCriteria()); + + List relatedUrns = _client.findRelatedUrns( + "", + newFilter("urn", "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), + "", + EMPTY_FILTER, + edgeTypes, + relationshipFilter, + 0, + 10); + + assertEquals(relatedUrns.size(), 1); + } + + @Test + public void testAddEdgeReverse() throws Exception { + Edge edge1 = new Edge( + Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"), + Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), + "DownstreamOf"); + + _client.addEdge(edge1); + TimeUnit.SECONDS.sleep(5); + + List edgeTypes = new ArrayList<>(); + edgeTypes.add("DownstreamOf"); + RelationshipFilter relationshipFilter = new RelationshipFilter(); + relationshipFilter.setDirection(RelationshipDirection.INCOMING); + relationshipFilter.setCriteria(EMPTY_FILTER.getCriteria()); + + List relatedUrns = _client.findRelatedUrns( + "", + newFilter("urn", "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), + "", + EMPTY_FILTER, + edgeTypes, + relationshipFilter, + 0, + 10); + + assertEquals(relatedUrns.size(), 1); + } + + @Test + public void testRemoveEdgesFromNode() throws Exception { + Edge edge1 = new Edge( + Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"), + Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), + "DownstreamOf"); + + _client.addEdge(edge1); + TimeUnit.SECONDS.sleep(5); + + List edgeTypes = new ArrayList<>(); + edgeTypes.add("DownstreamOf"); + RelationshipFilter relationshipFilter = new RelationshipFilter(); + relationshipFilter.setDirection(RelationshipDirection.INCOMING); + relationshipFilter.setCriteria(EMPTY_FILTER.getCriteria()); + + List relatedUrns = _client.findRelatedUrns( + "", + newFilter("urn", "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), + "", + EMPTY_FILTER, + edgeTypes, + relationshipFilter, + 0, + 10); + + assertEquals(relatedUrns.size(), 1); + + _client.removeEdgeTypesFromNode(Urn.createFromString( + "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), + edgeTypes, + relationshipFilter); + TimeUnit.SECONDS.sleep(5); + + List relatedUrnsPostDelete = _client.findRelatedUrns( + "", + newFilter("urn", "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), + "", + EMPTY_FILTER, + edgeTypes, + relationshipFilter, + 0, + 10); + + assertEquals(relatedUrnsPostDelete.size(), 0); + } +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java index d03a5da1376892..cc84e5c03ec2d7 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.kafka; import com.linkedin.common.urn.Urn; + import com.linkedin.data.template.RecordTemplate; import com.linkedin.gms.factory.common.GraphServiceFactory; import com.linkedin.gms.factory.search.SearchServiceFactory; @@ -59,6 +60,7 @@ public MetadataAuditEventsProcessor(GraphService graphService, SearchService sea _graphService = graphService; _searchService = searchService; _searchService.configure(); + _graphService.configure(); } @KafkaListener(id = "${KAFKA_CONSUMER_GROUP_ID:mae-consumer-job-client}", topics = "${KAFKA_TOPIC_NAME:" @@ -76,8 +78,8 @@ public void consume(final ConsumerRecord consumerRecord) final EntitySpec entitySpec = SnapshotEntityRegistry.getInstance().getEntitySpec(PegasusUtils.getEntityNameFromSchema(snapshot.schema())); - updateElasticsearch(snapshot, entitySpec); - updateNeo4j(snapshot, entitySpec); + updateSearchService(snapshot, entitySpec); + updateGraphService(snapshot, entitySpec); } } catch (Exception e) { log.error("Error deserializing message: {}", e.toString()); @@ -90,7 +92,7 @@ public void consume(final ConsumerRecord consumerRecord) * * @param snapshot Snapshot */ - private void updateNeo4j(final RecordTemplate snapshot, final EntitySpec entitySpec) { + private void updateGraphService(final RecordTemplate snapshot, final EntitySpec entitySpec) { final Set relationshipTypesBeingAdded = new HashSet<>(); final List edgesToAdd = new ArrayList<>(); final String sourceUrnStr = snapshot.data().get("urn").toString(); @@ -134,7 +136,7 @@ private void updateNeo4j(final RecordTemplate snapshot, final EntitySpec entityS * * @param snapshot Snapshot */ - private void updateElasticsearch(final RecordTemplate snapshot, final EntitySpec entitySpec) { + private void updateSearchService(final RecordTemplate snapshot, final EntitySpec entitySpec) { String urn = snapshot.data().get("urn").toString(); Optional searchDocument; try { From 679baa22c6d9a36fe6271b0b8b6a72fb8a6e9662 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 10:49:07 -0700 Subject: [PATCH 02/15] fixes --- .../charts/datahub-mae-consumer/templates/deployment.yaml | 2 ++ .../com/linkedin/gms/factory/common/GraphServiceFactory.java | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml index 98cc6bdd6aeeaf..0871f8a96fc62b 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml @@ -115,6 +115,8 @@ spec: name: "{{ .Values.global.neo4j.password.secretRef }}" key: "{{ .Values.global.neo4j.password.secretKey }}" {{- end }} + - name: DATAHUB_ANALYTICS_ENABLED + - value: "{{ .Values.global.datahub_analytics_enabled }}" {{- if .Values.global.springKafkaConfigurationOverrides }} {{- range $configName, $configValue := .Values.global.springKafkaConfigurationOverrides }} - name: SPRING_KAFKA_PROPERTIES_{{ $configName | replace "." "_" | upper }} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java index 9d77fa61b78849..318bd21a75204b 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java @@ -44,9 +44,9 @@ public class GraphServiceFactory { @Bean(name = "graphService") @Primary protected GraphService createInstance() { - if (graphService.equals("neo4j")) { + if (graphService.equalsIgnoreCase("neo4j")) { return new Neo4jGraphService(neo4jDriver); - } else if (graphService.equals("elastic")) { + } else if (graphService.equalsIgnoreCase("elasticsearch")) { return _elasticSearchGraphService; } else { throw new RuntimeException( From 0a83f508a25fc70d3ebdda54afc0c344a049a68b Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 10:50:15 -0700 Subject: [PATCH 03/15] adding deprecated mapper back in --- .../dashboard/mappers/DashboardMapper.java | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardMapper.java diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardMapper.java new file mode 100644 index 00000000000000..da3cca409a9c03 --- /dev/null +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/dashboard/mappers/DashboardMapper.java @@ -0,0 +1,81 @@ +package com.linkedin.datahub.graphql.types.dashboard.mappers; + +import com.linkedin.datahub.graphql.generated.AccessLevel; +import com.linkedin.datahub.graphql.generated.Chart; +import com.linkedin.datahub.graphql.generated.Dashboard; +import com.linkedin.datahub.graphql.generated.DashboardInfo; +import com.linkedin.datahub.graphql.generated.EntityType; +import com.linkedin.datahub.graphql.generated.DashboardEditableProperties; +import com.linkedin.datahub.graphql.types.common.mappers.AuditStampMapper; +import com.linkedin.datahub.graphql.types.common.mappers.StringMapMapper; +import com.linkedin.datahub.graphql.types.mappers.ModelMapper; +import com.linkedin.datahub.graphql.types.common.mappers.OwnershipMapper; +import com.linkedin.datahub.graphql.types.common.mappers.StatusMapper; +import com.linkedin.datahub.graphql.types.tag.mappers.GlobalTagsMapper; + +import javax.annotation.Nonnull; +import java.util.stream.Collectors; + +public class DashboardMapper implements ModelMapper { + + public static final DashboardMapper INSTANCE = new DashboardMapper(); + + public static Dashboard map(@Nonnull final com.linkedin.dashboard.Dashboard dashboard) { + return INSTANCE.apply(dashboard); + } + + @Override + public Dashboard apply(@Nonnull final com.linkedin.dashboard.Dashboard dashboard) { + final Dashboard result = new Dashboard(); + result.setUrn(dashboard.getUrn().toString()); + result.setType(EntityType.DASHBOARD); + result.setDashboardId(dashboard.getDashboardId()); + result.setTool(dashboard.getTool()); + if (dashboard.hasInfo()) { + result.setInfo(mapDashboardInfo(dashboard.getInfo())); + } + if (dashboard.hasOwnership()) { + result.setOwnership(OwnershipMapper.map(dashboard.getOwnership())); + } + if (dashboard.hasStatus()) { + result.setStatus(StatusMapper.map(dashboard.getStatus())); + } + if (dashboard.hasGlobalTags()) { + result.setGlobalTags(GlobalTagsMapper.map(dashboard.getGlobalTags())); + } + if (dashboard.hasEditableProperties()) { + final DashboardEditableProperties dashboardEditableProperties = new DashboardEditableProperties(); + dashboardEditableProperties.setDescription(dashboard.getEditableProperties().getDescription()); + result.setEditableProperties(dashboardEditableProperties); + } + return result; + } + + private DashboardInfo mapDashboardInfo(final com.linkedin.dashboard.DashboardInfo info) { + final DashboardInfo result = new DashboardInfo(); + result.setDescription(info.getDescription()); + result.setName(info.getTitle()); + result.setLastRefreshed(info.getLastRefreshed()); + result.setCharts(info.getCharts().stream().map(urn -> { + final Chart chart = new Chart(); + chart.setUrn(urn.toString()); + return chart; + }).collect(Collectors.toList())); + if (info.hasExternalUrl()) { + // TODO: Migrate to using the External URL field for consistency. + result.setExternalUrl(info.getDashboardUrl().toString()); + } + if (info.hasCustomProperties()) { + result.setCustomProperties(StringMapMapper.map(info.getCustomProperties())); + } + if (info.hasAccess()) { + result.setAccess(AccessLevel.valueOf(info.getAccess().toString())); + } + result.setLastModified(AuditStampMapper.map(info.getLastModified().getLastModified())); + result.setCreated(AuditStampMapper.map(info.getLastModified().getCreated())); + if (info.getLastModified().hasDeleted()) { + result.setDeleted(AuditStampMapper.map(info.getLastModified().getDeleted())); + } + return result; + } +} From 496727f8aa20f21bc6c8cd061d92a855f4b30e76 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 10:59:31 -0700 Subject: [PATCH 04/15] search request handler revert --- .../elasticsearch/query/request/SearchRequestHandler.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java index 658066b923c806..6bfe72b47a8cdc 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java @@ -192,9 +192,6 @@ public HighlightBuilder getHighlights() { public SearchResult extractResult(@Nonnull SearchResponse searchResponse, int from, int size) { int totalCount = (int) searchResponse.getHits().getTotalHits().value; - - // searchResponse.getHits().getHits()[0].field("sourceUrn") - List resultList = getResults(searchResponse); SearchResultMetadata searchResultMetadata = extractSearchResultMetadata(searchResponse); searchResultMetadata.setUrns(new UrnArray(resultList)); From 7670230f22100165c6131cd782d8688ed22ae62e Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 11:24:07 -0700 Subject: [PATCH 05/15] using delete by query --- .../ElasticSearchGraphServiceFactory.java | 4 +- ...GraphReadDAO.java => ESGraphQueryDAO.java} | 53 ++++++++++++++++++- .../graph/elastic/ESGraphWriteDAO.java | 2 +- .../elastic/ElasticSearchGraphService.java | 32 +++-------- .../graph/ElasticSearchGraphServiceTest.java | 4 +- 5 files changed, 64 insertions(+), 31 deletions(-) rename metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/{ESGraphReadDAO.java => ESGraphQueryDAO.java} (64%) diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java index 1328a8cc87850a..2c554f37c4096c 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java @@ -1,6 +1,6 @@ package com.linkedin.gms.factory.common; -import com.linkedin.metadata.graph.elastic.ESGraphReadDAO; +import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO; import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO; import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; @@ -50,6 +50,6 @@ protected ElasticSearchGraphService getInstance() { bulkFlushPeriod, numRetries, retryInterval), - new ESGraphReadDAO(searchClient, indexConvention)); + new ESGraphQueryDAO(searchClient, indexConvention)); } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphReadDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java similarity index 64% rename from metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphReadDAO.java rename to metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 6f988e32c8357a..c8c9874a685ef6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphReadDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -18,6 +18,8 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.builder.SearchSourceBuilder; import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.*; @@ -28,7 +30,7 @@ */ @Slf4j @RequiredArgsConstructor -public class ESGraphReadDAO { +public class ESGraphQueryDAO { private final RestHighLevelClient client; private final IndexConvention indexConvention; @@ -109,4 +111,53 @@ public SearchResponse getSearchResponse( } return null; } + + public BulkByScrollResponse deleteByQuery( + @Nullable final String sourceType, + @Nonnull final Filter sourceEntityFilter, + @Nullable final String destinationType, + @Nonnull final Filter destinationEntityFilter, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter) { + // also delete any relationship going to or from it + final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + + BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); + + // set source filter + String sourceNode = relationshipDirection == RelationshipDirection.OUTGOING ? "source" : "destination"; + if (sourceType != null && sourceType.length() > 0) { + finalQuery.must(QueryBuilders.termQuery(sourceNode + ".entityType", sourceType)); + } + addCriterionToQueryBuilder(sourceEntityFilter.getCriteria(), sourceNode, finalQuery); + + // set destination filter + String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? "destination" : "source"; + if (destinationType != null && destinationType.length() > 0) { + finalQuery.must(QueryBuilders.termQuery(destinationNode + ".entityType", destinationType)); + } + addCriterionToQueryBuilder(destinationEntityFilter.getCriteria(), destinationNode, finalQuery); + + // set relationship filter + if (relationshipTypes.size() > 0) { + BoolQueryBuilder relationshipQuery = QueryBuilders.boolQuery(); + relationshipTypes.forEach(relationshipType + -> relationshipQuery.should(QueryBuilders.termQuery("relationshipType", relationshipType))); + finalQuery.must(relationshipQuery); + } + + deleteByQueryRequest.setQuery(finalQuery); + + deleteByQueryRequest.indices(indexConvention.getIndexName(INDEX_NAME)); + + try { + final BulkByScrollResponse deleteResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + return deleteResponse; + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java index 30c087b175c4ab..a825112cf46208 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java @@ -1,8 +1,8 @@ package com.linkedin.metadata.graph.elastic; import com.linkedin.metadata.search.elasticsearch.update.BulkListener; + import com.linkedin.metadata.utils.elasticsearch.IndexConvention; -import java.io.IOException; import javax.annotation.Nonnull; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index 3800a59137d4d6..fa848585c61a64 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -42,7 +42,7 @@ public class ElasticSearchGraphService implements GraphService { private final RestHighLevelClient searchClient; private final IndexConvention _indexConvention; private final ESGraphWriteDAO _graphWriteDAO; - private final ESGraphReadDAO _graphReadDAO; + private final ESGraphQueryDAO _graphReadDAO; private static final String DOC_DELIMETER = "--"; public static final String INDEX_NAME = "graph_service_v1"; @@ -141,34 +141,22 @@ public void removeNode(@Nonnull final Urn urn) { RelationshipFilter outgoingFilter = new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING); RelationshipFilter incomingFilter = new RelationshipFilter().setDirection(RelationshipDirection.INCOMING); - SearchResponse outgoingEdges = _graphReadDAO.getSearchResponse( + _graphReadDAO.deleteByQuery( null, urnFilter, null, emptyFilter, relationshipTypes, - outgoingFilter, - 0, - MAX_ELASTIC_RESULT + outgoingFilter ); - SearchResponse incomingEdges = _graphReadDAO.getSearchResponse( + _graphReadDAO.deleteByQuery( null, urnFilter, null, emptyFilter, relationshipTypes, - incomingFilter, - 0, - MAX_ELASTIC_RESULT - ); - - outgoingEdges.getHits().forEach( - hit -> _graphWriteDAO.deleteDocument(hit.getId()) - ); - - incomingEdges.getHits().forEach( - hit -> _graphWriteDAO.deleteDocument(hit.getId()) + incomingFilter ); return; @@ -182,19 +170,13 @@ public void removeEdgeTypesFromNode( Filter urnFilter = createUrnFilter(urn); Filter emptyFilter = new Filter().setCriteria(new CriterionArray()); - SearchResponse edges = _graphReadDAO.getSearchResponse( + _graphReadDAO.deleteByQuery( null, urnFilter, null, emptyFilter, relationshipTypes, - relationshipFilter, - 0, - MAX_ELASTIC_RESULT - ); - - edges.getHits().forEach( - hit -> _graphWriteDAO.deleteDocument(hit.getId()) + relationshipFilter ); } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java index a42106eb11b13a..fe4cd251a8e9b7 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java @@ -2,7 +2,7 @@ import com.linkedin.common.urn.Urn; -import com.linkedin.metadata.graph.elastic.ESGraphReadDAO; +import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO; import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO; import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.query.RelationshipDirection; @@ -70,7 +70,7 @@ private RestHighLevelClient buildRestClient() { @Nonnull private ElasticSearchGraphService buildService() { - ESGraphReadDAO readDAO = new ESGraphReadDAO(_searchClient, _indexConvention); + ESGraphQueryDAO readDAO = new ESGraphQueryDAO(_searchClient, _indexConvention); ESGraphWriteDAO writeDAO = new ESGraphWriteDAO(_searchClient, _indexConvention, 1, 1, 1, 1); return new ElasticSearchGraphService(_searchClient, _indexConvention, writeDAO, readDAO); } From 8a40ccb853293951cabdc67cde1c98126adf0d69 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 12:38:19 -0700 Subject: [PATCH 06/15] cleanup --- .../charts/datahub-mae-consumer/templates/deployment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml index 0871f8a96fc62b..58e93be31a0f09 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml @@ -116,7 +116,7 @@ spec: key: "{{ .Values.global.neo4j.password.secretKey }}" {{- end }} - name: DATAHUB_ANALYTICS_ENABLED - - value: "{{ .Values.global.datahub_analytics_enabled }}" + value: "{{ .Values.global.datahub_analytics_enabled }}" {{- if .Values.global.springKafkaConfigurationOverrides }} {{- range $configName, $configValue := .Values.global.springKafkaConfigurationOverrides }} - name: SPRING_KAFKA_PROPERTIES_{{ $configName | replace "." "_" | upper }} From 01edd761a47a1a99dcf8a5d8b11a3ba2e0324c5d Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 13:36:31 -0700 Subject: [PATCH 07/15] adding new values.yaml --- .../quickstart-values-without-neo4j.yaml | 78 +++++++++++++++++++ .../datahub/quickstart-values.yaml | 2 + 2 files changed, 80 insertions(+) create mode 100644 datahub-kubernetes/datahub/quickstart-values-without-neo4j.yaml diff --git a/datahub-kubernetes/datahub/quickstart-values-without-neo4j.yaml b/datahub-kubernetes/datahub/quickstart-values-without-neo4j.yaml new file mode 100644 index 00000000000000..24f7aebb74a1f3 --- /dev/null +++ b/datahub-kubernetes/datahub/quickstart-values-without-neo4j.yaml @@ -0,0 +1,78 @@ +# Values to start up datahub after starting up the datahub-prerequisites chart with "prerequisites" release name +# Copy this chart and change configuration as needed. +datahub-gms: + enabled: true + image: + repository: linkedin/datahub-gms + tag: "v0.8.1" + +datahub-frontend: + enabled: true + image: + repository: linkedin/datahub-frontend-react + tag: "v0.8.1" + # Set up ingress to expose react front-end + ingress: + enabled: false + +elasticsearchSetupJob: + enabled: true + image: + repository: linkedin/datahub-elasticsearch-setup + tag: "v0.8.1" + +kafkaSetupJob: + enabled: true + image: + repository: linkedin/datahub-kafka-setup + tag: "v0.8.1" + +mysqlSetupJob: + enabled: true + image: + repository: acryldata/datahub-mysql-setup + tag: "v0.8.1" + +datahubUpgrade: + enabled: true + image: + repository: acryldata/datahub-upgrade + tag: "v0.8.1" + +datahub-ingestion-cron: + enabled: false + +global: + graphService: elasticsearch + + elasticsearch: + host: "elasticsearch-master" + port: "9200" + indexPrefix: demo + + kafka: + bootstrap: + server: "prerequisites-kafka:9092" + zookeeper: + server: "prerequisites-zookeeper:2181" + schemaregistry: + url: "http://prerequisites-cp-schema-registry:8081" + + sql: + datasource: + host: "prerequisites-mysql:3306" + hostForMysqlClient: "prerequisites-mysql" + port: "3306" + url: "jdbc:mysql://prerequisites-mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8" + driver: "com.mysql.jdbc.Driver" + username: "root" + password: + secretRef: mysql-secrets + secretKey: mysql-root-password + + datahub: + gms: + port: "8080" + mae_consumer: + port: "9091" + appVersion: "1.0" diff --git a/datahub-kubernetes/datahub/quickstart-values.yaml b/datahub-kubernetes/datahub/quickstart-values.yaml index 719598ffd8b284..9acee937ea7582 100644 --- a/datahub-kubernetes/datahub/quickstart-values.yaml +++ b/datahub-kubernetes/datahub/quickstart-values.yaml @@ -43,6 +43,8 @@ datahub-ingestion-cron: enabled: false global: + graphService: neo4j + elasticsearch: host: "elasticsearch-master" port: "9200" From 9df81ce0a8187266b514cd8cf6e0ba1844e5b322 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 19:04:42 -0700 Subject: [PATCH 08/15] updating --- .../datahub/charts/datahub-gms/values.yaml | 2 +- .../charts/datahub-mae-consumer/README.md | 2 +- .../templates/deployment.yaml | 4 +- .../charts/datahub-mae-consumer/values.yaml | 2 +- .../quickstart-values-without-neo4j.yaml | 2 +- .../datahub/quickstart-values.yaml | 2 +- .../datahub-gms/env/docker-without-neo4j.env | 2 +- docker/datahub-gms/env/docker.env | 2 +- ...ocker-compose-without-neo4j.quickstart.yml | 4 +- .../quickstart/docker-compose.quickstart.yml | 2 +- .../factory/common/GraphServiceFactory.java | 29 +++------ .../common/Neo4jGraphServiceFactory.java | 25 ++++++++ .../linkedin/metadata/graph/GraphService.java | 2 +- .../metadata/graph/Neo4jGraphService.java | 2 +- .../graph/elastic/ESGraphQueryDAO.java | 64 +++++-------------- .../graph/elastic/ESGraphWriteDAO.java | 53 ++++++++++++--- .../elastic/ElasticSearchGraphService.java | 19 ++++-- .../graph/ElasticSearchGraphServiceTest.java | 2 +- .../metadata/graph/Neo4jGraphServiceTest.java | 2 +- .../kafka/MetadataAuditEventsProcessor.java | 2 +- 20 files changed, 122 insertions(+), 102 deletions(-) create mode 100644 gms/factories/src/main/java/com/linkedin/gms/factory/common/Neo4jGraphServiceFactory.java diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml b/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml index 37b4cc1a7a270d..f062155cc73bdf 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml @@ -151,7 +151,7 @@ readinessProbe: # helm install datahub-gms datahub-gms/ global: datahub_analytics_enabled: true - graphService: neo4j + graph_service_impl: neo4j elasticsearch: host: "elasticsearch" diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/README.md b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/README.md index 1421b0fdcd112b..392720c6ec5f9f 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/README.md +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/README.md @@ -29,7 +29,7 @@ Current chart version is `0.2.0` | global.hostAliases[0].hostnames[2] | string | `"elasticsearch"` | | | global.hostAliases[0].hostnames[3] | string | `"neo4j"` | | | global.hostAliases[0].ip | string | `"192.168.0.104"` | | -| global.graphService | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. +| global.graph_service_impl | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. | image.pullPolicy | string | `"IfNotPresent"` | | | image.repository | string | `"linkedin/datahub-mae-consumer"` | | | image.tag | string | `"v0.8.1"` | | diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml index 58e93be31a0f09..562be0fa23040a 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml @@ -101,8 +101,8 @@ spec: key: "{{ .password.secretKey }}" {{- end }} - name: GRAPH_SERVICE - value: {{ .Values.global.graphService }} - {{- if eq .Values.global.graphService "neo4j" }} + value: {{ .Values.global.graph_service_impl }} + {{- if eq .Values.global.graph_service_impl "neo4j" }} - name: NEO4J_HOST value: "{{ .Values.global.neo4j.host }}" - name: NEO4J_URI diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/values.yaml b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/values.yaml index c8fd4347598c01..706913015150a9 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/values.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/values.yaml @@ -145,7 +145,7 @@ readinessProbe: failureThreshold: 8 global: - graphService: neo4j + graph_service_impl: neo4j datahub_analytics_enabled: true elasticsearch: diff --git a/datahub-kubernetes/datahub/quickstart-values-without-neo4j.yaml b/datahub-kubernetes/datahub/quickstart-values-without-neo4j.yaml index 24f7aebb74a1f3..83c8f679b0883a 100644 --- a/datahub-kubernetes/datahub/quickstart-values-without-neo4j.yaml +++ b/datahub-kubernetes/datahub/quickstart-values-without-neo4j.yaml @@ -43,7 +43,7 @@ datahub-ingestion-cron: enabled: false global: - graphService: elasticsearch + graph_service_impl: elasticsearch elasticsearch: host: "elasticsearch-master" diff --git a/datahub-kubernetes/datahub/quickstart-values.yaml b/datahub-kubernetes/datahub/quickstart-values.yaml index 9acee937ea7582..68aa569b0b2578 100644 --- a/datahub-kubernetes/datahub/quickstart-values.yaml +++ b/datahub-kubernetes/datahub/quickstart-values.yaml @@ -43,7 +43,7 @@ datahub-ingestion-cron: enabled: false global: - graphService: neo4j + graph_service_impl: neo4j elasticsearch: host: "elasticsearch-master" diff --git a/docker/datahub-gms/env/docker-without-neo4j.env b/docker/datahub-gms/env/docker-without-neo4j.env index 96e8ccfc7b04e4..ab44c1ada89e9b 100644 --- a/docker/datahub-gms/env/docker-without-neo4j.env +++ b/docker/datahub-gms/env/docker-without-neo4j.env @@ -8,7 +8,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092 KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 ELASTICSEARCH_HOST=elasticsearch ELASTICSEARCH_PORT=9200 -GRAPH_SERVICE=elasticsearch +GRAPH_SERVICE_IMPL=elasticsearch MAE_CONSUMER_ENABLED=true MCE_CONSUMER_ENABLED=true diff --git a/docker/datahub-gms/env/docker.env b/docker/datahub-gms/env/docker.env index 4b579bec3d50e6..8ed91b52d80235 100644 --- a/docker/datahub-gms/env/docker.env +++ b/docker/datahub-gms/env/docker.env @@ -12,7 +12,7 @@ NEO4J_HOST=http://neo4j:7474 NEO4J_URI=bolt://neo4j NEO4J_USERNAME=neo4j NEO4J_PASSWORD=datahub -GRAPH_SERVICE=neo4j +GRAPH_SERVICE_IMPL=neo4j MAE_CONSUMER_ENABLED=true MCE_CONSUMER_ENABLED=true diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml index b921577d12e23f..68c86d9e200f8a 100644 --- a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -51,11 +51,11 @@ services: - KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 - ELASTICSEARCH_HOST=elasticsearch - ELASTICSEARCH_PORT=9200 - - GRAPH_SERVICE=elasticsearch + - GRAPH_SERVICE_IMPL=elasticsearch - MAE_CONSUMER_ENABLED=true - MCE_CONSUMER_ENABLED=true hostname: datahub-gms - image: acryldata/datahub-gms:gabe-test + image: linkedin/datahub-gms:${DATAHUB_VERSION:-latest} mem_limit: 850m ports: - 8080:8080 diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index be558a8a1d4e90..0ce1e0e84b0117 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -55,7 +55,7 @@ services: - NEO4J_URI=bolt://neo4j - NEO4J_USERNAME=neo4j - NEO4J_PASSWORD=datahub - - GRAPH_SERVICE=neo4j + - GRAPH_SERVICE_IMPL=neo4j - MAE_CONSUMER_ENABLED=true - MCE_CONSUMER_ENABLED=true hostname: datahub-gms diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java index 318bd21a75204b..16eaac91d39f28 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java @@ -3,10 +3,7 @@ import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.Neo4jGraphService; import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; -import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import javax.annotation.Nonnull; -import org.elasticsearch.client.RestHighLevelClient; -import org.neo4j.driver.Driver; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -18,39 +15,31 @@ @Configuration -@Import({Neo4jDriverFactory.class, ElasticSearchGraphServiceFactory.class}) +@Import({Neo4jGraphServiceFactory.class, ElasticSearchGraphServiceFactory.class}) public class GraphServiceFactory { @Autowired @Qualifier("elasticSearchGraphService") private ElasticSearchGraphService _elasticSearchGraphService; @Autowired - @Qualifier("neo4jDriver") - private Driver neo4jDriver; + @Qualifier("neo4jGraphService") + private Neo4jGraphService _neo4jGraphService; - @Autowired - @Qualifier("elasticSearchRestHighLevelClient") - private RestHighLevelClient searchClient; - - @Autowired - @Qualifier(IndexConventionFactory.INDEX_CONVENTION_BEAN) - private IndexConvention indexConvention; - - @Value("${GRAPH_SERVICE:neo4j}") - private String graphService; + @Value("${GRAPH_SERVICE_IMPL:elasticsearch}") + private String graphServiceImpl; @Nonnull @DependsOn({"neo4jDriver", "elasticSearchGraphService"}) @Bean(name = "graphService") @Primary protected GraphService createInstance() { - if (graphService.equalsIgnoreCase("neo4j")) { - return new Neo4jGraphService(neo4jDriver); - } else if (graphService.equalsIgnoreCase("elasticsearch")) { + if (graphServiceImpl.equalsIgnoreCase("neo4j")) { + return _neo4jGraphService; + } else if (graphServiceImpl.equalsIgnoreCase("elasticsearch")) { return _elasticSearchGraphService; } else { throw new RuntimeException( - "Error: Failed to initialize graph service. Graph Service provided: " + graphService + "Error: Failed to initialize graph service. Graph Service provided: " + graphServiceImpl + ". Valid options: [neo4j, elasticsearch]."); } } diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/Neo4jGraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/Neo4jGraphServiceFactory.java new file mode 100644 index 00000000000000..8988cc7825c330 --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/Neo4jGraphServiceFactory.java @@ -0,0 +1,25 @@ +package com.linkedin.gms.factory.common; + +import com.linkedin.metadata.graph.Neo4jGraphService; +import javax.annotation.Nonnull; +import org.neo4j.driver.Driver; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + + +@Configuration +@Import({Neo4jDriverFactory.class}) +public class Neo4jGraphServiceFactory { + @Autowired + @Qualifier("neo4jDriver") + private Driver neo4jDriver; + + @Bean(name = "neo4jGraphService") + @Nonnull + protected Neo4jGraphService getInstance() { + return new Neo4jGraphService(neo4jDriver); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java index a99b0861128fea..df284fff3b6dc1 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java @@ -24,7 +24,7 @@ List findRelatedUrns( void removeNode(@Nonnull final Urn urn); - void removeEdgeTypesFromNode( + void removeEdgesFromNode( @Nonnull final Urn urn, @Nonnull final List relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java index f7f81dddb718e3..b557089a8d74e4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java @@ -119,7 +119,7 @@ public void removeNode(@Nonnull final Urn urn) { runQuery(buildStatement(statement, params)).consume(); } - public void removeEdgeTypesFromNode( + public void removeEdgesFromNode( @Nonnull final Urn urn, @Nonnull final List relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index c8c9874a685ef6..7b50706202f7e3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -18,8 +18,6 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.builder.SearchSourceBuilder; import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.*; @@ -36,10 +34,8 @@ public class ESGraphQueryDAO { private final IndexConvention indexConvention; /** - * Converts {@link CriterionArray} to neo4j query string. * * @param criterionArray CriterionArray in a Filter - * @return Neo4j criteria string */ @Nonnull public static void addCriterionToQueryBuilder(@Nonnull CriterionArray criterionArray, String node, BoolQueryBuilder finalQuery) { @@ -63,9 +59,6 @@ public SearchResponse getSearchResponse( @Nonnull final RelationshipFilter relationshipFilter, final int offset, final int count) { - // also delete any relationship going to or from it - final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); - SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); @@ -73,29 +66,14 @@ public SearchResponse getSearchResponse( searchSourceBuilder.from(offset); searchSourceBuilder.size(count); - BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); - - // set source filter - String sourceNode = relationshipDirection == RelationshipDirection.OUTGOING ? "source" : "destination"; - if (sourceType != null && sourceType.length() > 0) { - finalQuery.must(QueryBuilders.termQuery(sourceNode + ".entityType", sourceType)); - } - addCriterionToQueryBuilder(sourceEntityFilter.getCriteria(), sourceNode, finalQuery); - - // set destination filter - String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? "destination" : "source"; - if (destinationType != null && destinationType.length() > 0) { - finalQuery.must(QueryBuilders.termQuery(destinationNode + ".entityType", destinationType)); - } - addCriterionToQueryBuilder(destinationEntityFilter.getCriteria(), destinationNode, finalQuery); - - // set relationship filter - if (relationshipTypes.size() > 0) { - BoolQueryBuilder relationshipQuery = QueryBuilders.boolQuery(); - relationshipTypes.forEach(relationshipType - -> relationshipQuery.should(QueryBuilders.termQuery("relationshipType", relationshipType))); - finalQuery.must(relationshipQuery); - } + BoolQueryBuilder finalQuery = buildQuery( + sourceType, + sourceEntityFilter, + destinationType, + destinationEntityFilter, + relationshipTypes, + relationshipFilter + ); searchSourceBuilder.query(finalQuery); @@ -112,20 +90,18 @@ public SearchResponse getSearchResponse( return null; } - public BulkByScrollResponse deleteByQuery( + public static BoolQueryBuilder buildQuery( @Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter, @Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter, @Nonnull final List relationshipTypes, - @Nonnull final RelationshipFilter relationshipFilter) { - // also delete any relationship going to or from it - final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); - - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); - + @Nonnull final RelationshipFilter relationshipFilter + ) { BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); + final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); + // set source filter String sourceNode = relationshipDirection == RelationshipDirection.OUTGOING ? "source" : "destination"; if (sourceType != null && sourceType.length() > 0) { @@ -147,17 +123,7 @@ public BulkByScrollResponse deleteByQuery( -> relationshipQuery.should(QueryBuilders.termQuery("relationshipType", relationshipType))); finalQuery.must(relationshipQuery); } - - deleteByQueryRequest.setQuery(finalQuery); - - deleteByQueryRequest.indices(indexConvention.getIndexName(INDEX_NAME)); - - try { - final BulkByScrollResponse deleteResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); - return deleteResponse; - } catch (IOException e) { - e.printStackTrace(); - } - return null; + return finalQuery; } + } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java index a825112cf46208..80e01b6345c070 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java @@ -1,28 +1,40 @@ package com.linkedin.metadata.graph.elastic; +import com.linkedin.metadata.query.Filter; +import com.linkedin.metadata.query.RelationshipFilter; import com.linkedin.metadata.search.elasticsearch.update.BulkListener; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import java.io.IOException; +import java.util.List; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.*; import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.*; +@Slf4j public class ESGraphWriteDAO { private final BulkProcessor bulkProcessor; private final IndexConvention indexConvention; + private final RestHighLevelClient client; public ESGraphWriteDAO(RestHighLevelClient searchClient, IndexConvention indexConvention, int bulkRequestsLimit, int bulkFlushPeriod, int numRetries, long retryInterval) { + this.client = searchClient; this.indexConvention = indexConvention; this.bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> { @@ -41,7 +53,7 @@ public ESGraphWriteDAO(RestHighLevelClient searchClient, IndexConvention indexCo * @param document the document to update / insert * @param docId the ID of the document */ - public void upsertDocument(@Nonnull String document, @Nonnull String docId) { + public void upsertDocument(@Nonnull String docId, @Nonnull String document) { final IndexRequest indexRequest = new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON); final UpdateRequest updateRequest = new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON) .detectNoop(false) @@ -49,12 +61,35 @@ public void upsertDocument(@Nonnull String document, @Nonnull String docId) { bulkProcessor.add(updateRequest); } - /** - * Deletes the document with the given document ID from the index. - * - * @param docId the ID of the document to delete - */ - public void deleteDocument(@Nonnull String docId) { - bulkProcessor.add(new DeleteRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId)); + public BulkByScrollResponse deleteByQuery( + @Nullable final String sourceType, + @Nonnull final Filter sourceEntityFilter, + @Nullable final String destinationType, + @Nonnull final Filter destinationEntityFilter, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter) { + BoolQueryBuilder finalQuery = buildQuery( + sourceType, + sourceEntityFilter, + destinationType, + destinationEntityFilter, + relationshipTypes, + relationshipFilter + ); + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + + deleteByQueryRequest.setQuery(finalQuery); + + deleteByQueryRequest.indices(indexConvention.getIndexName(INDEX_NAME)); + + try { + final BulkByScrollResponse deleteResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + return deleteResponse; + } catch (IOException e) { + log.error("ERROR: Failed to delete by query. See stacktrace for a more detailed error:"); + e.printStackTrace(); + } + return null; } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index fa848585c61a64..e81aadfab0a677 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -86,7 +87,7 @@ private String toDocId(@Nonnull final Edge edge) { public void addEdge(@Nonnull final Edge edge) { String docId = toDocId(edge); String edgeDocument = toDocument(edge); - _graphWriteDAO.upsertDocument(edgeDocument, docId); + _graphWriteDAO.upsertDocument(docId, edgeDocument); } @Nonnull @@ -116,7 +117,7 @@ public List findRelatedUrns( return Arrays.stream(response.getHits().getHits()) .map(hit -> ((HashMap) hit.getSourceAsMap().getOrDefault(destinationNode, EMPTY_HASH)).getOrDefault("urn", null)) - .filter(urn -> urn != null) + .filter(Objects::nonNull) .collect(Collectors.toList()); } @@ -141,7 +142,7 @@ public void removeNode(@Nonnull final Urn urn) { RelationshipFilter outgoingFilter = new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING); RelationshipFilter incomingFilter = new RelationshipFilter().setDirection(RelationshipDirection.INCOMING); - _graphReadDAO.deleteByQuery( + _graphWriteDAO.deleteByQuery( null, urnFilter, null, @@ -150,7 +151,7 @@ public void removeNode(@Nonnull final Urn urn) { outgoingFilter ); - _graphReadDAO.deleteByQuery( + _graphWriteDAO.deleteByQuery( null, urnFilter, null, @@ -162,7 +163,7 @@ public void removeNode(@Nonnull final Urn urn) { return; } - public void removeEdgeTypesFromNode( + public void removeEdgesFromNode( @Nonnull final Urn urn, @Nonnull final List relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) { @@ -170,7 +171,7 @@ public void removeEdgeTypesFromNode( Filter urnFilter = createUrnFilter(urn); Filter emptyFilter = new Filter().setCriteria(new CriterionArray()); - _graphReadDAO.deleteByQuery( + _graphWriteDAO.deleteByQuery( null, urnFilter, null, @@ -188,7 +189,9 @@ public void configure() { exists = searchClient.indices().exists( new GetIndexRequest(_indexConvention.getIndexName(INDEX_NAME)), RequestOptions.DEFAULT); } catch (IOException e) { + log.error("ERROR: Failed to set up elasticsearch graph index. Could not check if the index exists"); e.printStackTrace(); + return; } // If index doesn't exist, create index @@ -202,10 +205,12 @@ public void configure() { try { searchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); } catch (IOException e) { + log.error("ERROR: Failed to set up elasticsearch graph index. Could not create the index."); e.printStackTrace(); + return; } - log.info("Created Elastic Graph Index"); + log.info("Successfully Created Elastic Graph Index"); } return; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java index fe4cd251a8e9b7..4c8db4825e158c 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/ElasticSearchGraphServiceTest.java @@ -166,7 +166,7 @@ public void testRemoveEdgesFromNode() throws Exception { assertEquals(relatedUrns.size(), 1); - _client.removeEdgeTypesFromNode(Urn.createFromString( + _client.removeEdgesFromNode(Urn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), edgeTypes, relationshipFilter); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/Neo4jGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/Neo4jGraphServiceTest.java index 8c93808cbc7046..1d892dec22d6ed 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/Neo4jGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/Neo4jGraphServiceTest.java @@ -117,7 +117,7 @@ public void testRemoveEdgesFromNode() throws Exception { assertEquals(relatedUrns.size(), 1); - _client.removeEdgeTypesFromNode(Urn.createFromString( + _client.removeEdgesFromNode(Urn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), edgeTypes, relationshipFilter); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java index cc84e5c03ec2d7..61103e13eb0773 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java @@ -124,7 +124,7 @@ private void updateGraphService(final RecordTemplate snapshot, final EntitySpec } if (edgesToAdd.size() > 0) { new Thread(() -> { - _graphService.removeEdgeTypesFromNode(sourceUrn, new ArrayList<>(relationshipTypesBeingAdded), + _graphService.removeEdgesFromNode(sourceUrn, new ArrayList<>(relationshipTypesBeingAdded), createRelationshipFilter(new Filter().setCriteria(new CriterionArray()), RelationshipDirection.OUTGOING)); edgesToAdd.forEach(edge -> _graphService.addEdge(edge)); }).start(); From dd713ed941acb9640d43b6b7aca1b4281410365a Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 19:12:31 -0700 Subject: [PATCH 09/15] finalizing update --- datahub-kubernetes/datahub/README.md | 2 +- datahub-kubernetes/datahub/charts/datahub-gms/README.md | 2 +- .../datahub/charts/datahub-gms/templates/deployment.yaml | 4 ++-- .../charts/datahub-mae-consumer/templates/deployment.yaml | 2 +- datahub-kubernetes/datahub/values.yaml | 2 +- .../com/linkedin/gms/factory/common/GraphServiceFactory.java | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/datahub-kubernetes/datahub/README.md b/datahub-kubernetes/datahub/README.md index fe7260b250aa83..31484dd655d6b9 100644 --- a/datahub-kubernetes/datahub/README.md +++ b/datahub-kubernetes/datahub/README.md @@ -57,7 +57,7 @@ helm install datahub datahub/ | global.sql.datasource.username | string | `"root"` | SQL user name | | global.sql.datasource.password.secretRef | string | `"mysql-secrets"` | Secret that contains the MySQL password | | global.sql.datasource.password.secretKey | string | `"mysql-password"` | Secret key that contains the MySQL password | -| global.graphService | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. +| global.graph_service_impl | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. ## Optional Chart Values diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/README.md b/datahub-kubernetes/datahub/charts/datahub-gms/README.md index ba855ba2ea2d2a..6374b6d73f4b62 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/README.md +++ b/datahub-kubernetes/datahub/charts/datahub-gms/README.md @@ -36,7 +36,7 @@ Current chart version is `0.2.0` | global.sql.datasource.username | string | `"datahub"` | | | global.sql.datasource.password.secretRef | string | `"mysql-secrets"` | | | global.sql.datasource.password.secretKey | string | `"mysql-password"` | | -| global.graphService | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. +| global.graph_service_impl | string | `neo4j` | One of `neo4j` or `elasticsearch`. Determines which backend to use for the GMS graph service. Elastic is recommended for a simplified deployment. Neo4j will be the default for now to maintain backwards compatibility. | image.pullPolicy | string | `"IfNotPresent"` | | | image.repository | string | `"linkedin/datahub-gms"` | | | image.tag | string | `"v0.8.1"` | | diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml index 9ae327e25a5459..cb2f813d82015e 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml @@ -116,8 +116,8 @@ spec: key: "{{ .password.secretKey }}" {{- end }} - name: GRAPH_SERVICE - value: {{ .Values.global.graphService }} - {{- if eq .Values.global.graphService "neo4j" }} + value: {{ .Values.global.graph_service_impl }} + {{- if eq .Values.global.graph_service_impl "neo4j" }} - name: NEO4J_HOST value: "{{ .Values.global.neo4j.host }}" - name: NEO4J_URI diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml index 562be0fa23040a..a8b928512c3c03 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml @@ -100,7 +100,7 @@ spec: name: "{{ .password.secretRef }}" key: "{{ .password.secretKey }}" {{- end }} - - name: GRAPH_SERVICE + - name: GRAPH_SERVICE_IMPL value: {{ .Values.global.graph_service_impl }} {{- if eq .Values.global.graph_service_impl "neo4j" }} - name: NEO4J_HOST diff --git a/datahub-kubernetes/datahub/values.yaml b/datahub-kubernetes/datahub/values.yaml index 9b532edbc4c230..d9e6ecd85dc79d 100644 --- a/datahub-kubernetes/datahub/values.yaml +++ b/datahub-kubernetes/datahub/values.yaml @@ -58,7 +58,7 @@ datahubUpgrade: tag: "v0.8.1" global: - graphService: neo4j + graph_service_impl: neo4j datahub_analytics_enabled: true datahub_standalone_consumers_enabled: false diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java index 16eaac91d39f28..22210df61d48ac 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java @@ -9,7 +9,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; +import org.springfr datahub-kubernetes/datahub/README.mdamework.context.annotation.DependsOn; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; From a9117e0f9262a568fa83b4027403bf2907f97356 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 19:23:01 -0700 Subject: [PATCH 10/15] fix import --- .../com/linkedin/gms/factory/common/GraphServiceFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java index 22210df61d48ac..16eaac91d39f28 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java @@ -9,7 +9,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springfr datahub-kubernetes/datahub/README.mdamework.context.annotation.DependsOn; +import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; From 8e305ee9608a2cad80d95b5e263e2157cb662510 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 19:26:04 -0700 Subject: [PATCH 11/15] fixing typo --- .../datahub/charts/datahub-gms/templates/deployment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml index cb2f813d82015e..ed42fd3acbed6e 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml @@ -115,7 +115,7 @@ spec: name: "{{ .password.secretRef }}" key: "{{ .password.secretKey }}" {{- end }} - - name: GRAPH_SERVICE + - name: GRAPH_SERVICE_IMPL value: {{ .Values.global.graph_service_impl }} {{- if eq .Values.global.graph_service_impl "neo4j" }} - name: NEO4J_HOST From 21bacdd1b8b2233374c8e6bef8def1015aa4256f Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 21 Jun 2021 19:28:21 -0700 Subject: [PATCH 12/15] removing lines --- .../metadata/graph/elastic/ElasticSearchGraphService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index e81aadfab0a677..426dbb7b513c48 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -61,9 +61,7 @@ private String toDocument(@Nonnull final Edge edge) { destinationObject.put("entityType", edge.getDestination().getEntityType()); searchDocument.set("source", sourceObject); - searchDocument.set("destination", destinationObject); - searchDocument.put("relationshipType", edge.getRelationshipType()); return searchDocument.toString(); From dd55e9c8c99457649f9b4ac4504729d92e82291e Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Tue, 22 Jun 2021 07:46:45 -0700 Subject: [PATCH 13/15] fixing depends on --- .../com/linkedin/gms/factory/common/GraphServiceFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java index 16eaac91d39f28..87a0c25676c838 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/common/GraphServiceFactory.java @@ -29,7 +29,7 @@ public class GraphServiceFactory { private String graphServiceImpl; @Nonnull - @DependsOn({"neo4jDriver", "elasticSearchGraphService"}) + @DependsOn({"neo4jGraphService", "elasticSearchGraphService"}) @Bean(name = "graphService") @Primary protected GraphService createInstance() { From 0c85b5a25c4e8da42e6b566b7a7014926bedc7a6 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Tue, 22 Jun 2021 14:45:08 -0700 Subject: [PATCH 14/15] handling null response --- .../metadata/graph/elastic/ElasticSearchGraphService.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index 426dbb7b513c48..f171341949ba65 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableList; import com.linkedin.common.urn.Urn; import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.GraphService; @@ -113,6 +114,10 @@ public List findRelatedUrns( count ); + if (Objects.isNull(response)) { + return ImmutableList.of(); + } + return Arrays.stream(response.getHits().getHits()) .map(hit -> ((HashMap) hit.getSourceAsMap().getOrDefault(destinationNode, EMPTY_HASH)).getOrDefault("urn", null)) .filter(Objects::nonNull) From d551c26d07b60b4038b38243de29be475c7bd3b8 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Tue, 22 Jun 2021 14:50:07 -0700 Subject: [PATCH 15/15] == null --- .../metadata/graph/elastic/ElasticSearchGraphService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index f171341949ba65..5cd7272df91655 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -114,7 +114,7 @@ public List findRelatedUrns( count ); - if (Objects.isNull(response)) { + if (response == null) { return ImmutableList.of(); }