Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OffsetManager Implementation #461

Merged
merged 24 commits into from
Aug 24, 2015
Merged

OffsetManager Implementation #461

merged 24 commits into from
Aug 24, 2015

Conversation

eapache
Copy link
Contributor

@eapache eapache commented May 29, 2015

There are still some TODOs sprinkled throughout, but not all of them necessarily need to be fixed before we ship this (it's already pretty close to the minimum viable implementation). The only critical missing piece is implementing AsyncClose for the PartitionOffsetManager.

Regardless, it's at the state where it can be opened up for comments. This should completely supersede #379.

@eapache
Copy link
Contributor Author

eapache commented Jun 1, 2015

@Shopify/kafka

@piotrkowalczuk
Copy link

What is approximate ETA?

@wvanbergen
Copy link
Contributor

We don't have any direct need for this, so finishing this PR does not have high priority for us. If you're willing to work on this, the following things need to be added before we can merge it:

  • AsyncClose
  • unit tests
  • functional tests

@eapache
Copy link
Contributor Author

eapache commented Jun 9, 2015

I just added a quick and dirty AsyncClose implementation, so this is theoretically feature-complete in a minimal state, but I can pretty much guarantee it's broken since I've never actually run it.

@eapache
Copy link
Contributor Author

eapache commented Jul 9, 2015

It's not clear to me if kafka-based offset management is functional in 0.8.2 since v1 of the request requires fields which won't have meaning until the full consumer rewrite lands in 0.9 (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design)

@wvanbergen
Copy link
Contributor

It's functional; when I was testing this it was successfully storing and retrieving offsets and metadata.

@eapache
Copy link
Contributor Author

eapache commented Jul 9, 2015

What did you put in those fields then? Zeros?

(Specifically: ConsumerGroupGeneration, and ConsumerID)

@wvanbergen
Copy link
Contributor

Yup, I just didn't set them: https://github.com/Shopify/sarama/pull/379/files

@eapache eapache force-pushed the offset-manager branch 2 times, most recently from dc91d49 to 502888a Compare July 13, 2015 17:50
@horkhe
Copy link
Contributor

horkhe commented Jul 23, 2015

Hey guys, I was looking at the offset manager interface from the end user perspective and I have a couple of suggestions to make:

  • It would be nice if consumer group was not read from config, but rather accepted as a parameter along with topic and partition. That would allow using one offset manager for many consumer groups.
  • Why does the user should be concerned with creating a committer for each particular topic/partition? This seems to be unnecessary complication. Why not make the OffsetConsumer take care of that and provide a stupid simple interface. e.g.:
type OffsetResult struct {
    offset int64
    err error
}

type OffsetManger interface {
    FetchOffset(group, topic string, partition int32, replyTo chan<- OffsetResult)
    CommitOffset(group, topic string, partition int32, offset int64, replyTo chan<- error)
    Close() error
}

NewOffsetManagerWithClient(c *Client) (*OffsetManger, error)

I understand that you were modeling the API to align with Consumer/PartitionConsumer, but the OffsetManager seems to be a higher level abstraction that asks for a simpler interface. Just think how much code needs to be written to make use of the OffsetManager implemented in this PR comparing to the usage of the suggested interface.

This is certainly just my subjective opinion. In any case thank you very much, guys, for what you are doing.

@eapache
Copy link
Contributor Author

eapache commented Jul 24, 2015

allow using one offset manager for many consumer groups

That's actually surprisingly difficult. While you can include multiple topics and partitions in a single offset fetch or commit, you can only include a single consumer group (this is a limitation of the kafka protocol) so having an OffsetManager support multiple consumer groups would require a great deal more work.

Why does the user should be concerned with creating a committer for each particular topic/partition?

Eventually they won't have to create anything at all, it will be baked into the consumer (so creating a PartitionConsumer will automatically create a PartitionOffsetManager behind the scenes that keeps itself up-to-date with the messages you've consumed). I just haven't written that part yet :)

I understand that you were modeling the API to align with Consumer/PartitionConsumer

Yup, see my previous answer.

@horkhe
Copy link
Contributor

horkhe commented Jul 27, 2015

@eapache makes sense. Thank you for the clarification.

@horkhe
Copy link
Contributor

horkhe commented Jul 28, 2015

@eapache Could you please consider making consumer group into a parameter of NewOffsetManager, for having it in sarama.Config forces us to use one sarama.Client per offset manager.

@aaronkavlie-wf
Copy link
Contributor

We're using Sarama, and looking at our options for offset management.

Is this PR still a low priority for you?

@wvanbergen
Copy link
Contributor

