From b2c65b9d0b69d47713de64fcfe221a0c86c4e590 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 12 Dec 2023 13:17:56 +0900 Subject: [PATCH] Add a test for issue #3089 https://github.com/fluent/fluentd/issues/3089 Signed-off-by: Takuro Ashie --- test/plugin/test_buffer.rb | 49 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index d35d2d6ce7..4af9182f3d 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -850,6 +850,55 @@ def create_chunk_es(metadata, es) test '#compress returns :text' do assert_equal :text, @p.compress end + + # https://github.com/fluent/fluentd/issues/3089 + test "closed chunk should not be committed" do + assert_equal 8 * 1024 * 1024, @p.chunk_limit_size + assert_equal 0.95, @p.chunk_full_threshold + + purge_count = 0 + + stub.proxy(@p).generate_chunk(anything) do |chunk| + stub.proxy(chunk).purge do |result| + purge_count += 1 + result + end + stub.proxy(chunk).commit do |result| + assert_false(chunk.closed?) + result + end + stub.proxy(chunk).rollback do |result| + assert_false(chunk.closed?) + result + end + chunk + end + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + small_row = "x" * 1024 * 400 + row = "x" * 1024 * 1024 * 8 # just chunk_size_limit + @p.write({m => [small_row] * 41 + [row]}) # 42 events in 1 event stream + + # Above event strem will be splitted twice by `Buffer#write_step_by_step` + # + # 1. `write_once`: 42 [events] * 1 [stream] + # 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream] + # 3. `write_step_by_step` (by ShouldRetry): 1 [event] * 42 [streams] + # + # The problematic data is built in the 2nd stage. + # In the 2nd stage, 5 streams are packed in a chunk. + # ((1024 * 400) [bytes] * 4 [events] * 5 [streams] = 8192000 [bytes] < `chunk_limit_size` (8MB)). + # So 3 chunks are used to store all data. + # The 1st chunk is already staged by `write_once`. + # The 2nd & 3rd chunks are newly created as unstaged. + # The 2nd chunk is purged before `ShouldRetry`, it's no problem: + # https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L850 + # The 3rd chunk is purged in `rescue ShouldRetry`: + # https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L862 + # The last one causes the issue described in https://github.com/fluent/fluentd/issues/3089#issuecomment-1811839198 + + assert_equal 2, purge_count + end end sub_test_case 'standard format with configuration for test with lower chunk limit size' do