Skip to content

Commit

Permalink
Make source variables filtering more powerful camunda-community-hub#80
Browse files Browse the repository at this point in the history
  • Loading branch information
aragornfbm authored and Farid Ben Miled committed Jul 30, 2022
1 parent 6e1baa1 commit 97aa255
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 32 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,20 @@ Under the hood, the connector will create one [job worker](https://docs.camunda.

### Filtering Variables

You can filter the variables being sent to Kafka by adding a configuration option "job.variables" to your source properties. It must contain a comma-separated list of variables to pass to Kafka.
You can filter the variables being sent to Kafka by adding a configuration option "job.header.variables" to your source properties. It must contain the name of a [custom service task header](https://docs.camunda.io/docs/components/modeler/bpmn/service-tasks/#task-headers) defined in the Kafka source service task. The value associated with this header is a comma-separated list of variables to pass to Kafka.

If this property is not present, then all variables in the scope will be sent to Kafka by default.

```properties
source.json
{
"name": "ping",
"config": {
...
"job.variables": "a, b, andSomeVariableC",
"job.header.variables": "kafkaVariables",
...
```

![Variables custom header](doc/images/variables-custom-header.png)
## Configuring Error Handling of Kafka Connect, e.g. Logging or Dead Letter Queues

Kafka Connect allows you to configure what happens if a message cannot be processed. A great explanation can be found in [Kafka Connect Deep Dive – Error Handling and Dead Letter Queues](https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues). This of course also applies to this connector.
Expand Down
Binary file modified doc/images/variables-custom-header.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ZeebeSourceConnectorConfig extends AbstractConfig {
static final String MAX_JOBS_TO_ACTIVATE_CONFIG = ClientProperties.JOB_WORKER_MAX_JOBS_ACTIVE;
static final String JOB_TIMEOUT_CONFIG = ClientProperties.DEFAULT_JOB_TIMEOUT;
static final String JOB_HEADER_TOPICS_CONFIG = "job.header.topics";
static final String JOB_VARIABLES_CONFIG = "job.variables";
static final String JOB_HEADER_VARIABLES_CONFIG = "job.header.variables";
private static final String WORKER_CONFIG_GROUP = "Job Worker";
private static final String WORKER_NAME_DEFAULT = "kafka-connector";
private static final String WORKER_NAME_DOC = "Name of the Zeebe worker that will poll jobs";
Expand All @@ -50,10 +50,10 @@ public class ZeebeSourceConnectorConfig extends AbstractConfig {
"Zeebe service task extension header key which determines to what Kafka topic a job should "
+ "be published. The value of the header is expected to be a comma-separated list of "
+ "Kafka topics on which the source record will be published.";
private static final String JOB_VARIABLES_DEFAULT = "";
private static final String JOB_VARIABLES_DOC =
"A comma-separated list of variables to fetch when activating a job. If none given, then "
+ "all variables are fetched.";
private static final String JOB_HEADER_VARIABLES_DEFAULT = "";
private static final String JOB_HEADER_VARIABLES_DOC =
"A comma-separated list of variables to send in the Kafka message. If none given, then "
+ "all variables are sent.";

public ZeebeSourceConnectorConfig(final Map<String, String> properties) {
super(DEFINITIONS, properties);
Expand Down Expand Up @@ -122,14 +122,14 @@ private static void defineWorkerGroup(final ConfigDef definitions) {
Width.SHORT,
"Job topics header")
.define(
JOB_VARIABLES_CONFIG,
Type.LIST,
JOB_VARIABLES_DEFAULT,
JOB_HEADER_VARIABLES_CONFIG,
Type.STRING,
JOB_HEADER_VARIABLES_DEFAULT,
Importance.LOW,
JOB_VARIABLES_DOC,
JOB_HEADER_VARIABLES_DOC,
WORKER_CONFIG_GROUP,
++order,
Width.SHORT,
"Job variables");
"Job variables header");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright © 2019 camunda services GmbH ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe.kafka.connect.source;

import io.camunda.zeebe.client.api.response.ActivatedJob;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ZeebeSourceJobVariablesExtractor {
private final String jobVariables;

public ZeebeSourceJobVariablesExtractor(ZeebeSourceConnectorConfig config) {
jobVariables = config.getString(ZeebeSourceConnectorConfig.JOB_HEADER_VARIABLES_CONFIG);
}

public List<String> extract(ActivatedJob job) {
final List<String> variableNames;
final String jobVariableString = job.getCustomHeaders().get(jobVariables);
if (jobVariableString == null) {
variableNames = null;
} else if (jobVariableString.contains(",")) {
variableNames =
Arrays.stream(jobVariableString.split(",", 0))
.map(String::trim)
.collect(Collectors.toList());
} else {
variableNames = new ArrayList<String>(1);
variableNames.add(jobVariableString);
}
return variableNames;
}
}
55 changes: 39 additions & 16 deletions src/main/java/io/zeebe/kafka/connect/source/ZeebeSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.impl.ZeebeObjectMapper;
import io.zeebe.kafka.connect.util.ManagedClient;
import io.zeebe.kafka.connect.util.ManagedClient.AlreadyClosedException;
import io.zeebe.kafka.connect.util.VersionInfo;
Expand All @@ -34,17 +35,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Source task for Zeebe which activates jobs, publishes results, and completes jobs */
/**
* Source task for Zeebe which activates jobs, publishes results, and completes
* jobs
*/
public class ZeebeSourceTask extends SourceTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ZeebeSourceTask.class);

private ManagedClient managedClient;
private ZeebeSourceTopicExtractor topicExtractor;
private ZeebeSourceJobVariablesExtractor jobVariablesExtractor;
private ZeebeSourceTaskFetcher taskFetcher;
private ZeebeSourceInflightRegistry inflightRegistry;
private ZeebeSourceBackoff backoff;

public ZeebeSourceTask() {}
public ZeebeSourceTask() {
}

@Override
public void start(final Map<String, String> props) {
Expand All @@ -53,6 +59,7 @@ public void start(final Map<String, String> props) {
managedClient = new ManagedClient(client);

topicExtractor = new ZeebeSourceTopicExtractor(config);
jobVariablesExtractor = new ZeebeSourceJobVariablesExtractor(config);
taskFetcher = new ZeebeSourceTaskFetcher(config, topicExtractor);
inflightRegistry = new ZeebeSourceInflightRegistry(config);
backoff = new ZeebeSourceBackoff(config);
Expand All @@ -67,13 +74,12 @@ public List<SourceRecord> poll() {
return null;
}

final List<SourceRecord> records =
inflightRegistry
.jobTypesWithCapacity()
.flatMap(this::fetchJobs)
.map(inflightRegistry::registerJob)
.map(this::transformJob)
.collect(Collectors.toList());
final List<SourceRecord> records = inflightRegistry
.jobTypesWithCapacity()
.flatMap(this::fetchJobs)
.map(inflightRegistry::registerJob)
.map(this::transformJob)
.collect(Collectors.toList());

// poll interface specifies to return null instead of empty
if (records.isEmpty()) {
Expand Down Expand Up @@ -142,11 +148,14 @@ public String version() {

private SourceRecord transformJob(final ActivatedJob job) {
final String topic = topicExtractor.extract(job);
final Map<String, Integer> sourcePartition =
Collections.singletonMap("partitionId", decodePartitionId(job.getKey()));
// a better sourceOffset would be the position but we don't have it here unfortunately
// key is however a monotonically increasing value, so in a sense it can provide a good
// approximation of an offset
final Map<String, Object> variables = extractJobVariables(jobVariablesExtractor.extract(job),
job.getVariablesAsMap());

final Map<String, Integer> sourcePartition = Collections.singletonMap("partitionId",
decodePartitionId(job.getKey()));
// a better sourceOffset would be the position but we don't have it here
// unfortunately, key is however a monotonically increasing value, so in a sense
// it can provide a good approximation of an offset
final Map<String, Long> sourceOffset = Collections.singletonMap("key", job.getKey());
return new SourceRecord(
sourcePartition,
Expand All @@ -155,12 +164,26 @@ private SourceRecord transformJob(final ActivatedJob job) {
Schema.INT64_SCHEMA,
job.getKey(),
Schema.STRING_SCHEMA,
job.toJson());
new ZeebeObjectMapper().toJson(variables));
}

// Copied from Zeebe Protocol as it is currently fixed to Java 11, and the connector to Java 8
// Copied from Zeebe Protocol as it is currently fixed to Java 11, and the
// connector to Java 8
// Should be fixed eventually and we can use the protocol directly again
private int decodePartitionId(final long key) {
return (int) (key >> 51);
}

private Map<String, Object> extractJobVariables(final List<String> jobVariableNamesList,
final Map<String, Object> jobVariablesAsMap) {
final Map<String, Object> variables;
if (jobVariableNamesList != null) {
variables = jobVariablesAsMap.entrySet().stream()
.filter(x -> jobVariableNamesList.contains(x.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} else {
variables = jobVariablesAsMap;
}
return variables;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ class ZeebeSourceTaskFetcher {

private final ZeebeSourceTopicExtractor topicExtractor;

private final List<String> jobVariables;
private final Duration jobTimeout;
private final String workerName;

ZeebeSourceTaskFetcher(
final ZeebeSourceConnectorConfig config, final ZeebeSourceTopicExtractor topicExtractor) {
this.topicExtractor = topicExtractor;

jobVariables = config.getList(ZeebeSourceConnectorConfig.JOB_VARIABLES_CONFIG);
jobTimeout = Duration.ofMillis(config.getLong(ZeebeSourceConnectorConfig.JOB_TIMEOUT_CONFIG));
workerName = config.getString(ZeebeSourceConnectorConfig.WORKER_NAME_CONFIG);
}
Expand Down Expand Up @@ -79,7 +77,6 @@ private List<ActivatedJob> activateJobs(
.maxJobsToActivate(amount)
.workerName(workerName)
.timeout(jobTimeout)
.fetchVariables(jobVariables)
.requestTimeout(requestTimeout)
.send()
.get()
Expand Down

0 comments on commit 97aa255

Please sign in to comment.