Skip to content

Commit

Permalink
in_http: support Parser yield
Browse files Browse the repository at this point in the history
in_http didn't support yield of Parser.
The specification assumed that Parser could return Array.
However, this is wrong. Parser shouldn't return Array.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Apr 30, 2024
1 parent 0f96420 commit 356040c
Showing 1 changed file with 15 additions and 52 deletions.
67 changes: 15 additions & 52 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,54 +203,24 @@ def on_request(path_info, params)
begin
path = path_info[1..-1] # remove /
tag = path.split('/').join('.')
record_time, record = parse_params(params)

# Skip nil record
if record.nil?
log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
if @respond_with_empty_img
return RESPONSE_IMG
else
if @use_204_response
return RESPONSE_204
else
return RESPONSE_200
end
mes = Fluent::MultiEventStream.new
parse_params(params) do |record_time, record|
if record.nil?
log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
next
end
end

mes = nil
# Support batched requests
if record.is_a?(Array)
mes = Fluent::MultiEventStream.new
record.each do |single_record|
add_params_to_record(single_record, params)

if param_time = params['time']
param_time = param_time.to_f
single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
elsif @custom_parser
single_time = @custom_parser.parse_time(single_record)
single_time, single_record = @custom_parser.convert_values(single_time, single_record)
else
single_time = convert_time_field(single_record)
end

mes.add(single_time, single_record)
end
else
add_params_to_record(record, params)

time = if param_time = params['time']
param_time = param_time.to_f
param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
else
if record_time.nil?
convert_time_field(record)
else
record_time
end
record_time.nil? ? convert_time_field(record) : record_time
end

mes.add(time, record)
end
rescue => e
if @dump_error_log
Expand All @@ -261,11 +231,7 @@ def on_request(path_info, params)

# TODO server error
begin
if mes
router.emit_stream(tag, mes)
else
router.emit(tag, time, record)
end
router.emit_stream(tag, mes) unless mes.empty?
rescue => e
if @dump_error_log
log.error "failed to emit data", error: e
Expand Down Expand Up @@ -308,31 +274,28 @@ def on_server_connect(conn)
def parse_params_default(params)
if msgpack = params['msgpack']
@parser_msgpack.parse(msgpack) do |_time, record|
return nil, record
yield nil, record
end
elsif js = params['json']
@parser_json.parse(js) do |_time, record|
return nil, record
yield nil, record
end
elsif ndjson = params['ndjson']
events = []
ndjson.split(/\r?\n/).each do |js|
@parser_json.parse(js) do |_time, record|
events.push(record)
yield nil, record
end
end
return nil, events
else
raise "'json', 'ndjson' or 'msgpack' parameter is required"
end
end

def parse_params_with_parser(params)
if content = params[EVENT_RECORD_PARAMETER]
@custom_parser.parse(content) { |time, record|
raise "Received event is not #{@format_name}: #{content}" if record.nil?
return time, record
}
@custom_parser.parse(content) do |time, record|
yield time, record
end
else
raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
end
Expand Down

0 comments on commit 356040c

Please sign in to comment.