diff --git a/message.go b/message.go index 7c7062e40..7f1f86b0b 100644 --- a/message.go +++ b/message.go @@ -8,7 +8,7 @@ import ( ) // CompressionCodec represents the various compression codecs recognized by Kafka in messages. -type CompressionCodec uint8 +type CompressionCodec int8 const ( CompressionNone CompressionCodec = 0 @@ -33,7 +33,7 @@ func (m *Message) encode(pe packetEncoder) error { pe.putInt8(messageFormat) - attributes := int8(m.Codec << 5) + attributes := int8(m.Codec) & 0x07 pe.putInt8(attributes) err := pe.putBytes(m.Key) @@ -95,7 +95,7 @@ func (m *Message) decode(pd packetDecoder) (err error) { if err != nil { return err } - m.Codec = CompressionCodec(attribute >> 5) + m.Codec = CompressionCodec(attribute & 0x07) m.Key, err = pd.getBytes() if err != nil { diff --git a/message_test.go b/message_test.go index 111a1346b..49ec50bf0 100644 --- a/message_test.go +++ b/message_test.go @@ -11,9 +11,9 @@ var ( 0xFF, 0xFF, 0xFF, 0xFF} // value emptyGzipMessage = []byte{ - 0xAA, 0x27, 0x4D, 0x22, //CRC + 97, 79, 149, 90, //CRC 0x00, // magic version byte - 0x20, // attribute flags + 0x01, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key // value 0x00, 0x00, 0x00, 0x17,