Skip to content

Commit

Permalink
KAFKA-18644: improve generic type names for internal FK-join classes (#…
Browse files Browse the repository at this point in the history
…18700)

Reviewers: Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax authored Feb 4, 2025
1 parent 9f78771 commit 7719b5f
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Objects;

public class CombinedKey<KF, KP> {
private final KF foreignKey;
private final KP primaryKey;
public class CombinedKey<KRight, KLeft> {
private final KRight foreignKey;
private final KLeft primaryKey;

CombinedKey(final KF foreignKey, final KP primaryKey) {
CombinedKey(final KRight foreignKey, final KLeft primaryKey) {
Objects.requireNonNull(primaryKey, "primaryKey can't be null");
this.foreignKey = foreignKey;
this.primaryKey = primaryKey;
}

public KF foreignKey() {
public KRight foreignKey() {
return foreignKey;
}

public KP primaryKey() {
public KLeft primaryKey() {
return primaryKey;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@
/**
* Factory for creating CombinedKey serializers / deserializers.
*/
public class CombinedKeySchema<KO, K> {
public class CombinedKeySchema<KRight, KLeft> {
private final Supplier<String> undecoratedPrimaryKeySerdeTopicSupplier;
private final Supplier<String> undecoratedForeignKeySerdeTopicSupplier;
private String primaryKeySerdeTopic;
private String foreignKeySerdeTopic;
private Serializer<K> primaryKeySerializer;
private Deserializer<K> primaryKeyDeserializer;
private Serializer<KO> foreignKeySerializer;
private Deserializer<KO> foreignKeyDeserializer;
private Serializer<KLeft> primaryKeySerializer;
private Deserializer<KLeft> primaryKeyDeserializer;
private Serializer<KRight> foreignKeySerializer;
private Deserializer<KRight> foreignKeyDeserializer;

public CombinedKeySchema(final Supplier<String> foreignKeySerdeTopicSupplier,
final Serde<KO> foreignKeySerde,
final Serde<KRight> foreignKeySerde,
final Supplier<String> primaryKeySerdeTopicSupplier,
final Serde<K> primaryKeySerde) {
final Serde<KLeft> primaryKeySerde) {
undecoratedPrimaryKeySerdeTopicSupplier = primaryKeySerdeTopicSupplier;
undecoratedForeignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
Expand All @@ -50,17 +50,17 @@ public CombinedKeySchema(final Supplier<String> foreignKeySerdeTopicSupplier,
foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "resource"})
public void init(final ProcessorContext<?, ?> context) {
primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer<KO>) context.keySerde().deserializer() : foreignKeyDeserializer;
primaryKeySerializer = primaryKeySerializer == null ? (Serializer<KLeft>) context.keySerde().serializer() : primaryKeySerializer;
primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<KLeft>) context.keySerde().deserializer() : primaryKeyDeserializer;
foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KRight>) context.keySerde().serializer() : foreignKeySerializer;
foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer<KRight>) context.keySerde().deserializer() : foreignKeyDeserializer;
}

Bytes toBytes(final KO foreignKey, final K primaryKey) {
Bytes toBytes(final KRight foreignKey, final KLeft primaryKey) {
//The serialization format - note that primaryKeySerialized may be null, such as when a prefixScan
//key is being created.
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
Expand All @@ -79,22 +79,22 @@ Bytes toBytes(final KO foreignKey, final K primaryKey) {
}


public CombinedKey<KO, K> fromBytes(final Bytes data) {
public CombinedKey<KRight, KLeft> fromBytes(final Bytes data) {
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
final byte[] dataArray = data.get();
final ByteBuffer dataBuffer = ByteBuffer.wrap(dataArray);
final int foreignKeyLength = dataBuffer.getInt();
final byte[] foreignKeyRaw = new byte[foreignKeyLength];
dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength);
final KO foreignKey = foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw);
final KRight foreignKey = foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw);

final byte[] primaryKeyRaw = new byte[dataArray.length - foreignKeyLength - Integer.BYTES];
dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length);
final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw);
final KLeft primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw);
return new CombinedKey<>(foreignKey, primaryKey);
}

Bytes prefixBytes(final KO key) {
Bytes prefixBytes(final KRight key) {
//The serialization format. Note that primaryKeySerialized is not required/used in this function.
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@
* <li>{@link #fromBiFunction(BiFunction)} - when the foreign key depends on both key and value</li>
* </ul>
*
* @param <K> Type of primary table's key
* @param <V> Type of primary table's value
* @param <KO> Type of the foreign key to extract
* @param <KLeft> Type of primary table's key
* @param <VLeft> Type of primary table's value
* @param <KRight> Type of the foreign key to extract
*/
@FunctionalInterface
public interface ForeignKeyExtractor<K, V, KO> {
KO extract(K key, V value);
public interface ForeignKeyExtractor<KLeft, VLeft, KRight> {
KRight extract(KLeft key, VLeft value);

static <K, V, KO> ForeignKeyExtractor<? super K, ? super V, ? extends KO> fromFunction(Function<? super V, ? extends KO> function) {
static <KLeft, VLeft, KRight> ForeignKeyExtractor<? super KLeft, ? super VLeft, ? extends KRight> fromFunction(Function<? super VLeft, ? extends KRight> function) {
return (key, value) -> function.apply(value);
}

static <K, V, KO> ForeignKeyExtractor<? super K, ? super V, ? extends KO> fromBiFunction(BiFunction<? super K, ? super V, ? extends KO> biFunction) {
static <KLeft, VLeft, KRight> ForeignKeyExtractor<? super KLeft, ? super VLeft, ? extends KRight> fromBiFunction(BiFunction<? super KLeft, ? super VLeft, ? extends KRight> biFunction) {
return biFunction::apply;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -43,15 +42,16 @@
import java.util.Collections;
import java.util.Set;

public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
public class ForeignTableJoinProcessorSupplier<KLeft, KRight, VRight>
implements ProcessorSupplier<KRight, Change<VRight>, KLeft, SubscriptionResponseWrapper<VRight>> {

private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class);
private final StoreFactory subscriptionStoreFactory;
private final CombinedKeySchema<KO, K> keySchema;
private final CombinedKeySchema<KRight, KLeft> keySchema;
private boolean useVersionedSemantics = false;

public ForeignTableJoinProcessorSupplier(final StoreFactory subscriptionStoreFactory,
final CombinedKeySchema<KO, K> keySchema) {
final CombinedKeySchema<KRight, KLeft> keySchema) {
this.subscriptionStoreFactory = subscriptionStoreFactory;
this.keySchema = keySchema;
}
Expand All @@ -62,7 +62,7 @@ public Set<StoreBuilder<?>> stores() {
}

@Override
public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
public Processor<KRight, Change<VRight>, KLeft, SubscriptionResponseWrapper<VRight>> get() {
return new KTableKTableJoinProcessor();
}

Expand All @@ -75,12 +75,12 @@ public boolean isUseVersionedSemantics() {
return useVersionedSemantics;
}

private final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
private final class KTableKTableJoinProcessor extends ContextualProcessor<KRight, Change<VRight>, KLeft, SubscriptionResponseWrapper<VRight>> {
private Sensor droppedRecordsSensor;
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> subscriptionStore;
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<KLeft>> subscriptionStore;

@Override
public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
public void init(final ProcessorContext<KLeft, SubscriptionResponseWrapper<VRight>> context) {
super.init(context);
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
Expand All @@ -92,7 +92,7 @@ public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> cont
}

@Override
public void process(final Record<KO, Change<VO>> record) {
public void process(final Record<KRight, Change<VRight>> record) {
// if the key is null, we do not need to proceed aggregating
// the record with the table
if (record.key() == null) {
Expand Down Expand Up @@ -122,14 +122,14 @@ public void process(final Record<KO, Change<VO>> record) {
final Bytes prefixBytes = keySchema.prefixBytes(record.key());

//Perform the prefixScan and propagate the results
try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<KLeft>>> prefixScanResults =
subscriptionStore.range(prefixBytes, Bytes.increment(prefixBytes))) {

while (prefixScanResults.hasNext()) {
final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next();
final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<KLeft>>> next = prefixScanResults.next();
// have to check the prefix because the range end is inclusive :(
if (prefixEquals(next.key.get(), prefixBytes.get())) {
final CombinedKey<KO, K> combinedKey = keySchema.fromBytes(next.key);
final CombinedKey<KRight, KLeft> combinedKey = keySchema.fromBytes(next.key);
context().forward(
record.withKey(combinedKey.primaryKey())
.withValue(new SubscriptionResponseWrapper<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -43,23 +42,25 @@
* of the primary key. This eliminates race-condition results for rapidly-changing foreign-keys for a given primary key.
* Applies the join and emits nulls according to LEFT/INNER rules.
*
* @param <K> Type of primary keys
* @param <V> Type of primary values
* @param <VO> Type of foreign values
* @param <VR> Type of joined result of primary and foreign values
* @param <KLeft> Type of primary keys
* @param <VLeft> Type of primary values
* @param <VRight> Type of foreign values
* @param <VOut> Type of joined result of primary and foreign values
*/
public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
public class ResponseJoinProcessorSupplier<KLeft, VLeft, VRight, VOut>
implements ProcessorSupplier<KLeft, SubscriptionResponseWrapper<VRight>, KLeft, VOut> {

private static final Logger LOG = LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class);
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> constructionTimeValueSerializer;
private final KTableValueGetterSupplier<KLeft, VLeft> valueGetterSupplier;
private final Serializer<VLeft> constructionTimeValueSerializer;
private final Supplier<String> valueHashSerdePseudoTopicSupplier;
private final ValueJoiner<? super V, ? super VO, ? extends VR> joiner;
private final ValueJoiner<? super VLeft, ? super VRight, ? extends VOut> joiner;
private final boolean leftJoin;

public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
final Serializer<V> valueSerializer,
public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<KLeft, VLeft> valueGetterSupplier,
final Serializer<VLeft> valueSerializer,
final Supplier<String> valueHashSerdePseudoTopicSupplier,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final ValueJoiner<? super VLeft, ? super VRight, ? extends VOut> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
constructionTimeValueSerializer = valueSerializer;
Expand All @@ -69,24 +70,24 @@ public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> value
}

@Override
public Processor<K, SubscriptionResponseWrapper<VO>, K, VR> get() {
public Processor<KLeft, SubscriptionResponseWrapper<VRight>, KLeft, VOut> get() {
return new ContextualProcessor<>() {
private String valueHashSerdePseudoTopic;
private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
private Serializer<VLeft> runtimeValueSerializer = constructionTimeValueSerializer;

private KTableValueGetter<K, V> valueGetter;
private KTableValueGetter<KLeft, VLeft> valueGetter;
private Sensor droppedRecordsSensor;


@SuppressWarnings({"unchecked", "resource"})
@Override
public void init(final ProcessorContext<K, VR> context) {
public void init(final ProcessorContext<KLeft, VOut> context) {
super.init(context);
valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get();
valueGetter = valueGetterSupplier.get();
valueGetter.init(context);
if (runtimeValueSerializer == null) {
runtimeValueSerializer = (Serializer<V>) context.valueSerde().serializer();
runtimeValueSerializer = (Serializer<VLeft>) context.valueSerde().serializer();
}

final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
Expand All @@ -98,14 +99,14 @@ public void init(final ProcessorContext<K, VR> context) {
}

@Override
public void process(final Record<K, SubscriptionResponseWrapper<VO>> record) {
public void process(final Record<KLeft, SubscriptionResponseWrapper<VRight>> record) {
if (record.value().version() != SubscriptionResponseWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionResponseWrapper. Need to ensure that there is
//compatibility with previous versions to enable rolling upgrades. Must develop a strategy for
//upgrading from older SubscriptionWrapper versions to newer versions.
throw new UnsupportedVersionException("SubscriptionResponseWrapper is of an incompatible version.");
}
final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(record.key());
final ValueAndTimestamp<VLeft> currentValueWithTimestamp = valueGetter.get(record.key());

final long[] currentHash = currentValueWithTimestamp == null ?
null :
Expand All @@ -115,7 +116,7 @@ public void process(final Record<K, SubscriptionResponseWrapper<VO>> record) {

//If this value doesn't match the current value from the original table, it is stale and should be discarded.
if (java.util.Arrays.equals(messageHash, currentHash)) {
final VR result;
final VOut result;

if (record.value().foreignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) {
result = null; //Emit tombstone
Expand Down
Loading

0 comments on commit 7719b5f

Please sign in to comment.