Skip to content

Commit

Permalink
Transition filter_parser from single record to stream
Browse files Browse the repository at this point in the history
Signed-off-by: Athish Pranav D <[email protected]>
  • Loading branch information
Athishpranav2003 committed Sep 10, 2024
1 parent 7f13c7a commit 90c8501
Showing 1 changed file with 33 additions and 49 deletions.
82 changes: 33 additions & 49 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,32 @@ def configure(conf)
FAILED_RESULT = [nil, nil].freeze # reduce allocation cost
REPLACE_CHAR = '?'.freeze

def filter_with_time(tag, time, record)
raw_value = @accessor.call(record)
if raw_value.nil?
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
end
if @reserve_data
return time, handle_parsed(tag, record, time, {})
else
return FAILED_RESULT
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each do |time, record|
begin
raw_value = @accessor.call(record)
if raw_value.nil?
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
end
if @reserve_data
new_es.add(time, handle_parsed(tag, record, time, {}))
end
next
end
filter_one_record(tag, time, record, raw_value) do |result_time, result_record|
new_es.add(result_time, result_record)
end
end
end
begin
# Note: https://github.com/fluent/fluentd/issues/4100
# If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# This should be fixed in the future version.
result_time = nil
result_record = nil
new_es
end

private

def filter_one_record(tag, time, record, raw_value)
begin
@parser.parse(raw_value) do |t, values|
if values
t = if @reserve_time
Expand All @@ -85,55 +91,33 @@ def filter_with_time(tag, time, record)
t.nil? ? time : t
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)

if result_record.nil?
result_time = t
result_record = r
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
))
end
end
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
end

next unless @reserve_data
next unless result_record.nil?

result_time = time
result_record = handle_parsed(tag, record, time, {})
t = time
values = {}
end
yield(t, handle_parsed(tag, record, t, values))
end

return result_time, result_record
rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
raise e
else
return FAILED_RESULT
router.emit_error_event(tag, time, record, e)
end
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0

raw_value = raw_value.scrub(REPLACE_CHAR)
retry
rescue => e
if @emit_invalid_record_to_error
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
else
return FAILED_RESULT
router.emit_error_event(tag, time, record, e)
end

if @replace_invalid_sequence
raw_value = raw_value.scrub(REPLACE_CHAR)
retry
end
end
end

private

def handle_parsed(tag, record, t, values)
if values && @inject_key_prefix
values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }]
Expand Down

0 comments on commit 90c8501

Please sign in to comment.