Skip to content

Commit

Permalink
Merge pull request #495 from Shopify/cleanup-error-formatting
Browse files Browse the repository at this point in the history
Cleanup error formatting
  • Loading branch information
eapache committed Jul 29, 2015
2 parents 1b953bb + 66411d8 commit d4ed94c
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 48 deletions.
2 changes: 1 addition & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (b *Broker) responseReceiver() {
if decodedHeader.correlationID != response.correlationID {
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
response.errors <- PacketDecodingError{fmt.Sprintf("CorrelationID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
response.errors <- PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
continue
}

Expand Down
50 changes: 25 additions & 25 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,71 +183,71 @@ func (c *Config) Validate() error {
// validate Net values
switch {
case c.Net.MaxOpenRequests <= 0:
return ConfigurationError("Invalid Net.MaxOpenRequests, must be > 0")
return ConfigurationError("Net.MaxOpenRequests must be > 0")
case c.Net.DialTimeout <= 0:
return ConfigurationError("Invalid Net.DialTimeout, must be > 0")
return ConfigurationError("Net.DialTimeout must be > 0")
case c.Net.ReadTimeout <= 0:
return ConfigurationError("Invalid Net.ReadTimeout, must be > 0")
return ConfigurationError("Net.ReadTimeout must be > 0")
case c.Net.WriteTimeout <= 0:
return ConfigurationError("Invalid Net.WriteTimeout, must be > 0")
return ConfigurationError("Net.WriteTimeout must be > 0")
case c.Net.KeepAlive < 0:
return ConfigurationError("Invalid Net.KeepAlive, must be >= 0")
return ConfigurationError("Net.KeepAlive must be >= 0")
}

// validate the Metadata values
switch {
case c.Metadata.Retry.Max < 0:
return ConfigurationError("Invalid Metadata.Retry.Max, must be >= 0")
return ConfigurationError("Metadata.Retry.Max must be >= 0")
case c.Metadata.Retry.Backoff < 0:
return ConfigurationError("Invalid Metadata.Retry.Backoff, must be >= 0")
return ConfigurationError("Metadata.Retry.Backoff must be >= 0")
case c.Metadata.RefreshFrequency < 0:
return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
return ConfigurationError("Metadata.RefreshFrequency must be >= 0")
}

// validate the Producer values
switch {
case c.Producer.MaxMessageBytes <= 0:
return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0")
return ConfigurationError("Producer.MaxMessageBytes must be > 0")
case c.Producer.RequiredAcks < -1:
return ConfigurationError("Invalid Producer.RequiredAcks, must be >= -1")
return ConfigurationError("Producer.RequiredAcks must be >= -1")
case c.Producer.Timeout <= 0:
return ConfigurationError("Invalid Producer.Timeout, must be > 0")
return ConfigurationError("Producer.Timeout must be > 0")
case c.Producer.Partitioner == nil:
return ConfigurationError("Invalid Producer.Partitioner, must not be nil")
return ConfigurationError("Producer.Partitioner must not be nil")
case c.Producer.Flush.Bytes < 0:
return ConfigurationError("Invalid Producer.Flush.Bytes, must be >= 0")
return ConfigurationError("Producer.Flush.Bytes must be >= 0")
case c.Producer.Flush.Messages < 0:
return ConfigurationError("Invalid Producer.Flush.Messages, must be >= 0")
return ConfigurationError("Producer.Flush.Messages must be >= 0")
case c.Producer.Flush.Frequency < 0:
return ConfigurationError("Invalid Producer.Flush.Frequency, must be >= 0")
return ConfigurationError("Producer.Flush.Frequency must be >= 0")
case c.Producer.Flush.MaxMessages < 0:
return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= 0")
return ConfigurationError("Producer.Flush.MaxMessages must be >= 0")
case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set")
return ConfigurationError("Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set")
case c.Producer.Retry.Max < 0:
return ConfigurationError("Invalid Producer.Retry.Max, must be >= 0")
return ConfigurationError("Producer.Retry.Max must be >= 0")
case c.Producer.Retry.Backoff < 0:
return ConfigurationError("Invalid Producer.Retry.Backoff, must be >= 0")
return ConfigurationError("Producer.Retry.Backoff must be >= 0")
}

// validate the Consumer values
switch {
case c.Consumer.Fetch.Min <= 0:
return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0")
return ConfigurationError("Consumer.Fetch.Min must be > 0")
case c.Consumer.Fetch.Default <= 0:
return ConfigurationError("Invalid Consumer.Fetch.Default, must be > 0")
return ConfigurationError("Consumer.Fetch.Default must be > 0")
case c.Consumer.Fetch.Max < 0:
return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
return ConfigurationError("Consumer.Fetch.Max must be >= 0")
case c.Consumer.MaxWaitTime < 1*time.Millisecond:
return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
return ConfigurationError("Consumer.MaxWaitTime must be > 1ms")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0")
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
}

// validate misc shared values
switch {
case c.ChannelBufferSize < 0:
return ConfigurationError("Invalid ChannelBufferSize, must be >= 0")
return ConfigurationError("ChannelBufferSize must be >= 0")
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func encode(e encoder) ([]byte, error) {
}

if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
return nil, PacketEncodingError{fmt.Sprintf("Invalid request size: %d", prepEnc.length)}
return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
}

realEnc.raw = make([]byte, prepEnc.length)
Expand Down Expand Up @@ -55,7 +55,7 @@ func decode(buf []byte, in decoder) error {
}

if helper.off != len(buf) {
return PacketDecodingError{"Length was invalid"}
return PacketDecodingError{"invalid length"}
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
// or otherwise failed to respond.
var ErrOutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

// ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
Expand Down Expand Up @@ -44,7 +44,7 @@ type PacketEncodingError struct {
}

func (err PacketEncodingError) Error() string {
return fmt.Sprintf("kafka: Error while encoding packet: %s", err.Info)
return fmt.Sprintf("kafka: error encoding packet: %s", err.Info)
}

// PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
Expand All @@ -54,15 +54,15 @@ type PacketDecodingError struct {
}

func (err PacketDecodingError) Error() string {
return fmt.Sprintf("kafka: Error while decoding packet: %s", err.Info)
return fmt.Sprintf("kafka: error decoding packet: %s", err.Info)
}

// ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer)
// when the specified configuration is invalid.
type ConfigurationError string

func (err ConfigurationError) Error() string {
return "kafka: Invalid Configuration: " + string(err)
return "kafka: invalid configuration (" + string(err) + ")"
}

// KError is the type of error that can be returned directly by the Kafka broker.
Expand Down
2 changes: 1 addition & 1 deletion length_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {

func (l *lengthField) check(curOffset int, buf []byte) error {
if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
return PacketDecodingError{"Lengthfield check failed"}
return PacketDecodingError{"length field invalid"}
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *Message) encode(pe packetEncoder) error {
m.compressedCache = tmp
payload = m.compressedCache
default:
return PacketEncodingError{fmt.Sprintf("Unsupported compression codec: %d", m.Codec)}
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
}
}

Expand All @@ -92,7 +92,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}
if format != messageFormat {
return PacketDecodingError{"Unexpected messageFormat"}
return PacketDecodingError{"unexpected messageFormat"}
}

attribute, err := pd.getInt8()
Expand Down Expand Up @@ -135,7 +135,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
}
return m.decodeSet()
default:
return PacketDecodingError{fmt.Sprintf("Invalid compression specified: %d", m.Codec)}
return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)}
}

