Skip to content

Commit

Permalink
KAFKA-16339: [3/4 KStream#transformValues] Remove Deprecated "transfo…
Browse files Browse the repository at this point in the history
…rmer" methods and classes (#17266)

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
fonsdant authored Nov 22, 2024
1 parent be4ea80 commit 866f0cc
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 1,181 deletions.
511 changes: 26 additions & 485 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
* @see Transformer
* @see ValueTransformer
* @see ValueTransformerSupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @deprecated Since 4.0. Use {@link org.apache.kafka.streams.processor.api.ProcessorSupplier api.ProcessorSupplier} instead.
*/
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@
* @param <VR> transformed value type
* @see ValueTransformerSupplier
* @see ValueTransformerWithKeySupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see KTable#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)
* @see Transformer
* @deprecated Since 4.0. Use {@link FixedKeyProcessor} instead.
*/
Expand Down Expand Up @@ -77,7 +76,7 @@ public interface ValueTransformer<V, VR> {

/**
* Transform the given value to a new value.
* Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerSupplier, String...)
* Additionally, any {@link StateStore} that is {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)
* attached} to this operator can be accessed and modified arbitrarily (cf.
* {@link ProcessorContext#getStateStore(String)}).
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
* @see ValueTransformer
* @see ValueTransformerWithKey
* @see ValueTransformerWithKeySupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @see TransformerSupplier
* @deprecated Since 4.0. Use {@link FixedKeyProcessorSupplier} instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
* @param <VR> transformed value type
* @see ValueTransformer
* @see ValueTransformerWithKeySupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see KTable#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
*/

Expand Down Expand Up @@ -77,7 +76,7 @@ public interface ValueTransformerWithKey<K, V, VR> {

/**
* Transform the given [key and] value to a new value.
* Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* Additionally, any {@link StateStore} that is {@link KTable#transformValues(ValueTransformerWithKeySupplier, Named, String...)
* attached} to this operator can be accessed and modified arbitrarily (cf.
* {@link ProcessorContext#getStateStore(String)}).
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
* @param <VR> transformed value type
* @see ValueTransformer
* @see ValueTransformerWithKey
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @see TransformerSupplier
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K

private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";

private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";

private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";

private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
Expand Down Expand Up @@ -1209,77 +1207,6 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
builder);
}

@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doTransformValues(
toValueTransformerWithKeySupplier(valueTransformerSupplier),
NamedInternal.empty(),
stateStoreNames);
}

@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
Objects.requireNonNull(named, "named can't be null");
return doTransformValues(
toValueTransformerWithKeySupplier(valueTransformerSupplier),
new NamedInternal(named),
stateStoreNames);
}

@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
}

@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
Objects.requireNonNull(named, "named can't be null");
return doTransformValues(valueTransformerSupplier, new NamedInternal(named), stateStoreNames);
}

private <VR> KStream<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier,
final NamedInternal named,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
}
ApiUtils.checkSupplier(valueTransformerWithKeySupplier);

final String name = named.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
stateStoreNames);
transformNode.setValueChangingOperation(true);

builder.addGraphNode(graphNode, transformNode);

// cannot inherit value serde
return new KStreamImpl<>(
name,
keySerde,
null,
subTopologySourceNodes,
repartitionRequired,
transformNode,
builder);
}

@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Set;
Expand Down Expand Up @@ -91,10 +92,7 @@
* @see Topology#addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
* @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, Named, String...)
* @see KStream#processValues(FixedKeyProcessorSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,24 +845,6 @@ public void shouldUseSpecifiedNameForForEachOperation() {
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
}

@Test
@SuppressWarnings("deprecation")
public void shouldUseSpecifiedNameForTransformValues() {
builder.stream(STREAM_TOPIC).transformValues(() -> new NoopValueTransformer<>(), Named.as(STREAM_OPERATION_NAME));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
}

@Test
@SuppressWarnings("deprecation")
public void shouldUseSpecifiedNameForTransformValuesWithKey() {
builder.stream(STREAM_TOPIC).transformValues(() -> new NoopValueTransformerWithKey<>(), Named.as(STREAM_OPERATION_NAME));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
}

@Test
public void shouldUseSpecifiedNameForSplitOperation() {
builder.stream(STREAM_TOPIC)
Expand Down
Loading

0 comments on commit 866f0cc

Please sign in to comment.