diff --git a/filebeat/config/config.go b/filebeat/config/config.go index d78453673fe1..8504c8769134 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -63,7 +63,9 @@ type HarvesterConfig struct { BackoffFactor int `yaml:"backoff_factor"` MaxBackoff string `yaml:"max_backoff"` MaxBackoffDuration time.Duration - ForceCloseFiles bool `yaml:"force_close_files"` + ForceCloseFiles bool `yaml:"force_close_files"` + ExcludeLines []string `yaml:"exclude_lines"` + IncludeLines []string `yaml:"include_lines"` } const ( diff --git a/filebeat/docs/configuration.asciidoc b/filebeat/docs/configuration.asciidoc index 0013390bbd56..862209962165 100644 --- a/filebeat/docs/configuration.asciidoc +++ b/filebeat/docs/configuration.asciidoc @@ -65,6 +65,36 @@ One of the following input types: The value that you specify here is used as the `input_type` for each event published to Logstash and Elasticsearch. +===== exclude_lines + +A list of regular expressions to match the lines that are dropped. It drops the lines that are matching any regular +expression from the list. By default, no lines are dropped. + +[source,yaml] +------------------------------------------------------------------------------------- +exclude_lines: ["^DBG"] +------------------------------------------------------------------------------------- +To exclude the lines starting with "DBG". + +===== include_lines + +A list of regular expressions to match the lines that are exported. It exports only the lines that are matching any regular expression from the list. By default, all lines are exported. + +[source,yaml] +------------------------------------------------------------------------------------- +include_lines: ["^ERR", "^WARN"] +------------------------------------------------------------------------------------- +To include only the lines starting with "ERR" or "WARN". + +Note:: +If both `include_lines` and `exclude_lines` are defined, then include_lines is called first. To export all the apache logs except the DBGs, then you can use: + +[source,yaml] +------------------------------------------------------------------------------------- + include_lines: ["apache"] + exclude_lines: ["^DBG"] +------------------------------------------------------------------------------------- + [[configuration-fields]] ===== fields diff --git a/filebeat/etc/beat.yml b/filebeat/etc/beat.yml index 32c8ad05463d..6eaabc87da0c 100644 --- a/filebeat/etc/beat.yml +++ b/filebeat/etc/beat.yml @@ -30,6 +30,16 @@ filebeat: # * stdin: Reads the standard in input_type: log + # Exclude lines. A list of regular expressions to match. It drops the lines that are + # matching any regular expression from the list. The include_lines is called before + # exclude_lines. By default, no lines are dropped. + # exclude_lines: ["^DBG"] + + # Include lines. A list of regular expressions to match. It exports the lines that are + # matching any regular expression from the list. The include_lines is called before + # exclude_lines. By default, all the lines are exported. + # include_lines: ["^ERR", "^WARN"] + # Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: diff --git a/filebeat/etc/filebeat.yml b/filebeat/etc/filebeat.yml index 9f94a1ffe9cf..d7d2a1650ffa 100644 --- a/filebeat/etc/filebeat.yml +++ b/filebeat/etc/filebeat.yml @@ -30,6 +30,16 @@ filebeat: # * stdin: Reads the standard in input_type: log + # Exclude lines. A list of regular expressions to match. It drops the lines that are + # matching any regular expression from the list. The include_lines is called before + # exclude_lines. By default, no lines are dropped. + # exclude_lines: ["^DBG"] + + # Include lines. A list of regular expressions to match. It exports the lines that are + # matching any regular expression from the list. The include_lines is called before + # exclude_lines. By default, all the lines are exported. + # include_lines: ["^ERR", "^WARN"] + # Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: @@ -163,9 +173,6 @@ output: # Optional HTTP Path #path: "/elasticsearch" - # Proxy server URL - # proxy_url: http://proxy:3128 - # The number of times a particular Elasticsearch index operation is attempted. If # the indexing operation doesn't succeed after this many retries, the events are # dropped. The default is 3. diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 430961541fcc..d63e348ecbf4 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -16,6 +16,7 @@ package harvester import ( "io" "os" + "regexp" "time" "github.com/elastic/beats/filebeat/config" @@ -24,15 +25,17 @@ import ( ) type Harvester struct { - Path string /* the file path to harvest */ - ProspectorConfig config.ProspectorConfig - Config *config.HarvesterConfig - Offset int64 - Stat *FileStat - SpoolerChan chan *input.FileEvent - encoding encoding.EncodingFactory - file FileSource /* the file being watched */ - backoff time.Duration + Path string /* the file path to harvest */ + ProspectorConfig config.ProspectorConfig + Config *config.HarvesterConfig + Offset int64 + Stat *FileStat + SpoolerChan chan *input.FileEvent + encoding encoding.EncodingFactory + file FileSource /* the file being watched */ + backoff time.Duration + ExcludeLinesRegexp []*regexp.Regexp + IncludeLinesRegexp []*regexp.Regexp } // Contains statistic about file when it was last seend by the prospector diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 136fab35a6ec..58410bd009eb 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "regexp" "time" "golang.org/x/text/transform" @@ -22,6 +23,7 @@ func NewHarvester( stat *FileStat, spooler chan *input.FileEvent, ) (*Harvester, error) { + var err error encoding, ok := encoding.FindEncoding(cfg.Encoding) if !ok || encoding == nil { return nil, fmt.Errorf("unknown encoding('%v')", cfg.Encoding) @@ -36,6 +38,14 @@ func NewHarvester( encoding: encoding, backoff: prospectorCfg.Harvester.BackoffDuration, } + h.ExcludeLinesRegexp, err = InitRegexps(cfg.ExcludeLines) + if err != nil { + return h, err + } + h.IncludeLinesRegexp, err = InitRegexps(cfg.IncludeLines) + if err != nil { + return h, err + } return h, nil } @@ -104,24 +114,50 @@ func (h *Harvester) Harvest() { // Reset Backoff h.backoff = h.Config.BackoffDuration - // Sends text to spooler - event := &input.FileEvent{ - ReadTime: lastReadTime, - Source: &h.Path, - InputType: h.Config.InputType, - DocumentType: h.Config.DocumentType, - Offset: h.Offset, - Bytes: bytesRead, - Text: &text, - Fields: &h.Config.Fields, - Fileinfo: &info, + if h.shouldExportLine(text) { + + // Sends text to spooler + event := &input.FileEvent{ + ReadTime: lastReadTime, + Source: &h.Path, + InputType: h.Config.InputType, + DocumentType: h.Config.DocumentType, + Offset: h.Offset, + Bytes: bytesRead, + Text: &text, + Fields: &h.Config.Fields, + Fileinfo: &info, + } + + event.SetFieldsUnderRoot(h.Config.FieldsUnderRoot) + h.SpoolerChan <- event // ship the new event downstream } + // Set Offset h.Offset += int64(bytesRead) // Update offset if complete line has been processed + } +} - event.SetFieldsUnderRoot(h.Config.FieldsUnderRoot) - h.SpoolerChan <- event // ship the new event downstream +// shouldExportLine decides if the line is exported or not based on +// the include_lines and exclude_lines options. +func (h *Harvester) shouldExportLine(line string) bool { + if len(h.IncludeLinesRegexp) > 0 { + if !MatchAnyRegexps(h.IncludeLinesRegexp, line) { + // drop line + logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line) + return false + } + } + if len(h.ExcludeLinesRegexp) > 0 { + if MatchAnyRegexps(h.ExcludeLinesRegexp, line) { + // drop line + logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line) + return false + } } + + return true + } // backOff checks the backoff variable and sleeps for the given time @@ -297,6 +333,35 @@ func (h *Harvester) handleReadlineError(lastTimeRead time.Time, err error) error func (h *Harvester) Stop() { } +func InitRegexps(exprs []string) ([]*regexp.Regexp, error) { + + result := []*regexp.Regexp{} + + for _, exp := range exprs { + + rexp, err := regexp.CompilePOSIX(exp) + if err != nil { + logp.Err("Fail to compile the regexp %s: %s", exp, err) + return nil, err + } + result = append(result, rexp) + } + return result, nil +} + +func MatchAnyRegexps(regexps []*regexp.Regexp, text string) bool { + + for _, rexp := range regexps { + if rexp.MatchString(text) { + // drop line + return true + + } + } + + return false +} + const maxConsecutiveEmptyReads = 100 // timedReader keeps track of last time bytes have been read from underlying diff --git a/filebeat/harvester/log_test.go b/filebeat/harvester/log_test.go index f889e51bac51..8ccf28b269ef 100644 --- a/filebeat/harvester/log_test.go +++ b/filebeat/harvester/log_test.go @@ -111,3 +111,30 @@ func TestLineEndingChars(t *testing.T) { line = []byte("NR ending \n\r") assert.Equal(t, 0, lineEndingChars(line)) } + +func TestExcludeLine(t *testing.T) { + + regexp, err := InitRegexps([]string{"^DBG"}) + + assert.Nil(t, err) + + assert.True(t, MatchAnyRegexps(regexp, "DBG: a debug message")) + assert.False(t, MatchAnyRegexps(regexp, "ERR: an error message")) +} + +func TestIncludeLine(t *testing.T) { + + regexp, err := InitRegexps([]string{"^ERR", "^WARN"}) + + assert.Nil(t, err) + + assert.False(t, MatchAnyRegexps(regexp, "DBG: a debug message")) + assert.True(t, MatchAnyRegexps(regexp, "ERR: an error message")) + assert.True(t, MatchAnyRegexps(regexp, "WARNING: a simple warning message")) +} + +func TestInitRegexp(t *testing.T) { + + _, err := InitRegexps([]string{"((((("}) + assert.NotNil(t, err) +} diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index ba50a6018ad1..6e1093f3e77a 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -23,6 +23,13 @@ filebeat: {% endfor %} {% endif %} fields_under_root: {{"true" if fieldsUnderRoot else "false"}} + {% if include_lines %} + include_lines: {{include_lines}} + {% endif %} + {% if exclude_lines %} + exclude_lines: {{exclude_lines}} + {% endif %} + spool_size: idle_timeout: 0.1s registry_file: {{ fb.working_dir + '/' }}{{ registryFile|default(".filebeat")}} diff --git a/filebeat/tests/system/test_crawler.py b/filebeat/tests/system/test_crawler.py index f9ce38c69730..ac91cf94b787 100644 --- a/filebeat/tests/system/test_crawler.py +++ b/filebeat/tests/system/test_crawler.py @@ -5,7 +5,6 @@ import codecs import os import time -import unittest # Additional tests to be added: @@ -157,7 +156,6 @@ def test_partial_line(self): filebeat.kill_and_wait() - def test_file_renaming(self): """ Makes sure that when a file is renamed, the content is not read again. @@ -675,3 +673,167 @@ def test_encodings(self): for _, _, text in encodings: assert text in lines assert text + " 2" in lines + + def test_include_lines(self): + """ + Checks if only the log lines defined by include_lines are exported + """ + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + include_lines=["^ERR", "^WARN"] + ) + os.mkdir(self.working_dir + "/log/") + + testfile = self.working_dir + "/log/test.log" + file = open(testfile, 'w') + + iterations = 20 + for n in range(0, iterations): + file.write("DBG: a simple debug message" + str(n)) + file.write("\n") + file.write("ERR: a simple error message" + str(n)) + file.write("\n") + file.write("WARNING: a simple warning message" + str(n)) + file.write("\n") + + file.close() + + filebeat = self.start_filebeat() + + self.wait_until( + lambda: self.output_has(40), + max_timeout=15) + + # TODO: Find better solution when filebeat did crawl the file + # Idea: Special flag to filebeat so that filebeat is only doing and + # crawl and then finishes + filebeat.kill_and_wait() + + output = self.read_output() + + # Check that output file has the same number of lines as the log file + assert iterations*2 == len(output) + + def test_default_include_exclude_lines(self): + """ + Checks if all the log lines are exported by default + """ + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*" + ) + os.mkdir(self.working_dir + "/log/") + + testfile = self.working_dir + "/log/test.log" + file = open(testfile, 'w') + + iterations = 20 + for n in range(0, iterations): + file.write("DBG: a simple debug message" + str(n)) + file.write("\n") + file.write("ERR: a simple error message" + str(n)) + file.write("\n") + file.write("WARNING: a simple warning message" + str(n)) + file.write("\n") + + file.close() + + filebeat = self.start_filebeat() + + self.wait_until( + lambda: self.output_has(60), + max_timeout=15) + + # TODO: Find better solution when filebeat did crawl the file + # Idea: Special flag to filebeat so that filebeat is only doing and + # crawl and then finishes + filebeat.kill_and_wait() + + output = self.read_output() + + # Check that output file has the same number of lines as the log file + assert iterations*3 == len(output) + + def test_exclude_lines(self): + """ + Checks if the lines matching exclude_lines regexp are dropped + """ + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + exclude_lines=["^DBG"] + ) + os.mkdir(self.working_dir + "/log/") + + testfile = self.working_dir + "/log/test.log" + file = open(testfile, 'w') + + iterations = 20 + for n in range(0, iterations): + file.write("DBG: a simple debug message" + str(n)) + file.write("\n") + file.write("ERR: a simple error message" + str(n)) + file.write("\n") + file.write("WARNING: a simple warning message" + str(n)) + file.write("\n") + + file.close() + + filebeat = self.start_filebeat() + + self.wait_until( + lambda: self.output_has(40), + max_timeout=15) + + # TODO: Find better solution when filebeat did crawl the file + # Idea: Special flag to filebeat so that filebeat is only doing and + # crawl and then finishes + filebeat.kill_and_wait() + + output = self.read_output() + + # Check that output file has the same number of lines as the log file + assert iterations*2 == len(output) + + def test_include_exclude_lines(self): + """ + Checks if all the log lines are exported by default + """ + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + exclude_lines=["^DBG"], + include_lines=["apache"] + ) + os.mkdir(self.working_dir + "/log/") + + testfile = self.working_dir + "/log/test.log" + file = open(testfile, 'w') + + iterations = 20 + for n in range(0, iterations): + file.write("DBG: a simple debug message" + str(n)) + file.write("\n") + file.write("ERR: apache simple error message" + str(n)) + file.write("\n") + file.write("ERR: a simple warning message" + str(n)) + file.write("\n") + + file.close() + + filebeat = self.start_filebeat() + + self.wait_until( + lambda: self.output_has(20), + max_timeout=15) + + # TODO: Find better solution when filebeat did crawl the file + # Idea: Special flag to filebeat so that filebeat is only doing and + # crawl and then finishes + filebeat.kill_and_wait() + + output = self.read_output() + + # Check that output file has the same number of lines as the log file + assert iterations == len(output)