err = pd.pop()
Expand Down
8 changes: 4 additions & 4 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (pe *prepEncoder) putInt64(in int64) {

func (pe *prepEncoder) putArrayLength(in int) error {
if in > math.MaxInt32 {
return PacketEncodingError{fmt.Sprintf("Array too long: %d", in)}
return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)}
}
pe.length += 4
return nil
Expand All @@ -43,15 +43,15 @@ func (pe *prepEncoder) putBytes(in []byte) error {
return nil
}
if len(in) > math.MaxInt32 {
return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))}
return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
}
pe.length += len(in)
return nil
}

func (pe *prepEncoder) putRawBytes(in []byte) error {
if len(in) > math.MaxInt32 {
return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))}
return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
}
pe.length += len(in)
return nil
Expand All @@ -60,7 +60,7 @@ func (pe *prepEncoder) putRawBytes(in []byte) error {
func (pe *prepEncoder) putString(in string) error {
pe.length += 2
if len(in) > math.MaxInt16 {
return PacketEncodingError{fmt.Sprintf("String too long: %d", len(in))}
return PacketEncodingError{fmt.Sprintf("string too long (%d)", len(in))}
}
pe.length += len(in)
return nil
Expand Down
10 changes: 5 additions & 5 deletions real_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (rd *realDecoder) getArrayLength() (int, error) {
rd.off = len(rd.raw)
return -1, ErrInsufficientData
} else if tmp > 2*math.MaxUint16 {
return -1, PacketDecodingError{"getArrayLength failed: Invalid array length"}
return -1, PacketDecodingError{"invalid array length"}
}
return tmp, nil
}
Expand All @@ -82,7 +82,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) {

switch {
case n < -1:
return nil, PacketDecodingError{"getBytes failed: Invalid length"}
return nil, PacketDecodingError{"invalid byteslice length"}
case n == -1:
return nil, nil
case n == 0:
Expand All @@ -108,7 +108,7 @@ func (rd *realDecoder) getString() (string, error) {

switch {
case n < -1:
return "", PacketDecodingError{"getString failed: invalid length"}
return "", PacketDecodingError{"invalid string length"}
case n == -1:
return "", nil
case n == 0:
Expand Down Expand Up @@ -141,7 +141,7 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
}

if n < 0 {
return nil, PacketDecodingError{"getInt32Array failed: invalid length"}
return nil, PacketDecodingError{"invalid array length"}
}

ret := make([]int32, n)
Expand Down Expand Up @@ -170,7 +170,7 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
}

if n < 0 {
return nil, PacketDecodingError{"getInt64Array failed: invalid length"}
return nil, PacketDecodingError{"invalid array length"}
}

ret := make([]int64, n)
Expand Down
4 changes: 2 additions & 2 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *request) decode(pd packetDecoder) (err error) {

r.body = allocateBody(key, version)
if r.body == nil {
return PacketDecodingError{fmt.Sprintf("Unknown request key: %d", key)}
return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
}
return r.body.decode(pd)
}
Expand All @@ -64,7 +64,7 @@ func decodeRequest(r io.Reader) (req *request, err error) {

length := int32(binary.BigEndian.Uint32(lengthBytes))
if length <= 4 || length > MaxRequestSize {
return nil, PacketDecodingError{fmt.Sprintf("Message of length %d too large or too small", length)}
return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
}

encodedReq := make([]byte, length)
Expand Down
2 changes: 1 addition & 1 deletion response_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func (r *responseHeader) decode(pd packetDecoder) (err error) {
return err
}
if r.length <= 4 || r.length > MaxResponseSize {
return PacketDecodingError{fmt.Sprintf("Message of length %d too large or too small", r.length)}
return PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", r.length)}
}

r.correlationID, err = pd.getInt32()
Expand Down

0 comments on commit d4ed94c

Please sign in to comment.