diff --git a/datahub-kubernetes/datahub/Chart.yaml b/datahub-kubernetes/datahub/Chart.yaml index 71589e532fe2d..f2527266654ce 100644 --- a/datahub-kubernetes/datahub/Chart.yaml +++ b/datahub-kubernetes/datahub/Chart.yaml @@ -20,11 +20,11 @@ dependencies: - name: datahub-mae-consumer version: 0.2.1 repository: file://./charts/datahub-mae-consumer - condition: datahub-mae-consumer.enabled + condition: global.datahub_standalone_consumers_enabled - name: datahub-mce-consumer version: 0.2.1 repository: file://./charts/datahub-mce-consumer - condition: datahub-mce-consumer.enabled + condition: global.datahub_standalone_consumers_enabled - name: datahub-ingestion-cron version: 0.2.1 repository: file://./charts/datahub-ingestion-cron diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml index 2389325ae1215..e0ef91188c135 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-gms/templates/deployment.yaml @@ -73,6 +73,14 @@ spec: periodSeconds: {{ .Values.readinessProbe.periodSeconds }} failureThreshold: {{ .Values.readinessProbe.failureThreshold }} env: + {{- if not .Values.global.datahub_standalone_consumers_enabled }} + - name: MCE_CONSUMER_ENABLED + value: "true" + - name: MAE_CONSUMER_ENABLED + value: "true" + {{- end }} + - name: DATAHUB_ANALYTICS_ENABLED + value: "{{ .Values.global.datahub_analytics_enabled }}" - name: EBEAN_DATASOURCE_USERNAME value: "{{ .Values.global.sql.datasource.username }}" - name: EBEAN_DATASOURCE_PASSWORD diff --git a/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml b/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml index 4480e91987fa0..b5e07b5ad0c3d 100644 --- a/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-gms/values.yaml @@ -146,6 +146,8 @@ readinessProbe: #This section is useful if we are installing this chart separately for testing # helm install datahub-gms datahub-gms/ global: + datahub_analytics_enabled: true + elasticsearch: host: "elasticsearch" port: "9200" diff --git a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml index 158c051bb8b1e..553397a62affc 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mae-consumer/templates/deployment.yaml @@ -73,6 +73,8 @@ spec: periodSeconds: {{ .Values.readinessProbe.periodSeconds }} failureThreshold: {{ .Values.readinessProbe.failureThreshold }} env: + - name: MAE_CONSUMER_ENABLED + value: "true" - name: GMS_HOST value: {{ printf "%s-%s" .Release.Name "datahub-gms" }} - name: GMS_PORT diff --git a/datahub-kubernetes/datahub/charts/datahub-mce-consumer/templates/deployment.yaml b/datahub-kubernetes/datahub/charts/datahub-mce-consumer/templates/deployment.yaml index 0c83919dd26d0..650668e6fbfb9 100644 --- a/datahub-kubernetes/datahub/charts/datahub-mce-consumer/templates/deployment.yaml +++ b/datahub-kubernetes/datahub/charts/datahub-mce-consumer/templates/deployment.yaml @@ -69,6 +69,8 @@ spec: periodSeconds: {{ .Values.readinessProbe.periodSeconds }} failureThreshold: {{ .Values.readinessProbe.failureThreshold }} env: + - name: MCE_CONSUMER_ENABLED + value: "true" - name: KAFKA_BOOTSTRAP_SERVER value: "{{ .Values.global.kafka.bootstrap.server }}" - name: KAFKA_SCHEMAREGISTRY_URL diff --git a/datahub-kubernetes/datahub/quickstart-values.yaml b/datahub-kubernetes/datahub/quickstart-values.yaml index 130ad93deb828..719598ffd8b28 100644 --- a/datahub-kubernetes/datahub/quickstart-values.yaml +++ b/datahub-kubernetes/datahub/quickstart-values.yaml @@ -15,18 +15,6 @@ datahub-frontend: ingress: enabled: false -datahub-mae-consumer: - enabled: true - image: - repository: linkedin/datahub-mae-consumer - tag: "v0.8.1" - -datahub-mce-consumer: - enabled: true - image: - repository: linkedin/datahub-mce-consumer - tag: "v0.8.1" - elasticsearchSetupJob: enabled: true image: diff --git a/datahub-kubernetes/datahub/values.yaml b/datahub-kubernetes/datahub/values.yaml index be3592fff3204..e7cad25392f06 100644 --- a/datahub-kubernetes/datahub/values.yaml +++ b/datahub-kubernetes/datahub/values.yaml @@ -58,7 +58,9 @@ datahubUpgrade: tag: "v0.8.1" global: + datahub_analytics_enabled: true + datahub_standalone_consumers_enabled: false elasticsearch: host: "elasticsearch-master" diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/MAEQualificationStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/MAEQualificationStep.java deleted file mode 100644 index c68acf53fd5b9..0000000000000 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/MAEQualificationStep.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.linkedin.datahub.upgrade.nocode; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.linkedin.datahub.upgrade.UpgradeContext; -import com.linkedin.datahub.upgrade.UpgradeStep; -import com.linkedin.datahub.upgrade.UpgradeStepResult; -import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URL; -import java.net.URLConnection; -import java.util.function.Function; - - -public class MAEQualificationStep implements UpgradeStep { - - private static String convertStreamToString(InputStream is) { - - BufferedReader reader = new BufferedReader(new InputStreamReader(is)); - StringBuilder sb = new StringBuilder(); - - String line = null; - try { - while ((line = reader.readLine()) != null) { - sb.append(line + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - try { - is.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - return sb.toString(); - } - - MAEQualificationStep() { } - - @Override - public String id() { - return "MAEQualificationStep"; - } - - @Override - public int retryCount() { - return 2; - } - - @Override - public Function executable() { - return (context) -> { - String maeHost = System.getenv("DATAHUB_MAE_CONSUMER_HOST") == null ? "localhost" : System.getenv("DATAHUB_MAE_CONSUMER_HOST"); - String maePort = System.getenv("DATAHUB_MAE_CONSUMER_PORT") == null ? "9091" : System.getenv("DATAHUB_MAE_CONSUMER_PORT"); - try { - String spec = String.format("http://%s:%s/config", maeHost, maePort); - - URLConnection gmsConnection = new URL(spec).openConnection(); - InputStream response = gmsConnection.getInputStream(); - String responseString = convertStreamToString(response); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode configJson = mapper.readTree(responseString); - if (configJson.get("noCode").asBoolean()) { - context.report().addLine("MAE Consumer is running and up to date. Proceeding with upgrade..."); - return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); - } else { - context.report().addLine(String.format("Failed to qualify MAE Consumer. It is not running on the latest version." - + "Re-run MAE Consumer on the latest datahub release")); - return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); - } - } catch (Exception e) { - e.printStackTrace(); - context.report().addLine(String.format( - "ERROR: Cannot connect to MAE Consumer" - + "at host %s port %s. Make sure MAE Consumer is on the latest version " - + "and is running at that host before starting the migration.", - maeHost, maePort)); - return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); - } - }; - } -} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/NoCodeUpgrade.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/NoCodeUpgrade.java index 658d0b9ab0f89..e719bae8f85eb 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/NoCodeUpgrade.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/NoCodeUpgrade.java @@ -58,7 +58,6 @@ private List buildUpgradeSteps( final List steps = new ArrayList<>(); steps.add(new RemoveAspectV2TableStep(server)); steps.add(new GMSQualificationStep()); - steps.add(new MAEQualificationStep()); steps.add(new UpgradeQualificationStep(server)); steps.add(new CreateAspectTableStep(server)); steps.add(new IngestDataPlatformsStep(entityService)); diff --git a/docker/datahub-gms/env/docker.env b/docker/datahub-gms/env/docker.env index af97ac2307c89..b56337df5e3de 100644 --- a/docker/datahub-gms/env/docker.env +++ b/docker/datahub-gms/env/docker.env @@ -13,6 +13,12 @@ NEO4J_URI=bolt://neo4j NEO4J_USERNAME=neo4j NEO4J_PASSWORD=datahub +MAE_CONSUMER_ENABLED=true +MCE_CONSUMER_ENABLED=true + +# Uncomment to disable persistence of client-side analytics events +# DATAHUB_ANALYTICS_ENABLED=false + # Uncomment to configure kafka topic names # Make sure these names are consistent across the whole deployment # METADATA_AUDIT_EVENT_NAME=MetadataAuditEvent_v4 diff --git a/docker/datahub-gms/env/docker.mariadb.env b/docker/datahub-gms/env/docker.mariadb.env index 52f80765962a7..5e75abd87238e 100644 --- a/docker/datahub-gms/env/docker.mariadb.env +++ b/docker/datahub-gms/env/docker.mariadb.env @@ -11,3 +11,5 @@ NEO4J_HOST=http://neo4j:7474 NEO4J_URI=bolt://neo4j NEO4J_USERNAME=neo4j NEO4J_PASSWORD=datahub +MAE_CONSUMER_ENABLED=true +MCE_CONSUMER_ENABLED=true diff --git a/docker/datahub-gms/env/docker.postgres.env b/docker/datahub-gms/env/docker.postgres.env index b92fd5b9e5f5b..efb60a4f55b0e 100644 --- a/docker/datahub-gms/env/docker.postgres.env +++ b/docker/datahub-gms/env/docker.postgres.env @@ -11,3 +11,6 @@ NEO4J_HOST=http://neo4j:7474 NEO4J_URI=bolt://neo4j NEO4J_USERNAME=neo4j NEO4J_PASSWORD=datahub +MAE_CONSUMER_ENABLED=true +MCE_CONSUMER_ENABLED=true + diff --git a/docker/datahub-mae-consumer/env/docker.env b/docker/datahub-mae-consumer/env/docker.env index 290e4df515257..e7f192901ea0e 100644 --- a/docker/datahub-mae-consumer/env/docker.env +++ b/docker/datahub-mae-consumer/env/docker.env @@ -1,3 +1,4 @@ +MAE_CONSUMER_ENABLED=true KAFKA_BOOTSTRAP_SERVER=broker:29092 KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 ELASTICSEARCH_HOST=elasticsearch diff --git a/docker/datahub-mce-consumer/env/docker.env b/docker/datahub-mce-consumer/env/docker.env index c89f317a92114..ec47943a4ab95 100644 --- a/docker/datahub-mce-consumer/env/docker.env +++ b/docker/datahub-mce-consumer/env/docker.env @@ -1,3 +1,4 @@ +MCE_CONSUMER_ENABLED=true KAFKA_BOOTSTRAP_SERVER=broker:29092 KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 GMS_HOST=datahub-gms diff --git a/docker/docker-compose.consumers.dev.yml b/docker/docker-compose.consumers.dev.yml new file mode 100644 index 0000000000000..4d354fda97a13 --- /dev/null +++ b/docker/docker-compose.consumers.dev.yml @@ -0,0 +1,23 @@ +version: '3.8' +services: + datahub-mae-consumer: + image: linkedin/datahub-mae-consumer:debug + build: + context: datahub-mae-consumer + dockerfile: Dockerfile + args: + APP_ENV: dev + volumes: + - ./datahub-mae-consumer/start.sh:/datahub/datahub-mae-consumer/scripts/start.sh + - ../metadata-jobs/mae-consumer-job/build/libs/:/datahub/datahub-mae-consumer/bin/ + + datahub-mce-consumer: + image: linkedin/datahub-mce-consumer:debug + build: + context: datahub-mce-consumer + dockerfile: Dockerfile + args: + APP_ENV: dev + volumes: + - ./datahub-mce-consumer/start.sh:/datahub/datahub-mce-consumer/scripts/start.sh + - ../metadata-jobs/mce-consumer-job/build/libs/:/datahub/datahub-mce-consumer/bin \ No newline at end of file diff --git a/docker/docker-compose.consumers.yml b/docker/docker-compose.consumers.yml new file mode 100644 index 0000000000000..3dc39521fd05e --- /dev/null +++ b/docker/docker-compose.consumers.yml @@ -0,0 +1,31 @@ +# Service definitions for standalone Kafka consumer containers. +version: '3.8' +services: + datahub-mae-consumer: + build: + context: ../ + dockerfile: docker/datahub-mae-consumer/Dockerfile + image: linkedin/datahub-mae-consumer:${DATAHUB_VERSION:-latest} + env_file: datahub-mae-consumer/env/docker.env + hostname: datahub-mae-consumer + container_name: datahub-mae-consumer + ports: + - "9091:9091" + depends_on: + - kafka-setup + - elasticsearch-setup + - neo4j + + datahub-mce-consumer: + build: + context: ../ + dockerfile: docker/datahub-mce-consumer/Dockerfile + image: linkedin/datahub-mce-consumer:${DATAHUB_VERSION:-latest} + env_file: datahub-mce-consumer/env/docker.env + hostname: datahub-mce-consumer + container_name: datahub-mce-consumer + ports: + - "9090:9090" + depends_on: + - kafka-setup + - datahub-gms \ No newline at end of file diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index dc82594fa38e1..fe6997ba904a9 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -50,25 +50,3 @@ services: APP_ENV: dev volumes: - ../datahub-frontend/build/stage/datahub-frontend:/datahub-frontend - - datahub-mae-consumer: - image: linkedin/datahub-mae-consumer:debug - build: - context: datahub-mae-consumer - dockerfile: Dockerfile - args: - APP_ENV: dev - volumes: - - ./datahub-mae-consumer/start.sh:/datahub/datahub-mae-consumer/scripts/start.sh - - ../metadata-jobs/mae-consumer-job/build/libs/:/datahub/datahub-mae-consumer/bin/ - - datahub-mce-consumer: - image: linkedin/datahub-mce-consumer:debug - build: - context: datahub-mce-consumer - dockerfile: Dockerfile - args: - APP_ENV: dev - volumes: - - ./datahub-mce-consumer/start.sh:/datahub/datahub-mce-consumer/scripts/start.sh - - ../metadata-jobs/mce-consumer-job/build/libs/:/datahub/datahub-mce-consumer/bin diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 5e38f20eda7fa..74ab7845c7ca7 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -160,35 +160,6 @@ services: depends_on: - datahub-gms - datahub-mae-consumer: - build: - context: ../ - dockerfile: docker/datahub-mae-consumer/Dockerfile - image: linkedin/datahub-mae-consumer:${DATAHUB_VERSION:-latest} - env_file: datahub-mae-consumer/env/docker.env - hostname: datahub-mae-consumer - container_name: datahub-mae-consumer - ports: - - "9091:9091" - depends_on: - - kafka-setup - - elasticsearch-setup - - neo4j - - datahub-mce-consumer: - build: - context: ../ - dockerfile: docker/datahub-mce-consumer/Dockerfile - image: linkedin/datahub-mce-consumer:${DATAHUB_VERSION:-latest} - env_file: datahub-mce-consumer/env/docker.env - hostname: datahub-mce-consumer - container_name: datahub-mce-consumer - ports: - - "9090:9090" - depends_on: - - kafka-setup - - datahub-gms - networks: default: name: datahub_network diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index 6b0886e1bad2e..5f92823311269 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -55,48 +55,13 @@ services: - NEO4J_URI=bolt://neo4j - NEO4J_USERNAME=neo4j - NEO4J_PASSWORD=datahub + - MAE_CONSUMER_ENABLED=true + - MCE_CONSUMER_ENABLED=true hostname: datahub-gms image: linkedin/datahub-gms:${DATAHUB_VERSION:-latest} mem_limit: 850m ports: - 8080:8080 - datahub-mae-consumer: - container_name: datahub-mae-consumer - depends_on: - - kafka-setup - - elasticsearch-setup - - neo4j - environment: - - KAFKA_BOOTSTRAP_SERVER=broker:29092 - - KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 - - ELASTICSEARCH_HOST=elasticsearch - - ELASTICSEARCH_PORT=9200 - - NEO4J_HOST=http://neo4j:7474 - - NEO4J_URI=bolt://neo4j - - NEO4J_USERNAME=neo4j - - NEO4J_PASSWORD=datahub - - GMS_HOST=datahub-gms - - GMS_PORT=8080 - hostname: datahub-mae-consumer - image: linkedin/datahub-mae-consumer:${DATAHUB_VERSION:-latest} - mem_limit: 256m - ports: - - 9091:9091 - datahub-mce-consumer: - container_name: datahub-mce-consumer - depends_on: - - kafka-setup - - datahub-gms - environment: - - KAFKA_BOOTSTRAP_SERVER=broker:29092 - - KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 - - GMS_HOST=datahub-gms - - GMS_PORT=8080 - hostname: datahub-mce-consumer - image: linkedin/datahub-mce-consumer:${DATAHUB_VERSION:-latest} - mem_limit: 384m - ports: - - 9090:9090 elasticsearch: container_name: elasticsearch environment: diff --git a/gms/war/build.gradle b/gms/war/build.gradle index 4ad1448db3a39..7df5e426f6b96 100644 --- a/gms/war/build.gradle +++ b/gms/war/build.gradle @@ -4,6 +4,8 @@ ext.apiProject = project(':gms:api') dependencies { runtime project(':gms:factories') + runtime project(':metadata-jobs:mce-consumer') + runtime project(':metadata-jobs:mae-consumer') runtime externalDependency.h2 runtime externalDependency.logbackClassic diff --git a/metadata-ingestion/src/datahub/cli/docker_check.py b/metadata-ingestion/src/datahub/cli/docker_check.py index 8735d97cee264..ee45b0feedf09 100644 --- a/metadata-ingestion/src/datahub/cli/docker_check.py +++ b/metadata-ingestion/src/datahub/cli/docker_check.py @@ -7,9 +7,7 @@ "elasticsearch-setup", "elasticsearch", "datahub-gms", - "datahub-mce-consumer", "datahub-frontend-react", - "datahub-mae-consumer", "kafka-setup", "schema-registry", "broker", @@ -21,6 +19,8 @@ # "schema-registry-ui", # "kibana", # "kafka-rest-proxy", + # "datahub-mce-consumer", + # "datahub-mae-consumer" ] ENSURE_EXIT_SUCCESS = [ diff --git a/metadata-jobs/mae-consumer-job/build.gradle b/metadata-jobs/mae-consumer-job/build.gradle index a52a468511a62..4e7f422121831 100644 --- a/metadata-jobs/mae-consumer-job/build.gradle +++ b/metadata-jobs/mae-consumer-job/build.gradle @@ -3,64 +3,15 @@ plugins { id 'java' } -apply plugin: 'pegasus' - -configurations { - avro -} - dependencies { - avro project(path: ':metadata-models', configuration: 'avroSchema') - - compile project(':li-utils') - compile (project(':gms:factories')) { - exclude group: 'org.neo4j.test' - } - compile project(':metadata-utils') - compile project(":entity-registry") - compile project(':metadata-builders') - compile project(':metadata-dao-impl:restli-dao') - compile project(':metadata-events:mxe-schemas') - compile project(':metadata-events:mxe-avro-1.7') - compile project(':metadata-events:mxe-registration') - compile project(':metadata-events:mxe-utils-avro-1.7') - - compile externalDependency.elasticSearchRest - compile externalDependency.gmaDaoApi - compile externalDependency.gmaNeo4jDao - compile externalDependency.kafkaAvroSerde - compile externalDependency.neo4jJavaDriver - - compile (externalDependency.springBootStarterWeb) { + compile project(':metadata-jobs:mae-consumer') + compile(externalDependency.springBootStarterWeb) { exclude module: "spring-boot-starter-tomcat" } compile externalDependency.springBootStarterJetty compile externalDependency.springKafka - compile externalDependency.springActuator - - compileOnly externalDependency.lombok - - annotationProcessor externalDependency.lombok - - runtime externalDependency.logbackClassic -} - -task avroSchemaSources(type: Copy) { - dependsOn configurations.avro - - from { // use of closure defers evaluation until execution time - configurations.avro.collect { zipTree(it) } - } - into("src/main/resources/") - include("avro/com/linkedin/mxe/") -} - -compileJava.dependsOn avroSchemaSources - -clean { - project.delete("src/main/resources/avro") } bootJar { mainClassName = 'com.linkedin.metadata.kafka.MaeConsumerApplication' -} +} \ No newline at end of file diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java index 389f40477ad13..1a0635c9e2f51 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java @@ -8,8 +8,8 @@ @SpringBootApplication(exclude = {RestClientAutoConfiguration.class}) public class MaeConsumerApplication { - public static void main(String[] args) { - Class[] primarySources = {MaeConsumerApplication.class, MaeConsumerConfig.class}; - SpringApplication.run(primarySources, args); - } -} + public static void main(String[] args) { + Class[] primarySources = {MaeConsumerApplication.class, com.linkedin.metadata.kafka.MaeConsumerConfig.class}; + SpringApplication.run(primarySources, args); + } +} \ No newline at end of file diff --git a/metadata-jobs/mae-consumer-job/.gitignore b/metadata-jobs/mae-consumer/.gitignore similarity index 100% rename from metadata-jobs/mae-consumer-job/.gitignore rename to metadata-jobs/mae-consumer/.gitignore diff --git a/metadata-jobs/mae-consumer/build.gradle b/metadata-jobs/mae-consumer/build.gradle new file mode 100644 index 0000000000000..69fe765ef40ba --- /dev/null +++ b/metadata-jobs/mae-consumer/build.gradle @@ -0,0 +1,57 @@ +plugins { + id 'java' +} + +apply plugin: 'pegasus' + +configurations { + avro +} + +dependencies { + avro project(path: ':metadata-models', configuration: 'avroSchema') + + compile project(':li-utils') + compile (project(':gms:factories')) { + exclude group: 'org.neo4j.test' + } + compile project(':metadata-utils') + compile project(":entity-registry") + compile project(':metadata-builders') + compile project(':metadata-dao-impl:restli-dao') + compile project(':metadata-events:mxe-schemas') + compile project(':metadata-events:mxe-avro-1.7') + compile project(':metadata-events:mxe-registration') + compile project(':metadata-events:mxe-utils-avro-1.7') + + compile externalDependency.elasticSearchRest + compile externalDependency.gmaDaoApi + compile externalDependency.gmaNeo4jDao + compile externalDependency.kafkaAvroSerde + compile externalDependency.neo4jJavaDriver + + compile externalDependency.springKafka + compile externalDependency.springActuator + + compileOnly externalDependency.lombok + + annotationProcessor externalDependency.lombok + + runtime externalDependency.logbackClassic +} + +task avroSchemaSources(type: Copy) { + dependsOn configurations.avro + + from { // use of closure defers evaluation until execution time + configurations.avro.collect { zipTree(it) } + } + into("src/main/resources/") + include("avro/com/linkedin/mxe/") +} + +compileJava.dependsOn avroSchemaSources + +clean { + project.delete("src/main/resources/avro") +} \ No newline at end of file diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java similarity index 87% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java index 43cd68802830d..cdb4ecc7450cc 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.kafka; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.kafka.config.DataHubUsageEventsProcessorCondition; import com.linkedin.metadata.kafka.elasticsearch.ElasticsearchConnector; import com.linkedin.metadata.kafka.elasticsearch.JsonElasticEvent; import com.linkedin.metadata.kafka.transformer.DataHubUsageEventTransformer; @@ -11,7 +12,7 @@ import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Conditional; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -19,16 +20,18 @@ @Slf4j @Component -@ConditionalOnProperty(value = "DATAHUB_ANALYTICS_ENABLED", havingValue = "true", matchIfMissing = true) @EnableKafka +@Conditional(DataHubUsageEventsProcessorCondition.class) public class DataHubUsageEventsProcessor { private final ElasticsearchConnector elasticSearchConnector; private final DataHubUsageEventTransformer dataHubUsageEventTransformer; private final String indexName; - public DataHubUsageEventsProcessor(ElasticsearchConnector elasticSearchConnector, - DataHubUsageEventTransformer dataHubUsageEventTransformer, IndexConvention indexConvention) { + public DataHubUsageEventsProcessor( + ElasticsearchConnector elasticSearchConnector, + DataHubUsageEventTransformer dataHubUsageEventTransformer, + IndexConvention indexConvention) { this.elasticSearchConnector = elasticSearchConnector; this.dataHubUsageEventTransformer = dataHubUsageEventTransformer; this.indexName = indexConvention.getIndexName("datahub_usage_event"); diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerConfig.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MaeConsumerConfig.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerConfig.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MaeConsumerConfig.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java similarity index 96% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java index 3cce2a2155ab8..d03a5da137689 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java @@ -10,6 +10,7 @@ import com.linkedin.metadata.extractor.FieldExtractor; import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.kafka.config.MetadataAuditEventsProcessorCondition; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.RelationshipFieldSpec; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; @@ -34,6 +35,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; @@ -44,6 +46,7 @@ @Slf4j @Component +@Conditional(MetadataAuditEventsProcessorCondition.class) @Import({GraphServiceFactory.class, SearchServiceFactory.class}) @EnableKafka public class MetadataAuditEventsProcessor { diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/DataHubUsageEventsProcessorCondition.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/DataHubUsageEventsProcessorCondition.java new file mode 100644 index 0000000000000..0413cd09c36b7 --- /dev/null +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/DataHubUsageEventsProcessorCondition.java @@ -0,0 +1,19 @@ +package com.linkedin.metadata.kafka.config; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.env.Environment; +import org.springframework.core.type.AnnotatedTypeMetadata; + + +public class DataHubUsageEventsProcessorCondition implements Condition { + @Override + public boolean matches( + ConditionContext context, + AnnotatedTypeMetadata metadata) { + Environment env = context.getEnvironment(); + return "true".equals(env.getProperty("MAE_CONSUMER_ENABLED")) && ( + env.getProperty("DATAHUB_ANALYTICS_ENABLED") == null + || "true".equals(env.getProperty("DATAHUB_ANALYTICS_ENABLED"))); + } +} \ No newline at end of file diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/ElasticSearchConfig.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/ElasticSearchConfig.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/ElasticSearchConfig.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/ElasticSearchConfig.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/HydratorFactoryConfig.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/HydratorFactoryConfig.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/HydratorFactoryConfig.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/HydratorFactoryConfig.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java similarity index 90% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java index 73f138da446e1..92b4d624fb71c 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java @@ -15,8 +15,6 @@ import com.linkedin.metadata.builders.search.GlossaryTermInfoIndexBuilder; import com.linkedin.metadata.builders.search.GlossaryNodeInfoIndexBuilder; import com.linkedin.metadata.restli.DefaultRestliClientFactory; -import com.linkedin.metadata.utils.elasticsearch.IndexConvention; -import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import com.linkedin.restli.client.Client; import java.util.HashSet; import java.util.Set; @@ -71,12 +69,4 @@ public Set> indexBuilders(@Nonnull Cl public Client restliClient() { return DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort); } - - /** - * Convention for naming search indices - */ - @Bean - public IndexConvention indexConvention() { - return new IndexConventionImpl(indexPrefix); - } } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java similarity index 99% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java index 97fbe48653bae..b054f4086aaa5 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java @@ -20,7 +20,7 @@ @Slf4j @Configuration -public class KafkaConfig { +public class MaeKafkaConfig { @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") private String kafkaBootstrapServer; @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataAuditEventsProcessorCondition.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataAuditEventsProcessorCondition.java new file mode 100644 index 0000000000000..4dc7598ceaacb --- /dev/null +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataAuditEventsProcessorCondition.java @@ -0,0 +1,17 @@ +package com.linkedin.metadata.kafka.config; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.env.Environment; +import org.springframework.core.type.AnnotatedTypeMetadata; + + +public class MetadataAuditEventsProcessorCondition implements Condition { + @Override + public boolean matches( + ConditionContext context, + AnnotatedTypeMetadata metadata) { + Environment env = context.getEnvironment(); + return "true".equals(env.getProperty("MAE_CONSUMER_ENABLED")); + } +} diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticEvent.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticEvent.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticEvent.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticEvent.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnector.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnector.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnector.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnector.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java similarity index 86% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java index bd53263d1ae57..74eb559e8eeba 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java @@ -21,7 +21,7 @@ public class ElasticsearchConnectorFactory { @Bean(name = "elasticsearchConnector") @Nonnull - public ElasticsearchConnector createInstance(RestHighLevelClient elasticSearchRestHighLevelClient) { + public ElasticsearchConnector createInstance(@Nonnull RestHighLevelClient elasticSearchRestHighLevelClient) { return new ElasticsearchConnector(elasticSearchRestHighLevelClient, bulkRequestsLimit, bulkFlushPeriod); } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/JsonElasticEvent.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/JsonElasticEvent.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/JsonElasticEvent.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/JsonElasticEvent.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/MCEElasticEvent.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/MCEElasticEvent.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/elasticsearch/MCEElasticEvent.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/MCEElasticEvent.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/ChartHydrator.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/ChartHydrator.java similarity index 94% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/ChartHydrator.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/ChartHydrator.java index 7ac89c804468b..689445c619992 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/ChartHydrator.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/ChartHydrator.java @@ -9,6 +9,7 @@ import com.linkedin.restli.client.Client; import java.net.URISyntaxException; import java.util.Optional; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -20,7 +21,7 @@ public class ChartHydrator implements Hydrator { private static final String DASHBOARD_TOOL = "dashboardTool"; private static final String TITLE = "title"; - public ChartHydrator(Client restliClient) { + public ChartHydrator(@Nonnull Client restliClient) { _restliClient = restliClient; _remoteDAO = new RestliRemoteDAO<>(ChartSnapshot.class, ChartAspect.class, _restliClient); } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/CorpUserHydrator.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/CorpUserHydrator.java similarity index 94% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/CorpUserHydrator.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/CorpUserHydrator.java index 8741dc36d844f..fc1ab67b2ed59 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/CorpUserHydrator.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/CorpUserHydrator.java @@ -9,6 +9,7 @@ import com.linkedin.restli.client.Client; import java.net.URISyntaxException; import java.util.Optional; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -20,7 +21,7 @@ public class CorpUserHydrator implements Hydrator { private static final String USER_NAME = "username"; private static final String NAME = "name"; - public CorpUserHydrator(Client restliClient) { + public CorpUserHydrator(@Nonnull Client restliClient) { _restliClient = restliClient; _remoteDAO = new RestliRemoteDAO<>(CorpUserSnapshot.class, CorpUserAspect.class, _restliClient); } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DashboardHydrator.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DashboardHydrator.java similarity index 94% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DashboardHydrator.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DashboardHydrator.java index ca7e4f2a1b079..c2be304391164 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DashboardHydrator.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DashboardHydrator.java @@ -9,6 +9,7 @@ import com.linkedin.restli.client.Client; import java.net.URISyntaxException; import java.util.Optional; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -20,7 +21,7 @@ public class DashboardHydrator implements Hydrator { private static final String DASHBOARD_TOOL = "dashboardTool"; private static final String TITLE = "title"; - public DashboardHydrator(Client restliClient) { + public DashboardHydrator(@Nonnull Client restliClient) { _restliClient = restliClient; _remoteDAO = new RestliRemoteDAO<>(DashboardSnapshot.class, DashboardAspect.class, _restliClient); } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DataFlowHydrator.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DataFlowHydrator.java similarity index 94% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DataFlowHydrator.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DataFlowHydrator.java index ce6def8ce4c0e..69761b1139262 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DataFlowHydrator.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DataFlowHydrator.java @@ -9,6 +9,7 @@ import com.linkedin.restli.client.Client; import java.net.URISyntaxException; import java.util.Optional; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -20,7 +21,7 @@ public class DataFlowHydrator implements Hydrator { private static final String ORCHESTRATOR = "orchestrator"; private static final String NAME = "name"; - public DataFlowHydrator(Client restliClient) { + public DataFlowHydrator(@Nonnull Client restliClient) { _restliClient = restliClient; _remoteDAO = new RestliRemoteDAO<>(DataFlowSnapshot.class, DataFlowAspect.class, _restliClient); } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DataJobHydrator.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DataJobHydrator.java similarity index 94% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DataJobHydrator.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DataJobHydrator.java index ea4eae333ad03..e1b40e571cce4 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DataJobHydrator.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DataJobHydrator.java @@ -9,6 +9,7 @@ import com.linkedin.restli.client.Client; import java.net.URISyntaxException; import java.util.Optional; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -20,7 +21,7 @@ public class DataJobHydrator implements Hydrator { private static final String ORCHESTRATOR = "orchestrator"; private static final String NAME = "name"; - public DataJobHydrator(Client restliClient) { + public DataJobHydrator(@Nonnull Client restliClient) { _restliClient = restliClient; _remoteDAO = new RestliRemoteDAO<>(DataJobSnapshot.class, DataJobAspect.class, _restliClient); } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DatasetHydrator.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DatasetHydrator.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/DatasetHydrator.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/DatasetHydrator.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/EntityType.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/EntityType.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/EntityType.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/EntityType.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/Hydrator.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/Hydrator.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/Hydrator.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/Hydrator.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/HydratorFactory.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/HydratorFactory.java similarity index 93% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/HydratorFactory.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/HydratorFactory.java index 3e171fc41f09a..e9e1defaf3a0e 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/hydrator/HydratorFactory.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hydrator/HydratorFactory.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import javax.annotation.Nonnull; public class HydratorFactory { @@ -14,7 +15,7 @@ public class HydratorFactory { public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public HydratorFactory(Client restliClient) { + public HydratorFactory(@Nonnull Client restliClient) { _restliClient = restliClient; _hydratorMap = new HashMap<>(); _hydratorMap.put(EntityType.DATASET, new DatasetHydrator()); diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventConstants.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventConstants.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventConstants.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventConstants.java diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventTransformer.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventTransformer.java similarity index 97% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventTransformer.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventTransformer.java index 83e959187ea7d..c6ea3fa834162 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventTransformer.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventTransformer.java @@ -9,6 +9,7 @@ import com.linkedin.metadata.kafka.hydrator.HydratorFactory; import java.util.Optional; import java.util.Set; +import javax.annotation.Nonnull; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -36,7 +37,7 @@ public static class TransformedDocument { String document; } - public DataHubUsageEventTransformer(HydratorFactory hydratorFactory) { + public DataHubUsageEventTransformer(@Nonnull HydratorFactory hydratorFactory) { this.hydratorFactory = hydratorFactory; } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventType.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventType.java similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventType.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventType.java diff --git a/metadata-jobs/mae-consumer-job/src/main/resources/application.properties b/metadata-jobs/mae-consumer/src/main/resources/application.properties similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/resources/application.properties rename to metadata-jobs/mae-consumer/src/main/resources/application.properties diff --git a/metadata-jobs/mae-consumer-job/src/main/resources/logback.xml b/metadata-jobs/mae-consumer/src/main/resources/logback.xml similarity index 100% rename from metadata-jobs/mae-consumer-job/src/main/resources/logback.xml rename to metadata-jobs/mae-consumer/src/main/resources/logback.xml diff --git a/metadata-jobs/mce-consumer-job/build.gradle b/metadata-jobs/mce-consumer-job/build.gradle index c72b1cea0f7f7..bd0b4b192ca6c 100644 --- a/metadata-jobs/mce-consumer-job/build.gradle +++ b/metadata-jobs/mce-consumer-job/build.gradle @@ -3,59 +3,15 @@ plugins { id 'java' } -apply plugin: 'pegasus' - -configurations { - avro -} - dependencies { - avro project(path: ':metadata-models', configuration: 'avroSchema') - - compile project(':li-utils') - compile project(':metadata-utils') - compile project(':metadata-builders') - compile project(':metadata-events:mxe-schemas') - compile project(':metadata-events:mxe-avro-1.7') - compile project(':metadata-events:mxe-registration') - compile project(':metadata-events:mxe-utils-avro-1.7') - compile project(':metadata-dao-impl:restli-dao') - compile project(':metadata-io') - compile project(':gms:client') - compile spec.product.pegasus.restliClient - compile spec.product.pegasus.restliCommon - compile externalDependency.elasticSearchRest - compile externalDependency.kafkaAvroSerde - compile (externalDependency.springBootStarterWeb) { + compile project(':metadata-jobs:mce-consumer') + compile(externalDependency.springBootStarterWeb) { exclude module: "spring-boot-starter-tomcat" } compile externalDependency.springBootStarterJetty compile externalDependency.springKafka - - compile externalDependency.springActuator - - compileOnly externalDependency.lombok - - annotationProcessor externalDependency.lombok - -} - -task avroSchemaSources(type: Copy) { - dependsOn configurations.avro - - from { // use of closure defers evaluation until execution time - configurations.avro.collect { zipTree(it) } - } - into("src/main/resources/") - include("avro/com/linkedin/mxe/") -} - -compileJava.dependsOn avroSchemaSources - -clean { - project.delete("src/main/resources/avro") } bootJar { mainClassName = 'com.linkedin.metadata.kafka.MceConsumerApplication' -} +} \ No newline at end of file diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java index 9d6bc19801374..b308b5dba8b5c 100644 --- a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java @@ -12,4 +12,4 @@ public class MceConsumerApplication { public static void main(String[] args) { SpringApplication.run(MceConsumerApplication.class, args); } -} +} \ No newline at end of file diff --git a/metadata-jobs/mce-consumer-job/.gitignore b/metadata-jobs/mce-consumer/.gitignore similarity index 100% rename from metadata-jobs/mce-consumer-job/.gitignore rename to metadata-jobs/mce-consumer/.gitignore diff --git a/metadata-jobs/mce-consumer/build.gradle b/metadata-jobs/mce-consumer/build.gradle new file mode 100644 index 0000000000000..a895437841f53 --- /dev/null +++ b/metadata-jobs/mce-consumer/build.gradle @@ -0,0 +1,52 @@ +plugins { + id 'java' +} + +apply plugin: 'pegasus' + +configurations { + avro +} + +dependencies { + avro project(path: ':metadata-models', configuration: 'avroSchema') + + compile project(':li-utils') + compile project(':metadata-utils') + compile project(':metadata-builders') + compile project(':metadata-events:mxe-schemas') + compile project(':metadata-events:mxe-avro-1.7') + compile project(':metadata-events:mxe-registration') + compile project(':metadata-events:mxe-utils-avro-1.7') + compile project(':metadata-dao-impl:restli-dao') + compile project(':metadata-io') + compile project(':gms:client') + compile spec.product.pegasus.restliClient + compile spec.product.pegasus.restliCommon + compile externalDependency.elasticSearchRest + compile externalDependency.kafkaAvroSerde + + compile externalDependency.springKafka + compile externalDependency.springActuator + + compileOnly externalDependency.lombok + + annotationProcessor externalDependency.lombok + +} + +task avroSchemaSources(type: Copy) { + dependsOn configurations.avro + + from { // use of closure defers evaluation until execution time + configurations.avro.collect { zipTree(it) } + } + into("src/main/resources/") + include("avro/com/linkedin/mxe/") +} + +compileJava.dependsOn avroSchemaSources + +clean { + project.delete("src/main/resources/avro") +} \ No newline at end of file diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java similarity index 92% rename from metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java rename to metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java index a1403f9748ffe..90e68e6eb34ff 100644 --- a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java @@ -3,6 +3,7 @@ import com.linkedin.entity.client.EntityClient; import com.linkedin.entity.Entity; import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.kafka.config.MetadataChangeEventsProcessorCondition; import com.linkedin.metadata.snapshot.Snapshot; import com.linkedin.mxe.FailedMetadataChangeEvent; import com.linkedin.mxe.MetadataChangeEvent; @@ -15,6 +16,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Conditional; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; @@ -23,6 +25,7 @@ @Slf4j @Component +@Conditional(MetadataChangeEventsProcessorCondition.class) @EnableKafka public class MetadataChangeEventsProcessor { @@ -40,7 +43,7 @@ public MetadataChangeEventsProcessor( } @KafkaListener(id = "${KAFKA_CONSUMER_GROUP_ID:mce-consumer-job-client}", topics = "${KAFKA_MCE_TOPIC_NAME:" - + Topics.METADATA_CHANGE_EVENT + "}") + + Topics.METADATA_CHANGE_EVENT + "}", containerFactory = "mceKafkaContainerFactory") public void consume(final ConsumerRecord consumerRecord) { final GenericRecord record = consumerRecord.value(); log.debug("Record ", record); diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/EntityClientConfig.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/EntityClientConfig.java similarity index 100% rename from metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/EntityClientConfig.java rename to metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/EntityClientConfig.java diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java similarity index 98% rename from metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java rename to metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java index 98e9e236a5527..15d264dc9695f 100644 --- a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java @@ -24,13 +24,14 @@ @Slf4j @Configuration -public class KafkaConfig { +public class MceKafkaConfig { + @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") private String kafkaBootstrapServers; @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") private String kafkaSchemaRegistryUrl; - @Bean + @Bean(name = "mceKafkaContainerFactory") public KafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaProperties properties) { KafkaProperties.Consumer consumerProps = properties.getConsumer(); diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeEventsProcessorCondition.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeEventsProcessorCondition.java new file mode 100644 index 0000000000000..9739a40f5df65 --- /dev/null +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeEventsProcessorCondition.java @@ -0,0 +1,17 @@ +package com.linkedin.metadata.kafka.config; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.env.Environment; +import org.springframework.core.type.AnnotatedTypeMetadata; + + +public class MetadataChangeEventsProcessorCondition implements Condition { + @Override + public boolean matches( + ConditionContext context, + AnnotatedTypeMetadata metadata) { + Environment env = context.getEnvironment(); + return "true".equals(env.getProperty("MCE_CONSUMER_ENABLED")); + } +} diff --git a/metadata-jobs/mce-consumer-job/src/main/resources/application.properties b/metadata-jobs/mce-consumer/src/main/resources/application.properties similarity index 100% rename from metadata-jobs/mce-consumer-job/src/main/resources/application.properties rename to metadata-jobs/mce-consumer/src/main/resources/application.properties diff --git a/metadata-jobs/mce-consumer-job/src/main/resources/logback.xml b/metadata-jobs/mce-consumer/src/main/resources/logback.xml similarity index 100% rename from metadata-jobs/mce-consumer-job/src/main/resources/logback.xml rename to metadata-jobs/mce-consumer/src/main/resources/logback.xml diff --git a/settings.gradle b/settings.gradle index c2c0686e00901..832b65d925b1c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,6 +21,8 @@ include 'metadata-ingestion' include 'metadata-ingestion-examples:common' include 'metadata-ingestion-examples:kafka-etl' include 'metadata-ingestion-examples:mce-cli' +include 'metadata-jobs:mae-consumer' +include 'metadata-jobs:mce-consumer' include 'metadata-jobs:mae-consumer-job' include 'metadata-jobs:mce-consumer-job' include 'metadata-models'