Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

public API refactoring #30

Merged
merged 32 commits into from
May 26, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c303198
reader/writer: refactor configuration
mreiferson Apr 6, 2014
4e267e8
command: refactor to implement WriteTo interface; drop binary.Write
mreiferson Apr 12, 2014
baaa363
conn/reader/writer: refactor write buffering; switch to WriteCommand
mreiferson Apr 12, 2014
7f1b245
reader/conn: update comments; go fmt
mreiferson Apr 12, 2014
fffdd4c
writer: simplify public error handling
mreiferson Apr 12, 2014
fa643f3
test: fix fatal test output
mreiferson Apr 13, 2014
822f1c1
message/command: use callback API for message responses; switch to ti…
mreiferson Apr 13, 2014
31c80db
conn: expose FIN/REQ callbacks; refactor Message response plumbing
mreiferson Apr 13, 2014
16673ed
reader: remove AsyncHandler interface
mreiferson Apr 13, 2014
803365c
conn: close read side first, then write; trap conn closed errors
mreiferson Apr 14, 2014
70cf198
conn: don't publicly embed net.Conn
mreiferson Apr 14, 2014
6a5997e
reader/conn: s/Stop/Close; CloseRead if no messages in flight upon Cl…
mreiferson Apr 14, 2014
fff3368
reader: use wg for rdyLoop and lookupdLoop; s/ExitChan/StopChan
mreiferson Apr 14, 2014
88ee1b9
reader/writer/conn: conn doesn't take topic/channel
mreiferson Apr 21, 2014
9b7d6f3
reader: add global reader instance counter
mreiferson Apr 21, 2014
e260758
reader: consistent exit path for long running handler goroutine
mreiferson Apr 21, 2014
b1e3b77
reader/writer/conn: drop individual callbacks for delegate interface
mreiferson Apr 21, 2014
6d9c225
reader: comment fixes; make atomic counters private
mreiferson Apr 21, 2014
8929d25
reader/conn: un-export embedded sync.RWMutex
mreiferson Apr 21, 2014
65d8c5a
reader/writer/conn: log levels and delegates
mreiferson Apr 29, 2014
1099f5b
reader/conn: add requeue without backoff
mreiferson May 6, 2014
07a9461
reader: add HandlerFunc helper interface
mreiferson May 6, 2014
d7aa9bd
message: drop EncodeBytes; s/Write/WriteTo; drop binary.{Read,Write}
mreiferson Apr 29, 2014
1e5876a
reader: consistent connect method names
mreiferson May 6, 2014
fd1da49
reader: drop SetMaxInFlight; un-export ConnectionMaxInFlight
mreiferson Apr 29, 2014
db8c50e
reader: s/AddHandler/SetHandler and change semantics
mreiferson May 6, 2014
5a131ab
reader: move around ReadUnpackedResponse
mreiferson Apr 29, 2014
5b9217e
un-export apiRequest and newDeadlineTransport
mreiferson May 6, 2014
ccb206b
reader/conn: capitalize message.ID
mreiferson May 6, 2014
3543214
golint
mreiferson May 6, 2014
c654a44
reader/writer: rename to consumer/producer
mreiferson May 25, 2014
84426ed
bump v1.0.0-alpha
mreiferson Apr 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

`go-nsq` is the official Go package for [NSQ][nsq].

