Skip to content

Commit

Permalink
consumer: re-use http client with keepalives for lookup requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason authored and ploxiln committed Oct 17, 2021
1 parent 3bce8d0 commit c238b8d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
17 changes: 1 addition & 16 deletions api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,14 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {
return c.Conn.Write(b)
}

func newDeadlineTransport(timeout time.Duration) *http.Transport {
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
if err != nil {
return nil, err
}
return &deadlinedConn{timeout, c}, nil
},
}
return transport
}

type wrappedResp struct {
Status string `json:"status_txt"`
StatusCode int `json:"status_code"`
Data interface{} `json:"data"`
}

// stores the result in the value pointed to by ret(must be a pointer)
func apiRequestNegotiateV1(method string, endpoint string, headers http.Header, ret interface{}) error {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
func apiRequestNegotiateV1(httpclient *http.Client, method string, endpoint string, headers http.Header, ret interface{}) error {
req, err := http.NewRequest(method, endpoint, nil)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type Config struct {
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
LookupdPollTimeout time.Duration `opt:"lookupd_poll_timeout" default:"1m"`

// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
Expand Down
25 changes: 24 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type Consumer struct {
lookupdRecheckChan chan int
lookupdHTTPAddrs []string
lookupdQueryIndex int
lookupdHttpClient *http.Client

wg sync.WaitGroup
runningHandlers int32
Expand Down Expand Up @@ -326,6 +327,11 @@ func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
}
}

// set lookupd http client
func (r *Consumer) SetLookupdHttpClient(httpclient *http.Client) {
r.lookupdHttpClient = httpclient
}

// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
//
// If it is the first to be added, it initiates an HTTP request to discover nsqd
Expand Down Expand Up @@ -355,6 +361,23 @@ func (r *Consumer) ConnectToNSQLookupd(addr string) error {
}
}
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, parsedAddr)
if r.lookupdHttpClient == nil {
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: r.config.LookupdPollTimeout,
KeepAlive: 30 * time.Second,
}).DialContext,
ResponseHeaderTimeout: r.config.LookupdPollTimeout,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}
r.lookupdHttpClient = &http.Client{
Transport: transport,
Timeout: r.config.LookupdPollTimeout,
}
}

numLookupd := len(r.lookupdHTTPAddrs)
r.mtx.Unlock()

Expand Down Expand Up @@ -468,7 +491,7 @@ retry:
if r.config.AuthSecret != "" && r.config.LookupdAuthorization {
headers.Set("Authorization", fmt.Sprintf("Bearer %s", r.config.AuthSecret))
}
err := apiRequestNegotiateV1("GET", endpoint, headers, &data)
err := apiRequestNegotiateV1(r.lookupdHttpClient, "GET", endpoint, headers, &data)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
retries++
Expand Down

0 comments on commit c238b8d

Please sign in to comment.