Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] provide options to retag, send to label, and/or add keys for individual message failures #36

Open
1 task done
ryn9 opened this issue Mar 19, 2022 · 5 comments
Labels
question User forum like issues

Comments

@ryn9
Copy link

ryn9 commented Mar 19, 2022

(check apply)

Is your feature request related to a problem? Please describe.

We have message which reach the @error label without a clear understanding of why they are there.

Currently, the plugin will send messages to @error for errors like mapper_parsing_exception, etc..
The ERROR label, though, can receive messages from other plugins and/or various forms of failures
It would be beneficial to have these options for individual message failures:

  • retag
  • send to an alternate label (ie. other than ERROR)
  • add keys describing why the message the message ended up in an error state

Describe alternatives you've considered
N/A - there appear to be no available options when a message is errored

Additional context

N/A

@cosmo0920
Copy link
Collaborator

Oh, this plugin is based on fluent-plugin-elasticsearch.
OpenSearch plugin can be specified retry_tag that is for used in retrying phase:
https://github.com/fluent/fluent-plugin-opensearch/blob/main/lib/fluent/plugin/out_opensearch.rb#L1082

In more detail, please refer to this section: https://github.com/fluent/fluent-plugin-opensearch#retry_tag

@cosmo0920 cosmo0920 added the question User forum like issues label Mar 22, 2022
@ryn9
Copy link
Author

ryn9 commented Mar 22, 2022

@cosmo0920 -

I will have to get to some more testing (unfortunately tied up through next week), but I believe the retry_tag and retry label approaches address issues when flushing, failing, retrying a chunk - correct?

I do not believe they handle 'Records that have “hard” errors (schema violations, corrupted data, etc.) that cannot be retried will be sent to the @error handler. '
ref: https://richm.github.io/fluent-plugin-elasticsearch-retry.html

@ryn9
Copy link
Author

ryn9 commented Mar 26, 2022

@cosmo0920 -

my attempts at testing with retry_tag or using retry label logic, when the OpenSearch cluster is not available have have been unsuccessful.
I seem to fail to get messages to the match tag and/or label and then end up with messages like the following ... "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue" and "buffer queue cleared"

I additionally tried copy with store ignore_if_prev_success have also been unsuccessful. I get a message "ignore copy because prev_success", with no copy the next destination.

Has the retry_tag, retry label, and/or copy with store ignore_if_prev_success been recently tested?

@ryn9
Copy link
Author

ryn9 commented Mar 26, 2022

More details in relation to my above comment...

With each of the following tests, I tested with this curl command...

curl -kv -X POST -d '{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"e69565e1-1232-49b3-85fe-71d31b2aeea4"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"4a789769-079d-4fe3-a676-00b37a7409e7"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"78b65f68-ed71-4655-808d-2fb8d6222992"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"51e7dd4d-76e9-4147-bd39-96d1c039bbb5"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"1cc87991-28ad-4553-b4ca-68bd3612ce35"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"085d5341-b063-4189-b111-27ed5ee0943c"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"bdc7c081-441e-4918-82be-c702cb0c8a16"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"69354baa-c8c2-4ca3-8abd-5c03dfb22289"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"80316e99-904a-416a-9e62-04b0e16b6a6b"}
{"foo":"bar","timestamp":"2022-03-25T13:52:51.315315315+00:00","uuid":"0decdee3-b08d-4124-ba07-8fde7e107a09"}' -H 'Content-Type: application/x-ndjson' http://localhost:8080/

First test - baseline with working opensearch cluster....

  <system>
    log_level debug
  </system>
  <source>
    @type http
    port 8080
    bind "0.0.0.0"
    body_size_limit 10m
    keepalive_timeout 10s
    @label @STANDARD
  </source>
  <label @STANDARD>
    <match **>
      @type opensearch
      @id out_opensearch
      scheme https
      ssl_verify true
      host "xxxxxx"
      port 443
      user "xxxxxx"
      password xxxxxx
      index_name "demo-test"
      verify_os_version_at_startup false
      default_opensearch_version 1
      max_retry_get_os_version 1
      max_retry_putting_template 1
      <buffer tag>
        flush_interval 1s
        retry_type periodic
        retry_wait 2s
        retry_max_times 3
      </buffer>
    </match>
  </label>
  <label @RETRY>
    <match **>
      @type stdout
    </match>
  </label>
  <label @ERROR>
    <match **>
      @type stdout
    </match>
  </label>

