Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer: Can't write already compressed MessagePack EventStream as is #4146

Closed
daipom opened this issue Apr 12, 2023 · 3 comments · Fixed by #4147
Closed

Buffer: Can't write already compressed MessagePack EventStream as is #4146

daipom opened this issue Apr 12, 2023 · 3 comments · Fixed by #4147
Assignees
Labels
bug Something isn't working work-in-progress

Comments

@daipom
Copy link
Contributor

daipom commented Apr 12, 2023

Describe the bug

in_forward can receive a compressed MessagePack EventStream from out_forward.

On the in_forward side, I think there is no way to process the compressed data as is (i.e. without decompressing).
I get an unexpected error when I set compress gzip of Buffer on the in_forward side.

[warn]: #0 emit transaction failed: error_class=ArgumentError error="unknown keyword: :packer" location="/home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:297:in `to_compressed_msgpack_stream'" tag="test"
[error]: #0 unexpected error on reading data host="127.0.0.1" port=51396 error_class=ArgumentError error="unknown keyword: :packer"

To me, this looks like a bug of Buffer and CompressedMessagePackEventStream.

A possible use case is a two-stage transfer.

  • Forwarder1(out_forward) -> Forwarder2(in_forward, out_forward) -> Aggregator(in_forward)

In this case, Forwarder2 should process the data of CompressedMessagePackEventStream as is (i.e. without decompressing) and re-transfer the data to Aggregator.

However, if enabling compressing both in Forwarder1 and Forwarder2, an unexpected error occurs in Forwarder2.

Note: if disabling the compression only in Forwarder2, the data is decompressed in Forwarder2. This decompression is completely useless.

To Reproduce

Make the out_forward side Fluentd and the in_forward side Fluentd work respectively with the settings in Your Configuration.

The assumed use case is this, as above.

  • Forwarder1(out_forward) -> Forwarder2(in_forward, out_forward) -> Aggregator(in_forward)

But we can easily reproduce this error by 2 Fluentd as those settings.
(Of course, if we start 3 Fluentd like the above and enable the compression setting for both out_forward in Forwarder1 and Forwarder2, this reproduces as well)

Expected behavior

When enabling the compression of the buffer and it tries to process an already compressed MessagePack EventStream, it should process the data as is and not raise an unexpected error.

Your Environment

- Fluentd version: 1.16.0
- TD Agent version: none
- Operating system: Ubuntu 20.04.6 LTS
- Kernel version: 5.15.0-69-generic

Your Configuration

# out_forward side
<source>
  @type sample
  tag test
</source>

<match test.**>
  @type forward
  compress gzip
  <buffer>
    @type file
    path /test/fluentd/forwarder/buffer
    flush_mode interval
    flush_interval 10s
  </buffer>
  <server>
    host localhost
    port 24224
  </server>
</match>

# in_forward side
<source>
  @type forward
</source>

<match test.**>
  @type null
  <buffer>
    @type file
    path /test/fluentd/aggregator/buffer
    flush_mode interval
    flush_interval 10s
    compress gzip
  </buffer>
</match>

Your Error Log

# in_forward side
2023-04-12 13:21:37 +0900 [info]: init supervisor logger path=nil rotate_age=nil rotate_size=nil
2023-04-12 13:21:37 +0900 [info]: parsing config file is succeeded path="/test/fluentd/aggregator/fluent.conf"
2023-04-12 13:21:37 +0900 [info]: gem 'fluentd' version '1.16.0'
2023-04-12 13:21:37 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type forward
  </source>
  <match test.**>
    @type null
    <buffer>
      @type "file"
      path "/test/fluentd/aggregator/buffer"
      flush_mode interval
      flush_interval 10s
      compress gzip
    </buffer>
  </match>
