Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_elasticsearch: Process error information properly #9236

Merged

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Aug 15, 2024

The current out_elasticsearch implementation is just giving up to process responses when encountering the error information. In this patch, continue to process until the end of the elements of msgpack converted JSON response.
If there is a succeeded information in the converted response, Fluent Bit will assume the requested payloads to be succeeded to send ES clusters.

Another solution to fix #6341.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[SERVICE]
    Flush        1
    Daemon       Off
    Log_Level    debug
    HTTP_Server  Off
    HTTP_Listen  0.0.0.0
    HTTP_Port    2020
    Flush        5

[INPUT]
    Name forward
    Port 24225

[OUTPUT]
    Name es
    Http_User elastic
    Http_passwd <ELASTIC_PASSWORD>
    suppress_type_name On
    trace_error On
    Write_Operation create
    tls On
    tls.verify Off
  • Debug log output from testing the change
Fluent Bit v3.1.7
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  __  
|  ___| |                | |   | ___ (_) |         |____ |/  | 
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`| | 
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \ | | 
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /_| |_
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)___/

[2024/08/19 18:50:29] [ info] Configuration:
[2024/08/19 18:50:29] [ info]  flush time     | 5.000000 seconds
[2024/08/19 18:50:29] [ info]  grace          | 5 seconds
[2024/08/19 18:50:29] [ info]  daemon         | 0
[2024/08/19 18:50:29] [ info] ___________
[2024/08/19 18:50:29] [ info]  inputs:
[2024/08/19 18:50:29] [ info]      forward
[2024/08/19 18:50:29] [ info] ___________
[2024/08/19 18:50:29] [ info]  filters:
[2024/08/19 18:50:29] [ info] ___________
[2024/08/19 18:50:29] [ info]  outputs:
[2024/08/19 18:50:29] [ info]      es.0
[2024/08/19 18:50:29] [ info] ___________
[2024/08/19 18:50:29] [ info]  collectors:
[2024/08/19 18:50:29] [ info] [fluent bit] version=3.1.7, commit=fa88100fd7, pid=6092
[2024/08/19 18:50:29] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2024/08/19 18:50:29] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/08/19 18:50:29] [ info] [cmetrics] version=0.9.4
[2024/08/19 18:50:29] [ info] [ctraces ] version=0.5.5
[2024/08/19 18:50:29] [ info] [input:forward:forward.0] initializing
[2024/08/19 18:50:29] [ info] [input:forward:forward.0] storage_strategy='memory' (memory only)
[2024/08/19 18:50:29] [debug] [forward:forward.0] created event channels: read=21 write=22
[2024/08/19 18:50:29] [debug] [in_fw] Listen='0.0.0.0' TCP_Port=24225
[2024/08/19 18:50:29] [debug] [downstream] listening on 0.0.0.0:24225
[2024/08/19 18:50:29] [ info] [input:forward:forward.0] listening on 0.0.0.0:24225
[2024/08/19 18:50:29] [debug] [es:es.0] created event channels: read=24 write=25
[2024/08/19 18:50:29] [debug] [output:es:es.0] host=127.0.0.1 port=9200 uri=/_bulk index=fluent-bit type=_doc
[2024/08/19 18:50:29] [ info] [output:es:es.0] worker #0 started
[2024/08/19 18:50:29] [ info] [sp] stream processor started
[2024/08/19 18:50:29] [ info] [output:es:es.0] worker #1 started
^[[A
[2024/08/19 18:50:39] [debug] [task] created task=0x600003fbc000 id=0 OK
[2024/08/19 18:50:39] [debug] [output:es:es.0] task_id=0 assigned to thread #0
[2024/08/19 18:50:39] [debug] [upstream] KA connection #49 to 127.0.0.1:9200 is connected
[2024/08/19 18:50:39] [debug] [http_client] not using http_proxy for header
[2024/08/19 18:50:39] [debug] [output:es:es.0] HTTP Status=200 URI=/_bulk
[2024/08/19 18:50:39] [ info] [output:es:es.0] ret = 129
[2024/08/19 18:50:39] [debug] [output:es:es.0] Elasticsearch response
{"errors":true,"took":316737025,"items":[{"create":{"_index":"fluent-bit","_id":"hxELapEB_XqxG5Ydupgb","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":7,"_primary_term":1,"status":201}},{"create":{"_index":"fluent-bit","_id":"iBELapEB_XqxG5Ydupgb","status":400,"error":{"type":"document_parsing_exception","reason":"[1:65] failed to parse field [_id] of type [_id] in document with id 'iBELapEB_XqxG5Ydupgb'. Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'","caused_by":{"type":"document_parsing_exception","reason":"[1:65] Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}},{"create":{"_index":"fluent-bit","_id":"iRELapEB_XqxG5Ydupgb","status":400,"error":{"type":"document_parsing_exception","reason":"[1:65] failed to parse field [_id] of type [_id] in document with id 'iRELapEB_XqxG5Ydupgb'. Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'","caused_by":{"type":"document_parsing_exception","reason":"[1:65] Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}}}},{"create":{"_index":"fluent-bit","_id":"ihELapEB_XqxG5Ydupgb","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":8,"_primary_term":1,"status":201}}]}
[2024/08/19 18:50:39] [debug] [upstream] KA connection #49 to 127.0.0.1:9200 is now available
[2024/08/19 18:50:39] [debug] [out flush] cb_destroy coro_id=0
[2024/08/19 18:50:39] [debug] [task] destroy task=0x600003fbc000 (task_id=0)
^C[2024/08/19 18:50:41] [engine] caught signal (SIGINT)
[2024/08/19 18:50:41] [ info] [input] pausing forward.0
[2024/08/19 18:50:41] [ info] [output:es:es.0] thread worker #0 stopping...
[2024/08/19 18:50:41] [ info] [output:es:es.0] thread worker #0 stopped
[2024/08/19 18:50:41] [ info] [output:es:es.0] thread worker #1 stopping...
[2024/08/19 18:50:41] [ info] [output:es:es.0] thread worker #1 stopped
  • Attached Valgrind output that shows no leaks or memory corruption was found
==2174370== 
==2174370== HEAP SUMMARY:
==2174370==     in use at exit: 0 bytes in 0 blocks
==2174370==   total heap usage: 15,854 allocs, 15,854 frees, 6,883,930 bytes allocated
==2174370== 
==2174370== All heap blocks were freed -- no leaks are possible
==2174370== 
==2174370== For lists of detected and suppressed errors, rerun with: -s
==2174370== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

The current out_elasticsearch implementation is just giving up to
process responses when encountering the error information.
In this patch, continue to process until the end of the elements of
msgpack converted JSON response.
If there is a succeeded infrmation in the converted response,
Fluent Bit will assume the requesting payloads to be succeeded to send
ES clusters.

Signed-off-by: Hiroshi Hatake <[email protected]>
@edsiper
Copy link
Member

edsiper commented Aug 21, 2024

thanks for the PR.

this lead me to think in output response testing: #9260 , because we don't have one yet.

in addition to that, is this issue also present in our OpenSearch connector ?

@cosmo0920
Copy link
Contributor Author

in addition to that, is this issue also present in our OpenSearch connector ?

Yes, it should be existing the same possible issue.
I'm facing the circumstances of OpenSearch and Elasticsearch when the "OSS fork" that is OpenSearch was created.
They should be having almost similar for HTTP bulk mechanism.

@kevholmes
Copy link

kevholmes commented Aug 23, 2024

Seeing this issue on both OpenSearch 1.3.x and 2.13.x hosted in AWS with the ES and OS connector. Let me know if I can help in any way.

@edsiper edsiper merged commit e37b56b into master Aug 27, 2024
52 checks passed
@edsiper edsiper deleted the cosmo0920-process-success-and-errors-on-response-properly branch August 27, 2024 15:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fluent-Bit does not handle 201-Created with 409-Conflicts errors from ElasticSearch correctly
3 participants