From 502888ad2a959652bca5ead796fab15443fc2c3d Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 7 May 2015 21:07:09 +0000 Subject: [PATCH] wip --- config.go | 7 + offset_fetch_response.go | 12 ++ offset_manager.go | 435 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 454 insertions(+) create mode 100644 offset_manager.go diff --git a/config.go b/config.go index 881d630a2..f03a8956c 100644 --- a/config.go +++ b/config.go @@ -111,6 +111,13 @@ type Config struct { // If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled). Errors bool } + + // TODO these need documentation, defaults, and validation + Group string + + Offsets struct { + CommitInterval time.Duration + } } // A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes. diff --git a/offset_fetch_response.go b/offset_fetch_response.go index 0a00b2d43..8727e60a2 100644 --- a/offset_fetch_response.go +++ b/offset_fetch_response.go @@ -80,3 +80,15 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) { return nil } + +func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock { + if r.Blocks == nil { + return nil + } + + if r.Blocks[topic] == nil { + return nil + } + + return r.Blocks[topic][partition] +} diff --git a/offset_manager.go b/offset_manager.go new file mode 100644 index 000000000..4b89bf0b5 --- /dev/null +++ b/offset_manager.go @@ -0,0 +1,435 @@ +package sarama + +import ( + "sync" + "time" +) + +// Offset Manager + +// OffsetManager uses Kafka to store and fetch consumed partition offsets. +type OffsetManager interface { + // ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will + // return an error if this OffsetManager is already managing the given topic/partition. + ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) +} + +type offsetManager struct { + client Client + conf *Config + + lock sync.Mutex + poms map[string]map[int32]*partitionOffsetManager + boms map[*Broker]*brokerOffsetManager +} + +// NewOffsetManagerFromClient creates a new OffsetManager from the given client. +// It is still necessary to call Close() on the underlying client when finished with the partition manager. +func NewOffsetManagerFromClient(client Client) (OffsetManager, error) { + // Check that we are not dealing with a closed Client before processing any other arguments + if client.Closed() { + return nil, ErrClosedClient + } + + om := &offsetManager{ + client: client, + conf: client.Config(), + poms: make(map[string]map[int32]*partitionOffsetManager), + boms: make(map[*Broker]*brokerOffsetManager), + } + + return om, nil +} + +func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) { + pom, err := om.newPartitionOffsetManager(topic, partition) + if err != nil { + return nil, err + } + + om.lock.Lock() + defer om.lock.Unlock() + + topicManagers := om.poms[topic] + if topicManagers == nil { + topicManagers = make(map[int32]*partitionOffsetManager) + om.poms[topic] = topicManagers + } + + if topicManagers[partition] != nil { + return nil, ConfigurationError("That topic/partition is already being managed") + } + + topicManagers[partition] = pom + return pom, nil +} + +func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager { + om.lock.Lock() + defer om.lock.Unlock() + + bom := om.boms[broker] + if bom == nil { + bom = om.newBrokerOffsetManager(broker) + om.boms[broker] = bom + } + + bom.refs++ + + return bom +} + +func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) { + om.lock.Lock() + defer om.lock.Unlock() + + bom.refs-- + + if bom.refs == 0 { + close(bom.updateSubscriptions) + if om.boms[bom.broker] == bom { + delete(om.boms, bom.broker) + } + } +} + +func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) { + om.lock.Lock() + defer om.lock.Unlock() + + delete(om.boms, bom.broker) +} + +// Partition Offset Manager + +// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() +// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes +// out of scope. +type PartitionOffsetManager interface { + // Offset returns the current offset according to the manager; this value has not necessarily + // been flushed to the cluster yet. + Offset() int64 + // SetOffset sets the current offset according to the manager; this value (or a subsequent update) + // will eventually be flushed to the cluster based on configuration. + SetOffset(offset int64) + + // Metadata returns the current metadata according to the manager; this value has not necessarily + // been flushed to the cluster yet. + Metadata() string + // SetMetadata sets the current metadata according to the manager; this value (or a subsequent update) + // will eventually be flushed to the cluster based on configuration. + SetMetadata(metadata string) + + // Errors returns a read channel of errors that occur during offset management, if enabled. By default, + // errors are logged and not returned over this channel. If you want to implement any custom error + // handling, set your config's Consumer.Return.Errors setting to true, and read from this channel. + Errors() <-chan *ConsumerError + // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately, + // after which you should wait until the 'errors' channel has been drained and closed. + // It is required to call this function, or Close before a consumer object passes out of scope, + // as it will otherwise leak memory. You must call this before calling Close on the underlying + // client. + AsyncClose() + // Close stops the PartitionOffsetManager from managing offsets. It is required to call this function + // (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise + // leak memory. You must call this before calling Close on the underlying client. + Close() error +} + +type partitionOffsetManager struct { + parent *offsetManager + topic string + partition int32 + + lock sync.Mutex + offset int64 + metadata string + broker *brokerOffsetManager + + errors chan *ConsumerError + rebalance chan none + dying chan none +} + +func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) { + pom := &partitionOffsetManager{ + parent: om, + topic: topic, + partition: partition, + errors: make(chan *ConsumerError, om.conf.ChannelBufferSize), + rebalance: make(chan none, 1), + dying: make(chan none), + } + + if err := pom.selectBroker(); err != nil { + return nil, err + } + + if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil { + return nil, err + } + + go withRecover(pom.mainLoop) + + return pom, nil +} + +func (pom *partitionOffsetManager) mainLoop() { + for { + select { + case <-pom.rebalance: + if err := pom.selectBroker(); err != nil { + pom.handleError(err) + pom.rebalance <- none{} + } + case <-pom.dying: + if pom.broker != nil { + select { + case <-pom.rebalance: + case pom.broker.updateSubscriptions <- pom: + } + pom.parent.unrefBrokerOffsetManager(pom.broker) + } + return + } + } +} + +func (pom *partitionOffsetManager) selectBroker() error { + if pom.broker != nil { + pom.parent.unrefBrokerOffsetManager(pom.broker) + pom.broker = nil + } + + var broker *Broker + var err error + + if err = pom.parent.client.RefreshCoordinator(pom.parent.conf.Consumer.Group); err != nil { + return err + } + + if broker, err = pom.parent.client.Coordinator(pom.parent.conf.Consumer.Group); err != nil { + return err + } + + pom.broker = pom.parent.refBrokerOffsetManager(broker) + pom.broker.updateSubscriptions <- pom + return nil +} + +func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error { + request := new(OffsetFetchRequest) + request.Version = 1 + request.ConsumerGroup = pom.parent.conf.Consumer.Group + request.AddPartition(pom.topic, pom.partition) + + response, err := pom.broker.broker.FetchOffset(request) + if err != nil { + return err + } + + block := response.GetBlock(pom.topic, pom.partition) + if block == nil { + return ErrIncompleteResponse + } + + switch block.Err { + case ErrNoError: + pom.offset = block.Offset + pom.metadata = block.Metadata + return nil + case ErrNotCoordinatorForConsumer: + if retries <= 0 { + return err + } + if err := pom.selectBroker(); err != nil { + return err + } + return pom.fetchInitialOffset(retries - 1) + case ErrOffsetsLoadInProgress: + if retries <= 0 { + return err + } + time.Sleep(pom.parent.conf.Metadata.Retry.Backoff) + return pom.fetchInitialOffset(retries - 1) + default: + return err + } +} + +func (pom *partitionOffsetManager) handleError(err error) { + cErr := &ConsumerError{ + Topic: pom.topic, + Partition: pom.partition, + Err: err, + } + + if pom.parent.conf.Consumer.Return.Errors { + pom.errors <- cErr + } else { + Logger.Println(cErr) + } +} + +func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError { + return pom.errors +} + +func (pom *partitionOffsetManager) SetOffset(offset int64) { + pom.lock.Lock() + defer pom.lock.Unlock() + + pom.offset = offset +} + +func (pom *partitionOffsetManager) SetMetadata(metadata string) { + pom.lock.Lock() + defer pom.lock.Unlock() + + pom.metadata = metadata +} + +func (pom *partitionOffsetManager) Offset() int64 { + pom.lock.Lock() + defer pom.lock.Unlock() + + return pom.offset +} + +func (pom *partitionOffsetManager) Metadata() string { + pom.lock.Lock() + defer pom.lock.Unlock() + + return pom.metadata +} + +func (pom *partitionOffsetManager) AsyncClose() { + close(pom.dying) +} + +func (pom *partitionOffsetManager) Close() error { + pom.AsyncClose() + + var errors ConsumerErrors + for err := range pom.errors { + errors = append(errors, err) + } + + if len(errors) > 0 { + return errors + } + return nil +} + +// Broker Offset Manager + +type brokerOffsetManager struct { + parent *offsetManager + broker *Broker + timer *time.Ticker + updateSubscriptions chan *partitionOffsetManager + subscriptions map[*partitionOffsetManager]none + refs int +} + +func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager { + bom := &brokerOffsetManager{ + parent: om, + broker: broker, + timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval), + updateSubscriptions: make(chan *partitionOffsetManager), + subscriptions: make(map[*partitionOffsetManager]none), + } + + go withRecover(bom.mainLoop) + + return bom +} + +func (bom *brokerOffsetManager) mainLoop() { + for { + select { + case <-bom.timer.C: + bom.flushToBroker() + case s, ok := <-bom.updateSubscriptions: + if !ok { + bom.timer.Stop() + return + } + if _, ok := bom.subscriptions[s]; ok { + delete(bom.subscriptions, s) + } else { + bom.subscriptions[s] = none{} + } + } + } +} + +func (bom *brokerOffsetManager) flushToBroker() { + request := bom.constructRequest() + response, err := bom.broker.CommitOffset(request) + + if err != nil { + bom.abort(err) + } + + for s := range bom.subscriptions { + var err KError + var ok bool + + if response.Errors[s.topic] == nil { + s.handleError(ErrIncompleteResponse) + delete(bom.subscriptions, s) + s.rebalance <- none{} + continue + } + if err, ok = response.Errors[s.topic][s.partition]; !ok { + s.handleError(ErrIncompleteResponse) + delete(bom.subscriptions, s) + s.rebalance <- none{} + continue + } + + switch err { + case ErrNoError: + break + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: + delete(bom.subscriptions, s) + s.rebalance <- none{} + default: + s.handleError(err) + delete(bom.subscriptions, s) + s.rebalance <- none{} + } + } +} + +func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest { + r := &OffsetCommitRequest{ + Version: 1, + ConsumerGroup: bom.parent.conf.Consumer.Group, + } + for s := range bom.subscriptions { + s.lock.Lock() + r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata) + s.lock.Unlock() + } + return r +} + +func (bom *brokerOffsetManager) abort(err error) { + _ = bom.broker.Close() // we don't care about the error this might return, we already have one + bom.parent.abandonBroker(bom) + + for pom := range bom.subscriptions { + pom.handleError(err) + pom.rebalance <- none{} + } + + for s := range bom.updateSubscriptions { + if _, ok := bom.subscriptions[s]; !ok { + s.handleError(err) + s.rebalance <- none{} + } + } +}