Skip to content

Commit

Permalink
Merge branch 'nsqio:master' into topology_hints_312
Browse files Browse the repository at this point in the history
  • Loading branch information
zoemccormick authored Jan 5, 2024
2 parents 7a67d0c + c2c3842 commit ec88db9
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 112 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: tests

on:
push: {branches: [master]}
pull_request: {branches: [master]}

jobs:
test:
runs-on: ubuntu-20.04
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
imgtag:
- "golang:1.15-buster"
- "golang:1.16-buster"
- "golang:1.17-buster"
goarch:
- "amd64"
nsq_ver:
- "nsq-1.1.0.linux-amd64.go1.10.3"
- "nsq-1.2.0.linux-amd64.go1.12.9"
- "nsq-1.2.1.linux-amd64.go1.16.6"
include:
# test 386 only against latest version of NSQ
- imgtag: "golang:1.17-buster"
goarch: "386"
nsq_ver: "nsq-1.2.1.linux-amd64.go1.16.6"

container: "${{matrix.imgtag}}"
env:
GOPATH: "${{github.workspace}}/go"
GOARCH: "${{matrix.goarch}}"
SRCDIR: "go/src/github.com/nsqio/go-nsq"

steps:
- uses: actions/checkout@v2
with:
path: ${{env.SRCDIR}}

- name: download NSQ
run: |
cd ${{env.SRCDIR}}
curl -sSL "http://bitly-downloads.s3.amazonaws.com/nsq/${{matrix.nsq_ver}}.tar.gz" \
| tar -xzv --strip-components=1
- name: test
run: |
cd ${{env.SRCDIR}}
export PATH=bin:$PATH
./test.sh
16 changes: 0 additions & 16 deletions .travis.yml

This file was deleted.

28 changes: 28 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
## go-nsq Change Log

### 1.1.0 - 2021-10-25

* #275/#281 - support separate Logger for each log level (thanks @crazyweave)
* #282 - consumer: reduce duplicate RDY (ready) count updates (thanks @andyxning)
* #283 - remove redundant Config initialized check (thanks @SwanSpouse)
* #313 - add Authorization header to lookup queries
* #321 - consumer: fix panic with some invalid lookupd http addresses (thanks @martin-sucha)
* #317 - producer: connect() code-style improvement (thanks @martin-sucha)
* #330 - fix random backoff jitter on 32-bit architectures
* #333 - consumer: re-use http client with keepalives for lookupd requests (thanks @JieTrancender)
* #336 - producer: shutdown logging prefix consistent with other logging (thanks @karalabe)
* #294 - docs: fix producer example (thanks @nikitabuyevich)
* #307 - docs: add exit signal handling to consumer example
* #324 - docs: fix Consumer.SetLogger() description (thanks @gabriel-vasile)
* #297 - add AUTHORS file
* #329/#330 - switch to GitHub Actions for CI

### 1.0.8 - 2019-12-24

Thanks to @judwhite, @vitaliytv, and @HaraldNordgren for contributing to testing and dependency management improvements

* #248 - support go modules
* #249 - consumer: update RDY when setting MaxInFlight to 0
* #267 - check response message size is positive (thanks @andyxning)
* #271 - godoc for publisher and consumer (thanks @skateinmars)
* #270 - set log level (thanks @YongHaoWu)
* #255 - go vet tls.Config copying (thanks @iaburton)

### 1.0.7 - 2017-08-04

**Upgrading from 1.0.6**: There are no backward incompatible changes.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## go-nsq

