Skip to content

Commit

Permalink
consumer: fix panic in nextLookupdEndpoint
Browse files Browse the repository at this point in the history
If you use " myhostname:4161" as NSQd HTTP address
then the code panics in nextLookupdEndpoint:

    panic: parse "http:// myhostname:4161": invalid character " " in host name

This is caused different validation of the input when configured
in ConnectToNSQLookupd and when used in nextLookupdEndpoint.

Moving preparation of the URL to the beginning so that we don't
need to do any input validation in nextLookupdEndpoint.

Fixes #316
  • Loading branch information
martin-sucha committed May 15, 2021
1 parent 63a3a23 commit 16d0cdc
Showing 1 changed file with 50 additions and 36 deletions.
86 changes: 50 additions & 36 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type Consumer struct {

// used at connection close to force a possible reconnect
lookupdRecheckChan chan int
lookupdHTTPAddrs []string
lookupdHTTPAddrs []lookupdHTTPAddr
lookupdQueryIndex int

wg sync.WaitGroup
Expand Down Expand Up @@ -340,20 +340,21 @@ func (r *Consumer) ConnectToNSQLookupd(addr string) error {
return errors.New("no handlers")
}

if err := validatedLookupAddr(addr); err != nil {
parsedAddr, err := buildLookupAddr(addr, r.topic)
if err != nil {
return err
}

atomic.StoreInt32(&r.connectedFlag, 1)

r.mtx.Lock()
for _, x := range r.lookupdHTTPAddrs {
if x == addr {
if x.lookupURL == parsedAddr.lookupURL {
r.mtx.Unlock()
return nil
}
}
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, parsedAddr)
numLookupd := len(r.lookupdHTTPAddrs)
r.mtx.Unlock()

Expand Down Expand Up @@ -383,20 +384,6 @@ func (r *Consumer) ConnectToNSQLookupds(addresses []string) error {
return nil
}

func validatedLookupAddr(addr string) error {
if strings.Contains(addr, "/") {
_, err := url.Parse(addr)
if err != nil {
return err
}
return nil
}
if !strings.Contains(addr, ":") {
return errors.New("missing port")
}
return nil
}

// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
// add some jitter so that multiple consumers discovering the same topic,
Expand Down Expand Up @@ -446,23 +433,7 @@ func (r *Consumer) nextLookupdEndpoint() string {
r.mtx.RUnlock()
r.lookupdQueryIndex = (r.lookupdQueryIndex + 1) % num

urlString := addr
if !strings.Contains(urlString, "://") {
urlString = "http://" + addr
}

u, err := url.Parse(urlString)
if err != nil {
panic(err)
}
if u.Path == "/" || u.Path == "" {
u.Path = "/lookup"
}

v, err := url.ParseQuery(u.RawQuery)
v.Add("topic", r.topic)
u.RawQuery = v.Encode()
return u.String()
return addr.lookupURL
}

type lookupResp struct {
Expand Down Expand Up @@ -630,6 +601,15 @@ func indexOf(n string, h []string) int {
return -1
}

func indexOfLookupdHTTPAddr(n lookupdHTTPAddr, h []lookupdHTTPAddr) int {
for i, a := range h {
if n.lookupURL == a.lookupURL {
return i
}
}
return -1
}

// DisconnectFromNSQD closes the connection to and removes the specified
// `nsqd` address from the list
func (r *Consumer) DisconnectFromNSQD(addr string) error {
Expand Down Expand Up @@ -659,10 +639,15 @@ func (r *Consumer) DisconnectFromNSQD(addr string) error {
// DisconnectFromNSQLookupd removes the specified `nsqlookupd` address
// from the list used for periodic discovery.
func (r *Consumer) DisconnectFromNSQLookupd(addr string) error {
parsedAddr, err := buildLookupAddr(addr, r.topic)
if err != nil {
return err
}

r.mtx.Lock()
defer r.mtx.Unlock()

idx := indexOf(addr, r.lookupdHTTPAddrs)
idx := indexOfLookupdHTTPAddr(parsedAddr, r.lookupdHTTPAddrs)
if idx == -1 {
return ErrNotConnected
}
Expand Down Expand Up @@ -1204,3 +1189,32 @@ func (r *Consumer) log(lvl LogLevel, line string, args ...interface{}) {
lvl, r.id, r.topic, r.channel,
fmt.Sprintf(line, args...)))
}

type lookupdHTTPAddr struct {
lookupURL string
}

func buildLookupAddr(addr, topic string) (lookupdHTTPAddr, error) {
urlString := addr
if !strings.Contains(urlString, "://") {
urlString = "http://" + addr
}

u, err := url.Parse(urlString)
if err != nil {
return lookupdHTTPAddr{}, err
}

if u.Port() == "" {
return lookupdHTTPAddr{}, errors.New("missing port")
}

if u.Path == "/" || u.Path == "" {
u.Path = "/lookup"
}

v, err := url.ParseQuery(u.RawQuery)
v.Add("topic", topic)
u.RawQuery = v.Encode()
return lookupdHTTPAddr{lookupURL: u.String()}, nil
}

0 comments on commit 16d0cdc

Please sign in to comment.