Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: export & list defaults #52

Merged
merged 1 commit into from
Jun 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 150 additions & 177 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,116 +9,91 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"
"unsafe"
)

// Config is a struct of NSQ options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(based on the outcome of some other comments) we need to document what the valid ways to create a config are, and specify what happens after a config is passed into a high-level type (i.e. it's copied, so changing the values has no effect)

//
// (see Config.Set() for available parameters)
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no
// longer mutable (they are copied).
//
// Use Set(key string, value interface{}) as an alternate way to set parameters
type Config struct {
sync.RWMutex
initOnce sync.Once

readTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m"`
writeTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m"`

lookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m"`
lookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1"`

maxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m"`
defaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m"`
backoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m"`

maxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535"`
lowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m"`

clientID string `opt:"client_id"`
hostname string `opt:"hostname"`
userAgent string `opt:"user_agent"`

heartbeatInterval time.Duration `opt:"heartbeat_interval"`
sampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

tlsV1 bool `opt:"tls_v1"`
tlsConfig *tls.Config `opt:"tls_config"`

deflate bool `opt:"deflate"`
deflateLevel int `opt:"deflate_level" min:"1" max:"9"`
snappy bool `opt:"snappy"`

outputBufferSize int64 `opt:"output_buffer_size"`
outputBufferTimeout time.Duration `opt:"output_buffer_timeout"`

maxInFlight int `opt:"max_in_flight" min:"0"`

maxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m"`

authSecret string `opt:"auth_secret"`
initialized bool

// Deadlines for network reads and writes
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`

// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
// Amount of time in seconds to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // deprecated (defaults: short hostname)
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`

// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

// TLS Settings
TlsV1 bool `opt:"tls_v1"`
TlsConfig *tls.Config `opt:"tls_config"`

// Compression Settings
Deflate bool `opt:"deflate"`
DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
Snappy bool `opt:"snappy"`

// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`

// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`

// secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few of the comments for these vars are located to the right of the code vs above, let's be consistent and format them all above the respected variable, for readability.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}

// NewConfig returns a new default configuration.
//
// "read_timeout": 60s (min: 100ms, max: 5m) (time.Duration)
// "write_timeout": 1s (min: 100ms, max: 5m) (time.Duration)
// "lookupd_poll_interval": 60s (min: 5s, max: 5m) (time.Duration)
// "lookupd_poll_jitter": 0.3 (min: 0.0, max: 1.0) (float)
// "max_requeue_delay": 15m (min: 0, max: 60m) (time.Duration)
// "default_requeue_delay": 90s (min: 0, max: 60m) (time.Duration)
// "backoff_multiplier": 1s (min: 0, max: 60m) (time.TIme)
// "max_attempts": 5 (min: 0, max: 65535) (int)
// "low_rdy_idle_timeout": 10s (min: 1s, max: 5m) (time.Duration)
// "client_id": "<short host name>" (string)
// "hostname": os.Hostname() (string)
// "user_agent": "go-nsq/<version>" (string)
// "heartbeat_interval": 30s (time.Duration)
// "sample_rate": 0 (min: 0, max: 99) (int)
// "tls_v1": false (bool)
// "tls_config": nil (*tls.Config)
// "deflate": false (bool)
// "deflate_level": 6 (min: 1, max: 9) (int)
// "snappy": false (bool)
// "output_buffer_size": 16384 (int)
// "output_buffer_timeout": 250ms (time.Duration)
// "max_in_flight": 1 (int)
// "max_backoff_duration": 120s (time.Duration)
// "auth_secret": "" (string)
// NewConfig returns a new default nsq configuration.
//
// See Config.Set() for a description of these parameters.
// This must be used to initialize Config structs. Values can be set directly, or through Config.Set()
func NewConfig() *Config {
conf := &Config{}
conf.initialize()
return conf
}

// initialize is used to ensure that a Config has a baseline set of defaults
// despite how it might have been insantiated
func (c *Config) initialize() {
c.initOnce.Do(func() {
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}
c.maxInFlight = 1
c.maxAttempts = 5
c.lookupdPollInterval = 60 * time.Second
c.lookupdPollJitter = 0.3
c.lowRdyIdleTimeout = 10 * time.Second
c.defaultRequeueDelay = 90 * time.Second
c.maxRequeueDelay = 15 * time.Minute
c.backoffMultiplier = time.Second
c.maxBackoffDuration = 120 * time.Second
c.readTimeout = DefaultClientTimeout
c.writeTimeout = time.Second
c.deflateLevel = 6
c.outputBufferSize = 16 * 1024
c.outputBufferTimeout = 250 * time.Millisecond
c.heartbeatInterval = DefaultClientTimeout / 2
c.clientID = strings.Split(hostname, ".")[0]
c.hostname = hostname
c.userAgent = fmt.Sprintf("go-nsq/%s", VERSION)
})
c := &Config{}
c.initialized = true
if err := c.setDefaults(); err != nil {
panic(err.Error())
}
return c
}

