Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AuthSecret as Bearer token for lookupd queries #313

Merged
merged 1 commit into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package nsq
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand Down Expand Up @@ -46,12 +45,15 @@ type wrappedResp struct {
}

// stores the result in the value pointed to by ret(must be a pointer)
func apiRequestNegotiateV1(method string, endpoint string, body io.Reader, ret interface{}) error {
func apiRequestNegotiateV1(method string, endpoint string, headers http.Header, ret interface{}) error {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
req, err := http.NewRequest(method, endpoint, body)
req, err := http.NewRequest(method, endpoint, nil)
if err != nil {
return err
}
for k, v := range headers {
req.Header[k] = v
}

req.Header.Add("Accept", "application/vnd.nsq; version=1.0")

Expand Down
4 changes: 3 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ type Config struct {
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

// secret for nsqd authentication (requires nsqd 0.2.29+)
// Secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
// Use AuthSecret as 'Authorization: Bearer {AuthSecret}' on lookupd queries
LookupdAuthorization bool `opt:"skip_lookupd_authorization" default:"true"`
}

// NewConfig returns a new default nsq configuration.
Expand Down
7 changes: 6 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"strconv"
Expand Down Expand Up @@ -492,7 +493,11 @@ retry:
r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)

var data lookupResp
err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
headers := make(http.Header)
if r.config.AuthSecret != "" && r.config.LookupdAuthorization {
headers.Set("Authorization", fmt.Sprintf("Bearer %s", r.config.AuthSecret))
}
err := apiRequestNegotiateV1("GET", endpoint, headers, &data)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
retries++
Expand Down
31 changes: 31 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log"
"net"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -125,6 +126,36 @@ func TestConsumerTLSClientCert(t *testing.T) {
})
}

func TestConsumerLookupdAuthorization(t *testing.T) {
// confirm that LookupAuthorization = true sets Authorization header on lookudp call
config := NewConfig()
config.AuthSecret = "AuthSecret"
topicName := "auth" + strconv.Itoa(int(time.Now().Unix()))
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(newTestLogger(t), LogLevelDebug)

var req bool
lookupd := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req = true
if h := r.Header.Get("Authorization"); h != "Bearer AuthSecret" {
t.Errorf("got Auth header %q", h)
}
w.WriteHeader(404)
}))
defer lookupd.Close()

h := &MyTestHandler{
t: t,
q: q,
}
q.AddHandler(h)

q.ConnectToNSQLookupd(lookupd.URL)
if req == false {
t.Errorf("lookupd call not completed")
}
}

func TestConsumerTLSClientCertViaSet(t *testing.T) {
consumerTest(t, func(c *Config) {
c.Set("tls_v1", true)
Expand Down