Skip to content

Commit

Permalink
Merge pull request #60 from mreiferson/deprecated_60
Browse files Browse the repository at this point in the history
clean up some deprecated items
  • Loading branch information
jehiah committed Jul 11, 2014
2 parents 73464c2 + 9c0e4d5 commit f17a1a6
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 31 deletions.
26 changes: 16 additions & 10 deletions api_request.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package nsq

import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand Down Expand Up @@ -39,34 +39,40 @@ func newDeadlineTransport(timeout time.Duration) *http.Transport {
return transport
}

func apiRequest(endpoint string) (*simplejson.Json, error) {
func apiRequestNegotiateV1(method string, endpoint string, body io.Reader) (*simplejson.Json, error) {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
req, err := http.NewRequest("GET", endpoint, nil)
req, err := http.NewRequest(method, endpoint, body)
if err != nil {
return nil, err
}

req.Header.Add("Accept", "application/vnd.nsq; version=1.0")

resp, err := httpclient.Do(req)
if err != nil {
return nil, err
}

body, err := ioutil.ReadAll(resp.Body)
respBody, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("got response %s %q", resp.Status, respBody)
}

if len(respBody) == 0 {
respBody = []byte("{}")
}

data, err := simplejson.NewJson(body)
data, err := simplejson.NewJson(respBody)
if err != nil {
return nil, err
}

statusCode := data.Get("status_code").MustInt()
statusTxt := data.Get("status_txt").MustString()
if statusCode != 200 {
return nil, errors.New(fmt.Sprintf("response status_code = %d, status_txt = %s",
statusCode, statusTxt))
if resp.Header.Get("X-NSQ-Content-Type") == "nsq; version=1.0" {
return data, nil
}
return data.Get("data"), nil
}
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Config struct {

// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // deprecated (defaults: short hostname)
ClientID string `opt:"client_id"` // (defaults: short hostname)
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`

Expand Down
32 changes: 12 additions & 20 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,38 +357,30 @@ func (r *Consumer) queryLookupd() {

r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)

data, err := apiRequest(endpoint)
data, err := apiRequestNegotiateV1("GET", endpoint, nil)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
return
}

// {
// "data": {
// "channels": [],
// "producers": [
// {
// "broadcast_address": "jehiah-air.local",
// "http_port": 4151,
// "tcp_port": 4150
// }
// ],
// "timestamp": 1340152173
// },
// "status_code": 200,
// "status_txt": "OK"
// "channels": [],
// "producers": [
// {
// "broadcast_address": "jehiah-air.local",
// "http_port": 4151,
// "tcp_port": 4150
// }
// ],
// "timestamp": 1340152173
// }
for i := range data.Get("producers").MustArray() {
producer := data.Get("producers").GetIndex(i)
address := producer.Get("address").MustString()
broadcastAddress, ok := producer.CheckGet("broadcast_address")
if ok {
address = broadcastAddress.MustString()
}
broadcastAddress := producer.Get("broadcast_address").MustString()
port := producer.Get("tcp_port").MustInt()

// make an address, start a connection
joined := net.JoinHostPort(address, strconv.Itoa(port))
joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
err = r.ConnectToNSQD(joined)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", joined, err)
Expand Down

0 comments on commit f17a1a6

Please sign in to comment.