From 5d77a1dcccba668738d092b335204ff78155469e Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar Date: Sun, 15 Jun 2014 17:45:40 -0400 Subject: [PATCH] Export Config{} variables * export config variables so they can be set directly (restore type safety) * move defaults to struct tags (and considate docs) * add Config.Valdate() and panic when NewConfig() is not used * copy Config into Consumer and Producer so it's immutable * rename SetMaxInFlight to ChangeMaxInFlight to more appropriately describe functionality --- config.go | 327 ++++++++++++++++++++++------------------------- config_test.go | 12 ++ conn.go | 48 +++---- consumer.go | 75 +++++------ consumer_test.go | 20 +-- mock_test.go | 4 +- producer.go | 29 +++-- producer_test.go | 30 +++-- protocol.go | 4 - 9 files changed, 276 insertions(+), 273 deletions(-) diff --git a/config.go b/config.go index f84f5db9..c1f0d0e2 100644 --- a/config.go +++ b/config.go @@ -9,116 +9,91 @@ import ( "reflect" "strconv" "strings" - "sync" "time" "unsafe" ) // Config is a struct of NSQ options // -// (see Config.Set() for available parameters) +// The only valid way to create a Config is via NewConfig, using a struct literal will panic. +// After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no +// longer mutable (they are copied). +// +// Use Set(key string, value interface{}) as an alternate way to set parameters type Config struct { - sync.RWMutex - initOnce sync.Once - - readTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m"` - writeTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m"` - - lookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m"` - lookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1"` - - maxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m"` - defaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m"` - backoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m"` - - maxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535"` - lowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m"` - - clientID string `opt:"client_id"` - hostname string `opt:"hostname"` - userAgent string `opt:"user_agent"` - - heartbeatInterval time.Duration `opt:"heartbeat_interval"` - sampleRate int32 `opt:"sample_rate" min:"0" max:"99"` - - tlsV1 bool `opt:"tls_v1"` - tlsConfig *tls.Config `opt:"tls_config"` - - deflate bool `opt:"deflate"` - deflateLevel int `opt:"deflate_level" min:"1" max:"9"` - snappy bool `opt:"snappy"` - - outputBufferSize int64 `opt:"output_buffer_size"` - outputBufferTimeout time.Duration `opt:"output_buffer_timeout"` - - maxInFlight int `opt:"max_in_flight" min:"0"` - - maxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m"` - - authSecret string `opt:"auth_secret"` + initialized bool + + // Deadlines for network reads and writes + ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"` + WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"` + + // Duration between polling lookupd for new producers, and fractional jitter to add to + // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers + // restart at the same time + LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m" default:"60s"` + LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"` + + // Maximum duration when REQueueing (for doubling of deferred requeue) + MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"` + DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"` + // Unit of time for calculating consumer backoff + BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"` + + // Maximum number of times this consumer will attempt to process a message before giving up + MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"` + // Amount of time in seconds to wait for a message from a producer when in a state where RDY + // counts are re-distributed (ie. max_in_flight < num_producers) + LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"` + + // Identifiers sent to nsqd representing this client + // UserAgent is in the spirit of HTTP (default: "/") + ClientID string `opt:"client_id"` // deprecated (defaults: short hostname) + Hostname string `opt:"hostname"` + UserAgent string `opt:"user_agent"` + + // Duration of time between heartbeats. This must be less than ReadTimeout + HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"` + // Integer percentage to sample the channel (requires nsqd 0.2.25+) + SampleRate int32 `opt:"sample_rate" min:"0" max:"99"` + + // TLS Settings + TlsV1 bool `opt:"tls_v1"` + TlsConfig *tls.Config `opt:"tls_config"` + + // Compression Settings + Deflate bool `opt:"deflate"` + DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"` + Snappy bool `opt:"snappy"` + + // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection + OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"` + // Timeout used by nsqd before flushing buffered writes (set to 0 to disable). + // + // WARNING: configuring clients with an extremely low + // (< 25ms) output_buffer_timeout has a significant effect + // on nsqd CPU usage (particularly with > 50 clients connected). + OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"` + + // Maximum number of messages to allow in flight (concurrency knob) + MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"` + + // Maximum amount of time to backoff when processing fails 0 == no backoff + MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"` + + // secret for nsqd authentication (requires nsqd 0.2.29+) + AuthSecret string `opt:"auth_secret"` } -// NewConfig returns a new default configuration. -// -// "read_timeout": 60s (min: 100ms, max: 5m) (time.Duration) -// "write_timeout": 1s (min: 100ms, max: 5m) (time.Duration) -// "lookupd_poll_interval": 60s (min: 5s, max: 5m) (time.Duration) -// "lookupd_poll_jitter": 0.3 (min: 0.0, max: 1.0) (float) -// "max_requeue_delay": 15m (min: 0, max: 60m) (time.Duration) -// "default_requeue_delay": 90s (min: 0, max: 60m) (time.Duration) -// "backoff_multiplier": 1s (min: 0, max: 60m) (time.TIme) -// "max_attempts": 5 (min: 0, max: 65535) (int) -// "low_rdy_idle_timeout": 10s (min: 1s, max: 5m) (time.Duration) -// "client_id": "" (string) -// "hostname": os.Hostname() (string) -// "user_agent": "go-nsq/" (string) -// "heartbeat_interval": 30s (time.Duration) -// "sample_rate": 0 (min: 0, max: 99) (int) -// "tls_v1": false (bool) -// "tls_config": nil (*tls.Config) -// "deflate": false (bool) -// "deflate_level": 6 (min: 1, max: 9) (int) -// "snappy": false (bool) -// "output_buffer_size": 16384 (int) -// "output_buffer_timeout": 250ms (time.Duration) -// "max_in_flight": 1 (int) -// "max_backoff_duration": 120s (time.Duration) -// "auth_secret": "" (string) +// NewConfig returns a new default nsq configuration. // -// See Config.Set() for a description of these parameters. +// This must be used to initialize Config structs. Values can be set directly, or through Config.Set() func NewConfig() *Config { - conf := &Config{} - conf.initialize() - return conf -} - -// initialize is used to ensure that a Config has a baseline set of defaults -// despite how it might have been insantiated -func (c *Config) initialize() { - c.initOnce.Do(func() { - hostname, err := os.Hostname() - if err != nil { - log.Fatalf("ERROR: unable to get hostname %s", err.Error()) - } - c.maxInFlight = 1 - c.maxAttempts = 5 - c.lookupdPollInterval = 60 * time.Second - c.lookupdPollJitter = 0.3 - c.lowRdyIdleTimeout = 10 * time.Second - c.defaultRequeueDelay = 90 * time.Second - c.maxRequeueDelay = 15 * time.Minute - c.backoffMultiplier = time.Second - c.maxBackoffDuration = 120 * time.Second - c.readTimeout = DefaultClientTimeout - c.writeTimeout = time.Second - c.deflateLevel = 6 - c.outputBufferSize = 16 * 1024 - c.outputBufferTimeout = 250 * time.Millisecond - c.heartbeatInterval = DefaultClientTimeout / 2 - c.clientID = strings.Split(hostname, ".")[0] - c.hostname = hostname - c.userAgent = fmt.Sprintf("go-nsq/%s", VERSION) - }) + c := &Config{} + c.initialized = true + if err := c.setDefaults(); err != nil { + panic(err.Error()) + } + return c } // Set takes an option as a string and a value as an interface and @@ -140,86 +115,9 @@ func (c *Config) initialize() { // 1 (an int where 1 == true and 0 == false) // // It returns an error for an invalid option or value. -// -// read_timeout (time.Duration): the deadline set for network reads -// (min: 100ms, max: 5m) -// -// write_timeout (time.Duration): the deadline set for network writes -// (min: 100ms, max: 5m) -// -// lookupd_poll_interval (time.Duration): duration between polling lookupd for new -// (min: 5s, max: 5m) -// -// lookupd_poll_jitter (float): fractional amount of jitter to add to the lookupd pool loop, -// this helps evenly distribute requests even if multiple -// consumers restart at the same time. -// (min: 0.0, max: 1.0) -// -// max_requeue_delay (time.Duration): the maximum duration when REQueueing -// (for doubling of deferred requeue) -// (min: 0, max: 60m) -// -// default_requeue_delay (time.Duration): the default duration when REQueueing -// (min: 0, max: 60m) -// -// backoff_multiplier (time.Duration): the unit of time for calculating consumer backoff -// (min: 0, max: 60m) -// -// max_attempts (int): maximum number of times this consumer will attempt to process a message -// (min: 0, max: 65535) -// -// low_rdy_idle_timeout (time.Duration): the amount of time in seconds to wait for a message -// from a producer when in a state where RDY counts -// are re-distributed (ie. max_in_flight < num_producers) -// (min: 1s, max: 5m) -// -// client_id (string): an identifier sent to nsqd representing the client -// (defaults: short hostname) -// -// hostname (string): an identifier sent to nsqd representing the host -// (defaults: long hostname) -// -// user_agent (string): an identifier of the agent for this client (in the spirit of HTTP) -// (default: "/") -// -// heartbeat_interval (time.Duration): duration of time between heartbeats -// -// sample_rate (int): integer percentage to sample the channel (requires nsqd 0.2.25+) -// (min: 0, max: 99) -// -// tls_v1 (bool): negotiate TLS -// -// tls_config (*tls.Config): client TLS configuration -// -// deflate (bool): negotiate Deflate compression -// -// deflate_level (int): the compression level to negotiate for Deflate -// (min: 1, max: 9) -// -// snappy (bool): negotiate Snappy compression -// -// output_buffer_size (int): size of the buffer (in bytes) used by nsqd for -// buffering writes to this connection -// -// output_buffer_timeout (time.Duration): timeout (in ms) used by nsqd before flushing buffered -// writes (set to 0 to disable). -// -// WARNING: configuring clients with an extremely low -// (< 25ms) output_buffer_timeout has a significant effect -// on nsqd CPU usage (particularly with > 50 clients connected). -// -// max_in_flight (int): the maximum number of messages to allow in flight (concurrency knob) -// -// max_backoff_duration (time.Duration): the maximum amount of time to backoff when processing fails -// 0 == no backoff -// -// auth_secret (string): secret for nsqd authentication (requires nsqd 0.2.29+) -// func (c *Config) Set(option string, value interface{}) error { - c.Lock() - defer c.Unlock() - c.initialize() + c.assertInitialized() val := reflect.ValueOf(c).Elem() typ := val.Type() @@ -261,6 +159,81 @@ func (c *Config) Set(option string, value interface{}) error { return fmt.Errorf("invalid option %s", option) } +func (c *Config) assertInitialized() { + if !c.initialized { + panic("Config{} must be created with NewConfig()") + } +} + +// Validate checks that all values are within specified min/max ranges +func (c *Config) Validate() error { + + c.assertInitialized() + + val := reflect.ValueOf(c).Elem() + typ := val.Type() + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + + min := field.Tag.Get("min") + max := field.Tag.Get("max") + + if min == "" && max == "" { + continue + } + + value := val.FieldByName(field.Name) + + if min != "" { + coercedMinVal, _ := coerce(min, field.Type) + if valueCompare(value, coercedMinVal) == -1 { + return fmt.Errorf("invalid %s ! %v < %v", + field.Name, value.Interface(), coercedMinVal.Interface()) + } + } + if max != "" { + coercedMaxVal, _ := coerce(max, field.Type) + if valueCompare(value, coercedMaxVal) == 1 { + return fmt.Errorf("invalid %s ! %v > %v", + field.Name, value.Interface(), coercedMaxVal.Interface()) + } + } + } + + if c.HeartbeatInterval > c.ReadTimeout { + return fmt.Errorf("HeartbeatInterval %v must be less than ReadTimeout %v", c.HeartbeatInterval, c.ReadTimeout) + } + return nil +} + +func (c *Config) setDefaults() error { + val := reflect.ValueOf(c).Elem() + typ := val.Type() + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + opt := field.Tag.Get("opt") + defaultVal := field.Tag.Get("default") + if defaultVal == "" || opt == "" { + continue + } + + if err := c.Set(opt, defaultVal); err != nil { + return err + } + } + + hostname, err := os.Hostname() + if err != nil { + log.Fatalf("ERROR: unable to get hostname %s", err.Error()) + } + + c.ClientID = strings.Split(hostname, ".")[0] + c.Hostname = hostname + c.UserAgent = fmt.Sprintf("go-nsq/%s", VERSION) + + return nil +} + // because Config contains private structs we can't use reflect.Value // directly, instead we need to "unsafely" address the variable func unsafeValueOf(val reflect.Value) reflect.Value { diff --git a/config_test.go b/config_test.go index 78f35132..95445dc7 100644 --- a/config_test.go +++ b/config_test.go @@ -14,3 +14,15 @@ func TestConfigSet(t *testing.T) { t.Errorf("Error setting `tls_v1` config: %v", err) } } + +func TestConfigValidate(t *testing.T) { + c := NewConfig() + if err := c.Validate(); err != nil { + t.Error("initialized config is invalid") + } + c.DeflateLevel = 100 + if err := c.Validate(); err == nil { + t.Error("no error set for invalid value") + } + +} diff --git a/conn.go b/conn.go index 76bf3a63..0ae1d841 100644 --- a/conn.go +++ b/conn.go @@ -90,7 +90,9 @@ type Conn struct { // NewConn returns a new Conn instance func NewConn(addr string, config *Config) *Conn { - config.initialize() + if !config.initialized { + panic("Config must be created with NewConfig()") + } return &Conn{ addr: addr, @@ -144,11 +146,11 @@ func (c *Conn) Connect() (*IdentifyResponse, error) { } if resp != nil && resp.AuthRequired { - if c.config.authSecret == "" { + if c.config.AuthSecret == "" { c.log(LogLevelError, "Auth Required") return nil, errors.New("Auth Required") } - err := c.auth(c.config.authSecret) + err := c.auth(c.config.AuthSecret) if err != nil { c.log(LogLevelError, "Auth Failed %s", err) return nil, err @@ -218,13 +220,13 @@ func (c *Conn) String() string { // Read performs a deadlined read on the underlying TCP connection func (c *Conn) Read(p []byte) (int, error) { - c.conn.SetReadDeadline(time.Now().Add(c.config.readTimeout)) + c.conn.SetReadDeadline(time.Now().Add(c.config.ReadTimeout)) return c.r.Read(p) } // Write performs a deadlined write on the underlying TCP connection func (c *Conn) Write(p []byte) (int, error) { - c.conn.SetWriteDeadline(time.Now().Add(c.config.writeTimeout)) + c.conn.SetWriteDeadline(time.Now().Add(c.config.WriteTimeout)) return c.w.Write(p) } @@ -262,20 +264,20 @@ func (c *Conn) Flush() error { func (c *Conn) identify() (*IdentifyResponse, error) { ci := make(map[string]interface{}) - ci["client_id"] = c.config.clientID - ci["hostname"] = c.config.hostname - ci["user_agent"] = c.config.userAgent - ci["short_id"] = c.config.clientID // deprecated - ci["long_id"] = c.config.hostname // deprecated - ci["tls_v1"] = c.config.tlsV1 - ci["deflate"] = c.config.deflate - ci["deflate_level"] = c.config.deflateLevel - ci["snappy"] = c.config.snappy + ci["client_id"] = c.config.ClientID + ci["hostname"] = c.config.Hostname + ci["user_agent"] = c.config.UserAgent + ci["short_id"] = c.config.ClientID // deprecated + ci["long_id"] = c.config.Hostname // deprecated + ci["tls_v1"] = c.config.TlsV1 + ci["deflate"] = c.config.Deflate + ci["deflate_level"] = c.config.DeflateLevel + ci["snappy"] = c.config.Snappy ci["feature_negotiation"] = true - ci["heartbeat_interval"] = int64(c.config.heartbeatInterval / time.Millisecond) - ci["sample_rate"] = c.config.sampleRate - ci["output_buffer_size"] = c.config.outputBufferSize - ci["output_buffer_timeout"] = int64(c.config.outputBufferTimeout / time.Millisecond) + ci["heartbeat_interval"] = int64(c.config.HeartbeatInterval / time.Millisecond) + ci["sample_rate"] = c.config.SampleRate + ci["output_buffer_size"] = c.config.OutputBufferSize + ci["output_buffer_timeout"] = int64(c.config.OutputBufferTimeout / time.Millisecond) cmd, err := Identify(ci) if err != nil { return nil, ErrIdentify{err.Error()} @@ -313,7 +315,7 @@ func (c *Conn) identify() (*IdentifyResponse, error) { if resp.TLSv1 { c.log(LogLevelInfo, "upgrading to TLS") - err := c.upgradeTLS(c.config.tlsConfig) + err := c.upgradeTLS(c.config.TlsConfig) if err != nil { return nil, ErrIdentify{err.Error()} } @@ -321,7 +323,7 @@ func (c *Conn) identify() (*IdentifyResponse, error) { if resp.Deflate { c.log(LogLevelInfo, "upgrading to Deflate") - err := c.upgradeDeflate(c.config.deflateLevel) + err := c.upgradeDeflate(c.config.DeflateLevel) if err != nil { return nil, ErrIdentify{err.Error()} } @@ -639,10 +641,10 @@ func (c *Conn) onMessageFinish(m *Message) { func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) { if delay == -1 { // linear delay - delay = c.config.defaultRequeueDelay * time.Duration(m.Attempts) + delay = c.config.DefaultRequeueDelay * time.Duration(m.Attempts) // bound the requeueDelay to configured max - if delay > c.config.maxRequeueDelay { - delay = c.config.maxRequeueDelay + if delay > c.config.MaxRequeueDelay { + delay = c.config.MaxRequeueDelay } } c.msgResponseChan <- &msgResponse{m, Requeue(m.ID, delay), false, backoff} diff --git a/consumer.go b/consumer.go index b5d5c076..025290e0 100644 --- a/consumer.go +++ b/consumer.go @@ -70,6 +70,7 @@ type Consumer struct { messagesRequeued uint64 totalRdyCount int64 backoffDuration int64 + maxInFlight int32 mtx sync.RWMutex @@ -79,7 +80,7 @@ type Consumer struct { id int64 topic string channel string - config *Config + config Config backoffChan chan bool rdyChan chan *Conn @@ -111,10 +112,14 @@ type Consumer struct { // NewConsumer creates a new instance of Consumer for the specified topic/channel // -// The returned Consumer instance is setup with sane default values. To modify -// configuration, update the values on the returned instance before connecting. +// The only valid way to create a Config is via NewConfig, using a struct literal will panic. +// After Config is passed into NewConsumer the values are no longer mutable (they are copied). func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) { - config.initialize() + config.assertInitialized() + + if err := config.Validate(); err != nil { + return nil, err + } if !IsValidTopicName(topic) { return nil, errors.New("invalid topic name") @@ -129,10 +134,11 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error topic: topic, channel: channel, - config: config, + config: *config, - logger: log.New(os.Stderr, "", log.Flags()), - logLvl: LogLevelInfo, + logger: log.New(os.Stderr, "", log.Flags()), + logLvl: LogLevelInfo, + maxInFlight: int32(config.MaxInFlight), incomingMessages: make(chan *Message), @@ -173,7 +179,7 @@ func (r *Consumer) SetLogger(logger *log.Logger, lvl LogLevel) { // This may change dynamically based on the number of connections to nsqd the Consumer // is responsible for. func (r *Consumer) perConnMaxInFlight() int64 { - b := float64(r.maxInFlight()) + b := float64(r.getMaxInFlight()) s := b / float64(len(r.conns())) return int64(math.Min(math.Max(1, s), b)) } @@ -191,25 +197,22 @@ func (r *Consumer) IsStarved() bool { return false } -func (r *Consumer) maxInFlight() int { - r.config.RLock() - mif := r.config.maxInFlight - r.config.RUnlock() - return mif +func (r *Consumer) getMaxInFlight() int32 { + return atomic.LoadInt32(&r.maxInFlight) } -// SetMaxInFlight sets the maximum number of messages this comsumer instance -// will allow in-flight. +// ChangeMaxInFlight sets a new maximum number of messages this comsumer instance +// will allow in-flight, and updates all existing connections as appropriate. // +// For example, ChangeMaxInFlight(0) would pause message flow +// // If already connected, it updates the reader RDY state for each connection. -func (r *Consumer) SetMaxInFlight(maxInFlight int) { - r.config.RLock() - mif := r.config.maxInFlight - r.config.RUnlock() - if mif == maxInFlight { +func (r *Consumer) ChangeMaxInFlight(maxInFlight int) { + if r.getMaxInFlight() == int32(maxInFlight) { return } - r.config.Set("max_in_flight", maxInFlight) + + atomic.StoreInt32(&r.maxInFlight, int32(maxInFlight)) for _, c := range r.conns() { r.rdyChan <- c @@ -282,8 +285,8 @@ func (r *Consumer) lookupdLoop() { // add some jitter so that multiple consumers discovering the same topic, // when restarted at the same time, dont all connect at once. jitter := time.Duration(int64(rng.Float64() * - r.config.lookupdPollJitter * float64(r.config.lookupdPollInterval))) - ticker := time.NewTicker(r.config.lookupdPollInterval) + r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval))) + ticker := time.NewTicker(r.config.LookupdPollInterval) select { case <-time.After(jitter): @@ -424,7 +427,7 @@ func (r *Consumer) ConnectToNSQD(addr string) error { r.log(LogLevelInfo, "(%s) connecting to nsqd", addr) - conn := NewConn(addr, r.config) + conn := NewConn(addr, &r.config) conn.SetLogger(r.logger, r.logLvl, fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)) conn.Delegate = &consumerConnDelegate{r} @@ -445,10 +448,10 @@ func (r *Consumer) ConnectToNSQD(addr string) error { } if resp != nil { - if resp.MaxRdyCount < int64(r.maxInFlight()) { + if resp.MaxRdyCount < int64(r.getMaxInFlight()) { r.log(LogLevelWarning, "(%s) max RDY count %d < consumer max in flight %d, truncation possible", - conn.String(), resp.MaxRdyCount, r.maxInFlight()) + conn.String(), resp.MaxRdyCount, r.getMaxInFlight()) } } @@ -541,7 +544,7 @@ func (r *Consumer) onConnClose(c *Conn) { r.log(LogLevelWarning, "there are %d connections left alive", left) if (hasRDYRetryTimer || rdyCount > 0) && - (left == r.maxInFlight() || r.inBackoff()) { + (int32(left) == r.getMaxInFlight() || r.inBackoff()) { // we're toggling out of (normal) redistribution cases and this conn // had a RDY count... // @@ -586,10 +589,10 @@ func (r *Consumer) onConnClose(c *Conn) { } func (r *Consumer) backoffDurationForCount(count int32) time.Duration { - backoffDuration := r.config.backoffMultiplier * + backoffDuration := r.config.BackoffMultiplier * time.Duration(math.Pow(2, float64(count))) - if backoffDuration > r.config.maxBackoffDuration { - backoffDuration = r.config.maxBackoffDuration + if backoffDuration > r.config.MaxBackoffDuration { + backoffDuration = r.config.MaxBackoffDuration } return backoffDuration } @@ -673,7 +676,7 @@ func (r *Consumer) rdyLoop() { } } else { maxBackoffCount := int32(math.Max(1, math.Ceil( - math.Log2(r.config.maxBackoffDuration.Seconds())))) + math.Log2(r.config.MaxBackoffDuration.Seconds())))) if backoffCounter < maxBackoffCount { backoffCounter++ backoffUpdated = true @@ -746,7 +749,7 @@ func (r *Consumer) updateRDY(c *Conn, count int64) error { // never exceed our global max in flight. truncate if possible. // this could help a new connection get partial max-in-flight rdyCount := c.RDY() - maxPossibleRdy := int64(r.maxInFlight()) - atomic.LoadInt64(&r.totalRdyCount) + rdyCount + maxPossibleRdy := int64(r.getMaxInFlight()) - atomic.LoadInt64(&r.totalRdyCount) + rdyCount if maxPossibleRdy > 0 && maxPossibleRdy < count { count = maxPossibleRdy } @@ -789,8 +792,8 @@ func (r *Consumer) redistributeRDY() { return } - numConns := len(r.conns()) - maxInFlight := r.maxInFlight() + numConns := int32(len(r.conns())) + maxInFlight := r.getMaxInFlight() if numConns > maxInFlight { r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)", numConns, maxInFlight) @@ -813,7 +816,7 @@ func (r *Consumer) redistributeRDY() { rdyCount := c.RDY() r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)", c.String(), rdyCount, lastMsgDuration) - if rdyCount > 0 && lastMsgDuration > r.config.lowRdyIdleTimeout { + if rdyCount > 0 && lastMsgDuration > r.config.LowRdyIdleTimeout { r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String()) r.updateRDY(c, 0) } @@ -932,7 +935,7 @@ exit: func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool { // message passed the max number of attempts - if r.config.maxAttempts > 0 && message.Attempts > r.config.maxAttempts { + if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts { r.log(LogLevelWarning, "msg %s attempted %d times, giving up", message.ID, message.Attempts) diff --git a/consumer_test.go b/consumer_test.go index 7594f696..622c5c1d 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -23,6 +23,8 @@ type MyTestHandler struct { messagesFailed int } +var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags) + func (h *MyTestHandler) LogFailedMessage(message *Message) { h.messagesFailed++ h.q.Stop() @@ -84,7 +86,6 @@ func TestConsumerTLSSnappy(t *testing.T) { } func consumerTest(t *testing.T, deflate bool, snappy bool, tlsv1 bool) { - logger := log.New(ioutil.Discard, "", log.LstdFlags) topicName := "rdr_test" if deflate { @@ -99,20 +100,19 @@ func consumerTest(t *testing.T, deflate bool, snappy bool, tlsv1 bool) { config := NewConfig() // so that the test can simulate reaching max requeues and a call to LogFailedMessage - config.Set("default_requeue_delay", 0) + config.DefaultRequeueDelay = 0 // so that the test wont timeout from backing off - config.Set("max_backoff_duration", time.Millisecond*50) - config.Set("deflate", deflate) - config.Set("deflate_level", 6) - config.Set("snappy", snappy) - config.Set("tls_v1", tlsv1) + config.MaxBackoffDuration = time.Millisecond * 50 + config.Deflate = deflate + config.Snappy = snappy + config.TlsV1 = tlsv1 if tlsv1 { - config.Set("tls_config", &tls.Config{ + config.TlsConfig = &tls.Config{ InsecureSkipVerify: true, - }) + } } q, _ := NewConsumer(topicName, "ch", config) - q.SetLogger(logger, LogLevelInfo) + q.SetLogger(nullLogger, LogLevelInfo) h := &MyTestHandler{ t: t, diff --git a/mock_test.go b/mock_test.go index 5d6ea55f..ccee1cfd 100644 --- a/mock_test.go +++ b/mock_test.go @@ -218,8 +218,8 @@ func TestConsumerBackoff(t *testing.T) { topicName := "test_consumer_commands" + strconv.Itoa(int(time.Now().Unix())) config := NewConfig() - config.Set("max_in_flight", 5) - config.Set("backoff_multiplier", 10*time.Millisecond) + config.MaxInFlight = 5 + config.BackoffMultiplier = 10 * time.Millisecond q, _ := NewConsumer(topicName, "ch", config) q.SetLogger(logger, LogLevelDebug) q.SetHandler(&testHandler{}) diff --git a/producer.go b/producer.go index fc242c2d..380eee16 100644 --- a/producer.go +++ b/producer.go @@ -18,7 +18,7 @@ type Producer struct { id int64 addr string conn *Conn - config *Config + config Config logger *log.Logger logLvl LogLevel @@ -57,13 +57,21 @@ func (t *ProducerTransaction) finish() { } // NewProducer returns an instance of Producer for the specified address -func NewProducer(addr string, config *Config) *Producer { - config.initialize() - return &Producer{ +// +// The only valid way to create a Config is via NewConfig, using a struct literal will panic. +// After Config is passed into NewProducer the values are no longer mutable (they are copied). +func NewProducer(addr string, config *Config) (*Producer, error) { + config.assertInitialized() + err := config.Validate() + if err != nil { + return nil, err + } + + p := &Producer{ id: atomic.AddInt64(&instCount, 1), addr: addr, - config: config, + config: *config, logger: log.New(os.Stderr, "", log.Flags()), logLvl: LogLevelInfo, @@ -76,6 +84,7 @@ func NewProducer(addr string, config *Config) *Producer { heartbeatChan: make(chan int), closeChan: make(chan int), } + return p, nil } // SetLogger assigns the logger to use as well as a level @@ -111,7 +120,7 @@ func (w *Producer) Stop() { // When the Producer eventually receives the response from `nsqd`, // the supplied `doneChan` (if specified) // will receive a `ProducerTransaction` instance with the supplied variadic arguments -// (and the response `FrameType`, `Data`, and `Error`) +// and the response error if present func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction, args ...interface{}) error { return w.sendCommandAsync(Publish(topic, body), doneChan, args) @@ -123,7 +132,7 @@ func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *Produc // When the Producer eventually receives the response from `nsqd`, // the supplied `doneChan` (if specified) // will receive a `ProducerTransaction` instance with the supplied variadic arguments -// (and the response `FrameType`, `Data`, and `Error`) +// and the response error if present func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction, args ...interface{}) error { cmd, err := MultiPublish(topic, body) @@ -134,13 +143,13 @@ func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan } // Publish synchronously publishes a message body to the specified topic, returning -// the response frameType, data, and error +// the an error if publish failed func (w *Producer) Publish(topic string, body []byte) error { return w.sendCommand(Publish(topic, body)) } // MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning -// the response frameType, data, and error +// the an error if publish failed func (w *Producer) MultiPublish(topic string, body [][]byte) error { cmd, err := MultiPublish(topic, body) if err != nil { @@ -203,7 +212,7 @@ func (w *Producer) connect() error { w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr) - conn := NewConn(w.addr, w.config) + conn := NewConn(w.addr, &w.config) conn.SetLogger(w.logger, w.logLvl, fmt.Sprintf("%3d (%%s)", w.id)) conn.Delegate = &producerConnDelegate{w} diff --git a/producer_test.go b/producer_test.go index 4ae17cff..73ca37ac 100644 --- a/producer_test.go +++ b/producer_test.go @@ -39,7 +39,8 @@ func TestProducerConnection(t *testing.T) { defer log.SetOutput(os.Stdout) config := NewConfig() - w := NewProducer("127.0.0.1:4150", config) + w, _ := NewProducer("127.0.0.1:4150", config) + w.SetLogger(nullLogger, LogLevelInfo) err := w.Publish("write_test", []byte("test")) if err != nil { @@ -62,7 +63,8 @@ func TestProducerPublish(t *testing.T) { msgCount := 10 config := NewConfig() - w := NewProducer("127.0.0.1:4150", config) + w, _ := NewProducer("127.0.0.1:4150", config) + w.SetLogger(nullLogger, LogLevelInfo) defer w.Stop() for i := 0; i < msgCount; i++ { @@ -88,7 +90,8 @@ func TestProducerMultiPublish(t *testing.T) { msgCount := 10 config := NewConfig() - w := NewProducer("127.0.0.1:4150", config) + w, _ := NewProducer("127.0.0.1:4150", config) + w.SetLogger(nullLogger, LogLevelInfo) defer w.Stop() var testData [][]byte @@ -117,7 +120,8 @@ func TestProducerPublishAsync(t *testing.T) { msgCount := 10 config := NewConfig() - w := NewProducer("127.0.0.1:4150", config) + w, _ := NewProducer("127.0.0.1:4150", config) + w.SetLogger(nullLogger, LogLevelInfo) defer w.Stop() responseChan := make(chan *ProducerTransaction, msgCount) @@ -154,7 +158,8 @@ func TestProducerMultiPublishAsync(t *testing.T) { msgCount := 10 config := NewConfig() - w := NewProducer("127.0.0.1:4150", config) + w, _ := NewProducer("127.0.0.1:4150", config) + w.SetLogger(nullLogger, LogLevelInfo) defer w.Stop() var testData [][]byte @@ -194,8 +199,9 @@ func TestProducerHeartbeat(t *testing.T) { topicName := "heartbeat" + strconv.Itoa(int(time.Now().Unix())) config := NewConfig() - config.Set("heartbeat_interval", 100*time.Millisecond) - w := NewProducer("127.0.0.1:4150", config) + config.HeartbeatInterval = 100 * time.Millisecond + w, _ := NewProducer("127.0.0.1:4150", config) + w.SetLogger(nullLogger, LogLevelInfo) defer w.Stop() err := w.Publish(topicName, []byte("publish_test_case")) @@ -208,8 +214,9 @@ func TestProducerHeartbeat(t *testing.T) { } config = NewConfig() - config.Set("heartbeat_interval", 1000*time.Millisecond) - w = NewProducer("127.0.0.1:4150", config) + config.HeartbeatInterval = 1000 * time.Millisecond + w, _ = NewProducer("127.0.0.1:4150", config) + w.SetLogger(nullLogger, LogLevelInfo) defer w.Stop() err = w.Publish(topicName, []byte("publish_test_case")) @@ -237,9 +244,10 @@ func TestProducerHeartbeat(t *testing.T) { func readMessages(topicName string, t *testing.T, msgCount int) { config := NewConfig() - config.Set("default_requeue_delay", 0) - config.Set("max_backoff_duration", time.Millisecond*50) + config.DefaultRequeueDelay = 0 + config.MaxBackoffDuration = 50 * time.Millisecond q, _ := NewConsumer(topicName, "ch", config) + q.SetLogger(nullLogger, LogLevelInfo) h := &ConsumerHandler{ t: t, diff --git a/protocol.go b/protocol.go index dbf52bb2..82e09356 100644 --- a/protocol.go +++ b/protocol.go @@ -5,7 +5,6 @@ import ( "errors" "io" "regexp" - "time" ) // MagicV1 is the initial identifier sent when connecting for V1 clients @@ -21,9 +20,6 @@ const ( FrameTypeMessage int32 = 2 ) -// The amount of time nsqd will allow a client to idle, can be overriden -const DefaultClientTimeout = 60 * time.Second - var validTopicNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+$`) var validChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)