Skip to content

Commit

Permalink
Merge pull request #718 from ruflin/close_older
Browse files Browse the repository at this point in the history
Introduction of close_older
  • Loading branch information
tsg committed Jan 19, 2016
2 parents 368667a + 0b6d6e7 commit d096631
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
- Rename proc.cpu.user_p with proc.cpu.total_p as includes CPU time spent in kernel space {pull}631[631]

*Filebeat*
- Default config for ignore_older is now infinite instead of 24h, means ignore_older is disabled by default. Use close_older to only close file handlers.

*Winlogbeat*

Expand Down Expand Up @@ -66,6 +67,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]

*Filebeat*
- Add multiline support for combining multiple related lines into one event. {issue}461[461]
- Add close_older configuration option to complete ignore_older https://github.com/elastic/filebeat/issues/181[181]

*Winlogbeat*
- Added support for reading event logs using the Windows Event Log API {pull}576[576]
Expand Down
5 changes: 4 additions & 1 deletion filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
// Defaults for config variables which are not set
const (
DefaultRegistryFile = ".filebeat"
DefaultIgnoreOlderDuration time.Duration = 24 * time.Hour
DefaultIgnoreOlderDuration time.Duration = 0
DefaultCloseOlderDuration time.Duration = 1 * time.Hour
DefaultScanFrequency time.Duration = 10 * time.Second
DefaultSpoolSize uint64 = 2048
DefaultIdleTimeout time.Duration = 5 * time.Second
Expand Down Expand Up @@ -47,6 +48,8 @@ type ProspectorConfig struct {
Input string
IgnoreOlder string `yaml:"ignore_older"`
IgnoreOlderDuration time.Duration
CloseOlder string `yaml:"close_older"`
CloseOlderDuration time.Duration
ScanFrequency string `yaml:"scan_frequency"`
ScanFrequencyDuration time.Duration
Harvester HarvesterConfig `yaml:",inline"`
Expand Down
3 changes: 2 additions & 1 deletion filebeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func TestReadConfig(t *testing.T) {
assert.Equal(t, "log", prospectors[0].Input)
assert.Equal(t, 3, len(prospectors[0].Harvester.Fields))
assert.Equal(t, "1", prospectors[0].Harvester.Fields["review"])
assert.Equal(t, "24h", prospectors[0].IgnoreOlder)
assert.Equal(t, "0", prospectors[0].IgnoreOlder)
assert.Equal(t, "1h", prospectors[0].CloseOlder)
assert.Equal(t, "10s", prospectors[0].ScanFrequency)

assert.Equal(t, "stdin", prospectors[2].Input)
Expand Down
5 changes: 5 additions & 0 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (p *Prospector) setupProspectorConfig() error {
return err
}

config.CloseOlderDuration, err = getConfigDuration(config.CloseOlder, cfg.DefaultCloseOlderDuration, "close_older")
if err != nil {
return err
}

config.ScanFrequencyDuration, err = getConfigDuration(config.ScanFrequency, cfg.DefaultScanFrequency, "scan_frequency")
if err != nil {
return err
Expand Down
30 changes: 26 additions & 4 deletions filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ func (p ProspectorLog) checkNewFile(h *harvester.Harvester) {

logp.Debug("prospector", "Start harvesting unknown file: %s", h.Path)

// Check for unmodified time, but only if the file modification time is before the last scan started
// This ensures we don't skip genuine creations with dead times less than 10s
if h.Stat.Fileinfo.ModTime().Before(p.lastscan) &&
time.Since(h.Stat.Fileinfo.ModTime()) > p.config.IgnoreOlderDuration {
if p.checkOldFile(h) {

logp.Debug("prospector", "Fetching old state of file to resume: %s", h.Path)
// Call crawler if there if there exists a state for the given file
Expand Down Expand Up @@ -184,6 +181,30 @@ func (p ProspectorLog) checkNewFile(h *harvester.Harvester) {
}
}

// checkOldFile returns true if the given file is currently not harvested
// and the last time was modified before ignore_older
func (p ProspectorLog) checkOldFile(h *harvester.Harvester) bool {

// Resuming never needed if ignore_older disabled
if p.config.IgnoreOlderDuration == 0 {
return false
}

modTime := h.Stat.Fileinfo.ModTime()

// Make sure modification time is before the last scan started to not pick it up twice
if !modTime.Before(p.lastscan) {
return false
}

// Only should be checked if older then ignore_older
if time.Since(modTime) <= p.config.IgnoreOlderDuration {
return false
}

return true
}

// checkExistingFile checks if a harvester has to be started for a already known file
// For existing files the following options exist:
// * Last reading position is 0, no harvester has to be started as old harvester probably still busy
Expand All @@ -195,6 +216,7 @@ func (p ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input.

logp.Debug("prospector", "Update existing file for harvesting: %s", h.Path)

// We assume it is the same file, but it wasn't
if !oldFile.IsSameFile(newFile) {

logp.Debug("prospector", "File previously found: %s", h.Path)
Expand Down
17 changes: 17 additions & 0 deletions filebeat/crawler/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ func TestProspectorInitScanFrequency0(t *testing.T) {
assert.Equal(t, zero, prospector.ProspectorConfig.ScanFrequencyDuration)
}

func TestProspectorInitCloseOlder0(t *testing.T) {

prospectorConfig := config.ProspectorConfig{
CloseOlder: "0",
}

prospector := Prospector{
ProspectorConfig: prospectorConfig,
}

prospector.Init()

var zero time.Duration = 0
// 0 expected
assert.Equal(t, zero, prospector.ProspectorConfig.CloseOlderDuration)
}

func TestProspectorInitInvalidScanFrequency(t *testing.T) {

prospectorConfig := config.ProspectorConfig{
Expand Down
23 changes: 21 additions & 2 deletions filebeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,27 @@ If the custom field names conflict with other field names added by Filebeat, the
===== ignore_older

If this option is specified, Filebeat
ignores any files that were modified before the specified timespan.
You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 24h.
ignores any files that were modified before the specified timespan. This is disabled by default.

You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 0, which means disable.
Commenting out the config has the same affect as setting it to 0.

Files which were falling under ignore_older and are updated again, will start
from the offset the file was at when it was last ignored by ignore_older. As an example:
A file was not modified for 90 hours and the offset is at 200. Now a new line is added and
the last modification date is updated. After scan_frequency detects the change the crawling
starts at the offset 200. In case the file was falling under ignore_older already when filebeat
was started, the first 200 lines are never sent. In case filebeat was started earlier, the 200
chars were already sent and it now continues at the old offset.


===== close_older

After a file was not modified for the duration of close_older, the file handle will be closed.
After closing the file, a file change will only be detected after scan_frequency instead of almost
instant.

You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 1h.


===== scan_frequency
Expand Down
10 changes: 8 additions & 2 deletions filebeat/etc/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ filebeat:
# fields.
#fields_under_root: false

# Ignore files which were modified more then the defined timespan in the past
# Ignore files which were modified more then the defined timespan in the past.
# ignore_older is disabled by default, so no files are ignored by setting it to 0.
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 24h
#ignore_older: 0

# Close older closes the file handler for which were not modified
# for longer then close_older
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#close_older: 1h

# Type to be published in the 'type' field. For Elasticsearch output,
# the type defines the document type these entries should be stored
Expand Down
10 changes: 8 additions & 2 deletions filebeat/etc/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ filebeat:
# fields.
#fields_under_root: false

# Ignore files which were modified more then the defined timespan in the past
# Ignore files which were modified more then the defined timespan in the past.
# In case all files on your system must be read you can set this value very large.
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 24h
#ignore_older: 72h

# Close older closes the file handler for which were not modified
# for longer then close_older
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#close_older: 1h

# Type to be published in the 'type' field. For Elasticsearch output,
# the type defines the document type these entries should be stored
Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (h *Harvester) Harvest() {
config := h.Config
readerConfig := logFileReaderConfig{
forceClose: config.ForceCloseFiles,
maxInactive: h.ProspectorConfig.IgnoreOlderDuration,
closeOlder: h.ProspectorConfig.CloseOlderDuration,
backoffDuration: config.BackoffDuration,
maxBackoffDuration: config.MaxBackoffDuration,
backoffFactor: config.BackoffFactor,
Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestReadLine(t *testing.T) {
// Read only 10 bytes which is not the end of the file
codec, _ := encoding.Plain(file)
readConfig := logFileReaderConfig{
maxInactive: 500 * time.Millisecond,
closeOlder: 500 * time.Millisecond,
backoffDuration: 100 * time.Millisecond,
maxBackoffDuration: 1 * time.Second,
backoffFactor: 2,
Expand Down
4 changes: 2 additions & 2 deletions filebeat/harvester/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type logFileReader struct {

type logFileReaderConfig struct {
forceClose bool
maxInactive time.Duration
closeOlder time.Duration
backoffDuration time.Duration
maxBackoffDuration time.Duration
backoffFactor int
Expand Down Expand Up @@ -112,7 +112,7 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
}

age := time.Since(r.lastTimeRead)
if age > r.config.maxInactive {
if age > r.config.closeOlder {
// If the file hasn't change for longer then maxInactive, harvester stops
// and file handle will be closed.
return n, errInactive
Expand Down
3 changes: 2 additions & 1 deletion filebeat/tests/files/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ filebeat:
level: debug
review: 1
type: log
ignore_older: 24h
ignore_older: 0
close_older: 1h
scan_frequency: 10s
harvester_buffer_size: 5000
tail_files: false
Expand Down
3 changes: 2 additions & 1 deletion filebeat/tests/load/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ filebeat:
#fields:
# level: debug
# review: 1
ignore_older: 24h
ignore_older: 0
close_older: 1h
scan_frequency: 0s
harvester_buffer_size: 1000000

Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ filebeat:
input_type: {{input_type | default("log") }}
scan_frequency: {{scan_frequency | default("0.1s") }}
ignore_older: "{{ignoreOlder}}"
close_older: "{{closeOlder}}"
harvester_buffer_size:
encoding: {{encoding | default("utf-8") }}
tail_files: {{tailFiles}}
Expand Down
54 changes: 54 additions & 0 deletions filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def test_rotating_ignore_older_low_write_rate(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
ignoreOlder="1s",
closeOlder="1s",
scan_frequency="0.1s",
)

Expand Down Expand Up @@ -309,3 +310,56 @@ def test_files_added_late(self):
max_timeout=15)

filebeat.kill_and_wait()

def test_close_older(self):
"""
Test that close_older closes the file but reading
is picked up again after scan_frequency
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
ignoreOlder="1h",
closeOlder="1s",
scan_frequency="0.1s",
)

os.mkdir(self.working_dir + "/log/")
testfile = self.working_dir + "/log/test.log"

filebeat = self.start_filebeat(debug_selectors=['*'])

# wait for first "Start next scan" log message
self.wait_until(
lambda: self.log_contains(
"Start next scan"),
max_timeout=10)

lines = 0

# write first line
lines += 1
with open(testfile, 'a') as file:
file.write("Line {}\n".format(lines))

# wait for log to be read
self.wait_until(
lambda: self.output_has(lines=lines),
max_timeout=15)

# wait for file to be closed due to close_older
self.wait_until(
lambda: self.log_contains(
"Closing file: {}\n".format(os.path.abspath(testfile))),
max_timeout=10)

# write second line
lines += 1
with open(testfile, 'a') as file:
file.write("Line {}\n".format(lines))

self.wait_until(
# allow for events to be send multiple times due to log rotation
lambda: self.output_count(lambda x: x >= lines),
max_timeout=5)

filebeat.kill_and_wait()

0 comments on commit d096631

Please sign in to comment.