Results - messages made it to opensearch clusters


Second test - change to an alternate port in the config to force connection failures

  <system>
    log_level debug
  </system>
  <source>
    @type http
    port 8080
    bind "0.0.0.0"
    body_size_limit 10m
    keepalive_timeout 10s
    @label @STANDARD
  </source>
  <label @STANDARD>
    <match **>
      @type opensearch
      @id out_opensearch
      scheme https
      ssl_verify true
      host "xxxxxx"
      port 444
      user "xxxxxx"
      password xxxxxx
      index_name "demo-test"
      verify_os_version_at_startup false
      default_opensearch_version 1
      max_retry_get_os_version 1
      max_retry_putting_template 1
      <buffer tag>
        flush_interval 1s
        retry_type periodic
        retry_wait 2s
        retry_max_times 3
      </buffer>
    </match>
  </label>
  <label @RETRY>
    <match **>
      @type stdout
    </match>
  </label>
  <label @ERROR>
    <match **>
      @type stdout
    </match>
  </label>

Result: could not connect, could not flush buffer, ultimately dropped the messages

2022-03-26 19:46:15 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db2454b0e10fd500b940e82e91c1673"
2022-03-26 19:46:15 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=0 next_retry_time=2022-03-26 19:46:17 +0000 chunk="5db2454b0e10fd500b940e82e91c1673" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:1101:in `rescue in send_bulk'
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:1063:in `send_bulk'
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:878:in `block in write'
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:877:in `each'
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:877:in `write'
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:1179:in `try_flush'
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:1495:in `flush_thread_run'
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:499:in `block (2 levels) in start'
  2022-03-26 19:46:15 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2022-03-26 19:46:22 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db2454b0e10fd500b940e82e91c1673"
2022-03-26 19:46:22 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=1 next_retry_time=2022-03-26 19:46:25 +0000 chunk="5db2454b0e10fd500b940e82e91c1673" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:46:22 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 19:46:29 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db2454b0e10fd500b940e82e91c1673"
2022-03-26 19:46:29 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=2 next_retry_time=2022-03-26 19:46:32 +0000 chunk="5db2454b0e10fd500b940e82e91c1673" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:46:29 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 19:46:36 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db2454b0e10fd500b940e82e91c1673"
2022-03-26 19:46:36 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=3 next_retry_time=2022-03-26 19:46:39 +0000 chunk="5db2454b0e10fd500b940e82e91c1673" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:46:36 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 19:46:43 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db2454b0e10fd500b940e82e91c1673"
2022-03-26 19:46:43 +0000 [error]: #0 [out_opensearch] failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue. retry_times=3 records=10 error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:46:43 +0000 [error]: #0 suppressed same stacktrace
2022-03-26 19:46:43 +0000 [debug]: #0 [out_opensearch] buffer queue cleared

Third test - trying with label RETRY

  <system>
    log_level debug
  </system>
  <source>
    @type http
    port 8080
    bind "0.0.0.0"
    body_size_limit 10m
    keepalive_timeout 10s
    @label @STANDARD
  </source>
  <label @STANDARD>
    <match **>
      @type opensearch
      @id out_opensearch
      scheme https
      ssl_verify true
      host "xxxxxx"
      port 444
      user "xxxxxx"
      password xxxxxx
      index_name "demo-test"
      verify_os_version_at_startup false
      default_opensearch_version 1
      max_retry_get_os_version 1
      max_retry_putting_template 1
      @label @RETRY
      <buffer tag>
        flush_interval 1s
        retry_type periodic
        retry_wait 2s
        retry_max_times 3
      </buffer>
    </match>
  </label>
  <label @RETRY>
    <match **>
      @type stdout
    </match>
  </label>
  <label @ERROR>
    <match **>
      @type stdout
    </match>
  </label>

