Skip to content

Commit

Permalink
[fix][broker] Fix NPE after publishing a tombstone to the service uni…
Browse files Browse the repository at this point in the history
…t channel

### Motivation

NPE will happen in `UnloadManager#handleEvent` after
apache#22743. It's because the `Init`
state is always associated with a null `ServiceUnitStateData`.

```
java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.force()" because "data" is null
        at org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager.handleEvent(UnloadManager.java:204) ~[classes/:?]
        at org.apache.pulsar.broker.loadbalance.extensions.channel.StateChangeListeners.lambda$notify$3(StateChangeListeners.java:74) ~[classes/:?]
        at java.base/java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:807) ~[?:?]
        at org.apache.pulsar.broker.loadbalance.extensions.channel.StateChangeListeners.notify(StateChangeListeners.java:72) ~[classes/:?]
        at org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.handleInitEvent(ServiceUnitStateChannelImpl.java:902) ~[classes/:?]
        at org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.handle(ServiceUnitStateChannelImpl.java:715) ~[classes/:?]
```

### Modifications

In `UnloadManager#handleEvent`, assume `data` is null and call
`complete` directly. Fix `UnloadManagerTest`, which passes a non-null
`ServiceUnitStateData` and `Init` to `handleEvent`.
  • Loading branch information
BewareMyPower committed Jun 6, 2024
1 parent d74010c commit 9a30a17
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Histogram;
import java.util.Map;
Expand Down Expand Up @@ -201,9 +202,8 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
}
}
case Init -> {
if (data.force()) {
complete(serviceUnit, t);
}
checkArgument(data == null, "Init state must be associated with null data");
complete(serviceUnit, t);
}
case Owned -> complete(serviceUnit, t);
case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
assertEquals(inFlightUnloadRequestMap.size(), 1);

// Success with Init state.
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, false, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, true, VERSION_ID_INIT), null);
manager.handleEvent(bundle, null, null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);
Expand Down

0 comments on commit 9a30a17

Please sign in to comment.