Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pulsar-io-kafka] Add option to copy Kafka headers to Pulsar properties #17829

Merged
merged 1 commit into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.pulsar.io.kafka;

import io.jsonwebtoken.io.Encoders;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -38,6 +40,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.header.Header;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
Expand All @@ -52,6 +55,9 @@
* Simple Kafka Source to transfer messages from a Kafka topic.
*/
public abstract class KafkaAbstractSource<V> extends PushSource<V> {
public static final String HEADER_KAFKA_TOPIC_KEY = "__kafka_topic";
public static final String HEADER_KAFKA_PTN_KEY = "__kafka_partition";
public static final String HEADER_KAFKA_OFFSET_KEY = "__kafka_offset";

private static final Logger LOG = LoggerFactory.getLogger(KafkaAbstractSource.class);

Expand Down Expand Up @@ -189,19 +195,36 @@ public void start() {

public abstract KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord);

protected Map<String, String> copyKafkaHeaders(ConsumerRecord<Object, Object> consumerRecord) {
if (!kafkaSourceConfig.isCopyHeadersEnabled()) {
return Collections.emptyMap();
}
Map<String, String> properties = new HashMap<>();
properties.put(HEADER_KAFKA_TOPIC_KEY, consumerRecord.topic());
properties.put(HEADER_KAFKA_PTN_KEY, Integer.toString(consumerRecord.partition()));
properties.put(HEADER_KAFKA_OFFSET_KEY, Long.toString(consumerRecord.offset()));
for (Header header: consumerRecord.headers()) {
properties.put(header.key(), Encoders.BASE64.encode(header.value()));
}
return properties;
}

@Slf4j
protected static class KafkaRecord<V> implements Record<V> {
private final ConsumerRecord<String, ?> record;
private final V value;
private final Schema<V> schema;
private final Map<String, String> properties;

@Getter
private final CompletableFuture<Void> completableFuture = new CompletableFuture<>();

public KafkaRecord(ConsumerRecord<String, ?> record, V value, Schema<V> schema) {
public KafkaRecord(ConsumerRecord<String, ?> record, V value, Schema<V> schema,
Map<String, String> properties) {
this.record = record;
this.value = value;
this.schema = schema;
this.properties = properties;
}
@Override
public Optional<String> getPartitionId() {
Expand Down Expand Up @@ -237,15 +260,21 @@ public void ack() {
public Schema<V> getSchema() {
return schema;
}

@Override
public Map<String, String> getProperties(){
return properties;
}
}
protected static class KeyValueKafkaRecord<V> extends KafkaRecord implements KVRecord<Object, Object> {

private final Schema<Object> keySchema;
private final Schema<Object> valueSchema;

public KeyValueKafkaRecord(ConsumerRecord record, KeyValue value,
Schema<Object> keySchema, Schema<Object> valueSchema) {
super(record, value, null);
Schema<Object> keySchema, Schema<Object> valueSchema,
Map<String, String> properties) {
super(record, value, null, properties);
this.keySchema = keySchema;
this.valueSchema = valueSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,15 @@ public KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
return new KeyValueKafkaRecord(consumerRecord,
new KeyValue<>(key, value),
currentKeySchema,
currentValueSchema);
currentValueSchema,
copyKafkaHeaders(consumerRecord));

} else {
Object value = consumerRecord.value();
return new KafkaRecord(consumerRecord,
extractSimpleValue(value),
getSchemaFromObject(value, valueSchema));
getSchemaFromObject(value, valueSchema),
copyKafkaHeaders(consumerRecord));

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public class KafkaSourceConfig implements Serializable {
"The consumer config properties to be passed to Consumer. Note that other properties specified "
+ "in the connector config file take precedence over this config.")
private Map<String, Object> consumerConfigProperties;
@FieldDoc(
defaultValue = "false",
help =
"If true the Kafka message headers will be copied into Pulsar message properties. Since Pulsar properties "
+ "is a Map<String, String>, byte array values in the Kafka headers will be base64 encoded. ")
private boolean copyHeadersEnabled = false;

public static KafkaSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class KafkaStringSource extends KafkaAbstractSource<String> {
public KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
KafkaRecord record = new KafkaRecord(consumerRecord,
new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8),
Schema.STRING);
Schema.STRING,
copyKafkaHeaders(consumerRecord));
return record;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
Expand All @@ -44,6 +45,7 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.io.core.SourceContext;
import org.bouncycastle.util.encoders.Base64;
import org.mockito.Mockito;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -115,6 +117,64 @@ public void testKeyValueSchema() throws Exception {
ByteBuffer.wrap(new StringSerializer().serialize("test", "test")));
}

@Test
public void testCopyKafkaHeadersEnabled() throws Exception {
ByteBuffer key = ByteBuffer.wrap(new IntegerSerializer().serialize("test", 10));
ByteBuffer value = ByteBuffer.wrap(new StringSerializer().serialize("test", "test"));
KafkaBytesSource source = new KafkaBytesSource();
Map<String, Object> config = new HashMap<>();
config.put("copyHeadersEnabled", true);
config.put("topic","test");
config.put("bootstrapServers","localhost:9092");
config.put("groupId", "test");
config.put("valueDeserializationClass", IntegerDeserializer.class.getName());
config.put("keyDeserializationClass", StringDeserializer.class.getName());
config.put("consumerConfigProperties", ImmutableMap.builder()
.put("schema.registry.url", "http://localhost:8081")
.build());
source.open(config, Mockito.mock(SourceContext.class));
ConsumerRecord record = new ConsumerRecord<Object, Object>("test", 88, 99, key, value);
record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
record.headers().add("k2", new byte[]{0xF});

Map<String, String> props = source.copyKafkaHeaders(record);
assertEquals(props.size(), 5);
assertTrue(props.containsKey("__kafka_topic"));
assertTrue(props.containsKey("__kafka_partition"));
assertTrue(props.containsKey("__kafka_offset"));
assertTrue(props.containsKey("k1"));
assertTrue(props.containsKey("k2"));

assertEquals(props.get("__kafka_topic"), "test");
assertEquals(props.get("__kafka_partition"), "88");
assertEquals(props.get("__kafka_offset"), "99");
assertEquals(Base64.decode(props.get("k1")), "v1".getBytes(StandardCharsets.UTF_8));
assertEquals(Base64.decode(props.get("k2")), new byte[]{0xF});
}

@Test
public void testCopyKafkaHeadersDisabled() throws Exception {
ByteBuffer key = ByteBuffer.wrap(new IntegerSerializer().serialize("test", 10));
ByteBuffer value = ByteBuffer.wrap(new StringSerializer().serialize("test", "test"));
KafkaBytesSource source = new KafkaBytesSource();
Map<String, Object> config = new HashMap<>();
config.put("topic","test");
config.put("bootstrapServers","localhost:9092");
config.put("groupId", "test");
config.put("valueDeserializationClass", IntegerDeserializer.class.getName());
config.put("keyDeserializationClass", StringDeserializer.class.getName());
config.put("consumerConfigProperties", ImmutableMap.builder()
.put("schema.registry.url", "http://localhost:8081")
.build());
source.open(config, Mockito.mock(SourceContext.class));
ConsumerRecord record = new ConsumerRecord<Object, Object>("test", 88, 99, key, value);
record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
record.headers().add("k2", new byte[]{0xF});

Map<String, String> props = source.copyKafkaHeaders(record);
assertTrue(props.isEmpty());
}

private void validateSchemaKeyValue(String keyDeserializationClass, Schema expectedKeySchema,
String valueDeserializationClass, Schema expectedValueSchema,
ByteBuffer key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
Expand Down Expand Up @@ -55,10 +56,10 @@ private static class DummySource extends KafkaAbstractSource<String> {
public KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
KafkaRecord record = new KafkaRecord(consumerRecord,
new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8),
Schema.STRING);
Schema.STRING,
Collections.emptyMap());
return record;
}

}

@Test
Expand Down