The latest stable release is **[0.3.7](https://github.com/bitly/go-nsq/releases/tag/v0.3.7)**
NOTE: The public API was improved in [pull request #30][pr30] and is not backwards compatible with
previous releases. **[0.3.7][legacy]** is the last stable release compatible with the legacy API.

[![Build Status](https://secure.travis-ci.org/bitly/go-nsq.png?branch=master)](http://travis-ci.org/bitly/go-nsq)
The latest stable release is **[0.3.7][latest_tag]**

It provides high-level [Reader][reader] and [Writer][writer] types to implement consumers and
producers as well as low-level functions to communicate over the [NSQ protocol][protocol].
[![Build Status](https://secure.travis-ci.org/bitly/go-nsq.png?branch=master)][travis]

It provides high-level [Consumer][consumer] and [Producer][producer] types as well as low-level
functions to communicate over the [NSQ protocol][protocol].

See the [main repo apps][apps] directory for examples of clients built using this package.

Expand All @@ -26,5 +29,9 @@ See [godoc][nsq_gopkgdoc] for pretty documentation or:
[nsq_gopkgdoc]: http://godoc.org/github.com/bitly/go-nsq
[protocol]: http://bitly.github.io/nsq/clients/tcp_protocol_spec.html
[apps]: https://github.com/bitly/nsq/tree/master/apps
[reader]: http://godoc.org/github.com/bitly/go-nsq#Reader
[writer]: http://godoc.org/github.com/bitly/go-nsq#Writer
[consumer]: http://godoc.org/github.com/bitly/go-nsq#Consumer
[producer]: http://godoc.org/github.com/bitly/go-nsq#Producer
[pr30]: https://github.com/bitly/go-nsq/pull/30
[legacy]: https://github.com/bitly/go-nsq/releases/tag/v0.3.7
[travis]: http://travis-ci.org/bitly/go-nsq
[latest_tag]: https://github.com/bitly/go-nsq/releases/tag/v0.3.7
11 changes: 3 additions & 8 deletions api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {
return c.Conn.Write(b)
}

// A custom http.Transport with support for deadline timeouts
func NewDeadlineTransport(timeout time.Duration) *http.Transport {
func newDeadlineTransport(timeout time.Duration) *http.Transport {
transport := &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
Expand All @@ -40,12 +39,8 @@ func NewDeadlineTransport(timeout time.Duration) *http.Transport {
return transport
}

// ApiRequest is a helper function to perform an HTTP request
// and parse our NSQ daemon's expected response format, with deadlines.
//
// {"status_code":200, "status_txt":"OK", "data":{...}}
func ApiRequest(endpoint string) (*simplejson.Json, error) {
httpclient := &http.Client{Transport: NewDeadlineTransport(2 * time.Second)}
func apiRequest(endpoint string) (*simplejson.Json, error) {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, err
Expand Down
57 changes: 35 additions & 22 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"strconv"
"time"
)

var byteSpace = []byte(" ")
Expand All @@ -27,44 +28,56 @@ func (c *Command) String() string {
return string(c.Name)
}

// Write serializes the Command to the supplied Writer.
// WriteTo implements the WriterTo interface and
// serializes the Command to the supplied Writer.
//
// It is suggested that the target Writer is buffered to avoid performing many system calls.
func (c *Command) Write(w io.Writer) error {
_, err := w.Write(c.Name)
// It is suggested that the target Writer is buffered
// to avoid performing many system calls.
func (c *Command) WriteTo(w io.Writer) (int64, error) {
var total int64
var buf [4]byte

n, err := w.Write(c.Name)
total += int64(n)
if err != nil {
return err
return total, err
}

for _, param := range c.Params {
_, err := w.Write(byteSpace)
n, err := w.Write(byteSpace)
total += int64(n)
if err != nil {
return err
return total, err
}
_, err = w.Write(param)
n, err = w.Write(param)
total += int64(n)
if err != nil {
return err
return total, err
}
}

_, err = w.Write(byteNewLine)
n, err = w.Write(byteNewLine)
total += int64(n)
if err != nil {
return err
return total, err
}

if c.Body != nil {
bodySize := int32(len(c.Body))
err := binary.Write(w, binary.BigEndian, &bodySize)
bufs := buf[:]
binary.BigEndian.PutUint32(bufs, uint32(len(c.Body)))
n, err := w.Write(bufs)
total += int64(n)
if err != nil {
return err
return total, err
}
_, err = w.Write(c.Body)
n, err = w.Write(c.Body)
total += int64(n)
if err != nil {
return err
return total, err
}
}

return nil
return total, nil
}

// Identify creates a new Command to provide information about the client. After connecting,
Expand Down Expand Up @@ -92,7 +105,7 @@ func Register(topic string, channel string) *Command {
return &Command{[]byte("REGISTER"), params, nil}
}

// Unregister creates a new Command to remove a topic/channel for the connected nsqd
// UnRegister creates a new Command to remove a topic/channel for the connected nsqd
func UnRegister(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
Expand Down Expand Up @@ -165,10 +178,10 @@ func Finish(id MessageID) *Command {
}

// Requeue creates a new Command to indicate that
// a given message (by id) should be requeued after the given timeout (in ms)
// NOTE: a timeout of 0 indicates immediate requeue
func Requeue(id MessageID, timeoutMs int) *Command {
var params = [][]byte{id[:], []byte(strconv.Itoa(timeoutMs))}
// a given message (by id) should be requeued after the given delay
// NOTE: a delay of 0 indicates immediate requeue
func Requeue(id MessageID, delay time.Duration) *Command {
var params = [][]byte{id[:], []byte(strconv.Itoa(int(delay / time.Millisecond)))}
return &Command{[]byte("REQ"), params, nil}
}

Expand Down
2 changes: 1 addition & 1 deletion command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ func BenchmarkCommand(b *testing.B) {
b.StartTimer()

for i := 0; i < b.N; i++ {
cmd.Write(&buf)
cmd.WriteTo(&buf)
}
}
Loading