diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java index 016f24cb4a055..dda47a2181bbd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java @@ -14,26 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import java.util.Objects; -public class CombinedKey { - private final KF foreignKey; - private final KP primaryKey; +public class CombinedKey { + private final KRight foreignKey; + private final KLeft primaryKey; - CombinedKey(final KF foreignKey, final KP primaryKey) { + CombinedKey(final KRight foreignKey, final KLeft primaryKey) { Objects.requireNonNull(primaryKey, "primaryKey can't be null"); this.foreignKey = foreignKey; this.primaryKey = primaryKey; } - public KF foreignKey() { + public KRight foreignKey() { return foreignKey; } - public KP primaryKey() { + public KLeft primaryKey() { return primaryKey; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java index 6b735f5b56a01..176205c724870 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java @@ -28,20 +28,20 @@ /** * Factory for creating CombinedKey serializers / deserializers. */ -public class CombinedKeySchema { +public class CombinedKeySchema { private final Supplier undecoratedPrimaryKeySerdeTopicSupplier; private final Supplier undecoratedForeignKeySerdeTopicSupplier; private String primaryKeySerdeTopic; private String foreignKeySerdeTopic; - private Serializer primaryKeySerializer; - private Deserializer primaryKeyDeserializer; - private Serializer foreignKeySerializer; - private Deserializer foreignKeyDeserializer; + private Serializer primaryKeySerializer; + private Deserializer primaryKeyDeserializer; + private Serializer foreignKeySerializer; + private Deserializer foreignKeyDeserializer; public CombinedKeySchema(final Supplier foreignKeySerdeTopicSupplier, - final Serde foreignKeySerde, + final Serde foreignKeySerde, final Supplier primaryKeySerdeTopicSupplier, - final Serde primaryKeySerde) { + final Serde primaryKeySerde) { undecoratedPrimaryKeySerdeTopicSupplier = primaryKeySerdeTopicSupplier; undecoratedForeignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier; primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer(); @@ -50,17 +50,17 @@ public CombinedKeySchema(final Supplier foreignKeySerdeTopicSupplier, foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer(); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "resource"}) public void init(final ProcessorContext context) { primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get(); foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get(); - primaryKeySerializer = primaryKeySerializer == null ? (Serializer) context.keySerde().serializer() : primaryKeySerializer; - primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer) context.keySerde().deserializer() : primaryKeyDeserializer; - foreignKeySerializer = foreignKeySerializer == null ? (Serializer) context.keySerde().serializer() : foreignKeySerializer; - foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer) context.keySerde().deserializer() : foreignKeyDeserializer; + primaryKeySerializer = primaryKeySerializer == null ? (Serializer) context.keySerde().serializer() : primaryKeySerializer; + primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer) context.keySerde().deserializer() : primaryKeyDeserializer; + foreignKeySerializer = foreignKeySerializer == null ? (Serializer) context.keySerde().serializer() : foreignKeySerializer; + foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer) context.keySerde().deserializer() : foreignKeyDeserializer; } - Bytes toBytes(final KO foreignKey, final K primaryKey) { + Bytes toBytes(final KRight foreignKey, final KLeft primaryKey) { //The serialization format - note that primaryKeySerialized may be null, such as when a prefixScan //key is being created. //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized} @@ -79,22 +79,22 @@ Bytes toBytes(final KO foreignKey, final K primaryKey) { } - public CombinedKey fromBytes(final Bytes data) { + public CombinedKey fromBytes(final Bytes data) { //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized} final byte[] dataArray = data.get(); final ByteBuffer dataBuffer = ByteBuffer.wrap(dataArray); final int foreignKeyLength = dataBuffer.getInt(); final byte[] foreignKeyRaw = new byte[foreignKeyLength]; dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength); - final KO foreignKey = foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw); + final KRight foreignKey = foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw); final byte[] primaryKeyRaw = new byte[dataArray.length - foreignKeyLength - Integer.BYTES]; dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length); - final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw); + final KLeft primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw); return new CombinedKey<>(foreignKey, primaryKey); } - Bytes prefixBytes(final KO key) { + Bytes prefixBytes(final KRight key) { //The serialization format. Note that primaryKeySerialized is not required/used in this function. //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java index 1d422d26fd6b5..9f597bad4b252 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java @@ -30,19 +30,19 @@ *
  • {@link #fromBiFunction(BiFunction)} - when the foreign key depends on both key and value
  • * * - * @param Type of primary table's key - * @param Type of primary table's value - * @param Type of the foreign key to extract + * @param Type of primary table's key + * @param Type of primary table's value + * @param Type of the foreign key to extract */ @FunctionalInterface -public interface ForeignKeyExtractor { - KO extract(K key, V value); +public interface ForeignKeyExtractor { + KRight extract(KLeft key, VLeft value); - static ForeignKeyExtractor fromFunction(Function function) { + static ForeignKeyExtractor fromFunction(Function function) { return (key, value) -> function.apply(value); } - static ForeignKeyExtractor fromBiFunction(BiFunction biFunction) { + static ForeignKeyExtractor fromBiFunction(BiFunction biFunction) { return biFunction::apply; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java index a8b8228ed552f..99d7b22dedc80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.metrics.Sensor; @@ -43,15 +42,16 @@ import java.util.Collections; import java.util.Set; -public class ForeignTableJoinProcessorSupplier implements - ProcessorSupplier, K, SubscriptionResponseWrapper> { +public class ForeignTableJoinProcessorSupplier + implements ProcessorSupplier, KLeft, SubscriptionResponseWrapper> { + private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class); private final StoreFactory subscriptionStoreFactory; - private final CombinedKeySchema keySchema; + private final CombinedKeySchema keySchema; private boolean useVersionedSemantics = false; public ForeignTableJoinProcessorSupplier(final StoreFactory subscriptionStoreFactory, - final CombinedKeySchema keySchema) { + final CombinedKeySchema keySchema) { this.subscriptionStoreFactory = subscriptionStoreFactory; this.keySchema = keySchema; } @@ -62,7 +62,7 @@ public Set> stores() { } @Override - public Processor, K, SubscriptionResponseWrapper> get() { + public Processor, KLeft, SubscriptionResponseWrapper> get() { return new KTableKTableJoinProcessor(); } @@ -75,12 +75,12 @@ public boolean isUseVersionedSemantics() { return useVersionedSemantics; } - private final class KTableKTableJoinProcessor extends ContextualProcessor, K, SubscriptionResponseWrapper> { + private final class KTableKTableJoinProcessor extends ContextualProcessor, KLeft, SubscriptionResponseWrapper> { private Sensor droppedRecordsSensor; - private TimestampedKeyValueStore> subscriptionStore; + private TimestampedKeyValueStore> subscriptionStore; @Override - public void init(final ProcessorContext> context) { + public void init(final ProcessorContext> context) { super.init(context); final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; droppedRecordsSensor = TaskMetrics.droppedRecordsSensor( @@ -92,7 +92,7 @@ public void init(final ProcessorContext> cont } @Override - public void process(final Record> record) { + public void process(final Record> record) { // if the key is null, we do not need to proceed aggregating // the record with the table if (record.key() == null) { @@ -122,14 +122,14 @@ public void process(final Record> record) { final Bytes prefixBytes = keySchema.prefixBytes(record.key()); //Perform the prefixScan and propagate the results - try (final KeyValueIterator>> prefixScanResults = + try (final KeyValueIterator>> prefixScanResults = subscriptionStore.range(prefixBytes, Bytes.increment(prefixBytes))) { while (prefixScanResults.hasNext()) { - final KeyValue>> next = prefixScanResults.next(); + final KeyValue>> next = prefixScanResults.next(); // have to check the prefix because the range end is inclusive :( if (prefixEquals(next.key.get(), prefixBytes.get())) { - final CombinedKey combinedKey = keySchema.fromBytes(next.key); + final CombinedKey combinedKey = keySchema.fromBytes(next.key); context().forward( record.withKey(combinedKey.primaryKey()) .withValue(new SubscriptionResponseWrapper<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java index 42e9c1cca45ab..a81700b6c0165 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -43,23 +42,25 @@ * of the primary key. This eliminates race-condition results for rapidly-changing foreign-keys for a given primary key. * Applies the join and emits nulls according to LEFT/INNER rules. * - * @param Type of primary keys - * @param Type of primary values - * @param Type of foreign values - * @param Type of joined result of primary and foreign values + * @param Type of primary keys + * @param Type of primary values + * @param Type of foreign values + * @param Type of joined result of primary and foreign values */ -public class ResponseJoinProcessorSupplier implements ProcessorSupplier, K, VR> { +public class ResponseJoinProcessorSupplier + implements ProcessorSupplier, KLeft, VOut> { + private static final Logger LOG = LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class); - private final KTableValueGetterSupplier valueGetterSupplier; - private final Serializer constructionTimeValueSerializer; + private final KTableValueGetterSupplier valueGetterSupplier; + private final Serializer constructionTimeValueSerializer; private final Supplier valueHashSerdePseudoTopicSupplier; - private final ValueJoiner joiner; + private final ValueJoiner joiner; private final boolean leftJoin; - public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier valueGetterSupplier, - final Serializer valueSerializer, + public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier valueGetterSupplier, + final Serializer valueSerializer, final Supplier valueHashSerdePseudoTopicSupplier, - final ValueJoiner joiner, + final ValueJoiner joiner, final boolean leftJoin) { this.valueGetterSupplier = valueGetterSupplier; constructionTimeValueSerializer = valueSerializer; @@ -69,24 +70,24 @@ public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier value } @Override - public Processor, K, VR> get() { + public Processor, KLeft, VOut> get() { return new ContextualProcessor<>() { private String valueHashSerdePseudoTopic; - private Serializer runtimeValueSerializer = constructionTimeValueSerializer; + private Serializer runtimeValueSerializer = constructionTimeValueSerializer; - private KTableValueGetter valueGetter; + private KTableValueGetter valueGetter; private Sensor droppedRecordsSensor; @SuppressWarnings({"unchecked", "resource"}) @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get(); valueGetter = valueGetterSupplier.get(); valueGetter.init(context); if (runtimeValueSerializer == null) { - runtimeValueSerializer = (Serializer) context.valueSerde().serializer(); + runtimeValueSerializer = (Serializer) context.valueSerde().serializer(); } final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; @@ -98,14 +99,14 @@ public void init(final ProcessorContext context) { } @Override - public void process(final Record> record) { + public void process(final Record> record) { if (record.value().version() != SubscriptionResponseWrapper.CURRENT_VERSION) { //Guard against modifications to SubscriptionResponseWrapper. Need to ensure that there is //compatibility with previous versions to enable rolling upgrades. Must develop a strategy for //upgrading from older SubscriptionWrapper versions to newer versions. throw new UnsupportedVersionException("SubscriptionResponseWrapper is of an incompatible version."); } - final ValueAndTimestamp currentValueWithTimestamp = valueGetter.get(record.key()); + final ValueAndTimestamp currentValueWithTimestamp = valueGetter.get(record.key()); final long[] currentHash = currentValueWithTimestamp == null ? null : @@ -115,7 +116,7 @@ public void process(final Record> record) { //If this value doesn't match the current value from the original table, it is stale and should be discarded. if (java.util.Arrays.equals(messageHash, currentHash)) { - final VR result; + final VOut result; if (record.value().foreignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) { result = null; //Emit tombstone diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java index 008ef665817a1..90dd8c1a365e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -31,53 +30,44 @@ import java.util.Objects; /** - * Receives {@code SubscriptionWrapper} events and processes them according to their Instruction. + * Receives {@code SubscriptionWrapper} events and processes them according to their Instruction. * Depending on the results, {@code SubscriptionResponseWrapper}s are created, which will be propagated to * the {@code ResponseJoinProcessorSupplier} instance. * - * @param Type of primary keys - * @param Type of foreign key - * @param Type of foreign value + * @param Type of primary keys + * @param Type of foreign key + * @param Type of foreign value */ -public class SubscriptionJoinProcessorSupplier - implements ProcessorSupplier, Change>>, K, SubscriptionResponseWrapper> { +public class SubscriptionJoinProcessorSupplier + implements ProcessorSupplier, Change>>, KLeft, SubscriptionResponseWrapper> { - private final KTableValueGetterSupplier foreignValueGetterSupplier; + private final KTableValueGetterSupplier foreignValueGetterSupplier; - public SubscriptionJoinProcessorSupplier(final KTableValueGetterSupplier foreignValueGetterSupplier) { + public SubscriptionJoinProcessorSupplier(final KTableValueGetterSupplier foreignValueGetterSupplier) { this.foreignValueGetterSupplier = foreignValueGetterSupplier; } @Override - public Processor, Change>>, K, SubscriptionResponseWrapper> get() { - - return new ContextualProcessor, Change>>, K, SubscriptionResponseWrapper>() { - - private KTableValueGetter foreignValues; + public Processor, Change>>, KLeft, SubscriptionResponseWrapper> get() { + return new ContextualProcessor<>() { + private KTableValueGetter foreignValues; @Override - public void init(final ProcessorContext> context) { + public void init(final ProcessorContext> context) { super.init(context); foreignValues = foreignValueGetterSupplier.get(); foreignValues.init(context); } @Override - public void process(final Record, Change>>> record) { + public void process(final Record, Change>>> record) { Objects.requireNonNull(record.key(), "This processor should never see a null key."); Objects.requireNonNull(record.value(), "This processor should never see a null value."); - final ValueAndTimestamp> valueAndTimestamp = record.value().newValue; + final ValueAndTimestamp> valueAndTimestamp = record.value().newValue; Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue."); - final SubscriptionWrapper value = valueAndTimestamp.value(); - - if (value.version() > SubscriptionWrapper.CURRENT_VERSION) { - //Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility - //with previous versions to enable rolling upgrades. Must develop a strategy for upgrading - //from older SubscriptionWrapper versions to newer versions. - throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version."); - } + final SubscriptionWrapper value = subscriptionWrapper(valueAndTimestamp); - final ValueAndTimestamp foreignValueAndTime = + final ValueAndTimestamp foreignValueAndTime = record.key().foreignKey() == null ? null : foreignValues.get(record.key().foreignKey()); @@ -91,7 +81,7 @@ public void process(final Record, Change( + .withValue(new SubscriptionResponseWrapper( value.hash(), null, value.primaryPartition() @@ -103,7 +93,7 @@ public void process(final Record, Change, Change subscriptionWrapper(final ValueAndTimestamp> valueAndTimestamp) { + final SubscriptionWrapper value = valueAndTimestamp.value(); + + if (value.version() > SubscriptionWrapper.CURRENT_VERSION) { + //Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility + //with previous versions to enable rolling upgrades. Must develop a strategy for upgrading + //from older SubscriptionWrapper versions to newer versions. + throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version."); + } + + return value; + } }; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java index 825283e98f581..e654cd752af16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -41,16 +40,16 @@ import java.util.Collections; import java.util.Set; -public class SubscriptionReceiveProcessorSupplier - implements ProcessorSupplier, CombinedKey, Change>>> { +public class SubscriptionReceiveProcessorSupplier + implements ProcessorSupplier, CombinedKey, Change>>> { + private static final Logger LOG = LoggerFactory.getLogger(SubscriptionReceiveProcessorSupplier.class); private final StoreFactory subscriptionStoreFactory; - private final CombinedKeySchema keySchema; + private final CombinedKeySchema keySchema; public SubscriptionReceiveProcessorSupplier(final StoreFactory subscriptionStoreFactory, - final CombinedKeySchema keySchema) { - + final CombinedKeySchema keySchema) { this.subscriptionStoreFactory = subscriptionStoreFactory; this.keySchema = keySchema; } @@ -61,15 +60,13 @@ public Set> stores() { } @Override - public Processor, CombinedKey, Change>>> get() { - + public Processor, CombinedKey, Change>>> get() { return new ContextualProcessor<>() { - - private TimestampedKeyValueStore> store; + private TimestampedKeyValueStore> store; private Sensor droppedRecordsSensor; @Override - public void init(final ProcessorContext, Change>>> context) { + public void init(final ProcessorContext, Change>>> context) { super.init(context); final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; @@ -84,7 +81,7 @@ public void init(final ProcessorContext, Change> record) { + public void process(final Record> record) { if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) { dropRecord(); return; @@ -102,7 +99,7 @@ public void process(final Record> record) { ); } - private Change>> inferChange(final Record> record) { + private Change>> inferChange(final Record> record) { if (record.key() == null) { return new Change<>(ValueAndTimestamp.make(record.value(), record.timestamp()), null); } else { @@ -110,11 +107,11 @@ private Change>> inferChange(final Reco } } - private Change>> inferBasedOnState(final Record> record) { + private Change>> inferBasedOnState(final Record> record) { final Bytes subscriptionKey = keySchema.toBytes(record.key(), record.value().primaryKey()); - final ValueAndTimestamp> newValue = ValueAndTimestamp.make(record.value(), record.timestamp()); - final ValueAndTimestamp> oldValue = store.get(subscriptionKey); + final ValueAndTimestamp> newValue = ValueAndTimestamp.make(record.value(), record.timestamp()); + final ValueAndTimestamp> oldValue = store.get(subscriptionKey); //This store is used by the prefix scanner in ForeignTableJoinProcessorSupplier if (record.value().instruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) || diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java index 501edcd243298..e7927e56ea664 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java @@ -21,22 +21,22 @@ import java.util.Arrays; import java.util.Objects; -public class SubscriptionResponseWrapper { +public class SubscriptionResponseWrapper { static final byte CURRENT_VERSION = 0; // v0 fields: private final long[] originalValueHash; - private final FV foreignValue; + private final VRight foreignValue; private final byte version; // non-serializing fields private final Integer primaryPartition; - public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue, final Integer primaryPartition) { + public SubscriptionResponseWrapper(final long[] originalValueHash, final VRight foreignValue, final Integer primaryPartition) { this(originalValueHash, foreignValue, CURRENT_VERSION, primaryPartition); } public SubscriptionResponseWrapper( final long[] originalValueHash, - final FV foreignValue, + final VRight foreignValue, final byte version, final Integer primaryPartition) { if (version < 0 || version > CURRENT_VERSION) { @@ -52,7 +52,7 @@ public long[] originalValueHash() { return originalValueHash; } - public FV foreignValue() { + public VRight foreignValue() { return foreignValue; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java index 72a918e455f88..0523ed2fe79cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java @@ -26,22 +26,22 @@ import java.nio.ByteBuffer; -public class SubscriptionResponseWrapperSerde implements Serde> { - private final SubscriptionResponseWrapperSerializer serializer; - private final SubscriptionResponseWrapperDeserializer deserializer; +public class SubscriptionResponseWrapperSerde implements Serde> { + private final SubscriptionResponseWrapperSerializer serializer; + private final SubscriptionResponseWrapperDeserializer deserializer; - public SubscriptionResponseWrapperSerde(final Serde foreignValueSerde) { + public SubscriptionResponseWrapperSerde(final Serde foreignValueSerde) { serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde == null ? null : foreignValueSerde.serializer()); deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde == null ? null : foreignValueSerde.deserializer()); } @Override - public Serializer> serializer() { + public Serializer> serializer() { return serializer; } @Override - public Deserializer> deserializer() { + public Deserializer> deserializer() { return deserializer; } @@ -54,7 +54,7 @@ private SubscriptionResponseWrapperSerializer(final Serializer serializer) { this.serializer = serializer; } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "resource"}) @Override public void setIfUnset(final SerdeGetter getter) { if (serializer == null) { @@ -101,7 +101,7 @@ private SubscriptionResponseWrapperDeserializer(final Deserializer deserializ this.deserializer = deserializer; } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "resource"}) @Override public void setIfUnset(final SerdeGetter getter) { if (deserializer == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java index 425dbe83bc405..40e404bf301b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.metrics.Sensor; @@ -43,22 +42,24 @@ import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE; import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE; -public class SubscriptionSendProcessorSupplier implements ProcessorSupplier, KO, SubscriptionWrapper> { +public class SubscriptionSendProcessorSupplier + implements ProcessorSupplier, KRight, SubscriptionWrapper> { + private static final Logger LOG = LoggerFactory.getLogger(SubscriptionSendProcessorSupplier.class); - private final ForeignKeyExtractor foreignKeyExtractor; + private final ForeignKeyExtractor foreignKeyExtractor; private final Supplier foreignKeySerdeTopicSupplier; private final Supplier valueSerdeTopicSupplier; private final boolean leftJoin; - private Serializer foreignKeySerializer; - private Serializer valueSerializer; + private Serializer foreignKeySerializer; + private Serializer valueSerializer; private boolean useVersionedSemantics; - public SubscriptionSendProcessorSupplier(final ForeignKeyExtractor foreignKeyExtractor, + public SubscriptionSendProcessorSupplier(final ForeignKeyExtractor foreignKeyExtractor, final Supplier foreignKeySerdeTopicSupplier, final Supplier valueSerdeTopicSupplier, - final Serde foreignKeySerde, - final Serializer valueSerializer, + final Serde foreignKeySerde, + final Serializer valueSerializer, final boolean leftJoin) { this.foreignKeyExtractor = foreignKeyExtractor; this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier; @@ -69,7 +70,7 @@ public SubscriptionSendProcessorSupplier(final ForeignKeyExtractor, KO, SubscriptionWrapper> get() { + public Processor, KRight, SubscriptionWrapper> get() { return new UnbindChangeProcessor(); } @@ -82,7 +83,7 @@ public boolean isUseVersionedSemantics() { return useVersionedSemantics; } - private class UnbindChangeProcessor extends ContextualProcessor, KO, SubscriptionWrapper> { + private class UnbindChangeProcessor extends ContextualProcessor, KRight, SubscriptionWrapper> { private Sensor droppedRecordsSensor; private String foreignKeySerdeTopic; @@ -91,16 +92,16 @@ private class UnbindChangeProcessor extends ContextualProcessor, KO @SuppressWarnings({"unchecked", "resource"}) @Override - public void init(final ProcessorContext> context) { + public void init(final ProcessorContext> context) { super.init(context); foreignKeySerdeTopic = foreignKeySerdeTopicSupplier.get(); valueSerdeTopic = valueSerdeTopicSupplier.get(); // get default key serde if it wasn't supplied directly at construction if (foreignKeySerializer == null) { - foreignKeySerializer = (Serializer) context.keySerde().serializer(); + foreignKeySerializer = (Serializer) context.keySerde().serializer(); } if (valueSerializer == null) { - valueSerializer = (Serializer) context.valueSerde().serializer(); + valueSerializer = (Serializer) context.valueSerde().serializer(); } droppedRecordsSensor = TaskMetrics.droppedRecordsSensor( Thread.currentThread().getName(), @@ -110,7 +111,7 @@ public void init(final ProcessorContext> context) { } @Override - public void process(final Record> record) { + public void process(final Record> record) { // clear cashed hash from previous record recordHash = null; // drop out-of-order records from versioned tables (cf. KIP-914) @@ -126,29 +127,29 @@ public void process(final Record> record) { } } - private void leftJoinInstructions(final Record> record) { + private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { - final KO oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue); - final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue); + final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue); + final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue); if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); } forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); } else if (record.value().newValue != null) { - final KO newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue); + final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue); forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); } } - private void defaultJoinInstructions(final Record> record) { + private void defaultJoinInstructions(final Record> record) { if (record.value().oldValue != null) { - final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().oldValue); + final KRight oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().oldValue); if (oldForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } if (record.value().newValue != null) { - final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue); + final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; @@ -166,7 +167,7 @@ private void defaultJoinInstructions(final Record> record) { forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); } } else if (record.value().newValue != null) { - final KO newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue); + final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); } else { @@ -175,12 +176,12 @@ private void defaultJoinInstructions(final Record> record) { } } - private byte[] serialize(final KO key) { + private byte[] serialize(final KRight key) { return foreignKeySerializer.serialize(foreignKeySerdeTopic, key); } - private void forward(final Record> record, final KO foreignKey, final Instruction deleteKeyNoPropagate) { - final SubscriptionWrapper wrapper = new SubscriptionWrapper<>( + private void forward(final Record> record, final KRight foreignKey, final Instruction deleteKeyNoPropagate) { + final SubscriptionWrapper wrapper = new SubscriptionWrapper<>( hash(record), deleteKeyNoPropagate, record.key(), @@ -189,7 +190,7 @@ private void forward(final Record> record, final KO foreignKey, fin context().forward(record.withKey(foreignKey).withValue(wrapper)); } - private long[] hash(final Record> record) { + private long[] hash(final Record> record) { if (recordHash == null) { recordHash = record.value().newValue == null ? null diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java index b104d420c704b..0a14eedcb6eab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java @@ -21,8 +21,7 @@ import java.util.Arrays; import java.util.Objects; - -public class SubscriptionWrapper { +public class SubscriptionWrapper { static final byte VERSION_0 = 0; static final byte VERSION_1 = 1; @@ -32,7 +31,7 @@ public class SubscriptionWrapper { private final long[] hash; private final Instruction instruction; private final byte version; - private final K primaryKey; + private final KLeft primaryKey; // v1 fields: private final Integer primaryPartition; @@ -71,11 +70,11 @@ public static Instruction fromValue(final byte value) { } } - public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final Integer primaryPartition) { + public SubscriptionWrapper(final long[] hash, final Instruction instruction, final KLeft primaryKey, final Integer primaryPartition) { this(hash, instruction, primaryKey, CURRENT_VERSION, primaryPartition); } - public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final byte version, final Integer primaryPartition) { + public SubscriptionWrapper(final long[] hash, final Instruction instruction, final KLeft primaryKey, final byte version, final Integer primaryPartition) { Objects.requireNonNull(instruction, "instruction cannot be null. Required by downstream processor."); Objects.requireNonNull(primaryKey, "primaryKey cannot be null. Required by downstream processor."); if (version < 0 || version > CURRENT_VERSION) { @@ -97,7 +96,7 @@ public long[] hash() { return hash; } - public K primaryKey() { + public KLeft primaryKey() { return primaryKey; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java index ed639eba0fe25..b03b24749e0b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java @@ -31,9 +31,10 @@ import java.util.Map; import java.util.function.Supplier; -public class SubscriptionWrapperSerde extends WrappingNullableSerde, K, Void> { +public class SubscriptionWrapperSerde extends WrappingNullableSerde, KLeft, Void> { + public SubscriptionWrapperSerde(final Supplier primaryKeySerializationPseudoTopicSupplier, - final Serde primaryKeySerde) { + final Serde primaryKeySerde) { super( new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier, primaryKeySerde == null ? null : primaryKeySerde.serializer()), @@ -42,25 +43,25 @@ public SubscriptionWrapperSerde(final Supplier primaryKeySerializationPs ); } - private static class SubscriptionWrapperSerializer - implements Serializer>, WrappingNullableSerializer, K, Void> { + private static class SubscriptionWrapperSerializer + implements Serializer>, WrappingNullableSerializer, KLeft, Void> { private final Supplier primaryKeySerializationPseudoTopicSupplier; private String primaryKeySerializationPseudoTopic = null; - private Serializer primaryKeySerializer; + private Serializer primaryKeySerializer; private boolean upgradeFromV0 = false; SubscriptionWrapperSerializer(final Supplier primaryKeySerializationPseudoTopicSupplier, - final Serializer primaryKeySerializer) { + final Serializer primaryKeySerializer) { this.primaryKeySerializationPseudoTopicSupplier = primaryKeySerializationPseudoTopicSupplier; this.primaryKeySerializer = primaryKeySerializer; } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "resource"}) @Override public void setIfUnset(final SerdeGetter getter) { if (primaryKeySerializer == null) { - primaryKeySerializer = (Serializer) getter.keySerde().serializer(); + primaryKeySerializer = (Serializer) getter.keySerde().serializer(); } } @@ -103,12 +104,11 @@ private static boolean upgradeFromV0(final Map configs) { } @Override - public byte[] serialize(final String ignored, final SubscriptionWrapper data) { + public byte[] serialize(final String ignored, final SubscriptionWrapper data) { //{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition} - //7-bit (0x7F) maximum for data version. - if (Byte.compare((byte) 0x7F, data.version()) < 0) { - throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F"); + if (data.version() < 0) { + throw new UnsupportedVersionException("SubscriptionWrapper version cannot be negative"); } final int version = data.version(); @@ -121,7 +121,7 @@ public byte[] serialize(final String ignored, final SubscriptionWrapper data) } } - private byte[] serializePrimaryKey(final SubscriptionWrapper data) { + private byte[] serializePrimaryKey(final SubscriptionWrapper data) { if (primaryKeySerializationPseudoTopic == null) { primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get(); } @@ -132,7 +132,7 @@ private byte[] serializePrimaryKey(final SubscriptionWrapper data) { ); } - private ByteBuffer serializeCommon(final SubscriptionWrapper data, final byte version, final int extraLength) { + private ByteBuffer serializeCommon(final SubscriptionWrapper data, final byte version, final int extraLength) { final byte[] primaryKeySerializedData = serializePrimaryKey(data); final ByteBuffer buf; int dataLength = 2 + primaryKeySerializedData.length + extraLength; @@ -155,40 +155,40 @@ private ByteBuffer serializeCommon(final SubscriptionWrapper data, final byte return buf; } - private byte[] serializeV0(final SubscriptionWrapper data) { + private byte[] serializeV0(final SubscriptionWrapper data) { return serializeCommon(data, (byte) 0, 0).array(); } - private byte[] serializeV1(final SubscriptionWrapper data) { + private byte[] serializeV1(final SubscriptionWrapper data) { final ByteBuffer buf = serializeCommon(data, data.version(), Integer.BYTES); buf.putInt(data.primaryPartition()); return buf.array(); } } - private static class SubscriptionWrapperDeserializer - implements Deserializer>, WrappingNullableDeserializer, K, Void> { + private static class SubscriptionWrapperDeserializer + implements Deserializer>, WrappingNullableDeserializer, KLeft, Void> { private final Supplier primaryKeySerializationPseudoTopicSupplier; private String primaryKeySerializationPseudoTopic = null; - private Deserializer primaryKeyDeserializer; + private Deserializer primaryKeyDeserializer; SubscriptionWrapperDeserializer(final Supplier primaryKeySerializationPseudoTopicSupplier, - final Deserializer primaryKeyDeserializer) { + final Deserializer primaryKeyDeserializer) { this.primaryKeySerializationPseudoTopicSupplier = primaryKeySerializationPseudoTopicSupplier; this.primaryKeyDeserializer = primaryKeyDeserializer; } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "resource"}) @Override public void setIfUnset(final SerdeGetter getter) { if (primaryKeyDeserializer == null) { - primaryKeyDeserializer = (Deserializer) getter.keySerde().deserializer(); + primaryKeyDeserializer = (Deserializer) getter.keySerde().deserializer(); } } @Override - public SubscriptionWrapper deserialize(final String ignored, final byte[] data) { + public SubscriptionWrapper deserialize(final String ignored, final byte[] data) { //{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition} final ByteBuffer buf = ByteBuffer.wrap(data); final byte versionAndIsHashNull = buf.get(); @@ -220,7 +220,7 @@ public SubscriptionWrapper deserialize(final String ignored, final byte[] dat primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get(); } - final K primaryKey = primaryKeyDeserializer.deserialize( + final KLeft primaryKey = primaryKeyDeserializer.deserialize( primaryKeySerializationPseudoTopic, primaryKeyRaw ); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java index 87366bd5334a9..fe262840a432e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java @@ -44,7 +44,7 @@ public class SubscriptionSendProcessorSupplierTest { private final Processor, String, SubscriptionWrapper> leftJoinProcessor = - new SubscriptionSendProcessorSupplier( + new SubscriptionSendProcessorSupplier( ForeignKeyExtractor.fromFunction(LeftValue::getForeignKey), () -> "subscription-topic-fk", () -> "value-serde-topic", @@ -54,7 +54,7 @@ public class SubscriptionSendProcessorSupplierTest { ).get(); private final Processor, String, SubscriptionWrapper> innerJoinProcessor = - new SubscriptionSendProcessorSupplier( + new SubscriptionSendProcessorSupplier( ForeignKeyExtractor.fromFunction(LeftValue::getForeignKey), () -> "subscription-topic-fk", () -> "value-serde-topic", @@ -329,7 +329,7 @@ public void innerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() { // Bi-function tests: inner join, left join private final Processor, String, SubscriptionWrapper> biFunctionLeftJoinProcessor = - new SubscriptionSendProcessorSupplier( + new SubscriptionSendProcessorSupplier( ForeignKeyExtractor.fromBiFunction((key, value) -> value.getForeignKey() == null ? null : key + value.getForeignKey()), () -> "subscription-topic-fk", () -> "value-serde-topic", @@ -339,7 +339,7 @@ public void innerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() { ).get(); private final Processor, String, SubscriptionWrapper> biFunctionInnerJoinProcessor = - new SubscriptionSendProcessorSupplier( + new SubscriptionSendProcessorSupplier( ForeignKeyExtractor.fromBiFunction((key, value) -> value.getForeignKey() == null ? null : key + value.getForeignKey()), () -> "subscription-topic-fk", () -> "value-serde-topic", @@ -628,6 +628,7 @@ public void biFunctionInnerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNul } private static class LeftValueSerializer implements Serializer { + @SuppressWarnings("resource") @Override public byte[] serialize(final String topic, final LeftValue data) { if (data == null) return null; @@ -648,6 +649,7 @@ public String getForeignKey() { } } + @SuppressWarnings("resource") private static long[] hash(final LeftValue value) { return Murmur3.hash128(new LeftValueSerializer().serialize("value-serde-topic", value)); }