Skip to content

Commit

Permalink
Merge pull request #1141 from adwinsky/fix-overflow-max-message-bytes…
Browse files Browse the repository at this point in the history
…-limit

Respect MaxMessageBytes limit for uncompressed messages
  • Loading branch information
bai authored Jan 7, 2019
2 parents bb6b5ba + 0017e59 commit 4c93f1f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
5 changes: 2 additions & 3 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,8 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
return true
// Would we overflow the size-limit of a compressed message-batch for this partition?
case ps.parent.conf.Producer.Compression != CompressionNone &&
ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
// Would we overflow the size-limit of a message-batch for this partition?
case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
Expand Down
32 changes: 28 additions & 4 deletions produce_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ func TestProduceSetInitial(t *testing.T) {
}

func TestProduceSetAddingMessages(t *testing.T) {
parent, ps := makeProduceSet()
parent.conf.Producer.Flush.MaxMessages = 1000

_, ps := makeProduceSet()
msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}

safeAddMessage(t, ps, msg)

if ps.empty() {
Expand All @@ -48,8 +47,15 @@ func TestProduceSetAddingMessages(t *testing.T) {
if !ps.readyToFlush() {
t.Error("by default set should be ready to flush when any message is in place")
}
}

for i := 0; i < 999; i++ {
func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) {
parent, ps := makeProduceSet()
parent.conf.Producer.Flush.MaxMessages = 1000

msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}

for i := 0; i < 1000; i++ {
if ps.wouldOverflow(msg) {
t.Error("set shouldn't fill up after only", i+1, "messages")
}
Expand All @@ -61,6 +67,24 @@ func TestProduceSetAddingMessages(t *testing.T) {
}
}

func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) {
parent, ps := makeProduceSet()
parent.conf.Producer.MaxMessageBytes = 1000

msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}

for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes {
if ps.wouldOverflow(msg) {
t.Error("set shouldn't fill up before 1000 bytes")
}
safeAddMessage(t, ps, msg)
}

if !ps.wouldOverflow(msg) {
t.Error("set should be full after 1000 bytes")
}
}

func TestProduceSetPartitionTracking(t *testing.T) {
_, ps := makeProduceSet()

Expand Down

0 comments on commit 4c93f1f

Please sign in to comment.