diff --git a/consumer.go b/consumer.go index 4e5d03a7..da5b6942 100644 --- a/consumer.go +++ b/consumer.go @@ -10,6 +10,7 @@ import ( "net" "net/url" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -200,11 +201,14 @@ func (r *Consumer) maxInFlight() int { // // A goroutine is spawned to handle continual polling. func (r *Consumer) ConnectToNSQLookupd(addr string) error { + if err := validatedLookupAddr(addr); err != nil { + return err + } r.mtx.Lock() for _, x := range r.lookupdHTTPAddrs { if x == addr { r.mtx.Unlock() - return ErrLookupdAddressExists + return nil } } r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr) @@ -221,6 +225,20 @@ func (r *Consumer) ConnectToNSQLookupd(addr string) error { return nil } +func validatedLookupAddr(addr string) error { + if strings.Contains(addr, "/") { + _, err := url.Parse(addr) + if err != nil { + return err + } + return nil + } + if !strings.Contains(addr, ":") { + return errors.New("missing port") + } + return nil +} + // poll all known lookup servers every LookupdPollInterval func (r *Consumer) lookupdLoop() { // add some jitter so that multiple consumers discovering the same topic, @@ -252,24 +270,46 @@ exit: r.wg.Done() } -// make an HTTP req to the /lookup endpoint of one of the -// configured nsqlookupd instances to discover which nsqd provide -// the topic we are consuming. -// -// initiate a connection to any new producers that are identified. -func (r *Consumer) queryLookupd() { +// return the next lookupd endpoint to query +// keeping track of which one was last used +func (r *Consumer) nextLookupdEndpoint() string { r.mtx.RLock() addr := r.lookupdHTTPAddrs[r.lookupdQueryIndex] num := len(r.lookupdHTTPAddrs) r.mtx.RUnlock() r.lookupdQueryIndex = (r.lookupdQueryIndex + 1) % num - endpoint := fmt.Sprintf("http://%s/lookup?topic=%s", addr, url.QueryEscape(r.topic)) + + urlString := addr + if !strings.Contains(urlString, "/") { + urlString = fmt.Sprintf("http://%s/", addr) + } + + u, err := url.Parse(urlString) + if err != nil { + panic(err) + } + if u.Path == "/" { + u.Path = "/lookup" + } + + v, err := url.ParseQuery(u.RawQuery) + v.Add("topic", r.topic) + u.RawQuery = v.Encode() + return u.String() +} + +// make an HTTP req to one of the configured nsqlookupd instances to discover +// which nsqd's provide the topic we are consuming. +// +// initiate a connection to any new producers that are identified. +func (r *Consumer) queryLookupd() { + endpoint := r.nextLookupdEndpoint() r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint) data, err := apiRequest(endpoint) if err != nil { - r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", addr, err) + r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err) return } diff --git a/errors.go b/errors.go index 4c0bf957..7e1cd63d 100644 --- a/errors.go +++ b/errors.go @@ -19,10 +19,6 @@ var ErrAlreadyConnected = errors.New("already connected") // ErrOverMaxInFlight is returned from Consumer if over max-in-flight var ErrOverMaxInFlight = errors.New("over configure max-inflight") -// ErrLookupdAddressExists is returned from ConnectToNSQLookupd -// when given lookupd address exists already -var ErrLookupdAddressExists = errors.New("lookupd address already exists") - // ErrIdentify is returned from Conn as part of the IDENTIFY handshake type ErrIdentify struct { Reason string