Skip to content

Commit

Permalink
Merge pull request #108 from judwhite/master
Browse files Browse the repository at this point in the history
producer/consumer: clean up logging races
  • Loading branch information
jehiah committed Feb 6, 2015
2 parents b30f73e + ffe397d commit e461358
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
27 changes: 20 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ type Conn struct {

delegate ConnDelegate

logger logger
logLvl LogLevel
logFmt string
logger logger
logLvl LogLevel
logFmt string
logGuard sync.RWMutex

r io.Reader
w io.Writer
Expand Down Expand Up @@ -121,6 +122,9 @@ func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
// Output(calldepth int, s string)
//
func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) {
c.logGuard.Lock()
defer c.logGuard.Unlock()

c.logger = l
c.logLvl = lvl
c.logFmt = format
Expand All @@ -129,6 +133,13 @@ func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) {
}
}

func (c *Conn) getLogger() (logger, LogLevel, string) {
c.logGuard.RLock()
defer c.logGuard.RUnlock()

return c.logger, c.logLvl, c.logFmt
}

// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
Expand Down Expand Up @@ -693,15 +704,17 @@ func (c *Conn) onMessageTouch(m *Message) {
}

func (c *Conn) log(lvl LogLevel, line string, args ...interface{}) {
if c.logger == nil {
logger, logLvl, logFmt := c.getLogger()

if logger == nil {
return
}

if c.logLvl > lvl {
if logLvl > lvl {
return
}

c.logger.Output(2, fmt.Sprintf("%-4s %s %s", logPrefix(lvl),
fmt.Sprintf(c.logFmt, c.String()),
logger.Output(2, fmt.Sprintf("%-4s %s %s", logPrefix(lvl),
fmt.Sprintf(logFmt, c.String()),
fmt.Sprintf(line, args...)))
}
27 changes: 21 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ type Consumer struct {

mtx sync.RWMutex

logger logger
logLvl LogLevel
logger logger
logLvl LogLevel
logGuard sync.RWMutex

behaviorDelegate interface{}

Expand Down Expand Up @@ -206,10 +207,20 @@ func (r *Consumer) conns() []*Conn {
// Output(calldepth int, s string)
//
func (r *Consumer) SetLogger(l logger, lvl LogLevel) {
r.logGuard.Lock()
defer r.logGuard.Unlock()

r.logger = l
r.logLvl = lvl
}

func (r *Consumer) getLogger() (logger, LogLevel) {
r.logGuard.RLock()
defer r.logGuard.RUnlock()

return r.logger, r.logLvl
}

// SetBehaviorDelegate takes a type implementing one or more
// of the following interfaces that modify the behavior
// of the `Consumer`:
Expand Down Expand Up @@ -485,8 +496,10 @@ func (r *Consumer) ConnectToNSQD(addr string) error {

atomic.StoreInt32(&r.connectedFlag, 1)

logger, logLvl := r.getLogger()

conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
conn.SetLogger(r.logger, r.logLvl,
conn.SetLogger(logger, logLvl,
fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))

r.mtx.Lock()
Expand Down Expand Up @@ -1094,15 +1107,17 @@ func (r *Consumer) exit() {
}

func (r *Consumer) log(lvl LogLevel, line string, args ...interface{}) {
if r.logger == nil {
logger, logLvl := r.getLogger()

if logger == nil {
return
}

if r.logLvl > lvl {
if logLvl > lvl {
return
}

r.logger.Output(2, fmt.Sprintf("%-4s %3d [%s/%s] %s",
logger.Output(2, fmt.Sprintf("%-4s %3d [%s/%s] %s",
logPrefix(lvl), r.id, r.topic, r.channel,
fmt.Sprintf(line, args...)))
}
29 changes: 22 additions & 7 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type Producer struct {
conn *Conn
config Config

logger logger
logLvl LogLevel
logger logger
logLvl LogLevel
logGuard sync.RWMutex

responseChan chan []byte
errorChan chan []byte
Expand Down Expand Up @@ -90,10 +91,20 @@ func NewProducer(addr string, config *Config) (*Producer, error) {
// Output(calldepth int, s string)
//
func (w *Producer) SetLogger(l logger, lvl LogLevel) {
w.logGuard.Lock()
defer w.logGuard.Unlock()

w.logger = l
w.logLvl = lvl
}

func (w *Producer) getLogger() (logger, LogLevel) {
w.logGuard.RLock()
defer w.logGuard.RUnlock()

return w.logger, w.logLvl
}

// String returns the address of the Producer
func (w *Producer) String() string {
return w.addr
Expand Down Expand Up @@ -213,8 +224,11 @@ func (w *Producer) connect() error {

w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

logger, logLvl := w.getLogger()

w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
w.conn.SetLogger(w.logger, w.logLvl, fmt.Sprintf("%3d (%%s)", w.id))
w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))

_, err := w.conn.Connect()
if err != nil {
w.conn.Close()
Expand Down Expand Up @@ -301,21 +315,22 @@ func (w *Producer) transactionCleanup() {
}
// give the runtime a chance to schedule other racing goroutines
time.Sleep(5 * time.Millisecond)
continue
}
}
}

func (w *Producer) log(lvl LogLevel, line string, args ...interface{}) {
if w.logger == nil {
logger, logLvl := w.getLogger()

if logger == nil {
return
}

if w.logLvl > lvl {
if logLvl > lvl {
return
}

w.logger.Output(2, fmt.Sprintf("%-4s %3d %s", logPrefix(lvl), w.id, fmt.Sprintf(line, args...)))
logger.Output(2, fmt.Sprintf("%-4s %3d %s", logPrefix(lvl), w.id, fmt.Sprintf(line, args...)))
}

func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan <- data }
Expand Down

0 comments on commit e461358

Please sign in to comment.