diff --git a/README.OpenSearchInput.md b/README.OpenSearchInput.md index a89535f..85157b6 100644 --- a/README.OpenSearchInput.md +++ b/README.OpenSearchInput.md @@ -24,6 +24,16 @@ + [docinfo_fields](#docinfo_fields) + [docinfo_target](#docinfo_target) + [docinfo](#docinfo) + + [check_connection](#check_connection) + + [retry_forever](#retry_forever) + + [retry_timeout](#retry_timeout) + + [retry_max_times](#retry_max_times) + + [retry_type](#retry_type) + + [retry_wait](#retry_wait) + + [retry_exponential_backoff_base](#retry_exponential_backoff_base) + + [retry_max_interval](#retry_max_interval) + + [retry_randomize](#retry_randomize) + * [Advanced Usage](#advanced-usage) ## Usage @@ -274,6 +284,88 @@ This parameter specifies whether docinfo information including or not. The defau docinfo false ``` +### check_connection + +The parameter for checking on connection availability with Elasticsearch or Opensearch hosts. The default value is `true`. + +``` +check_connection true +``` +### retry_forever + +The parameter If true, plugin will ignore retry_timeout and retry_max_times options and retry forever. The default value is `true`. + +``` +retry_forever true +``` + +### retry_timeout + +The parameter maximum time (seconds) to retry again the failed try, until the plugin discards the retry. +If the next retry is going to exceed this time limit, the last retry will be made at exactly this time limit.. +The default value is `72h`. +72hours == 17 times with exponential backoff (not to change default behavior) + +``` +retry_timeout 72 * 60 * 60 +``` + +### retry_max_times + +The parameter maximum number of times to retry the failed try. The default value is `5` + +``` +retry_max_times 5 +``` + +### retry_type + +The parameter needs for how long need to wait (time in seconds) to retry again: +`exponential_backoff`: wait in seconds will become large exponentially per failure, +`periodic`: plugin will retry periodically with fixed intervals (configured via retry_wait). The default value is `:exponential_backoff` +Periodic -> fixed :retry_wait +Exponential backoff: k is number of retry times +c: constant factor, @retry_wait +b: base factor, @retry_exponential_backoff_base +k: times +total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1 + +``` +retry_type exponential_backoff +``` + +### retry_wait + +The parameter needs for wait in seconds before the next retry to again or constant factor of exponential backoff. The default value is `5` + +``` +retry_wait 5 +``` + +### retry_exponential_backoff_base + +The parameter The base number of exponential backoff for retries. The default value is `2` + +``` +retry_exponential_backoff_base 2 +``` + +### retry_max_interval + +The parameter maximum interval (seconds) for exponential backoff between retries while failing. The default value is `nil` + +``` +retry_max_interval nil +``` + +### retry_randomize + +The parameter If true, the plugin will retry after randomized interval not to do burst retries. The default value is `false` + +``` +retry_randomize false +``` + ## Advanced Usage OpenSearch Input plugin and OpenSearch output plugin can combine to transfer records into another cluster. diff --git a/lib/fluent/plugin/in_opensearch.rb b/lib/fluent/plugin/in_opensearch.rb index 380c0d7..3b98712 100644 --- a/lib/fluent/plugin/in_opensearch.rb +++ b/lib/fluent/plugin/in_opensearch.rb @@ -29,6 +29,7 @@ require 'faraday/excon' require 'fluent/log-ext' require 'fluent/plugin/input' +require 'fluent/plugin_helper' require_relative 'opensearch_constants' module Fluent::Plugin @@ -39,7 +40,7 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end DEFAULT_STORAGE_TYPE = 'local' METADATA = "@metadata".freeze - helpers :timer, :thread + helpers :timer, :thread, :retry_state Fluent::Plugin.register_input('opensearch', self) @@ -80,6 +81,23 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id'] config_param :docinfo_target, :string, :default => METADATA config_param :docinfo, :bool, :default => false + config_param :check_connection, :bool, :default => true + config_param :retry_forever, :bool, default: true, desc: 'If true, plugin will ignore retry_timeout and retry_max_times options and retry forever.' + config_param :retry_timeout, :time, default: 72 * 60 * 60, desc: 'The maximum seconds to retry' + # 72hours == 17 times with exponential backoff (not to change default behavior) + config_param :retry_max_times, :integer, default: 5, desc: 'The maximum number of times to retry' + # exponential backoff sequence will be initialized at the time of this threshold + config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff + ### Periodic -> fixed :retry_wait + ### Exponential backoff: k is number of retry times + # c: constant factor, @retry_wait + # b: base factor, @retry_exponential_backoff_base + # k: times + # total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1 + config_param :retry_wait, :time, default: 5, desc: 'Seconds to wait before next retry , or constant factor of exponential backoff.' + config_param :retry_exponential_backoff_base, :float, default: 2, desc: 'The base number of exponential backoff for retries.' + config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.' + config_param :retry_randomize, :bool, default: false, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.' include Fluent::Plugin::OpenSearchConstants @@ -92,6 +110,7 @@ def configure(conf) @timestamp_parser = create_time_parser @backend_options = backend_options + @retry = nil raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil? @@ -138,6 +157,15 @@ def backend_options raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}" end + def retry_state(randomize) + retry_state_create( + :input_retries, @retry_type, @retry_wait, @retry_timeout, + forever: @retry_forever, max_steps: @retry_max_times, + max_interval: @retry_max_interval, backoff_base: @retry_exponential_backoff_base, + randomize: randomize + ) + end + def get_escaped_userinfo(host_str) if m = host_str.match(/(?.*)%{(?.*)}:%{(?.*)}(?@.*)/) m["scheme"] + @@ -176,12 +204,29 @@ def get_connection_options(con_host=nil) host.merge!(user: @user, password: @password) if !host[:user] && @user host.merge!(path: @path) if !host[:path] && @path end - + live_hosts = @check_connection ? hosts.select { |host| reachable_host?(host) } : hosts { - hosts: hosts + hosts: live_hosts } end + def reachable_host?(host) + client = OpenSearch::Client.new( + host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"], + user: host[:user], + password: host[:password], + reload_connections: @reload_connections, + request_timeout: @request_timeout, + resurrect_after: @resurrect_after, + reload_on_failure: @reload_on_failure, + transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } } + ) + client.ping + rescue => e + log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}: #{e.message}" + false + end + def emit_error_label_event(&block) # If `emit_error_label_event` is specified as false, error event emittions are not occurred. if emit_error_label_event @@ -292,6 +337,25 @@ def is_existing_connection(host) return true end + def update_retry_state(error=nil) + if error + unless @retry + @retry = retry_state(@retry_randomize) + end + @retry.step + #Raise error if the retry limit has been reached + raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{error.message}" if @retry.limit? + #Retry if the limit hasn't been reached + log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: error.message) + sleep(@retry.next_time - Time.now) + else + unless @retry.nil? + log.debug("retry succeeded.") + @retry = nil + end + end + end + def run return run_slice if @num_slices <= 1 @@ -302,6 +366,9 @@ def run run_slice(slice_id) end end + rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => error + update_retry_state(error) + retry end def run_slice(slice_id=nil) @@ -321,7 +388,15 @@ def run_slice(slice_id=nil) end router.emit_stream(@tag, es) + clear_scroll(scroll_id) + update_retry_state + end + + def clear_scroll(scroll_id) client.clear_scroll(scroll_id: scroll_id) if scroll_id + rescue => e + # ignore & log any clear_scroll errors + log.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class) end def process_scroll_request(scroll_id) diff --git a/test/plugin/test_in_opensearch.rb b/test/plugin/test_in_opensearch.rb index 43be253..1b75c25 100644 --- a/test/plugin/test_in_opensearch.rb +++ b/test/plugin/test_in_opensearch.rb @@ -39,6 +39,7 @@ class OpenSearchInputTest < Test::Unit::TestCase CONFIG = %[ tag raw.opensearch interval 2 + check_connection false ] def setup @@ -190,6 +191,7 @@ def test_configure user john password doe tag raw.opensearch + check_connection false } instance = driver(config).instance @@ -228,6 +230,7 @@ def test_single_host_params_and_defaults user john password doe tag raw.opensearch + check_connection false } instance = driver(config).instance @@ -249,6 +252,7 @@ def test_single_host_params_and_defaults_with_escape_placeholders user %{j+hn} password %{d@e} tag raw.opensearch + check_connection false } instance = driver(config).instance @@ -271,6 +275,7 @@ def test_legacy_hosts_list path /es/ port 123 tag raw.opensearch + check_connection false } instance = driver(config).instance @@ -295,6 +300,7 @@ def test_hosts_list user default_user password default_password tag raw.opensearch + check_connection false } instance = driver(config).instance @@ -323,6 +329,7 @@ def test_hosts_list_with_escape_placeholders user default_user password default_password tag raw.opensearch + check_connection false } instance = driver(config).instance