diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index 92beccd067e2..b7e6e981effe 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -54,7 +54,6 @@ def sdk_provided_project_dependencies = [ ":runners:google-cloud-dataflow-java", ":sdks:java:extensions:avro", ":sdks:java:extensions:google-cloud-platform-core", - ":sdks:java:io:kafka", // For metric propagation into worker ":sdks:java:io:google-cloud-platform", ] diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java index 77f867793ae2..91baefa0be4c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java @@ -32,7 +32,6 @@ import java.util.Map.Entry; import java.util.Optional; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; -import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; @@ -43,6 +42,9 @@ * converter. */ public class MetricsToPerStepNamespaceMetricsConverter { + // Avoids to introduce mandatory kafka-io dependency to Dataflow worker + // keep in sync with org.apache.beam.sdk.io.kafka.KafkaSinkMetrics.METRICS_NAMESPACE + public static String KAFKA_SINK_METRICS_NAMESPACE = "KafkaSink"; private static Optional getParsedMetricName( MetricName metricName, @@ -70,7 +72,7 @@ private static Optional convertCounterToMetricValue( if (value == 0 || (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE) - && !metricName.getNamespace().equals(KafkaSinkMetrics.METRICS_NAMESPACE))) { + && !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) { return Optional.empty(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 088a28e9b2db..0112ab4af80a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -110,7 +110,6 @@ import org.apache.beam.sdk.fn.JvmInitializers; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; -import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -835,10 +834,6 @@ public static void main(String[] args) throws Exception { enableBigQueryMetrics(); } - if (DataflowRunner.hasExperiment(options, "enable_kafka_metrics")) { - KafkaSinkMetrics.setSupportKafkaMetrics(true); - } - JvmInitializers.runBeforeProcessing(options); worker.startStatusPages(); worker.start(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java index 525464ef2e1f..d9fe95f3421b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.streaming; +import static org.apache.beam.runners.dataflow.worker.MetricsToPerStepNamespaceMetricsConverter.KAFKA_SINK_METRICS_NAMESPACE; import static org.apache.beam.sdk.metrics.Metrics.THROTTLE_TIME_COUNTER_NAME; import com.google.api.services.dataflow.model.CounterStructuredName; @@ -35,7 +36,6 @@ import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; -import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; /** Contains a few of the stage specific fields. E.g. metrics container registry, counters etc. */ @@ -120,8 +120,7 @@ private void translateKnownPerWorkerCounters(List metri for (PerStepNamespaceMetrics perStepnamespaceMetrics : metrics) { if (!BigQuerySinkMetrics.METRICS_NAMESPACE.equals( perStepnamespaceMetrics.getMetricsNamespace()) - && !KafkaSinkMetrics.METRICS_NAMESPACE.equals( - perStepnamespaceMetrics.getMetricsNamespace())) { + && !KAFKA_SINK_METRICS_NAMESPACE.equals(perStepnamespaceMetrics.getMetricsNamespace())) { continue; } for (MetricValue metric : perStepnamespaceMetrics.getMetricValues()) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOInitializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOInitializer.java new file mode 100644 index 000000000000..3dfb31715ced --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOInitializer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.harness.JvmInitializer; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** Initialize KafkaIO feature flags on worker. */ +@AutoService(JvmInitializer.class) +public class KafkaIOInitializer implements JvmInitializer { + @Override + public void beforeProcessing(PipelineOptions options) { + if (ExperimentalOptions.hasExperiment(options, "enable_kafka_metrics")) { + KafkaSinkMetrics.setSupportKafkaMetrics(true); + } + } +}