Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer: Fix NoMethodError with empty unstaged chunk arrays #4303

Merged
merged 2 commits into from
Oct 31, 2023
Merged

Buffer: Fix NoMethodError with empty unstaged chunk arrays #4303

merged 2 commits into from
Oct 31, 2023

Conversation

amdoolittle
Copy link
Contributor

When the write function encounters a chunk that is defined as a Nil object, the call to synchronize fails. This failure causes an unexpected error that is caught and handled like a failure to connect, with a 30 second retry delay.

In an environment with a large amount of data, and this Nil scenario occurring frequently, it introduces significant processing delays since all processing activity stops during the 30 second window. The topic being queried continues to fill but no agents parse the data, leading to an ever-increasing backlog.

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution.

When the write function encounters a chunk that is defined as a Nil object, the call to synchronize fails.

Is it reproducible?
Could you tell us how to reproduce it?

The error will occur if unstaged_chunks[m] is an empty list or contains a nil object.
However, it does not seem possible.

elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end

@amdoolittle
Copy link
Contributor Author

amdoolittle commented Sep 21, 2023

However, it does not seem possible.

elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end

Tell that to my constant 100MB/sec data flow that throws this error frequently. :)

I'll see if I can forcefully reproduce it outside that environment.

Here's the stack trace, for clarification:

2023-09-15 17:33:32 +0000 [warn]: #0 emit transaction failed: error_class=NoMethodError error="undefined method `synchronize' for nil:NilClass" location="/opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:422:in `block in write'" tag="[redacted]"
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:422:in `block in write'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:416:in `each'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:416:in `write'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:1095:in `block in handle_stream_simple'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:977:in `write_guard'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:1094:in `handle_stream_simple'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:967:in `execute_chunking'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:897:in `emit_buffered'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.0/lib/fluent/plugin/in_rdkafka_group.rb:287:in `emit_events'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.0/lib/fluent/plugin/in_rdkafka_group.rb:269:in `block in run'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.0/lib/fluent/plugin/in_rdkafka_group.rb:214:in `each_batch'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.0/lib/fluent/plugin/in_rdkafka_group.rb:229:in `run'
  2023-09-15 17:33:32 +0000 [warn]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2023-09-15 17:33:32 +0000 [error]: #0 unexpected error during consuming events from kafka. Re-fetch events. error="undefined method `synchronize' for nil:NilClass"
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:422:in `block in write'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:416:in `each'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:416:in `write'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:1095:in `block in handle_stream_simple'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:977:in `write_guard'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:1094:in `handle_stream_simple'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:967:in `execute_chunking'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:897:in `emit_buffered'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.0/lib/fluent/plugin/in_rdkafka_group.rb:287:in `emit_events'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.0/lib/fluent/plugin/in_rdkafka_group.rb:269:in `block in run'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.0/lib/fluent/plugin/in_rdkafka_group.rb:214:in `each_batch'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluent-plugin-kafka-0.19.0/lib/fluent/plugin/in_rdkafka_group.rb:229:in `run'
  2023-09-15 17:33:32 +0000 [error]: #0 /opt/fluent/lib/ruby/gems/3.2.0/gems/fluentd-1.16.2/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'

It's not the entire chunk array unstaged_chunks[m] that is nil, just one of the values being popped off the end of the array itself. The conditional on line 420 succeeds, so there is value in unstaged_chunks[m] overall, but the value being parsed is nil.

@amdoolittle
Copy link
Contributor Author

amdoolittle commented Sep 21, 2023

The issue is that unstaged_chunks[m] is an empty array. Once we've popped all the values off the array, it'll still evaluate truthy while empty (thanks Ruby), so we could end up attempting to pop another value off the end of an empty array, resulting in a nil return to "u".

A cleaner fix might be to adjust the conditional on line 420 to also include a check for empty. Ruby doesn't evaluate an empty array to false, so the existing conditional can/does allow empty values to be parsed.

I updated the PR with this adjustment, as it's cleaner than the additional nil check instead.

@amdoolittle amdoolittle changed the title Added conditional handling and logging of nil unstaged chunks Adjusted handling to avoid empty unstaged chunk arrays Sep 21, 2023
Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see if I can forcefully reproduce it outside that environment.

Thanks!

Once we've popped all the values off the array, it'll still evaluate truthy while empty (thanks Ruby), so we could end up attempting to pop another value off the end of an empty array, resulting in a nil return to "u".

I see!
Certainly, it seems possible to execute pop against an empty list because this process can be repeated.

chunks_to_enqueue.each do |c|
if c.staged? && (enqueue || chunk_size_full?(c))
m = c.metadata
enqueue_chunk(m)
if unstaged_chunks[m]
u = unstaged_chunks[m].pop

If so, this fix would be reasonable.
I want to confirm the reproduction conditions a little more.

@daipom daipom added the bug Something isn't working label Oct 17, 2023
@daipom
Copy link
Contributor

daipom commented Oct 17, 2023

Sorry for the delay.
I think this fix would be reasonable, but I'm still checking the mechanism and impact of this bug.
I will merge this after it.

@daipom daipom added this to the v1.16.3 milestone Oct 17, 2023
@daipom
Copy link
Contributor

daipom commented Oct 31, 2023

I think this fix would be reasonable, but I'm still checking the mechanism and impact of this bug.

I don't yet fully understand the role of unstaged_chunks in Buffer::write()...
At least, we can merge this to prevent the unexpected error.
It does not change the specification.
It only prevents this error.

So, I merge this now for the next release.

@daipom daipom changed the title Adjusted handling to avoid empty unstaged chunk arrays Buffer: Fix NoMethodError with empty unstaged chunk arrays Oct 31, 2023
@daipom daipom merged commit 36220e5 into fluent:master Oct 31, 2023
daipom pushed a commit to daipom/fluentd that referenced this pull request Oct 31, 2023
@daipom
Copy link
Contributor

daipom commented Dec 13, 2023

Seeing #4342, I understand now some mechanics of this bug.
When retrying write_step_by_step, the first staged chunk is added to operated_chunks duplicatedly.
So, unstaged_chunks[m].pop can be called more than once for the same metadata.

I believe this duplication of operated_chunks is another bug and we should fix it.

Anyway, these unstaged chunks will be enqueued immediately in the Buffer::write(),
so, for now, it is sufficient to avoid the error by this fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants