Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use DebugLogger reference for goldenpath log #2018

Merged
merged 1 commit into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (b *Broker) Open(conf *Config) error {
if b.connErr != nil {
err = b.conn.Close()
if err == nil {
Logger.Printf("Closed connection to broker %s\n", b.addr)
DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}
Expand All @@ -204,9 +204,9 @@ func (b *Broker) Open(conf *Config) error {
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)

if b.id >= 0 {
Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
DebugLogger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
} else {
Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
DebugLogger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
}
go withRecover(b.responseReceiver)
})
Expand Down Expand Up @@ -245,7 +245,7 @@ func (b *Broker) Close() error {
b.unregisterMetrics()

if err == nil {
Logger.Printf("Closed connection to broker %s\n", b.addr)
DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}
Expand Down Expand Up @@ -1044,7 +1044,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
return res.Err
}

Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
DebugLogger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
return nil
}

Expand Down Expand Up @@ -1117,7 +1117,7 @@ func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
return err
}

Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
DebugLogger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
return nil
}

Expand Down Expand Up @@ -1275,7 +1275,7 @@ func (b *Broker) sendAndReceiveSASLSCRAMv0() error {
}
}

Logger.Println("SASL authentication succeeded")
DebugLogger.Println("SASL authentication succeeded")
return nil
}

Expand Down Expand Up @@ -1323,7 +1323,7 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
}
}

Logger.Println("SASL authentication succeeded")
DebugLogger.Println("SASL authentication succeeded")
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type client struct {
// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
// be retrieved from any of the given broker addresses, the client is not created.
func NewClient(addrs []string, conf *Config) (Client, error) {
Logger.Println("Initializing new client")
DebugLogger.Println("Initializing new client")

if conf == nil {
conf = NewConfig()
Expand Down Expand Up @@ -182,7 +182,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
}
go withRecover(client.backgroundMetadataUpdater)

Logger.Println("Successfully initialized new client")
DebugLogger.Println("Successfully initialized new client")

return client, nil
}
Expand Down Expand Up @@ -247,7 +247,7 @@ func (client *client) Close() error {

client.lock.Lock()
defer client.lock.Unlock()
Logger.Println("Closing Client")
DebugLogger.Println("Closing Client")

for _, broker := range client.brokers {
safeAsyncClose(broker)
Expand Down Expand Up @@ -614,7 +614,7 @@ func (client *client) updateBroker(brokers []*Broker) {
currentBroker[broker.ID()] = broker
if client.brokers[broker.ID()] == nil { // add new broker
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Expand Down Expand Up @@ -642,7 +642,7 @@ func (client *client) registerBroker(broker *Broker) {

if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Expand All @@ -664,7 +664,7 @@ func (client *client) deregisterBroker(broker *Broker) {
// but we really shouldn't have to; once that loop is made better this case can be
// removed, and the function generally can be renamed from `deregisterBroker` to
// `nextSeedBroker` or something
Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
}
}
Expand Down Expand Up @@ -880,10 +880,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
for ; broker != nil && !pastDeadline(0); broker = client.any() {
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
} else {
allowAutoTopicCreation = false
Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}

req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
Expand Down Expand Up @@ -1047,7 +1047,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
}

for broker := client.any(); broker != nil; broker = client.any() {
Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())
DebugLogger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())

request := new(FindCoordinatorRequest)
request.CoordinatorKey = consumerGroup
Expand All @@ -1069,7 +1069,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin

switch response.Err {
case ErrNoError:
Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
DebugLogger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
return response, nil

case ErrConsumerCoordinatorNotAvailable:
Expand Down