Skip to content

Commit

Permalink
Merge pull request #48 from jehiah/concurrent_handlers_48
Browse files Browse the repository at this point in the history
separate handlers for concurrent handlers
  • Loading branch information
mreiferson committed Jul 7, 2014
2 parents c8fa005 + 8068575 commit 463df49
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 15 deletions.
39 changes: 27 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Handler interface {
// HandlerFunc is a convenience type to avoid having to declare a struct
// to implement the Handler interface, it can be used like this:
//
// consumer.SetHandler(nsq.HandlerFunc(func(m *Message) error {
// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
// // handle the message
// })
type HandlerFunc func(message *Message) error
Expand Down Expand Up @@ -103,6 +103,7 @@ type Consumer struct {
wg sync.WaitGroup
runningHandlers int32
stopFlag int32
connectedFlag int32
stopHandler sync.Once

// read from this channel to block until consumer is cleanly stopped
Expand Down Expand Up @@ -226,9 +227,19 @@ func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
//
// A goroutine is spawned to handle continual polling.
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
if atomic.LoadInt32(&r.stopFlag) == 1 {
return errors.New("consumer stopped")
}
if atomic.LoadInt32(&r.runningHandlers) == 0 {
return errors.New("no handlers")
}

if err := validatedLookupAddr(addr); err != nil {
return err
}

atomic.StoreInt32(&r.connectedFlag, 1)

r.mtx.Lock()
for _, x := range r.lookupdHTTPAddrs {
if x == addr {
Expand Down Expand Up @@ -416,6 +427,8 @@ func (r *Consumer) ConnectToNSQD(addr string) error {
return errors.New("no handlers")
}

atomic.StoreInt32(&r.connectedFlag, 1)

_, pendingOk := r.pendingConnections[addr]
r.mtx.RLock()
_, ok := r.connections[addr]
Expand Down Expand Up @@ -871,26 +884,28 @@ func (r *Consumer) stopHandlers() {
})
}

// SetHandler sets the Handler for messages received by this Consumer.
// AddHandler sets the Handler for messages received by this Consumer. This can be called
// multiple times to add additional handlers. Handler will have a 1:1 ration to message handling goroutines.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) SetHandler(handler Handler) {
r.setHandlers(handler, 1)
func (r *Consumer) AddHandler(handler Handler) {
r.AddConcurrentHandlers(handler, 1)
}

// SetConcurrentHandlers sets the Handler for messages received by this Consumer. It
// AddConcurrentHandlers sets the Handler for messages received by this Consumer. It
// takes a second argument which indicates the number of goroutines to spawn for
// message handling.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) SetConcurrentHandlers(handler Handler, concurrency int) {
r.setHandlers(handler, concurrency)
}

func (r *Consumer) setHandlers(handler Handler, concurrency int) {
if atomic.LoadInt32(&r.runningHandlers) > 0 {
panic("cannot call setHandlers() multiple times")
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
if atomic.LoadInt32(&r.connectedFlag) == 1 {
panic("already connected")
}

atomic.AddInt32(&r.runningHandlers, int32(concurrency))
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler)
Expand Down
2 changes: 1 addition & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func consumerTest(t *testing.T, deflate bool, snappy bool, tlsv1 bool) {
t: t,
q: q,
}
q.SetHandler(h)
q.AddHandler(h)

SendMessage(t, 4151, topicName, "put", []byte(`{"msg":"single"}`))
SendMessage(t, 4151, topicName, "mput", []byte("{\"msg\":\"double\"}\n{\"msg\":\"double\"}"))
Expand Down
2 changes: 1 addition & 1 deletion mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestConsumerBackoff(t *testing.T) {
config.BackoffMultiplier = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(logger, LogLevelDebug)
q.SetHandler(&testHandler{})
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
t.Fatalf(err.Error())
Expand Down
2 changes: 1 addition & 1 deletion producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func readMessages(topicName string, t *testing.T, msgCount int) {
t: t,
q: q,
}
q.SetHandler(h)
q.AddHandler(h)

err := q.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
Expand Down

0 comments on commit 463df49

Please sign in to comment.