Skip to content

Commit

Permalink
Merge pull request #563 from elastic/exclude_files_filebeat
Browse files Browse the repository at this point in the history
Exclude files in filebeat
  • Loading branch information
tsg committed Dec 21, 2015
2 parents 1873d72 + eb541bc commit fb95077
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Topbeat*

*Filebeat*
- Add exclude_files configuration option {pull}563[563]

*Winlogbeat*

Expand Down
3 changes: 3 additions & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"os"
"path/filepath"
"regexp"
"time"

"github.com/elastic/beats/libbeat/cfgfile"
Expand Down Expand Up @@ -49,6 +50,8 @@ type ProspectorConfig struct {
ScanFrequency string `yaml:"scan_frequency"`
ScanFrequencyDuration time.Duration
Harvester HarvesterConfig `yaml:",inline"`
ExcludeFiles []string `yaml:"exclude_files"`
ExcludeFilesRegexp []*regexp.Regexp
}

type HarvesterConfig struct {
Expand Down
25 changes: 25 additions & 0 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (p *Prospector) setupProspectorConfig() error {
if err != nil {
return err
}
config.ExcludeFilesRegexp, err = harvester.InitRegexps(config.ExcludeFiles)
if err != nil {
return err
}

// Init File Stat list
p.prospectorList = make(map[string]harvester.FileStat)
Expand Down Expand Up @@ -219,11 +223,26 @@ func (p *Prospector) stdinRun(spoolChan chan *input.FileEvent) {
}
}

func (p *Prospector) isFileExcluded(file string) bool {

config := &p.ProspectorConfig

if len(config.ExcludeFilesRegexp) > 0 {

if harvester.MatchAnyRegexps(config.ExcludeFilesRegexp, file) {
return true
}
}

return false
}

// Scans the specific path which can be a glob (/**/**/*.log)
// For all found files it is checked if a harvester should be started
func (p *Prospector) scan(path string, output chan *input.FileEvent) {

logp.Debug("prospector", "scan path %s", path)
logp.Debug("prospector", "exclude_files: %s", p.ProspectorConfig.ExcludeFiles)
// Evaluate the path as a wildcards/shell glob
matches, err := filepath.Glob(path)
if err != nil {
Expand All @@ -237,6 +256,12 @@ func (p *Prospector) scan(path string, output chan *input.FileEvent) {
for _, file := range matches {
logp.Debug("prospector", "Check file for harvesting: %s", file)

// check if the file is in the exclude_files list
if p.isFileExcluded(file) {
logp.Debug("prospector", "Exclude file: %s", file)
continue
}

// Stat the file, following any symlinks.
fileinfo, err := os.Stat(file)

Expand Down
20 changes: 20 additions & 0 deletions filebeat/crawler/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,23 @@ func TestProspectorInitInputTypeWrong(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "log", prospector.ProspectorConfig.Harvester.InputType)
}

func TestProspectorFileExclude(t *testing.T) {

prospectorConfig := config.ProspectorConfig{
ExcludeFiles: []string{"\\.gz$"},
Harvester: config.HarvesterConfig{
BufferSize: 0,
},
}

prospector := Prospector{
ProspectorConfig: prospectorConfig,
}

prospector.Init()

assert.True(t, prospector.isFileExcluded("/tmp/log/logw.gz"))
assert.False(t, prospector.isFileExcluded("/tmp/log/logw.log"))

}
10 changes: 10 additions & 0 deletions filebeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ If both `include_lines` and `exclude_lines` are defined, then include_lines is c
exclude_lines: ["^DBG"]
-------------------------------------------------------------------------------------

===== exclude_files

A list of regular expressions to match the files to be ignored. By default no file is excluded.

[source,yaml]
-------------------------------------------------------------------------------------
exclude_files: [".gz$"]
-------------------------------------------------------------------------------------
To ignore all the files with the `gz` extension.

[[configuration-fields]]
===== fields

Expand Down
4 changes: 4 additions & 0 deletions filebeat/etc/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ filebeat:
# exclude_lines. By default, all the lines are exported.
# include_lines: ["^ERR", "^WARN"]

# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
# exclude_files: [".gz$"]

# Optional additional fields. These field can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
Expand Down
4 changes: 4 additions & 0 deletions filebeat/etc/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ filebeat:
# exclude_lines. By default, all the lines are exported.
# include_lines: ["^ERR", "^WARN"]

# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
# exclude_files: [".gz$"]

# Optional additional fields. These field can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
Expand Down
30 changes: 0 additions & 30 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"os"
"regexp"
"time"

"github.com/elastic/beats/filebeat/config"
Expand Down Expand Up @@ -332,35 +331,6 @@ func (h *Harvester) handleReadlineError(lastTimeRead time.Time, err error) error
func (h *Harvester) Stop() {
}

func InitRegexps(exprs []string) ([]*regexp.Regexp, error) {

result := []*regexp.Regexp{}

for _, exp := range exprs {

rexp, err := regexp.CompilePOSIX(exp)
if err != nil {
logp.Err("Fail to compile the regexp %s: %s", exp, err)
return nil, err
}
result = append(result, rexp)
}
return result, nil
}

func MatchAnyRegexps(regexps []*regexp.Regexp, text string) bool {

for _, rexp := range regexps {
if rexp.MatchString(text) {
// drop line
return true

}
}

return false
}

const maxConsecutiveEmptyReads = 100

// timedReader keeps track of last time bytes have been read from underlying
Expand Down
33 changes: 32 additions & 1 deletion filebeat/harvester/util.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package harvester

import (
"regexp"
"time"

"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/libbeat/logp"
"time"
)

// isLine checks if the given byte array is a line, means has a line ending \n
Expand Down Expand Up @@ -60,3 +62,32 @@ func readlineString(bytes []byte, size int) (string, int, error) {
s := string(bytes)[:len(bytes)-lineEndingChars(bytes)]
return s, size, nil
}

// InitRegexps initializes a list of compiled regular expressions.
func InitRegexps(exprs []string) ([]*regexp.Regexp, error) {

result := []*regexp.Regexp{}

for _, exp := range exprs {

rexp, err := regexp.CompilePOSIX(exp)
if err != nil {
logp.Err("Fail to compile the regexp %s: %s", exp, err)
return nil, err
}
result = append(result, rexp)
}
return result, nil
}

// MatchAnyRegexps checks if the text matches any of the regular expressions
func MatchAnyRegexps(regexps []*regexp.Regexp, text string) bool {

for _, rexp := range regexps {
if rexp.MatchString(text) {
return true
}
}

return false
}
19 changes: 19 additions & 0 deletions filebeat/harvester/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package harvester

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMatchAnyRegexps(t *testing.T) {

patterns := []string{"\\.gz$"}

regexps, err := InitRegexps(patterns)

assert.Nil(t, err)

assert.Equal(t, MatchAnyRegexps(regexps, "/var/log/log.gz"), true)

}
5 changes: 4 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ filebeat:
{% if exclude_lines %}
exclude_lines: {{exclude_lines}}
{% endif %}

{% if exclude_files %}
exclude_files: {{exclude_files}}
{% endif %}

spool_size:
idle_timeout: 0.1s
registry_file: {{ fb.working_dir + '/' }}{{ registryFile|default(".filebeat")}}
Expand Down
36 changes: 35 additions & 1 deletion filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def test_stdin(self):
lambda: self.output_has(lines=iterations1+iterations2),
max_timeout=15)


proc.kill_and_wait()

objs = self.read_output()
Expand Down Expand Up @@ -140,3 +139,38 @@ def test_rotating_ignore_older_larger_write_rate(self):
max_timeout=15)

proc.kill_and_wait()

def test_exclude_files(self):

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
exclude_files=[".gz$"]
)
os.mkdir(self.working_dir + "/log/")

testfile = self.working_dir + "/log/test.gz"
file = open(testfile, 'w')
file.write("line in gz file\n")
file.close()

testfile = self.working_dir + "/log/test.log"
file = open(testfile, 'w')
file.write("line in log file\n")
file.close()

filebeat = self.start_filebeat()

self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=15)

# TODO: Find better solution when filebeat did crawl the file
# Idea: Special flag to filebeat so that filebeat is only doing and
# crawl and then finishes
filebeat.kill_and_wait()

output = self.read_output()

# Check that output file has the same number of lines as the log file
assert 1 == len(output)
assert output[0]["message"] == "line in log file"

0 comments on commit fb95077

Please sign in to comment.