diff --git a/reader.go b/reader.go index 349d8d4c..20350c7e 100644 --- a/reader.go +++ b/reader.go @@ -123,6 +123,8 @@ type Reader struct { DeflateLevel int // the compression level to negotiate for Deflate Snappy bool // negotiate enabling Snappy compression + SampleRate int32 // set the sampleRate of the client's messagePump (requires nsqd 0.2.25+) + // internal variables maxBackoffDuration time.Duration maxBackoffCount int32 @@ -361,6 +363,15 @@ func (q *Reader) Configure(option string, value interface{}) error { return errors.New(fmt.Sprintf("invalid %s ! 1 <= %d <= 9", option, err)) } q.DeflateLevel = int(v) + case "sample_rate": + v, err := getInt64(value) + if err != nil { + return errors.New(fmt.Sprintf("invalid %s - %s", option, err)) + } + if v < 0 || v > 99 { + return errors.New(fmt.Sprintf("invalid %s ! 0 <= %d <= 99", option, err)) + } + q.SampleRate = int32(v) case "snappy": v, err := getBool(value) if err != nil { @@ -623,6 +634,7 @@ func (q *Reader) ConnectToNSQ(addr string) error { ci["deflate_level"] = q.DeflateLevel ci["snappy"] = q.Snappy ci["feature_negotiation"] = true + ci["sample_rate"] = q.SampleRate cmd, err := Identify(ci) if err != nil { cleanupConnection() @@ -648,6 +660,7 @@ func (q *Reader) ConnectToNSQ(addr string) error { TLSv1 bool `json:"tls_v1"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` + SampleRate int32 `json:"sample_rate"` }{} err := json.Unmarshal(data, &resp) if err != nil {