Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Null pointer exception when starting another instance for the same consumer group #35

Closed
bayan opened this issue Mar 4, 2015 · 2 comments · Fixed by #37
Closed

Null pointer exception when starting another instance for the same consumer group #35

bayan opened this issue Mar 4, 2015 · 2 comments · Fixed by #37

Comments

@bayan
Copy link

bayan commented Mar 4, 2015

I start multiple processes which join the same consumer group.

If there are processes that are currently consuming kafka messages while a new consumer process is spawned, often some of the existing processes panic and crash with a null pointer exception (NPE) when I try to commit the latest offset (CommitUpto).

The NPE occurs because the partition has been deleted from the zookeeperOffsetManager in the following function:

func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool {
    zom.l.RLock()
    defer zom.l.RUnlock()
    return zom.offsets[topic][partition].markAsProcessed(offset)
}

My local work around has been to add a check and short circuit:

func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool {
    zom.l.RLock()
    defer zom.l.RUnlock()
    currentOffset := zom.offsets[topic][partition]
    if currentOffset == nil {
        return false
    }
    return currentOffset.markAsProcessed(offset)
}
@wvanbergen
Copy link
Owner

I managed to reproduce this. It's actually a pretty basic mistake - it will finalize the partition and commit the offset once it is done receiving messages from it, but that doesn't mean all the messages are processed yet - based on what your app does this can take a while.

Your workaround works, but it will basically mean that you will reprocess some messages because the offset didn't get committed. While that is OK within Kafka's at least once guarantee, I'd like to prevent tis if possible. Looking into it.

@bayan
Copy link
Author

bayan commented Mar 8, 2015

Thanks for investigating.

Yes, messages will be reprocessed with my workaround.

FWIW I was able to mitigate the redundant processing by setting the ChannelBufferSize to zero, which is ok for my own use case, since my throughput of messages is relatively low (I have less than 100 messages per minute, but each one can take many minutes to be processed).

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants