Skip to content

Commit

Permalink
fix: handle revoke with no partitions assigned (issue #214)
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Nov 25, 2023
1 parent f2a3821 commit 1623072
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix>-beta.7</BaseVersionSuffix>
<BaseVersionSuffix>-beta.8</BaseVersionSuffix>
<BaseVersion>4.4.0$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ConsumerChannelsManager(
Func<ISequenceStore> sequenceStoreFactory,
ISilverbackLogger logger)
: base(
consumer.Endpoint.ProcessPartitionsIndependently ? partitions.Count : 1,
GetChannelsCount(partitions, consumer),
consumer.Endpoint.BackpressureLimit,
sequenceStoreFactory)
{
Expand Down Expand Up @@ -76,7 +76,7 @@ public void StartReading()
{
if (_consumer.Endpoint.ProcessPartitionsIndependently)
_partitions.ForEach(StartReading);
else
else if (_partitions.Count > 0)
StartReading(0);
}

Expand Down Expand Up @@ -161,6 +161,11 @@ protected override void Dispose(bool disposing)
_readingSemaphoreSlim.Dispose();
}

private static int GetChannelsCount(IReadOnlyList<TopicPartition> partitions, KafkaConsumer consumer) =>
consumer.Endpoint.ProcessPartitionsIndependently
? partitions.Count
: Math.Min(partitions.Count, 1);

[SuppressMessage("", "VSTHRD110", Justification = Justifications.FireAndForget)]
private void StartReading(int channelIndex)
{
Expand Down

0 comments on commit 1623072

Please sign in to comment.