From fa5a0cb0a625ae65b64793d2a98cb7538190f7cf Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Mon, 13 Sep 2021 15:34:02 +0100 Subject: [PATCH] feat: use DebugLogger reference for goldenpath log Replace some of the more verbose golden path log statements with a call to DebugLogger rather than Logger. With the out of the box Sarama configuration, these will still just get proxied to whatever the regular Logger is set to use. However, the caller can override DebugLogger with their own implementation which may include a Discard logger. --- broker.go | 16 ++++++++-------- client.go | 20 ++++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/broker.go b/broker.go index cddfa6463..cb4e9cf8d 100644 --- a/broker.go +++ b/broker.go @@ -190,7 +190,7 @@ func (b *Broker) Open(conf *Config) error { if b.connErr != nil { err = b.conn.Close() if err == nil { - Logger.Printf("Closed connection to broker %s\n", b.addr) + DebugLogger.Printf("Closed connection to broker %s\n", b.addr) } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } @@ -204,9 +204,9 @@ func (b *Broker) Open(conf *Config) error { b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1) if b.id >= 0 { - Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id) + DebugLogger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id) } else { - Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr) + DebugLogger.Printf("Connected to broker at %s (unregistered)\n", b.addr) } go withRecover(b.responseReceiver) }) @@ -245,7 +245,7 @@ func (b *Broker) Close() error { b.unregisterMetrics() if err == nil { - Logger.Printf("Closed connection to broker %s\n", b.addr) + DebugLogger.Printf("Closed connection to broker %s\n", b.addr) } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } @@ -1044,7 +1044,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int return res.Err } - Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms) + DebugLogger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms) return nil } @@ -1117,7 +1117,7 @@ func (b *Broker) sendAndReceiveV0SASLPlainAuth() error { return err } - Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) + DebugLogger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) return nil } @@ -1275,7 +1275,7 @@ func (b *Broker) sendAndReceiveSASLSCRAMv0() error { } } - Logger.Println("SASL authentication succeeded") + DebugLogger.Println("SASL authentication succeeded") return nil } @@ -1323,7 +1323,7 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error { } } - Logger.Println("SASL authentication succeeded") + DebugLogger.Println("SASL authentication succeeded") return nil } diff --git a/client.go b/client.go index f0cfb0d32..1c69cb8d6 100644 --- a/client.go +++ b/client.go @@ -138,7 +138,7 @@ type client struct { // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot // be retrieved from any of the given broker addresses, the client is not created. func NewClient(addrs []string, conf *Config) (Client, error) { - Logger.Println("Initializing new client") + DebugLogger.Println("Initializing new client") if conf == nil { conf = NewConfig() @@ -182,7 +182,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) { } go withRecover(client.backgroundMetadataUpdater) - Logger.Println("Successfully initialized new client") + DebugLogger.Println("Successfully initialized new client") return client, nil } @@ -247,7 +247,7 @@ func (client *client) Close() error { client.lock.Lock() defer client.lock.Unlock() - Logger.Println("Closing Client") + DebugLogger.Println("Closing Client") for _, broker := range client.brokers { safeAsyncClose(broker) @@ -614,7 +614,7 @@ func (client *client) updateBroker(brokers []*Broker) { currentBroker[broker.ID()] = broker if client.brokers[broker.ID()] == nil { // add new broker client.brokers[broker.ID()] = broker - Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) + DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) } else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address safeAsyncClose(client.brokers[broker.ID()]) client.brokers[broker.ID()] = broker @@ -642,7 +642,7 @@ func (client *client) registerBroker(broker *Broker) { if client.brokers[broker.ID()] == nil { client.brokers[broker.ID()] = broker - Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) + DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) } else if broker.Addr() != client.brokers[broker.ID()].Addr() { safeAsyncClose(client.brokers[broker.ID()]) client.brokers[broker.ID()] = broker @@ -664,7 +664,7 @@ func (client *client) deregisterBroker(broker *Broker) { // but we really shouldn't have to; once that loop is made better this case can be // removed, and the function generally can be renamed from `deregisterBroker` to // `nextSeedBroker` or something - Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) + DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) delete(client.brokers, broker.ID()) } } @@ -880,10 +880,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, for ; broker != nil && !pastDeadline(0); broker = client.any() { allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation if len(topics) > 0 { - Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr) + DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr) } else { allowAutoTopicCreation = false - Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr) + DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr) } req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation} @@ -1047,7 +1047,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin } for broker := client.any(); broker != nil; broker = client.any() { - Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr()) + DebugLogger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr()) request := new(FindCoordinatorRequest) request.CoordinatorKey = consumerGroup @@ -1069,7 +1069,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin switch response.Err { case ErrNoError: - Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr()) + DebugLogger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr()) return response, nil case ErrConsumerCoordinatorNotAvailable: