Skip to content

Commit

Permalink
MINOR: fix NPE in KS Topology for new AutoOffsetReset (#18780)
Browse files Browse the repository at this point in the history
Introduced via KIP-1106.

Reviewers: Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax authored Feb 4, 2025
1 parent ab8ef87 commit ce6f078
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 21 deletions.
72 changes: 64 additions & 8 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,14 @@ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final String... topics) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, null, null, topics);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
null,
null,
null,
topics
);
return this;
}

Expand Down Expand Up @@ -215,7 +222,14 @@ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, null, null, topicPattern);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
null,
null,
null,
topicPattern
);
return this;
}

Expand Down Expand Up @@ -304,7 +318,14 @@ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffset
final TimestampExtractor timestampExtractor,
final String name,
final String... topics) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, topics);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
timestampExtractor,
null,
null,
topics
);
return this;
}

Expand Down Expand Up @@ -351,7 +372,14 @@ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffset
final TimestampExtractor timestampExtractor,
final String name,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, topicPattern);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
timestampExtractor,
null,
null,
topicPattern
);
return this;
}

Expand Down Expand Up @@ -457,7 +485,14 @@ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffset
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final String... topics) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, valueDeserializer, topics);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
null,
keyDeserializer,
valueDeserializer,
topics
);
return this;
}

Expand Down Expand Up @@ -514,7 +549,14 @@ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffset
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, valueDeserializer, topicPattern);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
null,
keyDeserializer,
valueDeserializer,
topicPattern
);
return this;
}

Expand Down Expand Up @@ -571,7 +613,14 @@ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffset
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final String... topics) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topics
);
return this;
}

Expand Down Expand Up @@ -634,7 +683,14 @@ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffset
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topicPattern
);
return this;
}

Expand Down
25 changes: 12 additions & 13 deletions streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ public void shouldNotAllowNullNameWhenAddingProcessor() {

@Test
public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
assertThrows(NullPointerException.class, () -> topology.addProcessor("name",
(ProcessorSupplier<Object, Object, Object, Object>) null));
assertThrows(NullPointerException.class, () -> topology.addProcessor("name", null));
}

@Test
Expand Down Expand Up @@ -376,6 +375,7 @@ public Set<StoreBuilder<?>> stores() {
}
}

@SuppressWarnings("resource")
@Test
public void shouldThrowOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
Expand Down Expand Up @@ -411,7 +411,7 @@ private static class LocalMockProcessorSupplier implements ProcessorSupplier<Obj

@Override
public Processor<Object, Object, Object, Object> get() {
return new Processor<Object, Object, Object, Object>() {
return new Processor<>() {
@Override
public void init(final ProcessorContext<Object, Object> context) {
context.getStateStore(STORE_NAME);
Expand Down Expand Up @@ -1157,7 +1157,7 @@ public void streamStreamOuterJoinTopologyWithCustomStoresSuppliers() {
public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
final StreamsBuilder builder = new StreamsBuilder();

final TopicNameExtractor<Object, Object> topicNameExtractor = new TopicNameExtractor<Object, Object>() {
final TopicNameExtractor<Object, Object> topicNameExtractor = new TopicNameExtractor<>() {
@Override
public String extract(final Object key, final Object value, final RecordContext recordContext) {
return recordContext.topic() + "-" + key;
Expand Down Expand Up @@ -2257,16 +2257,16 @@ private Topology topologyWithStaticTopicName() {

private TopologyDescription.Source addSource(final String sourceName,
final String... sourceTopic) {
topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourceTopic);
final StringBuilder allSourceTopics = new StringBuilder(sourceTopic[0]);
for (int i = 1; i < sourceTopic.length; ++i) {
allSourceTopics.append(", ").append(sourceTopic[i]);
}
topology.addSource((AutoOffsetReset) null, sourceName, null, null, null, sourceTopic);
return new InternalTopologyBuilder.Source(sourceName, new HashSet<>(Arrays.asList(sourceTopic)), null);
}

@SuppressWarnings("deprecation")
private TopologyDescription.Source addSource(final String sourceName,
final Pattern sourcePattern) {
// we still test the old `Topology.AutoOffsetReset` here, to increase test coverage
// (cf `addSource` about which used the new one)
// When can rewrite this to the new one, when the old one is removed
topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourcePattern);
return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern);
}
Expand Down Expand Up @@ -2338,7 +2338,6 @@ private TopologyDescription.Sink addSink(final String sinkName,
return expectedSinkNode;
}

@Deprecated // testing old PAPI
private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
final String sourceName,
final String globalTopicName,
Expand Down Expand Up @@ -2441,17 +2440,17 @@ public void shouldWrapProcessors() {
topology.addSource("source", "topic");
topology.addProcessor(
"p1",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
() -> record -> System.out.println("Processing: " + random.nextInt()),
"source"
);
topology.addProcessor(
"p2",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
() -> record -> System.out.println("Processing: " + random.nextInt()),
"p1"
);
topology.addProcessor(
"p3",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
() -> record -> System.out.println("Processing: " + random.nextInt()),
"p2"
);
assertThat(counter.numWrappedProcessors(), is(3));
Expand Down

0 comments on commit ce6f078

Please sign in to comment.