Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer: Fix that compress setting causes unexpected error when receiving already compressed MessagePack #4147

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def to_msgpack_stream(time_int: false, packer: nil)
super
end

def to_compressed_msgpack_stream(time_int: false)
def to_compressed_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because @data is always packed binary in this class
@compressed_data
end
Expand Down
50 changes: 32 additions & 18 deletions test/plugin/test_output_as_buffered_compress.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ def format(tag, time, record)
@format ? @format.call(tag, time, record) : [tag, time, record].to_json
end
end

def self.dummy_event_stream
Fluent::ArrayEventStream.new(
[
[event_time('2016-04-13 18:33:00'), { 'name' => 'moris', 'age' => 36, 'message' => 'data1' }],
[event_time('2016-04-13 18:33:13'), { 'name' => 'moris', 'age' => 36, 'message' => 'data2' }],
[event_time('2016-04-13 18:33:32'), { 'name' => 'moris', 'age' => 36, 'message' => 'data3' }],
]
)
end
end

class BufferedOutputCompressTest < Test::Unit::TestCase
Expand All @@ -60,16 +70,6 @@ def waiting(seconds)
end
end

def dummy_event_stream
Fluent::ArrayEventStream.new(
[
[event_time('2016-04-13 18:33:00'), { 'name' => 'moris', 'age' => 36, 'message' => 'data1' }],
[event_time('2016-04-13 18:33:13'), { 'name' => 'moris', 'age' => 36, 'message' => 'data2' }],
[event_time('2016-04-13 18:33:32'), { 'name' => 'moris', 'age' => 36, 'message' => 'data3' }],
]
)
end

TMP_DIR = File.expand_path('../../tmp/test_output_as_buffered_compress', __FILE__)

setup do
Expand All @@ -89,20 +89,34 @@ def dummy_event_stream
end

data(
handle_simple_stream: config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }),
handle_stream_with_standard_format: config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }),
handle_simple_stream_and_file_chunk: config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
handle_stream_with_standard_format_and_file_chunk: config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
:buffer_config,
[
config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }),
config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }),
config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
],
)
test 'call a standard format when output plugin adds data to chunk' do |buffer_config|
data(
:input_es,
[
FluentPluginOutputAsBufferedCompressTest.dummy_event_stream,
# If already compressed data is incoming, it must be written as is (i.e. without decompressed).
# https://github.com/fluent/fluentd/issues/4146
Fluent::CompressedMessagePackEventStream.new(FluentPluginOutputAsBufferedCompressTest.dummy_event_stream.to_compressed_msgpack_stream),
],
)
test 'call a standard format when output plugin adds data to chunk' do |data|
buffer_config = data[:buffer_config]
es = data[:input_es].dup # Note: the data matrix is shared in all patterns, so we need `dup` here.

@i = create_output(:async)
@i.configure(config_element('ROOT','', {}, [buffer_config]))
@i.start
@i.after_start

io = StringIO.new
es = dummy_event_stream
expected = es.map { |e| e }
expected = es.dup.map { |t, r| [t, r] }
ashie marked this conversation as resolved.
Show resolved Hide resolved
compressed_data = ''

assert_equal :gzip, @i.buffer.compress
Expand Down Expand Up @@ -138,7 +152,7 @@ def dummy_event_stream
@i.after_start

io = StringIO.new
es = dummy_event_stream
es = FluentPluginOutputAsBufferedCompressTest.dummy_event_stream
expected = es.map { |e| "#{e[1]}\n" }.join # e[1] is record
compressed_data = ''

Expand Down