EXPECTED: could not connect, could not flush buffer, would send message to the RETRY label and print to stdout (DID NOT HAPPEN)

2022-03-26 19:53:36 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db246ef226b9754b1a76c9cc1513483"
2022-03-26 19:53:36 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=0 next_retry_time=2022-03-26 19:53:39 +0000 chunk="5db246ef226b9754b1a76c9cc1513483" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:1101:in `rescue in send_bulk'
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:1063:in `send_bulk'
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:878:in `block in write'
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:877:in `each'
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:877:in `write'
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:1179:in `try_flush'
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:1495:in `flush_thread_run'
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:499:in `block (2 levels) in start'
  2022-03-26 19:53:36 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2022-03-26 19:53:43 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db246ef226b9754b1a76c9cc1513483"
2022-03-26 19:53:43 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=1 next_retry_time=2022-03-26 19:53:46 +0000 chunk="5db246ef226b9754b1a76c9cc1513483" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:53:43 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 19:53:50 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db246ef226b9754b1a76c9cc1513483"
2022-03-26 19:53:50 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=2 next_retry_time=2022-03-26 19:53:53 +0000 chunk="5db246ef226b9754b1a76c9cc1513483" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:53:50 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 19:53:57 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db246ef226b9754b1a76c9cc1513483"
2022-03-26 19:53:57 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=3 next_retry_time=2022-03-26 19:54:00 +0000 chunk="5db246ef226b9754b1a76c9cc1513483" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:53:57 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 19:54:05 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db246ef226b9754b1a76c9cc1513483"
2022-03-26 19:54:05 +0000 [error]: #0 [out_opensearch] failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue. retry_times=3 records=10 error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 19:54:05 +0000 [error]: #0 suppressed same stacktrace
2022-03-26 19:54:05 +0000 [debug]: #0 [out_opensearch] buffer queue cleared

Fourth test - removing label RETRY, trying retry_tag retry_opensearch (match statement in both STANDARD and ROOT labels)

  <system>
    log_level debug
  </system>
  <source>
    @type http
    port 8080
    bind "0.0.0.0"
    body_size_limit 10m
    keepalive_timeout 10s
    @label @STANDARD
  </source>
  <label @STANDARD>
    <match retry_opensearch>
      @type stdout
    </match>
    <match **>
      @type opensearch
      @id out_opensearch
      scheme https
      ssl_verify true
      host "xxxxxx"
      port 444
      user "xxxxxx"
      password xxxxxx
      index_name "demo-test"
      verify_os_version_at_startup false
      default_opensearch_version 1
      max_retry_get_os_version 1
      max_retry_putting_template 1
      retry_tag "retry_opensearch"
      <buffer tag>
        flush_interval 1s
        retry_type periodic
        retry_wait 2s
        retry_max_times 3
      </buffer>
    </match>
  </label>
  <match retry_opensearch>
    @type stdout
  </match>
  <label @RETRY>
    <match **>
      @type stdout
    </match>
  </label>
  <label @ERROR>
    <match **>
      @type stdout
    </match>
  </label>

Result: could not connect, could not flush buffer, ultimately dropped the messages
EXPECTED: could not connect, could not flush buffer, retry would tag and get picked up by match retry_opensearch and print to stdout (DID NOT HAPPEN)

2022-03-26 20:12:56 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24b41b1e1effe040335dfc832c424"
2022-03-26 20:12:56 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=0 next_retry_time=2022-03-26 20:12:59 +0000 chunk="5db24b41b1e1effe040335dfc832c424" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:1101:in `rescue in send_bulk'
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:1063:in `send_bulk'
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:878:in `block in write'
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:877:in `each'
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:877:in `write'
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:1179:in `try_flush'
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:1495:in `flush_thread_run'
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:499:in `block (2 levels) in start'
  2022-03-26 20:12:56 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2022-03-26 20:13:03 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24b41b1e1effe040335dfc832c424"
