diff --git a/api_request.go b/api_request.go index 2c0c8d63..d4e389c9 100644 --- a/api_request.go +++ b/api_request.go @@ -27,6 +27,7 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) { func newDeadlineTransport(timeout time.Duration) *http.Transport { transport := &http.Transport{ + DisableKeepAlives: true, Dial: func(netw, addr string) (net.Conn, error) { c, err := net.DialTimeout(netw, addr, timeout) if err != nil { diff --git a/conn.go b/conn.go index e18c3629..315148ee 100644 --- a/conn.go +++ b/conn.go @@ -487,7 +487,6 @@ func (c *Conn) readLoop() { } frameType, data, err := ReadUnpackedResponse(c) - if err != nil { if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 { goto exit diff --git a/consumer.go b/consumer.go index c56a1647..78d64941 100644 --- a/consumer.go +++ b/consumer.go @@ -454,6 +454,9 @@ type peerInfo struct { // // initiate a connection to any new producers that are identified. func (r *Consumer) queryLookupd() { + retries := 0 + +retry: endpoint := r.nextLookupdEndpoint() r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint) @@ -462,6 +465,11 @@ func (r *Consumer) queryLookupd() { err := apiRequestNegotiateV1("GET", endpoint, nil, &data) if err != nil { r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err) + retries++ + if retries < 3 { + r.log(LogLevelInfo, "retrying with next nsqlookupd") + goto retry + } return } diff --git a/consumer_test.go b/consumer_test.go index 5c5816d5..4079dc91 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -64,6 +64,9 @@ func SendMessage(t *testing.T, port int, topic string, method string, body []byt t.Fatalf(err.Error()) return } + if resp.StatusCode != 200 { + t.Fatalf("%s status code: %d", method, resp.StatusCode) + } resp.Body.Close() } @@ -165,7 +168,7 @@ func consumerTest(t *testing.T, cb func(c *Config)) { } topicName = topicName + strconv.Itoa(int(time.Now().Unix())) q, _ := NewConsumer(topicName, "ch", config) - // q.SetLogger(nullLogger, LogLevelInfo) + q.SetLogger(log.New(os.Stderr, "", log.Flags()), LogLevelDebug) h := &MyTestHandler{ t: t, @@ -173,9 +176,9 @@ func consumerTest(t *testing.T, cb func(c *Config)) { } q.AddHandler(h) - SendMessage(t, 4151, topicName, "put", []byte(`{"msg":"single"}`)) - SendMessage(t, 4151, topicName, "mput", []byte("{\"msg\":\"double\"}\n{\"msg\":\"double\"}")) - SendMessage(t, 4151, topicName, "put", []byte("TOBEFAILED")) + SendMessage(t, 4151, topicName, "pub", []byte(`{"msg":"single"}`)) + SendMessage(t, 4151, topicName, "mpub", []byte("{\"msg\":\"double\"}\n{\"msg\":\"double\"}")) + SendMessage(t, 4151, topicName, "pub", []byte("TOBEFAILED")) h.messagesSent = 4 addr := "127.0.0.1:4150"