[![Build Status](https://secure.travis-ci.org/nsqio/go-nsq.svg?branch=master)][travis] [![GoDoc](https://godoc.org/github.com/nsqio/go-nsq?status.svg)](https://godoc.org/github.com/nsqio/go-nsq) [![GitHub release](https://img.shields.io/github/release/nsqio/go-nsq.svg)](https://github.com/nsqio/go-nsq/releases/latest)
[![Build Status](https://github.com/nsqio/go-nsq/workflows/tests/badge.svg)](https://github.com/nsqio/go-nsq/actions) [![GoDoc](https://godoc.org/github.com/nsqio/go-nsq?status.svg)](https://godoc.org/github.com/nsqio/go-nsq) [![GitHub release](https://img.shields.io/github/release/nsqio/go-nsq.svg)](https://github.com/nsqio/go-nsq/releases/latest)

The official Go package for [NSQ][nsq].

Expand Down
23 changes: 5 additions & 18 deletions api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package nsq
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand All @@ -25,33 +24,21 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {
return c.Conn.Write(b)
}

func newDeadlineTransport(timeout time.Duration) *http.Transport {
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
if err != nil {
return nil, err
}
return &deadlinedConn{timeout, c}, nil
},
}
return transport
}

type wrappedResp struct {
Status string `json:"status_txt"`
StatusCode int `json:"status_code"`
Data interface{} `json:"data"`
}

// stores the result in the value pointed to by ret(must be a pointer)
func apiRequestNegotiateV1(method string, endpoint string, body io.Reader, ret interface{}) error {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
req, err := http.NewRequest(method, endpoint, body)
func apiRequestNegotiateV1(httpclient *http.Client, method string, endpoint string, headers http.Header, ret interface{}) error {
req, err := http.NewRequest(method, endpoint, nil)
if err != nil {
return err
}
for k, v := range headers {
req.Header[k] = v
}

req.Header.Add("Accept", "application/vnd.nsq; version=1.0")

Expand Down
10 changes: 8 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *FullJitterStrategy) Calculate(attempt int) time.Duration {

backoffDuration := s.cfg.BackoffMultiplier *
time.Duration(math.Pow(2, float64(attempt)))
return time.Duration(s.rng.Intn(int(backoffDuration)))
return time.Duration(s.rng.Int63n(int64(backoffDuration)))
}

func (s *FullJitterStrategy) setConfig(cfg *Config) {
Expand Down Expand Up @@ -110,6 +110,7 @@ type Config struct {
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
LookupdPollTimeout time.Duration `opt:"lookupd_poll_timeout" default:"1m"`

// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
Expand Down Expand Up @@ -180,8 +181,13 @@ type Config struct {
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

// secret for nsqd authentication (requires nsqd 0.2.29+)
// Maximum size of a single message in bytes (0 means no limit)
MaxMsgSize int32 `opt:"max_msg_size" min:"0" default:"0"`

// Secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
// Use AuthSecret as 'Authorization: Bearer {AuthSecret}' on lookupd queries
LookupdAuthorization bool `opt:"skip_lookupd_authorization" default:"true"`
}

// NewConfig returns a new default nsq configuration.
Expand Down
4 changes: 2 additions & 2 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func TestExponentialBackoff(t *testing.T) {

func TestFullJitterBackoff(t *testing.T) {
expected := []time.Duration{
566028617 * time.Nanosecond,
1365407263 * time.Nanosecond,
724039541 * time.Nanosecond,
1603903257 * time.Nanosecond,
5232470547 * time.Nanosecond,
21467499218 * time.Nanosecond,
}
Expand Down
16 changes: 8 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (c *Conn) identify() (*IdentifyResponse, error) {
return nil, ErrIdentify{err.Error()}
}

frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
Expand Down Expand Up @@ -436,7 +436,7 @@ func (c *Conn) upgradeTLS(tlsConf *tls.Config) error {
}
c.r = c.tlsConn
c.w = c.tlsConn
frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return err
}
Expand All @@ -454,7 +454,7 @@ func (c *Conn) upgradeDeflate(level int) error {
fw, _ := flate.NewWriter(conn, level)
c.r = flate.NewReader(conn)
c.w = fw
frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return err
}
Expand All @@ -471,7 +471,7 @@ func (c *Conn) upgradeSnappy() error {
}
c.r = snappy.NewReader(conn)
c.w = snappy.NewWriter(conn)
frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return err
}
Expand All @@ -492,7 +492,7 @@ func (c *Conn) auth(secret string) error {
return err
}

frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -520,7 +520,7 @@ func (c *Conn) readLoop() {
goto exit
}

frameType, data, err := ReadUnpackedResponse(c)
frameType, data, err := ReadUnpackedResponse(c, c.config.MaxMsgSize)
if err != nil {
if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
Expand Down Expand Up @@ -694,7 +694,7 @@ func (c *Conn) cleanup() {
msgsInFlight = atomic.LoadInt64(&c.messagesInFlight)
}
if msgsInFlight > 0 {
if time.Now().Sub(lastWarning) > time.Second {
if time.Since(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight)
lastWarning = time.Now()
}
Expand All @@ -703,7 +703,7 @@ func (c *Conn) cleanup() {
// until the readLoop has exited we cannot be sure that there
// still won't be a race
if atomic.LoadInt32(&c.readLoopRunning) == 1 {
if time.Now().Sub(lastWarning) > time.Second {
if time.Since(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... readLoop still running")
lastWarning = time.Now()
}
Expand Down
Loading

0 comments on commit ec88db9

Please sign in to comment.