Skip to content

Commit

Permalink
[refactor][java] Improve docs and code quality about KeyValueSchema u…
Browse files Browse the repository at this point in the history
…sages (#17256)
  • Loading branch information
BewareMyPower authored Nov 16, 2022
1 parent 23695a9 commit 5d6a88e
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,12 @@ static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value) {
}

/**
* Key Value Schema using passed in key and value schemas.
* Key Value Schema using passed in key and value schemas with {@link KeyValueEncodingType#INLINE} encoding type.
*
* @see Schema#KeyValue(Schema, Schema, KeyValueEncodingType)
*/
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value) {
return DefaultImplementation.getDefaultImplementation().newKeyValueSchema(key, value);
return KeyValue(key, value, KeyValueEncodingType.INLINE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufNativeSc

Schema<KeyValue<byte[], byte[]>> newKeyValueBytesSchema();

<K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, Schema<V> valueSchema);

<K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, Schema<V> valueSchema,
KeyValueEncodingType keyValueEncodingType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,6 @@ public Schema<KeyValue<byte[], byte[]>> newKeyValueBytesSchema() {
return KeyValueSchemaImpl.kvBytes();
}

public <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, Schema<V> valueSchema) {
return KeyValueSchemaImpl.of(keySchema, valueSchema);
}

public <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, Schema<V> valueSchema,
KeyValueEncodingType keyValueEncodingType) {
return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
Expand Down Expand Up @@ -105,14 +106,12 @@ public CompletableFuture<MessageId> sendAsync() {

@Override
public TypedMessageBuilder<T> key(String key) {
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
checkArgument(kvSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED,
"This method is not allowed to set keys when in encoding type is SEPARATED");
if (key == null) {
msgMetadata.setNullPartitionKey(true);
return this;
}
getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument(
keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED,
"This method is not allowed to set keys when in encoding type is SEPARATED"));
if (key == null) {
msgMetadata.setNullPartitionKey(true);
return this;
}
msgMetadata.setPartitionKey(key);
msgMetadata.setPartitionKeyB64Encoded(false);
Expand All @@ -121,14 +120,12 @@ public TypedMessageBuilder<T> key(String key) {

@Override
public TypedMessageBuilder<T> keyBytes(byte[] key) {
if (schema instanceof KeyValueSchemaImpl && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
if (key == null) {
msgMetadata.setNullPartitionKey(true);
return this;
}
getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument(
keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED,
"This method is not allowed to set keys when in encoding type is SEPARATED"));
if (key == null) {
msgMetadata.setNullPartitionKey(true);
return this;
}
msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(key));
msgMetadata.setPartitionKeyB64Encoded(true);
Expand All @@ -147,31 +144,18 @@ public TypedMessageBuilder<T> value(T value) {
msgMetadata.setNullValue(true);
return this;
}
if (value instanceof org.apache.pulsar.common.schema.KeyValue
&& schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
// set key as the message key
if (kv.getKey() != null) {
msgMetadata.setPartitionKey(
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
msgMetadata.setPartitionKeyB64Encoded(true);
} else {
this.msgMetadata.setNullPartitionKey(true);
}

// set value as the payload
if (kv.getValue() != null) {
this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
} else {
this.msgMetadata.setNullValue(true);
}

return getKeyValueSchema().map(keyValueSchema -> {
if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
setSeparateKeyValue(value, keyValueSchema);
return this;
} else {
return null;
}
}
this.content = ByteBuffer.wrap(schema.encode(value));
return this;
}).orElseGet(() -> {
content = ByteBuffer.wrap(schema.encode(value));
return this;
});
}

@Override
Expand Down Expand Up @@ -300,4 +284,38 @@ public String getKey() {
public ByteBuffer getContent() {
return content;
}

private Optional<KeyValueSchema<?, ?>> getKeyValueSchema() {
if (schema.getSchemaInfo() != null
&& schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE
// The schema's class could also be AutoProduceBytesSchema when its type is KEY_VALUE
&& schema instanceof KeyValueSchema) {
return Optional.of((KeyValueSchema<?, ?>) schema);
} else {
return Optional.empty();
}
}

@SuppressWarnings("unchecked")
private <K, V> void setSeparateKeyValue(T value, KeyValueSchema<K, V> keyValueSchema) {
checkArgument(value instanceof org.apache.pulsar.common.schema.KeyValue);
org.apache.pulsar.common.schema.KeyValue<K, V> keyValue =
(org.apache.pulsar.common.schema.KeyValue<K, V>) value;

// set key as the message key
if (keyValue.getKey() != null) {
msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(
keyValueSchema.getKeySchema().encode(keyValue.getKey())));
msgMetadata.setPartitionKeyB64Encoded(true);
} else {
msgMetadata.setNullPartitionKey(true);
}

// set value as the payload
if (keyValue.getValue() != null) {
content = ByteBuffer.wrap(keyValueSchema.getValueSchema().encode(keyValue.getValue()));
} else {
msgMetadata.setNullValue(true);
}
}
}

0 comments on commit 5d6a88e

Please sign in to comment.