2022-03-26 20:13:03 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=1 next_retry_time=2022-03-26 20:13:06 +0000 chunk="5db24b41b1e1effe040335dfc832c424" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:13:03 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 20:13:10 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24b41b1e1effe040335dfc832c424"
2022-03-26 20:13:10 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=2 next_retry_time=2022-03-26 20:13:13 +0000 chunk="5db24b41b1e1effe040335dfc832c424" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:13:10 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 20:13:17 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24b41b1e1effe040335dfc832c424"
2022-03-26 20:13:17 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=3 next_retry_time=2022-03-26 20:13:20 +0000 chunk="5db24b41b1e1effe040335dfc832c424" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:13:17 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 20:13:24 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24b41b1e1effe040335dfc832c424"
2022-03-26 20:13:24 +0000 [error]: #0 [out_opensearch] failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue. retry_times=3 records=10 error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:13:24 +0000 [error]: #0 suppressed same stacktrace
2022-03-26 20:13:24 +0000 [debug]: #0 [out_opensearch] buffer queue cleared

Fifth test - removing retry_tag retry_opensearch logic, trying copy with store ignore_if_prev_success

  <system>
    log_level debug
  </system>
  <source>
    @type http
    port 8080
    bind "0.0.0.0"
    body_size_limit 10m
    keepalive_timeout 10s
    @label @STANDARD
  </source>
  <label @STANDARD>
    <match **>
      @type copy
      <store ignore_error>
        @type "opensearch"
        @id out_opensearch
        scheme https
        ssl_verify true
        host "xxxxxx"
        port 444
        user "xxxxxx"
        password xxxxxx
        index_name "demo-test"
        verify_os_version_at_startup false
        default_opensearch_version 1
        max_retry_get_os_version 1
        max_retry_putting_template 1
        <buffer tag>
          flush_interval 1s
          retry_type periodic
          retry_wait 2s
          retry_max_times 3
        </buffer>
      </store>
      <store ignore_if_prev_success ignore_error>
        @type "stdout"
      </store>
    </match>
  </label>
  <label @RETRY>
    <match **>
      @type stdout
    </match>
  </label>
  <label @ERROR>
    <match **>
      @type stdout
    </match>
  </label>

Result: could not connect, could not flush buffer, ultimately dropped the messages
EXPECTED: could not connect, could not flush buffer, would fallback to store method defined with ignore_if_prev_success and print to stdout (DID NOT HAPPEN)

2022-03-26 20:18:51 +0000 [debug]: #0 ignore copy because prev_success in object:b90 index=1
2022-03-26 20:18:57 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24c999e186d030e87a9c442ca2fb9"
2022-03-26 20:18:57 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=0 next_retry_time=2022-03-26 20:19:00 +0000 chunk="5db24c999e186d030e87a9c442ca2fb9" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:1101:in `rescue in send_bulk'
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:1063:in `send_bulk'
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:878:in `block in write'
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:877:in `each'
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluent-plugin-opensearch-1.0.2/lib/fluent/plugin/out_opensearch.rb:877:in `write'
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:1179:in `try_flush'
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:1495:in `flush_thread_run'
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin/output.rb:499:in `block (2 levels) in start'
  2022-03-26 20:18:57 +0000 [warn]: #0 /usr/local/bundle/gems/fluentd-1.14.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2022-03-26 20:19:04 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24c999e186d030e87a9c442ca2fb9"
2022-03-26 20:19:04 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=1 next_retry_time=2022-03-26 20:19:07 +0000 chunk="5db24c999e186d030e87a9c442ca2fb9" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:19:04 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 20:19:11 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24c999e186d030e87a9c442ca2fb9"
2022-03-26 20:19:11 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=2 next_retry_time=2022-03-26 20:19:14 +0000 chunk="5db24c999e186d030e87a9c442ca2fb9" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:19:11 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 20:19:18 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24c999e186d030e87a9c442ca2fb9"
2022-03-26 20:19:18 +0000 [warn]: #0 [out_opensearch] failed to flush the buffer. retry_times=3 next_retry_time=2022-03-26 20:19:21 +0000 chunk="5db24c999e186d030e87a9c442ca2fb9" error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:19:18 +0000 [warn]: #0 suppressed same stacktrace
2022-03-26 20:19:25 +0000 [debug]: #0 [out_opensearch] taking back chunk for errors. chunk="5db24c999e186d030e87a9c442ca2fb9"
2022-03-26 20:19:25 +0000 [error]: #0 [out_opensearch] failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue. retry_times=3 records=10 error_class=Fluent::Plugin::OpenSearchOutput::RecoverableRequestFailure error="could not push logs to OpenSearch cluster ({:host=>\"xxxxxx\", :port=>444, :scheme=>\"https\", :user=>\"xxxxxx\", :password=>\"obfuscated\"}): connect_write timeout reached"
  2022-03-26 20:19:25 +0000 [error]: #0 suppressed same stacktrace