// Set takes an option as a string and a value as an interface and
Expand All @@ -140,86 +115,9 @@ func (c *Config) initialize() {
// 1 (an int where 1 == true and 0 == false)
//
// It returns an error for an invalid option or value.
//
// read_timeout (time.Duration): the deadline set for network reads
// (min: 100ms, max: 5m)
//
// write_timeout (time.Duration): the deadline set for network writes
// (min: 100ms, max: 5m)
//
// lookupd_poll_interval (time.Duration): duration between polling lookupd for new
// (min: 5s, max: 5m)
//
// lookupd_poll_jitter (float): fractional amount of jitter to add to the lookupd pool loop,
// this helps evenly distribute requests even if multiple
// consumers restart at the same time.
// (min: 0.0, max: 1.0)
//
// max_requeue_delay (time.Duration): the maximum duration when REQueueing
// (for doubling of deferred requeue)
// (min: 0, max: 60m)
//
// default_requeue_delay (time.Duration): the default duration when REQueueing
// (min: 0, max: 60m)
//
// backoff_multiplier (time.Duration): the unit of time for calculating consumer backoff
// (min: 0, max: 60m)
//
// max_attempts (int): maximum number of times this consumer will attempt to process a message
// (min: 0, max: 65535)
//
// low_rdy_idle_timeout (time.Duration): the amount of time in seconds to wait for a message
// from a producer when in a state where RDY counts
// are re-distributed (ie. max_in_flight < num_producers)
// (min: 1s, max: 5m)
//
// client_id (string): an identifier sent to nsqd representing the client
// (defaults: short hostname)
//
// hostname (string): an identifier sent to nsqd representing the host
// (defaults: long hostname)
//
// user_agent (string): an identifier of the agent for this client (in the spirit of HTTP)
// (default: "<client_library_name>/<version>")
//
// heartbeat_interval (time.Duration): duration of time between heartbeats
//
// sample_rate (int): integer percentage to sample the channel (requires nsqd 0.2.25+)
// (min: 0, max: 99)
//
// tls_v1 (bool): negotiate TLS
//
// tls_config (*tls.Config): client TLS configuration
//
// deflate (bool): negotiate Deflate compression
//
// deflate_level (int): the compression level to negotiate for Deflate
// (min: 1, max: 9)
//
// snappy (bool): negotiate Snappy compression
//
// output_buffer_size (int): size of the buffer (in bytes) used by nsqd for
// buffering writes to this connection
//
// output_buffer_timeout (time.Duration): timeout (in ms) used by nsqd before flushing buffered
// writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
//
// max_in_flight (int): the maximum number of messages to allow in flight (concurrency knob)
//
// max_backoff_duration (time.Duration): the maximum amount of time to backoff when processing fails
// 0 == no backoff
//
// auth_secret (string): secret for nsqd authentication (requires nsqd 0.2.29+)
//
func (c *Config) Set(option string, value interface{}) error {
c.Lock()
defer c.Unlock()

c.initialize()
c.assertInitialized()

val := reflect.ValueOf(c).Elem()
typ := val.Type()
Expand Down Expand Up @@ -261,6 +159,81 @@ func (c *Config) Set(option string, value interface{}) error {
return fmt.Errorf("invalid option %s", option)
}

func (c *Config) assertInitialized() {
if !c.initialized {
panic("Config{} must be created with NewConfig()")
}
}

// Validate checks that all values are within specified min/max ranges
func (c *Config) Validate() error {

c.assertInitialized()

val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)

min := field.Tag.Get("min")
max := field.Tag.Get("max")

if min == "" && max == "" {
continue
}

value := val.FieldByName(field.Name)

if min != "" {
coercedMinVal, _ := coerce(min, field.Type)
if valueCompare(value, coercedMinVal) == -1 {
return fmt.Errorf("invalid %s ! %v < %v",
field.Name, value.Interface(), coercedMinVal.Interface())
}
}
if max != "" {
coercedMaxVal, _ := coerce(max, field.Type)
if valueCompare(value, coercedMaxVal) == 1 {
return fmt.Errorf("invalid %s ! %v > %v",
field.Name, value.Interface(), coercedMaxVal.Interface())
}
}
}

if c.HeartbeatInterval > c.ReadTimeout {
return fmt.Errorf("HeartbeatInterval %v must be less than ReadTimeout %v", c.HeartbeatInterval, c.ReadTimeout)
}
return nil
}

func (c *Config) setDefaults() error {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
opt := field.Tag.Get("opt")
defaultVal := field.Tag.Get("default")
if defaultVal == "" || opt == "" {
continue
}

if err := c.Set(opt, defaultVal); err != nil {
return err
}
}

hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}

c.ClientID = strings.Split(hostname, ".")[0]
c.Hostname = hostname
c.UserAgent = fmt.Sprintf("go-nsq/%s", VERSION)

return nil
}

// because Config contains private structs we can't use reflect.Value
// directly, instead we need to "unsafely" address the variable
func unsafeValueOf(val reflect.Value) reflect.Value {
Expand Down
12 changes: 12 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ func TestConfigSet(t *testing.T) {
t.Errorf("Error setting `tls_v1` config: %v", err)
}
}

func TestConfigValidate(t *testing.T) {
c := NewConfig()
if err := c.Validate(); err != nil {
t.Error("initialized config is invalid")
}
c.DeflateLevel = 100
if err := c.Validate(); err == nil {
t.Error("no error set for invalid value")
}

}
Loading