From 568b80501c709847f89b3a3c30aee911ecb1f811 Mon Sep 17 00:00:00 2001 From: Oleh Date: Sun, 14 Apr 2024 14:10:29 +0300 Subject: [PATCH 1/6] Updated in_opensearch.rb and added retry logic for both client creation and search operations in case of connection failures. Signed-off-by: Oleh Palanskyi Signed-off-by: Oleh Signed-off-by: OlehPalanskyi --- README.OpenSearchInput.md | 10 +++ lib/fluent/plugin/in_opensearch.rb | 125 +++++++++++++++++++---------- 2 files changed, 91 insertions(+), 44 deletions(-) diff --git a/README.OpenSearchInput.md b/README.OpenSearchInput.md index a89535f..a8de566 100644 --- a/README.OpenSearchInput.md +++ b/README.OpenSearchInput.md @@ -24,6 +24,7 @@ + [docinfo_fields](#docinfo_fields) + [docinfo_target](#docinfo_target) + [docinfo](#docinfo) + + [infinite_check_connection](#infinite_check_connection) * [Advanced Usage](#advanced-usage) ## Usage @@ -274,6 +275,15 @@ This parameter specifies whether docinfo information including or not. The defau docinfo false ``` +### infinite_check_connection + +The parameter infinite checking on connection availability with Elasticsearch or opensearch hosts, every request_timeout (default 5) seconds. The default value is `true,`. But if value is `false` then checking of connection will be only 3 times + +``` +infinite_check_connection true +``` + + ## Advanced Usage OpenSearch Input plugin and OpenSearch output plugin can combine to transfer records into another cluster. diff --git a/lib/fluent/plugin/in_opensearch.rb b/lib/fluent/plugin/in_opensearch.rb index 380c0d7..7e33872 100644 --- a/lib/fluent/plugin/in_opensearch.rb +++ b/lib/fluent/plugin/in_opensearch.rb @@ -80,6 +80,7 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id'] config_param :docinfo_target, :string, :default => METADATA config_param :docinfo, :bool, :default => false + config_param :infinite_check_connection, :bool, :default => true include Fluent::Plugin::OpenSearchConstants @@ -240,40 +241,55 @@ def parse_time(value, event_time, tag) end def client(host = nil) - # check here to see if we already have a client connection for the given host - connection_options = get_connection_options(host) + retry_count = 0 + max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed - @_os = nil unless is_existing_connection(connection_options[:hosts]) + begin + # check here to see if we already have a client connection for the given host + connection_options = get_connection_options(host) - @_os ||= begin - @current_config = connection_options[:hosts].clone - adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } - local_reload_connections = @reload_connections - if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER - local_reload_connections = @reload_after - end + @_os = nil unless is_existing_connection(connection_options[:hosts]) + + @_os ||= begin + @current_config = connection_options[:hosts].clone + adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } + local_reload_connections = @reload_connections + if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER + local_reload_connections = @reload_after + end - headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) - - transport = OpenSearch::Transport::Transport::HTTP::Faraday.new( - connection_options.merge( - options: { - reload_connections: local_reload_connections, - reload_on_failure: @reload_on_failure, - resurrect_after: @resurrect_after, - logger: @transport_logger, - transport_options: { - headers: headers, - request: { timeout: @request_timeout }, - ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } - }, - http: { - user: @user, - password: @password - }, - sniffer_class: @sniffer_class, - }), &adapter_conf) - OpenSearch::Client.new transport: transport + headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) + + transport = OpenSearch::Transport::Transport::HTTP::Faraday.new( + connection_options.merge( + options: { + reload_connections: local_reload_connections, + reload_on_failure: @reload_on_failure, + resurrect_after: @resurrect_after, + logger: @transport_logger, + transport_options: { + headers: headers, + request: { timeout: @request_timeout }, + ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } + }, + http: { + user: @user, + password: @password + }, + sniffer_class: @sniffer_class, + }), &adapter_conf) + OpenSearch::Client.new transport: transport + end + rescue Faraday::ConnectionFailed => e + # Retry logic for connection failures during client creation + if retry_count < max_retry + log.warn "Connection to OpenSearch failed during client creation: #{e.message}. Retrying (Attempt #{retry_count + 1})..." + retry_count += 1 + sleep(@request_timeout) + retry + else + raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation." + end end end @@ -305,23 +321,44 @@ def run end def run_slice(slice_id=nil) - slice_query = @base_query - slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? - result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) - es = Fluent::MultiEventStream.new - - result["hits"]["hits"].each {|hit| process_events(hit, es)} - has_hits = result['hits']['hits'].any? - scroll_id = result['_scroll_id'] - - while has_hits && scroll_id - result = process_next_scroll_request(es, scroll_id) - has_hits = result['has_hits'] + retry_count = 0 + max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed + begin + slice_query = @base_query + slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? + result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) + es = Fluent::MultiEventStream.new + + result["hits"]["hits"].each {|hit| process_events(hit, es)} + has_hits = result['hits']['hits'].any? scroll_id = result['_scroll_id'] + + while has_hits && scroll_id + result = process_next_scroll_request(es, scroll_id) + has_hits = result['has_hits'] + scroll_id = result['_scroll_id'] + end + + router.emit_stream(@tag, es) + clear_scroll(scroll_id) + rescue Faraday::ConnectionFailed => e + # Retry logic for connection failures during search + if retry_count < max_retry + log.warn "Connection to OpenSearch failed during search: #{e.message}. Retrying (Attempt #{retry_count + 1})..." + retry_count += 1 + sleep(@request_timeout) + retry + else + raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation." + end end + end - router.emit_stream(@tag, es) + def clear_scroll(scroll_id) client.clear_scroll(scroll_id: scroll_id) if scroll_id + rescue => e + # ignore & log any clear_scroll errors + log.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class) end def process_scroll_request(scroll_id) From dc27c67328aaee04c5ffe523c772010176409aba Mon Sep 17 00:00:00 2001 From: OlehPalanskyi Date: Mon, 15 Apr 2024 08:13:31 +0300 Subject: [PATCH 2/6] dded a service availability check function for high availability. I created a service availability check function for high availability. New function get_reachable_hosts in in_opensearch.rb I used one library opensearch to solve this problem. Signed-off-by: OlehPalanskyi --- lib/fluent/plugin/in_opensearch.rb | 163 +++++++++++++++-------------- test/plugin/test_in_opensearch.rb | 7 ++ 2 files changed, 94 insertions(+), 76 deletions(-) diff --git a/lib/fluent/plugin/in_opensearch.rb b/lib/fluent/plugin/in_opensearch.rb index 7e33872..3a4cd83 100644 --- a/lib/fluent/plugin/in_opensearch.rb +++ b/lib/fluent/plugin/in_opensearch.rb @@ -179,7 +179,7 @@ def get_connection_options(con_host=nil) end { - hosts: hosts + hosts: get_reachable_hosts(hosts) } end @@ -240,56 +240,78 @@ def parse_time(value, event_time, tag) return Time.at(event_time).to_time end - def client(host = nil) - retry_count = 0 - max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed - - begin - # check here to see if we already have a client connection for the given host - connection_options = get_connection_options(host) - - @_os = nil unless is_existing_connection(connection_options[:hosts]) - - @_os ||= begin - @current_config = connection_options[:hosts].clone - adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } - local_reload_connections = @reload_connections - if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER - local_reload_connections = @reload_after + def get_reachable_hosts(hosts=nil) + reachable_hosts = [] + attempt = 0 + loop do + hosts.each do |host| + begin + if @infinite_check_connection == true + check_host = OpenSearch::Client.new( + host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"], + user: host[:user], + password: host[:password], + reload_connections: true, + resurrect_after: @resurrect_after, + reload_on_failure: @reload_on_failure, + transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } } + ) + response = check_host.ping #https://github.com/opensearch-project/opensearch-ruby/blob/136e1c975fc91b8cb80d7d1134e32c6dbefdb3eb/lib/opensearch/api/actions/ping.rb#L33 + if response == true + reachable_hosts << host + else + log.warn "Connection to #{host[:scheme]}://#{host[:host]}:#{host[:port]} failed with status code #{response.status}" + end + else + reachable_hosts << host + end + rescue => e + log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}" end + end + break unless reachable_hosts.empty? + log.info "Attempt ##{attempt += 1} to get reachable hosts" + log.info "No reachable hosts found. Retrying in #{@request_timeout} seconds..." + sleep(@request_timeout) + end + reachable_hosts + end + + def client(host = nil) + # check here to see if we already have a client connection for the given host + connection_options = get_connection_options(host) - headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) + @_os = nil unless is_existing_connection(connection_options[:hosts]) - transport = OpenSearch::Transport::Transport::HTTP::Faraday.new( - connection_options.merge( - options: { - reload_connections: local_reload_connections, - reload_on_failure: @reload_on_failure, - resurrect_after: @resurrect_after, - logger: @transport_logger, - transport_options: { - headers: headers, - request: { timeout: @request_timeout }, - ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } - }, - http: { - user: @user, - password: @password - }, - sniffer_class: @sniffer_class, - }), &adapter_conf) - OpenSearch::Client.new transport: transport - end - rescue Faraday::ConnectionFailed => e - # Retry logic for connection failures during client creation - if retry_count < max_retry - log.warn "Connection to OpenSearch failed during client creation: #{e.message}. Retrying (Attempt #{retry_count + 1})..." - retry_count += 1 - sleep(@request_timeout) - retry - else - raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation." + @_os ||= begin + @current_config = connection_options[:hosts].clone + adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } + local_reload_connections = @reload_connections + if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER + local_reload_connections = @reload_after end + + headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) + + transport = OpenSearch::Transport::Transport::HTTP::Faraday.new( + connection_options.merge( + options: { + reload_connections: local_reload_connections, + reload_on_failure: @reload_on_failure, + resurrect_after: @resurrect_after, + logger: @transport_logger, + transport_options: { + headers: headers, + request: { timeout: @request_timeout }, + ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } + }, + http: { + user: @user, + password: @password + }, + sniffer_class: @sniffer_class, + }), &adapter_conf) + OpenSearch::Client.new transport: transport end end @@ -318,40 +340,29 @@ def run run_slice(slice_id) end end + rescue Faraday::ConnectionFailed => e + log.warn "Connection to OpenSearch failed during search in the 'run' method: #{e.message}. Retrying..." + retry end def run_slice(slice_id=nil) - retry_count = 0 - max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed - begin - slice_query = @base_query - slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? - result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) - es = Fluent::MultiEventStream.new - - result["hits"]["hits"].each {|hit| process_events(hit, es)} - has_hits = result['hits']['hits'].any? + slice_query = @base_query + slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? + result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) + es = Fluent::MultiEventStream.new + + result["hits"]["hits"].each {|hit| process_events(hit, es)} + has_hits = result['hits']['hits'].any? + scroll_id = result['_scroll_id'] + + while has_hits && scroll_id + result = process_next_scroll_request(es, scroll_id) + has_hits = result['has_hits'] scroll_id = result['_scroll_id'] - - while has_hits && scroll_id - result = process_next_scroll_request(es, scroll_id) - has_hits = result['has_hits'] - scroll_id = result['_scroll_id'] - end - - router.emit_stream(@tag, es) - clear_scroll(scroll_id) - rescue Faraday::ConnectionFailed => e - # Retry logic for connection failures during search - if retry_count < max_retry - log.warn "Connection to OpenSearch failed during search: #{e.message}. Retrying (Attempt #{retry_count + 1})..." - retry_count += 1 - sleep(@request_timeout) - retry - else - raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation." - end end + + router.emit_stream(@tag, es) + clear_scroll(scroll_id) end def clear_scroll(scroll_id) diff --git a/test/plugin/test_in_opensearch.rb b/test/plugin/test_in_opensearch.rb index 43be253..ce81351 100644 --- a/test/plugin/test_in_opensearch.rb +++ b/test/plugin/test_in_opensearch.rb @@ -39,6 +39,7 @@ class OpenSearchInputTest < Test::Unit::TestCase CONFIG = %[ tag raw.opensearch interval 2 + infinite_check_connection false ] def setup @@ -190,6 +191,7 @@ def test_configure user john password doe tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -228,6 +230,7 @@ def test_single_host_params_and_defaults user john password doe tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -249,6 +252,7 @@ def test_single_host_params_and_defaults_with_escape_placeholders user %{j+hn} password %{d@e} tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -271,6 +275,7 @@ def test_legacy_hosts_list path /es/ port 123 tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -295,6 +300,7 @@ def test_hosts_list user default_user password default_password tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -323,6 +329,7 @@ def test_hosts_list_with_escape_placeholders user default_user password default_password tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance From e72411f05d56b2c7773a7d02f3bbcd5ad430b947 Mon Sep 17 00:00:00 2001 From: OlehPalanskyi Date: Mon, 15 Apr 2024 08:28:32 +0300 Subject: [PATCH 3/6] Updated README.OpenSearchInput Signed-off-by: OlehPalanskyi --- README.OpenSearchInput.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.OpenSearchInput.md b/README.OpenSearchInput.md index a8de566..adb8138 100644 --- a/README.OpenSearchInput.md +++ b/README.OpenSearchInput.md @@ -277,7 +277,7 @@ docinfo false ### infinite_check_connection -The parameter infinite checking on connection availability with Elasticsearch or opensearch hosts, every request_timeout (default 5) seconds. The default value is `true,`. But if value is `false` then checking of connection will be only 3 times +The parameter infinite checking on connection availability with Elasticsearch or opensearch hosts, every request_timeout (default 5) seconds. The default value is `true`. ``` infinite_check_connection true From 97083057f11d4a7965cd73c71dffd0df50ebbba0 Mon Sep 17 00:00:00 2001 From: OlehPalanskyi Date: Wed, 24 Apr 2024 18:33:27 +0300 Subject: [PATCH 4/6] rewritten with the above recommendations in mind Signed-off-by: OlehPalanskyi --- README.OpenSearchInput.md | 90 +++++++++++++++++++++++-- lib/fluent/plugin/in_opensearch.rb | 101 +++++++++++++++++------------ test/plugin/test_in_opensearch.rb | 14 ++-- 3 files changed, 151 insertions(+), 54 deletions(-) diff --git a/README.OpenSearchInput.md b/README.OpenSearchInput.md index adb8138..85157b6 100644 --- a/README.OpenSearchInput.md +++ b/README.OpenSearchInput.md @@ -24,7 +24,16 @@ + [docinfo_fields](#docinfo_fields) + [docinfo_target](#docinfo_target) + [docinfo](#docinfo) - + [infinite_check_connection](#infinite_check_connection) + + [check_connection](#check_connection) + + [retry_forever](#retry_forever) + + [retry_timeout](#retry_timeout) + + [retry_max_times](#retry_max_times) + + [retry_type](#retry_type) + + [retry_wait](#retry_wait) + + [retry_exponential_backoff_base](#retry_exponential_backoff_base) + + [retry_max_interval](#retry_max_interval) + + [retry_randomize](#retry_randomize) + * [Advanced Usage](#advanced-usage) ## Usage @@ -275,14 +284,87 @@ This parameter specifies whether docinfo information including or not. The defau docinfo false ``` -### infinite_check_connection +### check_connection + +The parameter for checking on connection availability with Elasticsearch or Opensearch hosts. The default value is `true`. + +``` +check_connection true +``` +### retry_forever + +The parameter If true, plugin will ignore retry_timeout and retry_max_times options and retry forever. The default value is `true`. + +``` +retry_forever true +``` + +### retry_timeout + +The parameter maximum time (seconds) to retry again the failed try, until the plugin discards the retry. +If the next retry is going to exceed this time limit, the last retry will be made at exactly this time limit.. +The default value is `72h`. +72hours == 17 times with exponential backoff (not to change default behavior) + +``` +retry_timeout 72 * 60 * 60 +``` + +### retry_max_times + +The parameter maximum number of times to retry the failed try. The default value is `5` + +``` +retry_max_times 5 +``` + +### retry_type -The parameter infinite checking on connection availability with Elasticsearch or opensearch hosts, every request_timeout (default 5) seconds. The default value is `true`. +The parameter needs for how long need to wait (time in seconds) to retry again: +`exponential_backoff`: wait in seconds will become large exponentially per failure, +`periodic`: plugin will retry periodically with fixed intervals (configured via retry_wait). The default value is `:exponential_backoff` +Periodic -> fixed :retry_wait +Exponential backoff: k is number of retry times +c: constant factor, @retry_wait +b: base factor, @retry_exponential_backoff_base +k: times +total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1 ``` -infinite_check_connection true +retry_type exponential_backoff ``` +### retry_wait + +The parameter needs for wait in seconds before the next retry to again or constant factor of exponential backoff. The default value is `5` + +``` +retry_wait 5 +``` + +### retry_exponential_backoff_base + +The parameter The base number of exponential backoff for retries. The default value is `2` + +``` +retry_exponential_backoff_base 2 +``` + +### retry_max_interval + +The parameter maximum interval (seconds) for exponential backoff between retries while failing. The default value is `nil` + +``` +retry_max_interval nil +``` + +### retry_randomize + +The parameter If true, the plugin will retry after randomized interval not to do burst retries. The default value is `false` + +``` +retry_randomize false +``` ## Advanced Usage diff --git a/lib/fluent/plugin/in_opensearch.rb b/lib/fluent/plugin/in_opensearch.rb index 3a4cd83..a758f0e 100644 --- a/lib/fluent/plugin/in_opensearch.rb +++ b/lib/fluent/plugin/in_opensearch.rb @@ -29,6 +29,7 @@ require 'faraday/excon' require 'fluent/log-ext' require 'fluent/plugin/input' +require 'fluent/plugin_helper' require_relative 'opensearch_constants' module Fluent::Plugin @@ -39,7 +40,7 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end DEFAULT_STORAGE_TYPE = 'local' METADATA = "@metadata".freeze - helpers :timer, :thread + helpers :timer, :thread, :retry_state Fluent::Plugin.register_input('opensearch', self) @@ -80,7 +81,23 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id'] config_param :docinfo_target, :string, :default => METADATA config_param :docinfo, :bool, :default => false - config_param :infinite_check_connection, :bool, :default => true + config_param :check_connection, :bool, :default => true + config_param :retry_forever, :bool, default: true, desc: 'If true, plugin will ignore retry_timeout and retry_max_times options and retry forever.' + config_param :retry_timeout, :time, default: 72 * 60 * 60, desc: 'The maximum seconds to retry' + # 72hours == 17 times with exponential backoff (not to change default behavior) + config_param :retry_max_times, :integer, default: 5, desc: 'The maximum number of times to retry' + # exponential backoff sequence will be initialized at the time of this threshold + config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff + ### Periodic -> fixed :retry_wait + ### Exponential backoff: k is number of retry times + # c: constant factor, @retry_wait + # b: base factor, @retry_exponential_backoff_base + # k: times + # total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1 + config_param :retry_wait, :time, default: 5, desc: 'Seconds to wait before next retry , or constant factor of exponential backoff.' + config_param :retry_exponential_backoff_base, :float, default: 2, desc: 'The base number of exponential backoff for retries.' + config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.' + config_param :retry_randomize, :bool, default: false, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.' include Fluent::Plugin::OpenSearchConstants @@ -93,6 +110,7 @@ def configure(conf) @timestamp_parser = create_time_parser @backend_options = backend_options + @retry = retry_state(@retry_randomize) raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil? @@ -139,6 +157,15 @@ def backend_options raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}" end + def retry_state(randomize) + retry_state_create( + :input_retries, @retry_type, @retry_wait, @retry_timeout, + forever: @retry_forever, max_steps: @retry_max_times, + max_interval: @retry_max_interval, backoff_base: @retry_exponential_backoff_base, + randomize: randomize + ) + end + def get_escaped_userinfo(host_str) if m = host_str.match(/(?.*)%{(?.*)}:%{(?.*)}(?@.*)/) m["scheme"] + @@ -177,12 +204,29 @@ def get_connection_options(con_host=nil) host.merge!(user: @user, password: @password) if !host[:user] && @user host.merge!(path: @path) if !host[:path] && @path end - + live_hosts = @check_connection ? hosts.select { |host| reachable_host?(host) } : hosts { - hosts: get_reachable_hosts(hosts) + hosts: live_hosts } end + def reachable_host?(host) + client = OpenSearch::Client.new( + host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"], + user: host[:user], + password: host[:password], + reload_connections: @reload_connections, + request_timeout: @request_timeout, + resurrect_after: @resurrect_after, + reload_on_failure: @reload_on_failure, + transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } } + ) + client.ping + rescue => e + log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}: #{e.message}" + false + end + def emit_error_label_event(&block) # If `emit_error_label_event` is specified as false, error event emittions are not occurred. if emit_error_label_event @@ -240,43 +284,6 @@ def parse_time(value, event_time, tag) return Time.at(event_time).to_time end - def get_reachable_hosts(hosts=nil) - reachable_hosts = [] - attempt = 0 - loop do - hosts.each do |host| - begin - if @infinite_check_connection == true - check_host = OpenSearch::Client.new( - host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"], - user: host[:user], - password: host[:password], - reload_connections: true, - resurrect_after: @resurrect_after, - reload_on_failure: @reload_on_failure, - transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } } - ) - response = check_host.ping #https://github.com/opensearch-project/opensearch-ruby/blob/136e1c975fc91b8cb80d7d1134e32c6dbefdb3eb/lib/opensearch/api/actions/ping.rb#L33 - if response == true - reachable_hosts << host - else - log.warn "Connection to #{host[:scheme]}://#{host[:host]}:#{host[:port]} failed with status code #{response.status}" - end - else - reachable_hosts << host - end - rescue => e - log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}" - end - end - break unless reachable_hosts.empty? - log.info "Attempt ##{attempt += 1} to get reachable hosts" - log.info "No reachable hosts found. Retrying in #{@request_timeout} seconds..." - sleep(@request_timeout) - end - reachable_hosts - end - def client(host = nil) # check here to see if we already have a client connection for the given host connection_options = get_connection_options(host) @@ -340,8 +347,13 @@ def run run_slice(slice_id) end end - rescue Faraday::ConnectionFailed => e - log.warn "Connection to OpenSearch failed during search in the 'run' method: #{e.message}. Retrying..." + rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => e + @retry.step + #Raise error if the retry limit has been reached + raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{e.message}" if @retry.limit? + #Retry if the retry limit hasn't been reached + log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: e.message) + sleep(@retry.next_time - Time.now) retry end @@ -363,6 +375,9 @@ def run_slice(slice_id=nil) router.emit_stream(@tag, es) clear_scroll(scroll_id) + #reset steps and next_time if our function successful ends + @retry.instance_variable_set(:@steps, 0) + @retry.instance_variable_set(:@next_time, nil) end def clear_scroll(scroll_id) diff --git a/test/plugin/test_in_opensearch.rb b/test/plugin/test_in_opensearch.rb index ce81351..1b75c25 100644 --- a/test/plugin/test_in_opensearch.rb +++ b/test/plugin/test_in_opensearch.rb @@ -39,7 +39,7 @@ class OpenSearchInputTest < Test::Unit::TestCase CONFIG = %[ tag raw.opensearch interval 2 - infinite_check_connection false + check_connection false ] def setup @@ -191,7 +191,7 @@ def test_configure user john password doe tag raw.opensearch - infinite_check_connection false + check_connection false } instance = driver(config).instance @@ -230,7 +230,7 @@ def test_single_host_params_and_defaults user john password doe tag raw.opensearch - infinite_check_connection false + check_connection false } instance = driver(config).instance @@ -252,7 +252,7 @@ def test_single_host_params_and_defaults_with_escape_placeholders user %{j+hn} password %{d@e} tag raw.opensearch - infinite_check_connection false + check_connection false } instance = driver(config).instance @@ -275,7 +275,7 @@ def test_legacy_hosts_list path /es/ port 123 tag raw.opensearch - infinite_check_connection false + check_connection false } instance = driver(config).instance @@ -300,7 +300,7 @@ def test_hosts_list user default_user password default_password tag raw.opensearch - infinite_check_connection false + check_connection false } instance = driver(config).instance @@ -329,7 +329,7 @@ def test_hosts_list_with_escape_placeholders user default_user password default_password tag raw.opensearch - infinite_check_connection false + check_connection false } instance = driver(config).instance From e340a299a27c08d8acf5b817a2a3b23d4f89269a Mon Sep 17 00:00:00 2001 From: OlehPalanskyi Date: Thu, 25 Apr 2024 14:43:00 +0300 Subject: [PATCH 5/6] rewritten retry state with the above recommendations in mind. Signed-off-by: Oleh Signed-off-by: OlehPalanskyi --- lib/fluent/plugin/in_opensearch.rb | 32 ++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/lib/fluent/plugin/in_opensearch.rb b/lib/fluent/plugin/in_opensearch.rb index a758f0e..692c8fd 100644 --- a/lib/fluent/plugin/in_opensearch.rb +++ b/lib/fluent/plugin/in_opensearch.rb @@ -110,7 +110,7 @@ def configure(conf) @timestamp_parser = create_time_parser @backend_options = backend_options - @retry = retry_state(@retry_randomize) + @retry = nil raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil? @@ -337,6 +337,23 @@ def is_existing_connection(host) return true end + def update_retry_state(error=nil) + if error + unless @retry + @retry = retry_state(@retry_randomize) + end + @retry.step + #Raise error if the retry limit has been reached + raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{error.message}" if @retry.limit? + #Retry if the limit hasn't been reached + log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: error.message) + sleep(@retry.next_time - Time.now) + else + log.debug("retry succeeded.") unless @retry.nil? + @retry = nil unless @retry.nil? + end + end + def run return run_slice if @num_slices <= 1 @@ -347,13 +364,8 @@ def run run_slice(slice_id) end end - rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => e - @retry.step - #Raise error if the retry limit has been reached - raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{e.message}" if @retry.limit? - #Retry if the retry limit hasn't been reached - log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: e.message) - sleep(@retry.next_time - Time.now) + rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => error + update_retry_state(error) retry end @@ -375,9 +387,7 @@ def run_slice(slice_id=nil) router.emit_stream(@tag, es) clear_scroll(scroll_id) - #reset steps and next_time if our function successful ends - @retry.instance_variable_set(:@steps, 0) - @retry.instance_variable_set(:@next_time, nil) + update_retry_state end def clear_scroll(scroll_id) From 3abd9eb7b545d04d6371dbbc239a692a227b5c4d Mon Sep 17 00:00:00 2001 From: OlehPalanskyi Date: Fri, 26 Apr 2024 01:03:48 +0300 Subject: [PATCH 6/6] rewritten retry state with the above recommendations in mind. Signed-off-by: Oleh Signed-off-by: OlehPalanskyi --- lib/fluent/plugin/in_opensearch.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_opensearch.rb b/lib/fluent/plugin/in_opensearch.rb index 692c8fd..3b98712 100644 --- a/lib/fluent/plugin/in_opensearch.rb +++ b/lib/fluent/plugin/in_opensearch.rb @@ -349,8 +349,10 @@ def update_retry_state(error=nil) log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: error.message) sleep(@retry.next_time - Time.now) else - log.debug("retry succeeded.") unless @retry.nil? - @retry = nil unless @retry.nil? + unless @retry.nil? + log.debug("retry succeeded.") + @retry = nil + end end end