2022-03-26 20:19:25 +0000 [debug]: #0 [out_opensearch] buffer queue cleared

@ryn9
Copy link
Author

ryn9 commented Apr 2, 2022

@cosmo0920 -

After really digging in, I understand that the retry and relabel of more or less what I was asking for in the first place - in that it should be used for individual messages failing during bulk inserts.

Reading through the code, I see 409's are dropped, 400's are strictly sent to ERROR, and the remainder of code evaluate through some conditionals, then is either sent to ERROR, marked UnrecoverableError, or added to the retry_stream.

The retry_stream is where it looks like the retry_tag gets used.

With all of this in mind.... (and admittedly not having a good test of retry_tag and/or retry label yet) I was again hoping to ask about adding keys describing why the message failed in the first place and optionally taking messages that would otherwise be sent to ERROR and allow them to retry.

I am hoping this pseudocode... might give a better Idea of what I am looking for.

Config file definitions:
############################################
inject_on_bulk_insert_error: true / false
inject_on_bulk_insert_error_key: <STRING>
inject_on_bulk_insert_retry: true / false
inject_on_bulk_insert_retry_key: <STRING>
retry_on_bulk_insert_status_400: true / false     ##must be used with retry_tag, must be used with caution as to not cause a loop!##
retry_on_bulk_insert_status_other: true / false   ##must be used with retry_tag, must be used with caution as to not cause a loop!##
############################################

pseudocode... for opensearch_error_handler.rb


#when status is 400#

if !retry_on_bulk_insert_status_400
  if inject_on_bulk_insert_error
   rawrecord[inject_on_bulk_insert_error_key]="400 - Rejected by OpenSearch#{reason}"
  end
  @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
else
  if inject_on_bulk_insert_retry
   rawrecord[inject_on_bulk_insert_retry_key]="400 - Rejected by OpenSearch#{reason}"
  end
  retry_stream.add(time, rawrecord)
end

...

#when status is not 400 and error is a string#

if !retry_on_bulk_insert_status_other
  if inject_on_bulk_insert_error
   rawrecord[inject_on_bulk_insert_error_key]="#{status} - #{reason}"
  end
  @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{reason}"))
else
  if inject_on_bulk_insert_retry
   rawrecord[inject_on_bulk_insert_retry_key]="#{status} - #{reason}"
  end
  retry_stream.add(time, rawrecord)
end

...

#when status is not 400, error is a hash, type exists, and unrecoverable_record_error#

  if inject_on_bulk_insert_error
   rawrecord[inject_on_bulk_insert_error_key]="#{status} - #{type}: #{reason}"
  end
  @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{type}: #{reason}"))

...

#when status is not 400, error is a hash, type exists, and not unrecoverable_record_error#

if inject_on_bulk_insert_retry
 rawrecord[inject_on_bulk_insert_retry_key]="#{status} - #{type}: #{reason}"
end
retry_stream.add(time, rawrecord) unless unrecoverable_record_error?(type)

...

#when status is not 400, error is a hash, type is missing#

if !retry_on_bulk_insert_status_other
  if inject_on_bulk_insert_error
   rawrecord[inject_on_bulk_insert_error_key]="#{status} - No error type provided in the response"
  end
  @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - No error type provided in the response"))
else
  if inject_on_bulk_insert_retry
   rawrecord[inject_on_bulk_insert_retry_key]="#{status} - No error type provided in the response"
  end
  retry_stream.add(time, rawrecord)
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question User forum like issues
Projects
None yet
Development

No branches or pull requests

2 participants