@aaronkavlie-wf We are not in need of this feature ourselves (we use Zookeeper to manage our offsets). To get this rolling again, it would be great if you can try out this branch to a) see if it works and b) give feedback on the proposed API.

@horkhe
Copy link
Contributor

horkhe commented Jul 29, 2015

@eapache @wvanbergen I have tried to use this offset manager in our balanced consumer implementation here are my comments (some of them have already been mentioned above by I decided to list them all here anyway):

  • It would be nice to have group as a OffsetManager.ManageParitition parameter or at least as a NewOffsetManager parameter rather than a part of Config. The later approach forces the user to make a client connection per consumer group;
  • metadata should probably be a parameter of SetOffset rather than settable on its own.
  • PartitionOffsetManager.Close blocks forever because errors channel is never closed in the implementation.
  • If a PartitionOffsetManager is closed a pending offset may not be committed.

@aaronkavlie-wf
Copy link
Contributor

@wvanbergen are you using your kafka repo for offset management (in production)? Or something else?

@wvanbergen
Copy link
Contributor

@aaronkavlie-wf yup that is what we are using.

@aaronkavlie-wf
Copy link
Contributor

I tried this out yesterday and did some simple testing. It seems to be storing/retrieving offsets just fine.

I agree with @horkhe that it's cumbersome to manage the partition offset managers; baking it into the consumer as @eapache mentions will be a welcome improvement.

@horkhe
Copy link
Contributor

horkhe commented Aug 4, 2015

@aaronkavlie-wf beware of the issues that I listed above.

@horkhe
Copy link
Contributor

horkhe commented Aug 4, 2015

@aaronkavlie-wf I have been thinking about offset manager interface and actually agree with @eapache that it is not a higher level abstraction as I thought before but ruther a lower level building block that along with partition consumer can be used to build a high level balanced consumer.

But the way that we implemented it in our clone is a little bit different. It was to ensure that commits are never lost during shutdown and that it is easy to use with multiple consumer groups. I am not sure if it is suitable for the master but maybe later I will submit a PR for you to consider.

An interface that we come up with is a bit different:

// OffsetManager uses Kafka to store consumed partition offsets.
type OffsetManager interface {
    // ManagePartition creates a PartitionOffsetManager on the given
    // group/topic/partition. It returns an error if this OffsetManager is
    // already managing the given group/topic/partition.
    ManagePartition(group, topic string, partition int32) (PartitionOffsetManager, error)

    // Close terminates all spawned PartitionOffsetManager's and waits for them
    // to commit pending offsets. So it is not necessary to call Close methods
    // of spawned PartitionOffsetManagers explicitly.
    Close()
}

// PartitionOffsetManager uses Kafka to fetch consumed partition offsets. You
// MUST call Close() on a partition offset manager to avoid leaks, it will not
// be garbage-collected automatically when it passes out of scope.
type PartitionOffsetManager interface {
    // InitialOffset returns a channel that an initial offset will be sent down
    // to, when retrieved.
    InitialOffset() <-chan FetchedOffset

    // CommitOffset triggers saving of the specified commit in Kafka. Actual
    // commits are performed periodically in a background goroutine. The commit
    // interval is configured by `Config.Consumer.Offsets.CommitInterval`.
    CommitOffset(offset int64, metadata string)

    // Errors returns a read channel of errors that occur during offset
    // management, if enabled. By default errors are not returned. If you want
    // to implement any custom error handling logic then you need to set
    // `Consumer.Return.Errors` to true, and read from this channel.
    Errors() <-chan *OffsetCommitError

    // Close stops the PartitionOffsetManager from managing offsets. It is
    // required to call this function before a PartitionOffsetManager object
    // passes out of scope, as it will otherwise leak memory.
    Close()
}

type FetchedOffset struct {
    Offset   int64
    Metadata string
}

type OffsetCommitError struct {
    Group     string
    Topic     string
    Partition int32
    Err       error
}

The implementation is here.

@eapache
Copy link
Contributor Author

eapache commented Aug 4, 2015

Most recent push makes some of the API adjustments that have been suggested (group is now per-offset-manager, and offset/metadata are set/fetched together). It also fixes the missing close(error).

I haven't figured out the best behaviour yet regarding "If a PartitionOffsetManager is closed a pending offset may not be committed.", since we probably don't want to spin forever trying to commit an offset before we give up, but there's also no harm in that as long as Close hasn't been called... do we have the user configure a retry-count that is only valid when shutting down? That seems a bit odd.

@aaronkavlie-wf
Copy link
Contributor

@eapache Did you amend to your previous commit? This PR is only showing a single commit, "wip".

@aaronkavlie-wf
Copy link
Contributor

I was actually starting on some tests; separate commits would make it easier to merge those in.

@eapache
Copy link
Contributor Author

eapache commented Aug 5, 2015

Did you amend to your previous commit?

Ya, when I'm working on something incomplete like this I'll typically always just amend a single WIP commit.

I was actually starting on some tests; separate commits would make it easier to merge those in.

Sure, I can do that in future. I'll do one more amend to give it a useful commit message and then I'll put any other changes as separate commits on top.

aaronkavlie-wf and others added 10 commits August 18, 2015 15:58
Includes fix to error returns in offset_manager.go
Includes small fix to offset manager -- return to avoid
checking nil response.
- Make test function names more verbose (for targeting with -run)
- safeClose io.Closers to catch errors
- Remove debugging time.Sleep call
- Don't use globals for client, brokers, etc.
Add Close() OffsetManager interface
@aaronkavlie-wf
Copy link
Contributor

Are there any remaining todos for this PR?

// Errors returns a read channel of errors that occur during offset management, if enabled. By default,
// errors are logged and not returned over this channel. If you want to implement any custom error
// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
Errors() <-chan *ConsumerError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have an OffsetManagerError instead? I think I am OK with ConsumerError given that offset management only concerns the consumer, but I wanted top bring it up anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They'd have exactly the same contents, and your point about offset management only concerning the consumer also came to mind. We're on the same page here to leave it as is.

@wvanbergen
Copy link
Contributor

@aaronkavlie-wf Just one outstanding PR to finish up: #520, and the question about the error type, but after we resolve those we can merge this I think.

I am mostly concerned about getting the interface right; if we find any bugs we can fix them later, but fixing the interface is harder due toe API stability guarantees.

@wvanbergen
Copy link
Contributor

#520 proposes some API changes, and fixes a bug.

@wvanbergen
Copy link
Contributor

I am happy with this now, but @eapache may want to take a final look.

@eapache
Copy link
Contributor Author

eapache commented Aug 21, 2015

I will do a final walkthrough (mostly for my own benefit) this weekend some time. If I don't find anything, and nobody else raises any last-minute issues, we can merge this as-is on Monday.

@eapache eapache changed the title [WIP] OffsetManager Implementation OffsetManager Implementation Aug 21, 2015
@eapache
Copy link
Contributor Author

eapache commented Aug 21, 2015

@horkhe you may be interested in the minor API changes we made:

  • Offset (the getter) is now NextOffset and returns +1 the current offset so it is always the correct offset to start consuming at
  • SetOffset (the setter) is now MarkOffset as that seems more precise

@eapache
Copy link
Contributor Author

eapache commented Aug 24, 2015

A few more questions to consider (none of them block merging this though):

  • Should MarkOffset validate that the offset being marked is > (or >=) the offset already stored?
  • Since producing a message to kafka is much cheaper than writing a zookeeper node, should the commit interval default to a lower value (1 second maybe)?
  • Do we need any other "flush triggers" than a simple timer (maybe based on number of uncommitted offsets or something)?
  • Should we expose a Dirty() bool reader on the POM?
  • Would it be handy to expose a Flush() method on the POM which blocks until !dirty (just like the equivalent syscall)?

@eapache
Copy link
Contributor Author

eapache commented Aug 24, 2015

OK, I am happy with this now. I would probably like to have a few more of the above questions answered before we put this into a versioned release (1.6?) but it can go into master now and we iterate from there.

eapache added a commit that referenced this pull request Aug 24, 2015
@eapache eapache merged commit 23a7cd9 into master Aug 24, 2015
@eapache eapache deleted the offset-manager branch August 24, 2015 15:10
@wvanbergen
Copy link
Contributor

Should MarkOffset validate that the offset being marked is > (or >=) the offset already stored?

I see a valid use case of "resetting" a consumer by setting all the offsets to a negative value. However, that might be easier using low level primitives (Coordinator() and crafting a custom request). So I am OK with adding this.

Since producing a message to kafka is much cheaper than writing a zookeeper node, should the commit interval default to a lower value (1 second maybe)?

👍 to 1 sec interval by default.

Do we need any other "flush triggers" than a simple timer (maybe based on number of uncommitted offsets or something)?

I have no problem if this gets added, but let's first see if anybody is actually interested in this.

Should we expose a Dirty() bool reader on the POM?
Would it be handy to expose a Flush() method on the POM which blocks until !dirty (just like the equivalent syscall)?

I don't think Dirty() bool is not too useful without a Flush() error, so it only makes sense if we add them together. I am not so sure if I can think of good use cases for this. The only time you really want to be sure the last offset is flushed is when you stop consuming from the partition. In that case you should always call Close()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants