Skip to content

Commit

Permalink
Introduce clean_removed (#1922)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruflin authored and Steffen Siering committed Jul 5, 2016
1 parent 2f3885f commit 4ae89f2
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 6 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d

*Filebeat*
- Introduce close_removed and close_renamed harvester options {issue}1600[1600]
- Introduce close_eof harvester options {issue}1600[1600]

- Introduce close_eof harvester option {issue}1600[1600]
- Add clean_removed config option {issue}1600[1600]

*Winlogbeat*

Expand Down
6 changes: 5 additions & 1 deletion filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ filebeat.prospectors:
# Note: Potential data loss if renamed file is not picked up by prospector.
#close_renamed: false

# When enabling this option, a file handler is closed immidiately in case a file can't be found
# When enabling this option, a file handler is closed immediately in case a file can't be found
# any more. In case the file shows up again later, harvesting will continue at the last known position
# after scan_frequency.
# Note: Potential data loss if file reading was not finished when file was removed.
Expand All @@ -187,6 +187,10 @@ filebeat.prospectors:
# By default this is disabled.
#clean_older: 0

# Removes the state for file which cannot be found on disk anymore immediately
#clean_removed: false


#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin
Expand Down
6 changes: 5 additions & 1 deletion filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ filebeat.prospectors:
# Note: Potential data loss if renamed file is not picked up by prospector.
#close_renamed: false

# When enabling this option, a file handler is closed immidiately in case a file can't be found
# When enabling this option, a file handler is closed immediately in case a file can't be found
# any more. In case the file shows up again later, harvesting will continue at the last known position
# after scan_frequency.
# Note: Potential data loss if file reading was not finished when file was removed.
Expand All @@ -187,6 +187,10 @@ filebeat.prospectors:
# By default this is disabled.
#clean_older: 0

# Removes the state for file which cannot be found on disk anymore immediately
#clean_removed: false


#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *States) Cleanup() {

for _, state := range s.states {
ttl := state.TTL
if ttl >= 0 && currentTime.Sub(state.Timestamp) > ttl {
if ttl == 0 || (ttl > 0 && currentTime.Sub(state.Timestamp) > ttl) {
logp.Debug("state", "State removed for %v because of older: %v", state.Source, ttl)
continue // drop state
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
CleanOlder: 0,
CleanRemoved: false,
}
)

Expand All @@ -24,6 +25,7 @@ type prospectorConfig struct {
ScanFrequency time.Duration `config:"scan_frequency"`
InputType string `config:"input_type"`
CleanOlder time.Duration `config:"clean_older" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
17 changes: 16 additions & 1 deletion filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,21 @@ func (p *ProspectorLog) Run() {
p.Prospector.states.Cleanup()
logp.Debug("prospector", "Prospector states cleaned up.")
}
p.lastScan = time.Now()

// Cleanup of removed files will only happen after next scan. Otherwise it can happen that not all states
// were updated before cleanup is called
if p.config.CleanRemoved {
for _, state := range p.Prospector.states.GetStates() {
// os.Stat will return an error in case the file does not exist
_, err := os.Stat(state.Source)
if err != nil {
state.TTL = 0
h, _ := p.Prospector.createHarvester(state)
h.SendStateUpdate()
logp.Debug("prospector", "Cleanup state for file as file removed: %s", state.Source)
}
}
}
}

// getFiles returns all files which have to be harvested
Expand Down Expand Up @@ -131,6 +145,7 @@ func (p *ProspectorLog) scan() {
}
}

// Only update lastScan timestamp after scan is completed
p.lastScan = newLastScan
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ filebeat.prospectors:
close_eof: {{close_eof}}
force_close_files: {{force_close_files}}
clean_older: {{clean_older}}
clean_removed: {{clean_removed}}

{% if fields %}
fields:
Expand Down
58 changes: 58 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,3 +769,61 @@ def test_clean_older(self):
else:
assert data[0]["offset"] == 2


def test_clean_removed(self):
"""
Checks that files which were removed, the state is removed
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
scan_frequency="0.1s",
clean_removed=True,
close_removed=True
)

os.mkdir(self.working_dir + "/log/")
testfile1 = self.working_dir + "/log/input1"
testfile2 = self.working_dir + "/log/input2"

with open(testfile1, 'w') as f:
f.write("file to be removed\n")

with open(testfile2, 'w') as f:
f.write("2\n")

filebeat = self.start_beat()

self.wait_until(
lambda: self.output_has(lines=2),
max_timeout=10)

data = self.get_registry()
assert len(data) == 2

os.remove(testfile1)

# Wait until states are removed from prospectors
self.wait_until(
lambda: self.log_contains(
"Cleanup state for file as file removed"),
max_timeout=15)

# Add one more line to make sure registry is written
with open(testfile2, 'a') as f:
f.write("make sure registry is written\n")

self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)

filebeat.check_kill_and_wait()

# Check that the first to files were removed from the registry
data = self.get_registry()
assert len(data) == 1

# Make sure the last file in the registry is the correct one and has the correct offset
if os.name == "nt":
assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + 2
else:
assert data[0]["offset"] == len("make sure registry is written\n" + "2\n")

0 comments on commit 4ae89f2

Please sign in to comment.