Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support <record> with dynamic parameter #7

Merged
merged 12 commits into from
Jan 8, 2016
9 changes: 7 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
language: ruby

rvm:
- 1.9.3
- 2.0.0
- 2.1.1
- 2.1
- 2.2.3
- 2.3.0
- ruby-head
- rbx

branches:
only:
- master

gemfile:
- Gemfile

matrix:
allow_failures:
- rvm: ruby-head
- rvm: rbx

before_install: gem update bundler
script: bundle exec rake test
1 change: 1 addition & 0 deletions fluent-plugin-record-modifier.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ Gem::Specification.new do |gem|
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
gem.add_dependency "fluent-mixin-config-placeholders", ">= 0.3.0"
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency("test-unit", ["~> 3.1.4"])
end
105 changes: 85 additions & 20 deletions lib/fluent/plugin/filter_record_modifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@ class RecordModifierFilter < Filter
config_param :char_encoding, :string, :default => nil
config_param :remove_keys, :string, :default => nil

include SetTagKeyMixin
include Fluent::Mixin::ConfigPlaceholders

BUILTIN_CONFIGURATIONS = %W(type @type log_level @log_level id @id include_tag_key tag_key char_encoding remove_keys)
BUILTIN_CONFIGURATIONS = %W(type @type log_level @log_level id @id char_encoding remove_keys)

def configure(conf)
super

if conf.has_key?('include_tag_key')
raise ConfigError, "include_tag_key and tag_key parameters are removed. Use 'tag ${tag}' in <record> section"
end

@map = {}
conf.each_pair { |k, v|
unless BUILTIN_CONFIGURATIONS.include?(k)
conf.has_key?(k)
@map[k] = v
$log.warn "top level definition is deprecated. Please put parameters inside <record>: '#{k} #{v}'"
@map[k] = DynamicExpander.new(k, v)
end
}

Expand All @@ -39,33 +43,46 @@ def configure(conf)
end
end

@has_tag_parts = false
conf.elements.select { |element| element.name == 'record' }.each do |element|
element.each_pair do |k, v|
element.has_key?(k) # to suppress unread configuration warning
@has_tag_parts = true if v.include?('tag_parts')
@map[k] = DynamicExpander.new(k, v)
end
end

if @remove_keys
@remove_keys = @remove_keys.split(',').map {|e| e.strip }
@remove_keys = @remove_keys.split(',').map { |e| e.strip }
end
end

def filter(tag, time, record)
filter_record(tag, time, record)
modify_record(record)
# Collect DynamicExpander related garbage instructions
GC.start
end

private

def modify_record(record)
@map.each_pair { |k, v|
record[k] = v
}
def filter_stream(tag, es)
new_es = MultiEventStream.new
tag_parts = @has_tag_parts ? tag.split('.') : nil

if @remove_keys
@remove_keys.each { |v|
record.delete(v)
es.each { |time, record|
@map.each_pair { |k, v|
record[k] = v.expand(tag, time, record, tag_parts)
}
end

record = change_encoding(record) if @char_encoding
record
if @remove_keys
@remove_keys.each { |v|
record.delete(v)
}
end

record = change_encoding(record) if @char_encoding
new_es.add(time, record)
}
new_es
end

private

def set_encoding(record)
record.each_pair { |k, v|
if v.is_a?(String)
Expand All @@ -82,5 +99,53 @@ def convert_encoding(record)
end
}
end

class DynamicExpander
def initialize(param_key, param_value)
if param_value.include?('${')
__str_eval_code__ = parse_parameter(param_value)

# Use class_eval with string instead of define_method for performance.
# It can't share instructions but this is 2x+ faster than define_method in filter case.
# Refer: http://tenderlovemaking.com/2013/03/03/dynamic_method_definitions.html
(class << self; self; end).class_eval <<-EORUBY, __FILE__, __LINE__ + 1
def expand(tag, time, record, tag_parts)
#{__str_eval_code__}
end
EORUBY
else
@param_value = param_value
end

begin
# check eval genarates wrong code or not
expand(nil, nil, nil, nil)
rescue SyntaxError
raise ConfigError, "Pass invalid syntax parameter : key = #{param_key}, value = #{param_value}"
rescue
# Ignore other runtime errors
end
end

# Default implementation for fixed value. This is overwritten when parameter contains '${xxx}' placeholder
def expand(tag, time, record, tag_parts)
@param_value
end

private

def parse_parameter(value)
num_placeholders = value.scan('${').size
if num_placeholders == 1
if value.start_with?('${') && value.end_with?('}')
return value[2..-2]
else
"\"#{value.gsub('${', '#{')}\""
end
else
"\"#{value.gsub('${', '#{')}\""
end
end
end
end if defined?(Filter)
end
19 changes: 11 additions & 8 deletions test/test_filter_record_modifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ def setup

CONFIG = %[
type record_modifier

gen_host ${hostname}
foo bar
include_tag_key
tag_key included_tag
remove_keys hoge

<record>
gen_host ${hostname}
foo bar
included_tag ${tag}
tag_wrap -${tag_parts[0]}-${tag_parts[1]}-
</record>
]

def create_driver(conf = CONFIG)
Expand All @@ -33,8 +35,9 @@ def test_configure
d = create_driver
map = d.instance.instance_variable_get(:@map)

assert_equal get_hostname, map['gen_host']
assert_equal 'bar', map['foo']
map.each_pair { |k, v|
assert v.is_a?(Fluent::RecordModifierFilter::DynamicExpander)
}
end

def test_format
Expand All @@ -45,7 +48,7 @@ def test_format
d.emit("a" => 2)
end

mapped = {'gen_host' => get_hostname, 'foo' => 'bar', 'included_tag' => @tag}
mapped = {'gen_host' => get_hostname, 'foo' => 'bar', 'included_tag' => @tag, 'tag_wrap' => "-#{@tag.split('.')[0]}-#{@tag.split('.')[1]}-"}
assert_equal [
{"a" => 1}.merge(mapped),
{"a" => 2}.merge(mapped),
Expand Down