Skip to content

Commit

Permalink
Merge pull request #2164 from Shopify/dnwe/return-sasl-failure-message
Browse files Browse the repository at this point in the history
fix: return underlying sasl error message
  • Loading branch information
dnwe authored Feb 27, 2022
2 parents 34bcb7c + f436782 commit 3f90bc7
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
12 changes: 9 additions & 3 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
return res.Err
}

DebugLogger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
DebugLogger.Print("Completed pre-auth SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
return nil
}

Expand Down Expand Up @@ -1268,7 +1268,9 @@ func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {

// With v1 sasl we get an error message set in the response we can return
if err != nil {
Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
Logger.Printf(
"Error returned from broker %s during SASL authentication: %v\n",
b.addr, err.Error())
return err
}

Expand Down Expand Up @@ -1579,7 +1581,11 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl
}

if !errors.Is(res.Err, ErrNoError) {
return bytesRead, res.Err
var err error = res.Err
if res.ErrorMessage != nil {
err = Wrap(res.Err, errors.New(*res.ErrorMessage))
}
return bytesRead, err
}

return bytesRead, nil
Expand Down
20 changes: 18 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"errors"
"fmt"
"strings"

"github.com/hashicorp/go-multierror"
)
Expand Down Expand Up @@ -63,8 +64,23 @@ var ErrReassignPartitions = errors.New("failed to reassign partitions for topic"
// ErrDeleteRecords is the type of error returned when fail to delete the required records
var ErrDeleteRecords = errors.New("kafka server: failed to delete records")

// The formatter used to format multierrors
var MultiErrorFormat multierror.ErrorFormatFunc
// MultiErrorFormat specifies the formatter applied to format multierrors. The
// default implementation is a consensed version of the hashicorp/go-multierror
// default one
var MultiErrorFormat multierror.ErrorFormatFunc = func(es []error) string {
if len(es) == 1 {
return es[0].Error()
}

points := make([]string, len(es))
for i, err := range es {
points[i] = fmt.Sprintf("* %s", err)
}

return fmt.Sprintf(
"%d errors occurred:\n\t%s\n",
len(es), strings.Join(points, "\n\t"))
}

type sentinelError struct {
sentinel error
Expand Down
4 changes: 2 additions & 2 deletions errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"testing"
)

func TestSentinelWithWrappedError(t *testing.T) {
func TestSentinelWithSingleWrappedError(t *testing.T) {
t.Parallel()
myNetError := &net.OpError{Op: "mock", Err: errors.New("op error")}
error := Wrap(ErrOutOfBrokers, myNetError)

expected := fmt.Sprintf("%s: 1 error occurred:\n\t* %s\n\n", ErrOutOfBrokers, myNetError)
expected := fmt.Sprintf("%s: %s", ErrOutOfBrokers, myNetError)
actual := error.Error()
if actual != expected {
t.Errorf("unexpected value '%s' vs '%v'", expected, actual)
Expand Down

0 comments on commit 3f90bc7

Please sign in to comment.