diff --git a/conn.go b/conn.go index 66576862..97a15ec0 100644 --- a/conn.go +++ b/conn.go @@ -65,9 +65,10 @@ type Conn struct { delegate ConnDelegate - logger logger - logLvl LogLevel - logFmt string + logger logger + logLvl LogLevel + logFmt string + logGuard sync.RWMutex r io.Reader w io.Writer @@ -121,6 +122,9 @@ func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn { // Output(calldepth int, s string) // func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) { + c.logGuard.Lock() + defer c.logGuard.Unlock() + c.logger = l c.logLvl = lvl c.logFmt = format @@ -129,6 +133,13 @@ func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) { } } +func (c *Conn) getLogger() (logger, LogLevel, string) { + c.logGuard.RLock() + defer c.logGuard.RUnlock() + + return c.logger, c.logLvl, c.logFmt +} + // Connect dials and bootstraps the nsqd connection // (including IDENTIFY) and returns the IdentifyResponse func (c *Conn) Connect() (*IdentifyResponse, error) { @@ -693,15 +704,17 @@ func (c *Conn) onMessageTouch(m *Message) { } func (c *Conn) log(lvl LogLevel, line string, args ...interface{}) { - if c.logger == nil { + logger, logLvl, logFmt := c.getLogger() + + if logger == nil { return } - if c.logLvl > lvl { + if logLvl > lvl { return } - c.logger.Output(2, fmt.Sprintf("%-4s %s %s", logPrefix(lvl), - fmt.Sprintf(c.logFmt, c.String()), + logger.Output(2, fmt.Sprintf("%-4s %s %s", logPrefix(lvl), + fmt.Sprintf(logFmt, c.String()), fmt.Sprintf(line, args...))) } diff --git a/consumer.go b/consumer.go index 7a4203cb..cbef610d 100644 --- a/consumer.go +++ b/consumer.go @@ -86,8 +86,9 @@ type Consumer struct { mtx sync.RWMutex - logger logger - logLvl LogLevel + logger logger + logLvl LogLevel + logGuard sync.RWMutex behaviorDelegate interface{} @@ -206,10 +207,20 @@ func (r *Consumer) conns() []*Conn { // Output(calldepth int, s string) // func (r *Consumer) SetLogger(l logger, lvl LogLevel) { + r.logGuard.Lock() + defer r.logGuard.Unlock() + r.logger = l r.logLvl = lvl } +func (r *Consumer) getLogger() (logger, LogLevel) { + r.logGuard.RLock() + defer r.logGuard.RUnlock() + + return r.logger, r.logLvl +} + // SetBehaviorDelegate takes a type implementing one or more // of the following interfaces that modify the behavior // of the `Consumer`: @@ -482,8 +493,10 @@ func (r *Consumer) ConnectToNSQD(addr string) error { atomic.StoreInt32(&r.connectedFlag, 1) + logger, logLvl := r.getLogger() + conn := NewConn(addr, &r.config, &consumerConnDelegate{r}) - conn.SetLogger(r.logger, r.logLvl, + conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)) r.mtx.Lock() @@ -1090,15 +1103,17 @@ func (r *Consumer) exit() { } func (r *Consumer) log(lvl LogLevel, line string, args ...interface{}) { - if r.logger == nil { + logger, logLvl := r.getLogger() + + if logger == nil { return } - if r.logLvl > lvl { + if logLvl > lvl { return } - r.logger.Output(2, fmt.Sprintf("%-4s %3d [%s/%s] %s", + logger.Output(2, fmt.Sprintf("%-4s %3d [%s/%s] %s", logPrefix(lvl), r.id, r.topic, r.channel, fmt.Sprintf(line, args...))) } diff --git a/producer.go b/producer.go index 5dac12d1..f2fe9d02 100644 --- a/producer.go +++ b/producer.go @@ -20,8 +20,9 @@ type Producer struct { conn *Conn config Config - logger logger - logLvl LogLevel + logger logger + logLvl LogLevel + logGuard sync.RWMutex responseChan chan []byte errorChan chan []byte @@ -90,10 +91,20 @@ func NewProducer(addr string, config *Config) (*Producer, error) { // Output(calldepth int, s string) // func (w *Producer) SetLogger(l logger, lvl LogLevel) { + w.logGuard.Lock() + defer w.logGuard.Unlock() + w.logger = l w.logLvl = lvl } +func (w *Producer) getLogger() (logger, LogLevel) { + w.logGuard.RLock() + defer w.logGuard.RUnlock() + + return w.logger, w.logLvl +} + // String returns the address of the Producer func (w *Producer) String() string { return w.addr @@ -213,8 +224,11 @@ func (w *Producer) connect() error { w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr) + logger, logLvl := w.getLogger() + w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w}) - w.conn.SetLogger(w.logger, w.logLvl, fmt.Sprintf("%3d (%%s)", w.id)) + w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id)) + _, err := w.conn.Connect() if err != nil { w.conn.Close() @@ -301,21 +315,22 @@ func (w *Producer) transactionCleanup() { } // give the runtime a chance to schedule other racing goroutines time.Sleep(5 * time.Millisecond) - continue } } } func (w *Producer) log(lvl LogLevel, line string, args ...interface{}) { - if w.logger == nil { + logger, logLvl := w.getLogger() + + if logger == nil { return } - if w.logLvl > lvl { + if logLvl > lvl { return } - w.logger.Output(2, fmt.Sprintf("%-4s %3d %s", logPrefix(lvl), w.id, fmt.Sprintf(line, args...))) + logger.Output(2, fmt.Sprintf("%-4s %3d %s", logPrefix(lvl), w.id, fmt.Sprintf(line, args...))) } func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan <- data }