Skip to content

Commit

Permalink
Merge pull request #132 from mreiferson/deadlocks_132
Browse files Browse the repository at this point in the history
consumer: don't run redistribute without connections
  • Loading branch information
jehiah committed Apr 8, 2015
2 parents da20c0d + 2e31ccc commit a76e331
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 32 deletions.
10 changes: 8 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ type Config struct {
// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m" default:"60s"`
//
// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

// Maximum duration when REQueueing (for doubling of deferred requeue)
Expand All @@ -128,10 +131,13 @@ type Config struct {
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

// Amount of time in seconds to wait for a message from a producer when in a state where RDY
// Duration to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // (defaults: short hostname)
Expand Down
35 changes: 23 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,8 @@ func (r *Consumer) onConnClose(c *Conn) {
// try to reconnect after a bit
go func(addr string) {
for {
r.log(LogLevelInfo, "(%s) re-connecting in 15 seconds...", addr)
time.Sleep(15 * time.Second)
r.log(LogLevelInfo, "(%s) re-connecting in %.04f seconds...", addr, r.config.LookupdPollInterval)
time.Sleep(r.config.LookupdPollInterval)
if atomic.LoadInt32(&r.stopFlag) == 1 {
break
}
Expand Down Expand Up @@ -817,7 +817,8 @@ func (r *Consumer) resume() {
// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
r.log(LogLevelWarning, "no connection available to resume")
r.log(LogLevelWarning, "backing off for %.04f seconds", 1)
r.backoff(time.Second)
return
}
Expand All @@ -831,7 +832,8 @@ func (r *Consumer) resume() {
// while in backoff only ever let 1 message at a time through
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
r.log(LogLevelWarning, "(%s) error resuming RDY 1 - %s", choice.String(), err)
r.log(LogLevelWarning, "backing off for %.04f seconds", 1)
r.backoff(time.Second)
return
}
Expand All @@ -848,7 +850,11 @@ func (r *Consumer) inBackoffTimeout() bool {
}

func (r *Consumer) maybeUpdateRDY(conn *Conn) {
if r.inBackoff() || r.inBackoffTimeout() {
inBackoff := r.inBackoff()
inBackoffTimeout := r.inBackoffTimeout()
if inBackoff || inBackoffTimeout {
r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
conn, inBackoff, inBackoffTimeout)
return
}

Expand All @@ -868,7 +874,7 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) {
}

func (r *Consumer) rdyLoop() {
redistributeTicker := time.NewTicker(5 * time.Second)
redistributeTicker := time.NewTicker(r.config.RDYRedistributeInterval)

for {
select {
Expand Down Expand Up @@ -949,24 +955,29 @@ func (r *Consumer) redistributeRDY() {
return
}

numConns := int32(len(r.conns()))
// if an external heuristic set needRDYRedistributed we want to wait
// until we can actually redistribute to proceed
conns := r.conns()
if len(conns) == 0 {
return
}

maxInFlight := r.getMaxInFlight()
if numConns > maxInFlight {
if len(conns) > int(maxInFlight) {
r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
numConns, maxInFlight)
len(conns), maxInFlight)
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}

if r.inBackoff() && numConns > 1 {
r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", numConns)
if r.inBackoff() && len(conns) > 1 {
r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}

if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) {
return
}

conns := r.conns()
possibleConns := make([]*Conn, 0, len(conns))
for _, c := range conns {
lastMsgDuration := time.Now().Sub(c.LastMessageTime())
Expand Down
163 changes: 146 additions & 17 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,30 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"strconv"
"testing"
"time"
)

type tbLog interface {
Log(...interface{})
}

type testLogger struct {
tbLog
}

func (tl *testLogger) Output(maxdepth int, s string) error {
tl.Log(s)
return nil
}

func newTestLogger(tbl tbLog) logger {
return &testLogger{tbl}
}

type instruction struct {
delay time.Duration
frameType int32
Expand All @@ -29,14 +45,13 @@ type mockNSQD struct {
exitChan chan int
}

func newMockNSQD(script []instruction) *mockNSQD {
func newMockNSQD(script []instruction, addr string) *mockNSQD {
n := &mockNSQD{
script: script,
exitChan: make(chan int),
}

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
tcpListener, err := net.Listen("tcp", addr.String())
tcpListener, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err)
}
Expand Down Expand Up @@ -147,6 +162,7 @@ func (n *mockNSQD) handle(conn net.Conn) {

exit:
n.tcpListener.Close()
conn.Close()
}

func framedResponse(frameType int32, data []byte) []byte {
Expand Down Expand Up @@ -174,18 +190,17 @@ func framedResponse(frameType int32, data []byte) []byte {
type testHandler struct{}

func (h *testHandler) HandleMessage(message *Message) error {
if bytes.Equal(message.Body, []byte("requeue")) {
switch string(message.Body) {
case "requeue":
message.Requeue(-1)
return nil
}
if bytes.Equal(message.Body, []byte("requeue_no_backoff_1")) {
case "requeue_no_backoff_1":
if message.Attempts > 1 {
return nil
}
message.RequeueWithoutBackoff(-1)
return nil
}
if bytes.Equal(message.Body, []byte("bad")) {
case "bad":
return errors.New("bad")
}
return nil
Expand All @@ -198,8 +213,6 @@ func frameMessage(m *Message) []byte {
}

func TestConsumerBackoff(t *testing.T) {
logger := log.New(ioutil.Discard, "", log.LstdFlags)

msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgGood := NewMessage(msgIDGood, []byte("good"))

Expand All @@ -221,14 +234,16 @@ func TestConsumerBackoff(t *testing.T) {
// needed to exit test
instruction{200 * time.Millisecond, -1, []byte("exit")},
}
n := newMockNSQD(script)

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_consumer_commands" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 5
config.BackoffMultiplier = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(logger, LogLevelDebug)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
Expand Down Expand Up @@ -272,8 +287,6 @@ func TestConsumerBackoff(t *testing.T) {
}

func TestConsumerRequeueNoBackoff(t *testing.T) {
// logger := log.New(ioutil.Discard, "", log.LstdFlags)

msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeueNoBackoff := MessageID{'r', 'e', 'q', 'n', 'b', 'a', 'c', 'k', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
Expand All @@ -293,14 +306,16 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}
n := newMockNSQD(script)

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 1
config.BackoffMultiplier = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
// q.SetLogger(logger, LogLevelDebug)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
Expand Down Expand Up @@ -341,3 +356,117 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
}
}
}

func TestConsumerBackoffDisconnect(t *testing.T) {
msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}

msgGood := NewMessage(msgIDGood, []byte("good"))
msgRequeue := NewMessage(msgIDRequeue, []byte("requeue"))

script := []instruction{
// SUB
instruction{0, FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 5
config.BackoffMultiplier = 10 * time.Millisecond
config.LookupdPollInterval = 10 * time.Millisecond
config.RDYRedistributeInterval = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
t.Fatalf(err.Error())
}

select {
case <-n.exitChan:
log.Printf("clean exit")
case <-time.After(500 * time.Millisecond):
log.Printf("timeout")
}

for i, r := range n.got {
log.Printf("%d: %s", i, r)
}

expected := []string{
"IDENTIFY",
"SUB " + topicName + " ch",
"RDY 5",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDRequeue),
"RDY 1",
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDRequeue),
"RDY 1",
"RDY 0",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 1",
}
if len(n.got) != len(expected) {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
}
for i, r := range n.got {
if string(r) != expected[i] {
t.Fatalf("cmd %d bad %s != %s", i, r, expected[i])
}
}

script = []instruction{
// SUB
instruction{0, FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}

n = newMockNSQD(script, n.tcpAddr.String())

select {
case <-n.exitChan:
log.Printf("clean exit")
case <-time.After(500 * time.Millisecond):
log.Printf("timeout")
}

for i, r := range n.got {
log.Printf("%d: %s", i, r)
}

expected = []string{
"IDENTIFY",
"SUB " + topicName + " ch",
"RDY 1",
"RDY 5",
fmt.Sprintf("FIN %s", msgIDGood),
fmt.Sprintf("FIN %s", msgIDGood),
}
if len(n.got) != len(expected) {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
}
for i, r := range n.got {
if string(r) != expected[i] {
t.Fatalf("cmd %d bad %s != %s", i, r, expected[i])
}
}
}
2 changes: 1 addition & 1 deletion producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestProducerConnection(t *testing.T) {

err := w.Publish("write_test", []byte("test"))
if err != nil {
t.Fatalf("should lazily connect")
t.Fatalf("should lazily connect - %s", err)
}

conn := w.conn.(*Conn)
Expand Down

0 comments on commit a76e331

Please sign in to comment.