</ROOT>
2023-04-12 13:21:37 +0900 [info]: starting fluentd-1.16.0 pid=107781 ruby="3.2.0"
2023-04-12 13:21:37 +0900 [info]: spawn command to main:  cmdline=["/home/daipom/.rbenv/versions/3.2.0/bin/ruby", "-r/home/daipom/.rbenv/versions/3.2.0/lib/ruby/site_ruby/3.2.0/bundler/setup", "-Eascii-8bit:ascii-8bit", "/home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/bin/fluentd", "-c", "/test/fluentd/aggregator/fluent.conf", "--under-supervisor"]
2023-04-12 13:21:37 +0900 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil
2023-04-12 13:21:37 +0900 [info]: adding match pattern="test.**" type="null"
2023-04-12 13:21:37 +0900 [info]: adding source type="forward"
2023-04-12 13:21:37 +0900 [info]: #0 starting fluentd worker pid=107802 ppid=107781 worker=0
2023-04-12 13:21:37 +0900 [info]: #0 listening port port=24224 bind="0.0.0.0"
2023-04-12 13:21:37 +0900 [info]: #0 fluentd worker is now running worker=0
2023-04-12 13:21:57 +0900 [warn]: #0 emit transaction failed: error_class=ArgumentError error="unknown keyword: :packer" location="/home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:297:in `to_compressed_msgpack_stream'" tag="test"
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:297:in `to_compressed_msgpack_stream'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1017:in `block in <class:Output>'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:670:in `block in write_once'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:661:in `write_once'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:345:in `block in write'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:343:in `each'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:343:in `write'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1095:in `block in handle_stream_simple'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:977:in `write_guard'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1094:in `handle_stream_simple'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:967:in `execute_chunking'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:897:in `emit_buffered'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event_router.rb:115:in `emit_stream'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:318:in `on_message'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:226:in `block in handle_connection'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:263:in `block (3 levels) in read_messages'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:262:in `feed_each'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:262:in `block (2 levels) in read_messages'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:271:in `block in read_messages'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/server.rb:632:in `on_read_without_connection'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:123:in `on_readable'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in `on_readable'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run_once'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2023-04-12 13:21:57 +0900 [error]: #0 unexpected error on reading data host="127.0.0.1" port=51396 error_class=ArgumentError error="unknown keyword: :packer"
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:297:in `to_compressed_msgpack_stream'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1017:in `block in <class:Output>'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:670:in `block in write_once'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:661:in `write_once'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:345:in `block in write'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:343:in `each'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:343:in `write'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1095:in `block in handle_stream_simple'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:977:in `write_guard'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1094:in `handle_stream_simple'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:967:in `execute_chunking'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:897:in `emit_buffered'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event_router.rb:115:in `emit_stream'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:318:in `on_message'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:226:in `block in handle_connection'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:263:in `block (3 levels) in read_messages'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:262:in `feed_each'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:262:in `block (2 levels) in read_messages'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:271:in `block in read_messages'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/server.rb:632:in `on_read_without_connection'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:123:in `on_readable'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in `on_readable'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run_once'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
^C2023-04-12 13:22:03 +0900 [info]: Received graceful stop
2023-04-12 13:22:04 +0900 [info]: #0 fluentd worker is now stopping worker=0
2023-04-12 13:22:04 +0900 [info]: #0 shutting down fluentd worker worker=0
2023-04-12 13:22:04 +0900 [info]: #0 shutting down input plugin type=:forward plugin_id="object:be0"
2023-04-12 13:22:04 +0900 [info]: #0 shutting down output plugin type=:null plugin_id="object:bb8"
2023-04-12 13:22:04 +0900 [info]: Worker 0 finished with status 0

Additional context

No response

@daipom
Copy link
Contributor Author

daipom commented Apr 12, 2023

Buffer::write_once()

  • If format is nil, then the data will be appended as is, but we need to set a custom format for this, which can not be set in out_forward.

if format
serialized = format.call(data)
chunk.concat(serialized, size ? size.call : data.size)
else
chunk.append(data, compress: @compress)
end

Output::generate_format_proc()

  • to_msgpack_stream() or to_compressed_msgpack_stream() of EventStream will be called.

FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
def generate_format_proc
if @buffer && @buffer.compress == :gzip
@time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM
else
@time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM
end
end

CompressedMessagePackEventStream: to_msgpack_stream(), to_compressed_msgpack_stream()

  • If to_msgpack_stream() is called by FORMAT_MSGPACK_STREAM or FORMAT_MSGPACK_STREAM_TIME_INT, the data is decompressed.
  • If to_compressed_msgpack_stream() is called by FORMAT_COMPRESSED_MSGPACK_STREAM or FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, the unexpected error occurs since the method signature doesn't match.

fluentd/lib/fluent/event.rb

Lines 292 to 300 in 206b46b

def to_msgpack_stream(time_int: false, packer: nil)
ensure_decompressed!
super
end
def to_compressed_msgpack_stream(time_int: false)
# time_int is always ignored because @data is always packed binary in this class
@compressed_data
end

@daipom
Copy link
Contributor Author

daipom commented Apr 12, 2023

This signature mismatch looks like a bug.
I am looking into the implementation history.

fluentd/lib/fluent/event.rb

Lines 297 to 300 in 206b46b

def to_compressed_msgpack_stream(time_int: false)
# time_int is always ignored because @data is always packed binary in this class
@compressed_data
end

@daipom
Copy link
Contributor Author

daipom commented Apr 12, 2023

this signature mismatch looks like a bug. I am looking into the implementation history.

fluentd/lib/fluent/event.rb

Lines 297 to 300 in 206b46b

def to_compressed_msgpack_stream(time_int: false)
# time_int is always ignored because @data is always packed binary in this class
@compressed_data
end

Somehow, only the signature of this method was not fixed in the following PR.

It is hard to imagine that only this method was be overlooked, but it may be a simple bug, since I don't see the intent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working work-in-progress
Projects
Archived in project
1 participant