Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: return underlying sasl error message #2164

Merged
merged 2 commits into from
Feb 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
return res.Err
}

DebugLogger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
DebugLogger.Print("Completed pre-auth SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
return nil
}

Expand Down Expand Up @@ -1268,7 +1268,9 @@ func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {

// With v1 sasl we get an error message set in the response we can return
if err != nil {
Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
Logger.Printf(
"Error returned from broker %s during SASL authentication: %v\n",
b.addr, err.Error())
return err
}

Expand Down Expand Up @@ -1579,7 +1581,11 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl
}

if !errors.Is(res.Err, ErrNoError) {
return bytesRead, res.Err
var err error = res.Err
if res.ErrorMessage != nil {
err = Wrap(res.Err, errors.New(*res.ErrorMessage))
}
return bytesRead, err
}

return bytesRead, nil
Expand Down
20 changes: 18 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"errors"
"fmt"
"strings"

"github.com/hashicorp/go-multierror"
)
Expand Down Expand Up @@ -63,8 +64,23 @@ var ErrReassignPartitions = errors.New("failed to reassign partitions for topic"
// ErrDeleteRecords is the type of error returned when fail to delete the required records
var ErrDeleteRecords = errors.New("kafka server: failed to delete records")

// The formatter used to format multierrors
var MultiErrorFormat multierror.ErrorFormatFunc
// MultiErrorFormat specifies the formatter applied to format multierrors. The
// default implementation is a consensed version of the hashicorp/go-multierror
// default one
var MultiErrorFormat multierror.ErrorFormatFunc = func(es []error) string {
if len(es) == 1 {
return es[0].Error()
}

points := make([]string, len(es))
for i, err := range es {
points[i] = fmt.Sprintf("* %s", err)
}

return fmt.Sprintf(
"%d errors occurred:\n\t%s\n",
len(es), strings.Join(points, "\n\t"))
}

type sentinelError struct {
sentinel error
Expand Down
4 changes: 2 additions & 2 deletions errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"testing"
)

func TestSentinelWithWrappedError(t *testing.T) {
func TestSentinelWithSingleWrappedError(t *testing.T) {
t.Parallel()
myNetError := &net.OpError{Op: "mock", Err: errors.New("op error")}
error := Wrap(ErrOutOfBrokers, myNetError)

expected := fmt.Sprintf("%s: 1 error occurred:\n\t* %s\n\n", ErrOutOfBrokers, myNetError)
expected := fmt.Sprintf("%s: %s", ErrOutOfBrokers, myNetError)
actual := error.Error()
if actual != expected {
t.Errorf("unexpected value '%s' vs '%v'", expected, actual)
Expand Down