From 5d6a88efa78969073c9dad015b4727b59e8a76d8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 16 Nov 2022 09:31:36 +0800 Subject: [PATCH] [refactor][java] Improve docs and code quality about KeyValueSchema usages (#17256) --- .../org/apache/pulsar/client/api/Schema.java | 6 +- .../PulsarClientImplementationBinding.java | 2 - ...PulsarClientImplementationBindingImpl.java | 4 - .../client/impl/TypedMessageBuilderImpl.java | 98 +++++++++++-------- 4 files changed, 62 insertions(+), 48 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index 0c61e5e0e963a..ef1e0cc1feade 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -384,10 +384,12 @@ static Schema> KeyValue(Class key, Class value) { } /** - * Key Value Schema using passed in key and value schemas. + * Key Value Schema using passed in key and value schemas with {@link KeyValueEncodingType#INLINE} encoding type. + * + * @see Schema#KeyValue(Schema, Schema, KeyValueEncodingType) */ static Schema> KeyValue(Schema key, Schema value) { - return DefaultImplementation.getDefaultImplementation().newKeyValueSchema(key, value); + return KeyValue(key, value, KeyValueEncodingType.INLINE); } /** diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index 2f0a909d97f85..875a793023523 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -131,8 +131,6 @@ Schema newProtobufNativeSc Schema> newKeyValueBytesSchema(); - Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema); - Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema, KeyValueEncodingType keyValueEncodingType); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index f9d4cf7dd5a9c..1b069c5172dd7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -242,10 +242,6 @@ public Schema> newKeyValueBytesSchema() { return KeyValueSchemaImpl.kvBytes(); } - public Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema) { - return KeyValueSchemaImpl.of(keySchema, valueSchema); - } - public Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema, KeyValueEncodingType keyValueEncodingType) { return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 2ad25dacb42df..026f8a1e69e0b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; @@ -32,7 +33,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; +import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -105,14 +106,12 @@ public CompletableFuture sendAsync() { @Override public TypedMessageBuilder key(String key) { - if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { - KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema; - checkArgument(kvSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED, - "This method is not allowed to set keys when in encoding type is SEPARATED"); - if (key == null) { - msgMetadata.setNullPartitionKey(true); - return this; - } + getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument( + keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED, + "This method is not allowed to set keys when in encoding type is SEPARATED")); + if (key == null) { + msgMetadata.setNullPartitionKey(true); + return this; } msgMetadata.setPartitionKey(key); msgMetadata.setPartitionKeyB64Encoded(false); @@ -121,14 +120,12 @@ public TypedMessageBuilder key(String key) { @Override public TypedMessageBuilder keyBytes(byte[] key) { - if (schema instanceof KeyValueSchemaImpl && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { - KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema; - checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED), - "This method is not allowed to set keys when in encoding type is SEPARATED"); - if (key == null) { - msgMetadata.setNullPartitionKey(true); - return this; - } + getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument( + keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED, + "This method is not allowed to set keys when in encoding type is SEPARATED")); + if (key == null) { + msgMetadata.setNullPartitionKey(true); + return this; } msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(key)); msgMetadata.setPartitionKeyB64Encoded(true); @@ -147,31 +144,18 @@ public TypedMessageBuilder value(T value) { msgMetadata.setNullValue(true); return this; } - if (value instanceof org.apache.pulsar.common.schema.KeyValue - && schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { - KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema; - org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value; - if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { - // set key as the message key - if (kv.getKey() != null) { - msgMetadata.setPartitionKey( - Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey()))); - msgMetadata.setPartitionKeyB64Encoded(true); - } else { - this.msgMetadata.setNullPartitionKey(true); - } - - // set value as the payload - if (kv.getValue() != null) { - this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue())); - } else { - this.msgMetadata.setNullValue(true); - } + + return getKeyValueSchema().map(keyValueSchema -> { + if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { + setSeparateKeyValue(value, keyValueSchema); return this; + } else { + return null; } - } - this.content = ByteBuffer.wrap(schema.encode(value)); - return this; + }).orElseGet(() -> { + content = ByteBuffer.wrap(schema.encode(value)); + return this; + }); } @Override @@ -300,4 +284,38 @@ public String getKey() { public ByteBuffer getContent() { return content; } + + private Optional> getKeyValueSchema() { + if (schema.getSchemaInfo() != null + && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE + // The schema's class could also be AutoProduceBytesSchema when its type is KEY_VALUE + && schema instanceof KeyValueSchema) { + return Optional.of((KeyValueSchema) schema); + } else { + return Optional.empty(); + } + } + + @SuppressWarnings("unchecked") + private void setSeparateKeyValue(T value, KeyValueSchema keyValueSchema) { + checkArgument(value instanceof org.apache.pulsar.common.schema.KeyValue); + org.apache.pulsar.common.schema.KeyValue keyValue = + (org.apache.pulsar.common.schema.KeyValue) value; + + // set key as the message key + if (keyValue.getKey() != null) { + msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString( + keyValueSchema.getKeySchema().encode(keyValue.getKey()))); + msgMetadata.setPartitionKeyB64Encoded(true); + } else { + msgMetadata.setNullPartitionKey(true); + } + + // set value as the payload + if (keyValue.getValue() != null) { + content = ByteBuffer.wrap(keyValueSchema.getValueSchema().encode(keyValue.getValue())); + } else { + msgMetadata.setNullValue(true); + } + } }