Skip to content

Commit

Permalink
Merge pull request #22 from elubow/channel_sampling
Browse files Browse the repository at this point in the history
Adding channel sampling capability to the driver
  • Loading branch information
mreiferson committed Dec 19, 2013
2 parents b7f48b8 + 2b3a478 commit 316a2d7
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type Reader struct {
DeflateLevel int // the compression level to negotiate for Deflate
Snappy bool // negotiate enabling Snappy compression

SampleRate int32 // set the sampleRate of the client's messagePump (requires nsqd 0.2.25+)

// internal variables
maxBackoffDuration time.Duration
maxBackoffCount int32
Expand Down Expand Up @@ -361,6 +363,15 @@ func (q *Reader) Configure(option string, value interface{}) error {
return errors.New(fmt.Sprintf("invalid %s ! 1 <= %d <= 9", option, v))
}
q.DeflateLevel = int(v)
case "sample_rate":
v, err := getInt64(value)
if err != nil {
return errors.New(fmt.Sprintf("invalid %s - %s", option, err))
}
if v < 0 || v > 99 {
return errors.New(fmt.Sprintf("invalid %s ! 0 <= %d <= 99", option, err))
}
q.SampleRate = int32(v)
case "snappy":
v, err := getBool(value)
if err != nil {
Expand Down Expand Up @@ -623,6 +634,7 @@ func (q *Reader) ConnectToNSQ(addr string) error {
ci["deflate_level"] = q.DeflateLevel
ci["snappy"] = q.Snappy
ci["feature_negotiation"] = true
ci["sample_rate"] = q.SampleRate
cmd, err := Identify(ci)
if err != nil {
cleanupConnection()
Expand All @@ -648,6 +660,7 @@ func (q *Reader) ConnectToNSQ(addr string) error {
TLSv1 bool `json:"tls_v1"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
SampleRate int32 `json:"sample_rate"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
Expand Down

0 comments on commit 316a2d7

Please sign in to comment.