From b83b8aea94e5ec01c75cc12bd03abb30841867a7 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 29 Jul 2015 13:51:08 -0400 Subject: [PATCH 1/2] Improve config error formatting (get rid of spurious capitals) --- config.go | 50 +++++++++++++++++++++++++------------------------- errors.go | 2 +- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/config.go b/config.go index 881d630a2..264dfa0af 100644 --- a/config.go +++ b/config.go @@ -183,71 +183,71 @@ func (c *Config) Validate() error { // validate Net values switch { case c.Net.MaxOpenRequests <= 0: - return ConfigurationError("Invalid Net.MaxOpenRequests, must be > 0") + return ConfigurationError("Net.MaxOpenRequests must be > 0") case c.Net.DialTimeout <= 0: - return ConfigurationError("Invalid Net.DialTimeout, must be > 0") + return ConfigurationError("Net.DialTimeout must be > 0") case c.Net.ReadTimeout <= 0: - return ConfigurationError("Invalid Net.ReadTimeout, must be > 0") + return ConfigurationError("Net.ReadTimeout must be > 0") case c.Net.WriteTimeout <= 0: - return ConfigurationError("Invalid Net.WriteTimeout, must be > 0") + return ConfigurationError("Net.WriteTimeout must be > 0") case c.Net.KeepAlive < 0: - return ConfigurationError("Invalid Net.KeepAlive, must be >= 0") + return ConfigurationError("Net.KeepAlive must be >= 0") } // validate the Metadata values switch { case c.Metadata.Retry.Max < 0: - return ConfigurationError("Invalid Metadata.Retry.Max, must be >= 0") + return ConfigurationError("Metadata.Retry.Max must be >= 0") case c.Metadata.Retry.Backoff < 0: - return ConfigurationError("Invalid Metadata.Retry.Backoff, must be >= 0") + return ConfigurationError("Metadata.Retry.Backoff must be >= 0") case c.Metadata.RefreshFrequency < 0: - return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0") + return ConfigurationError("Metadata.RefreshFrequency must be >= 0") } // validate the Producer values switch { case c.Producer.MaxMessageBytes <= 0: - return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0") + return ConfigurationError("Producer.MaxMessageBytes must be > 0") case c.Producer.RequiredAcks < -1: - return ConfigurationError("Invalid Producer.RequiredAcks, must be >= -1") + return ConfigurationError("Producer.RequiredAcks must be >= -1") case c.Producer.Timeout <= 0: - return ConfigurationError("Invalid Producer.Timeout, must be > 0") + return ConfigurationError("Producer.Timeout must be > 0") case c.Producer.Partitioner == nil: - return ConfigurationError("Invalid Producer.Partitioner, must not be nil") + return ConfigurationError("Producer.Partitioner must not be nil") case c.Producer.Flush.Bytes < 0: - return ConfigurationError("Invalid Producer.Flush.Bytes, must be >= 0") + return ConfigurationError("Producer.Flush.Bytes must be >= 0") case c.Producer.Flush.Messages < 0: - return ConfigurationError("Invalid Producer.Flush.Messages, must be >= 0") + return ConfigurationError("Producer.Flush.Messages must be >= 0") case c.Producer.Flush.Frequency < 0: - return ConfigurationError("Invalid Producer.Flush.Frequency, must be >= 0") + return ConfigurationError("Producer.Flush.Frequency must be >= 0") case c.Producer.Flush.MaxMessages < 0: - return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= 0") + return ConfigurationError("Producer.Flush.MaxMessages must be >= 0") case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages: - return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set") + return ConfigurationError("Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set") case c.Producer.Retry.Max < 0: - return ConfigurationError("Invalid Producer.Retry.Max, must be >= 0") + return ConfigurationError("Producer.Retry.Max must be >= 0") case c.Producer.Retry.Backoff < 0: - return ConfigurationError("Invalid Producer.Retry.Backoff, must be >= 0") + return ConfigurationError("Producer.Retry.Backoff must be >= 0") } // validate the Consumer values switch { case c.Consumer.Fetch.Min <= 0: - return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0") + return ConfigurationError("Consumer.Fetch.Min must be > 0") case c.Consumer.Fetch.Default <= 0: - return ConfigurationError("Invalid Consumer.Fetch.Default, must be > 0") + return ConfigurationError("Consumer.Fetch.Default must be > 0") case c.Consumer.Fetch.Max < 0: - return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0") + return ConfigurationError("Consumer.Fetch.Max must be >= 0") case c.Consumer.MaxWaitTime < 1*time.Millisecond: - return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms") + return ConfigurationError("Consumer.MaxWaitTime must be > 1ms") case c.Consumer.Retry.Backoff < 0: - return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0") + return ConfigurationError("Consumer.Retry.Backoff must be >= 0") } // validate misc shared values switch { case c.ChannelBufferSize < 0: - return ConfigurationError("Invalid ChannelBufferSize, must be >= 0") + return ConfigurationError("ChannelBufferSize must be >= 0") } return nil diff --git a/errors.go b/errors.go index 0c81f7bb9..65d4edb35 100644 --- a/errors.go +++ b/errors.go @@ -62,7 +62,7 @@ func (err PacketDecodingError) Error() string { type ConfigurationError string func (err ConfigurationError) Error() string { - return "kafka: Invalid Configuration: " + string(err) + return "kafka: invalid configuration (" + string(err) + ")" } // KError is the type of error that can be returned directly by the Kafka broker. From 66411d8222087377937c6f176dbd4f95f32d821e Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 29 Jul 2015 14:00:12 -0400 Subject: [PATCH 2/2] Cleanup a bunch more error formats --- broker.go | 2 +- encoder_decoder.go | 4 ++-- errors.go | 6 +++--- length_field.go | 2 +- message.go | 6 +++--- prep_encoder.go | 8 ++++---- real_decoder.go | 10 +++++----- request.go | 4 ++-- response_header.go | 2 +- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/broker.go b/broker.go index 4f4d948fc..296b8eb48 100644 --- a/broker.go +++ b/broker.go @@ -359,7 +359,7 @@ func (b *Broker) responseReceiver() { if decodedHeader.correlationID != response.correlationID { // TODO if decoded ID < cur ID, discard until we catch up // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response - response.errors <- PacketDecodingError{fmt.Sprintf("CorrelationID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} + response.errors <- PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} continue } diff --git a/encoder_decoder.go b/encoder_decoder.go index a5d4bf2c4..b91efaa0e 100644 --- a/encoder_decoder.go +++ b/encoder_decoder.go @@ -23,7 +23,7 @@ func encode(e encoder) ([]byte, error) { } if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) { - return nil, PacketEncodingError{fmt.Sprintf("Invalid request size: %d", prepEnc.length)} + return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)} } realEnc.raw = make([]byte, prepEnc.length) @@ -55,7 +55,7 @@ func decode(buf []byte, in decoder) error { } if helper.off != len(buf) { - return PacketDecodingError{"Length was invalid"} + return PacketDecodingError{"invalid length"} } return nil diff --git a/errors.go b/errors.go index 65d4edb35..70f2b9bfd 100644 --- a/errors.go +++ b/errors.go @@ -7,7 +7,7 @@ import ( // ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored // or otherwise failed to respond. -var ErrOutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?") +var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)") // ErrClosedClient is the error returned when a method is called on a client that has been closed. var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") @@ -44,7 +44,7 @@ type PacketEncodingError struct { } func (err PacketEncodingError) Error() string { - return fmt.Sprintf("kafka: Error while encoding packet: %s", err.Info) + return fmt.Sprintf("kafka: error encoding packet: %s", err.Info) } // PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. @@ -54,7 +54,7 @@ type PacketDecodingError struct { } func (err PacketDecodingError) Error() string { - return fmt.Sprintf("kafka: Error while decoding packet: %s", err.Info) + return fmt.Sprintf("kafka: error decoding packet: %s", err.Info) } // ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) diff --git a/length_field.go b/length_field.go index cb6556bfd..70078be5d 100644 --- a/length_field.go +++ b/length_field.go @@ -22,7 +22,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error { func (l *lengthField) check(curOffset int, buf []byte) error { if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) { - return PacketDecodingError{"Lengthfield check failed"} + return PacketDecodingError{"length field invalid"} } return nil diff --git a/message.go b/message.go index fdf6d5849..49b19c5a6 100644 --- a/message.go +++ b/message.go @@ -70,7 +70,7 @@ func (m *Message) encode(pe packetEncoder) error { m.compressedCache = tmp payload = m.compressedCache default: - return PacketEncodingError{fmt.Sprintf("Unsupported compression codec: %d", m.Codec)} + return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)} } } @@ -92,7 +92,7 @@ func (m *Message) decode(pd packetDecoder) (err error) { return err } if format != messageFormat { - return PacketDecodingError{"Unexpected messageFormat"} + return PacketDecodingError{"unexpected messageFormat"} } attribute, err := pd.getInt8() @@ -135,7 +135,7 @@ func (m *Message) decode(pd packetDecoder) (err error) { } return m.decodeSet() default: - return PacketDecodingError{fmt.Sprintf("Invalid compression specified: %d", m.Codec)} + return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)} } err = pd.pop() diff --git a/prep_encoder.go b/prep_encoder.go index a2baf00bc..ddeef780e 100644 --- a/prep_encoder.go +++ b/prep_encoder.go @@ -29,7 +29,7 @@ func (pe *prepEncoder) putInt64(in int64) { func (pe *prepEncoder) putArrayLength(in int) error { if in > math.MaxInt32 { - return PacketEncodingError{fmt.Sprintf("Array too long: %d", in)} + return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)} } pe.length += 4 return nil @@ -43,7 +43,7 @@ func (pe *prepEncoder) putBytes(in []byte) error { return nil } if len(in) > math.MaxInt32 { - return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))} + return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))} } pe.length += len(in) return nil @@ -51,7 +51,7 @@ func (pe *prepEncoder) putBytes(in []byte) error { func (pe *prepEncoder) putRawBytes(in []byte) error { if len(in) > math.MaxInt32 { - return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))} + return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))} } pe.length += len(in) return nil @@ -60,7 +60,7 @@ func (pe *prepEncoder) putRawBytes(in []byte) error { func (pe *prepEncoder) putString(in string) error { pe.length += 2 if len(in) > math.MaxInt16 { - return PacketEncodingError{fmt.Sprintf("String too long: %d", len(in))} + return PacketEncodingError{fmt.Sprintf("string too long (%d)", len(in))} } pe.length += len(in) return nil diff --git a/real_decoder.go b/real_decoder.go index 3c2425eef..b194b9bcc 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -64,7 +64,7 @@ func (rd *realDecoder) getArrayLength() (int, error) { rd.off = len(rd.raw) return -1, ErrInsufficientData } else if tmp > 2*math.MaxUint16 { - return -1, PacketDecodingError{"getArrayLength failed: Invalid array length"} + return -1, PacketDecodingError{"invalid array length"} } return tmp, nil } @@ -82,7 +82,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) { switch { case n < -1: - return nil, PacketDecodingError{"getBytes failed: Invalid length"} + return nil, PacketDecodingError{"invalid byteslice length"} case n == -1: return nil, nil case n == 0: @@ -108,7 +108,7 @@ func (rd *realDecoder) getString() (string, error) { switch { case n < -1: - return "", PacketDecodingError{"getString failed: invalid length"} + return "", PacketDecodingError{"invalid string length"} case n == -1: return "", nil case n == 0: @@ -141,7 +141,7 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) { } if n < 0 { - return nil, PacketDecodingError{"getInt32Array failed: invalid length"} + return nil, PacketDecodingError{"invalid array length"} } ret := make([]int32, n) @@ -170,7 +170,7 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) { } if n < 0 { - return nil, PacketDecodingError{"getInt64Array failed: invalid length"} + return nil, PacketDecodingError{"invalid array length"} } ret := make([]int64, n) diff --git a/request.go b/request.go index cd0341f75..d6d5cdfcd 100644 --- a/request.go +++ b/request.go @@ -51,7 +51,7 @@ func (r *request) decode(pd packetDecoder) (err error) { r.body = allocateBody(key, version) if r.body == nil { - return PacketDecodingError{fmt.Sprintf("Unknown request key: %d", key)} + return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)} } return r.body.decode(pd) } @@ -64,7 +64,7 @@ func decodeRequest(r io.Reader) (req *request, err error) { length := int32(binary.BigEndian.Uint32(lengthBytes)) if length <= 4 || length > MaxRequestSize { - return nil, PacketDecodingError{fmt.Sprintf("Message of length %d too large or too small", length)} + return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)} } encodedReq := make([]byte, length) diff --git a/response_header.go b/response_header.go index 89cb363ae..f3f4d27d6 100644 --- a/response_header.go +++ b/response_header.go @@ -13,7 +13,7 @@ func (r *responseHeader) decode(pd packetDecoder) (err error) { return err } if r.length <= 4 || r.length > MaxResponseSize { - return PacketDecodingError{fmt.Sprintf("Message of length %d too large or too small", r.length)} + return PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", r.length)} } r.correlationID, err = pd.getInt32()