Skip to content

Commit

Permalink
Merge pull request #4147 from daipom/buffer-fix-error-compressing-alr…
Browse files Browse the repository at this point in the history
…eay-compressed-es

Buffer: Fix that `compress` setting causes unexpected error when receiving already compressed MessagePack
  • Loading branch information
ashie authored Apr 13, 2023
2 parents 8112d8f + 2d1c313 commit c989079
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def to_msgpack_stream(time_int: false, packer: nil)
super
end

def to_compressed_msgpack_stream(time_int: false)
def to_compressed_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because @data is always packed binary in this class
@compressed_data
end
Expand Down
50 changes: 32 additions & 18 deletions test/plugin/test_output_as_buffered_compress.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ def format(tag, time, record)
@format ? @format.call(tag, time, record) : [tag, time, record].to_json
end
end

def self.dummy_event_stream
Fluent::ArrayEventStream.new(
[
[event_time('2016-04-13 18:33:00'), { 'name' => 'moris', 'age' => 36, 'message' => 'data1' }],
[event_time('2016-04-13 18:33:13'), { 'name' => 'moris', 'age' => 36, 'message' => 'data2' }],
[event_time('2016-04-13 18:33:32'), { 'name' => 'moris', 'age' => 36, 'message' => 'data3' }],
]
)
end
end

class BufferedOutputCompressTest < Test::Unit::TestCase
Expand All @@ -60,16 +70,6 @@ def waiting(seconds)
end
end

def dummy_event_stream
Fluent::ArrayEventStream.new(
[
[event_time('2016-04-13 18:33:00'), { 'name' => 'moris', 'age' => 36, 'message' => 'data1' }],
[event_time('2016-04-13 18:33:13'), { 'name' => 'moris', 'age' => 36, 'message' => 'data2' }],
[event_time('2016-04-13 18:33:32'), { 'name' => 'moris', 'age' => 36, 'message' => 'data3' }],
]
)
end

TMP_DIR = File.expand_path('../../tmp/test_output_as_buffered_compress', __FILE__)

setup do
Expand All @@ -89,20 +89,34 @@ def dummy_event_stream
end

data(
handle_simple_stream: config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }),
handle_stream_with_standard_format: config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }),
handle_simple_stream_and_file_chunk: config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
handle_stream_with_standard_format_and_file_chunk: config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
:buffer_config,
[
config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }),
config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }),
config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
],
)
test 'call a standard format when output plugin adds data to chunk' do |buffer_config|
data(
:input_es,
[
FluentPluginOutputAsBufferedCompressTest.dummy_event_stream,
# If already compressed data is incoming, it must be written as is (i.e. without decompressed).
# https://github.com/fluent/fluentd/issues/4146
Fluent::CompressedMessagePackEventStream.new(FluentPluginOutputAsBufferedCompressTest.dummy_event_stream.to_compressed_msgpack_stream),
],
)
test 'call a standard format when output plugin adds data to chunk' do |data|
buffer_config = data[:buffer_config]
es = data[:input_es].dup # Note: the data matrix is shared in all patterns, so we need `dup` here.

@i = create_output(:async)
@i.configure(config_element('ROOT','', {}, [buffer_config]))
@i.start
@i.after_start

io = StringIO.new
es = dummy_event_stream
expected = es.map { |e| e }
expected = es.dup.map { |t, r| [t, r] }
compressed_data = ''

assert_equal :gzip, @i.buffer.compress
Expand Down Expand Up @@ -138,7 +152,7 @@ def dummy_event_stream
@i.after_start

io = StringIO.new
es = dummy_event_stream
es = FluentPluginOutputAsBufferedCompressTest.dummy_event_stream
expected = es.map { |e| "#{e[1]}\n" }.join # e[1] is record
compressed_data = ''

Expand Down

0 comments on commit c989079

Please sign in to comment.