From 4ae89f274c44a21fc8c98ce75c6cd58c9fdd91bb Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Tue, 5 Jul 2016 22:42:43 +0200 Subject: [PATCH] Introduce clean_removed (#1922) --- CHANGELOG.asciidoc | 4 +- filebeat/etc/beat.full.yml | 6 +- filebeat/filebeat.full.yml | 6 +- filebeat/input/file/state.go | 2 +- filebeat/prospector/config.go | 2 + filebeat/prospector/prospector_log.go | 17 +++++- filebeat/tests/system/config/filebeat.yml.j2 | 1 + filebeat/tests/system/test_registrar.py | 58 ++++++++++++++++++++ 8 files changed, 90 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index fa4d4b1bf40b..67790306bb49 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -58,8 +58,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d *Filebeat* - Introduce close_removed and close_renamed harvester options {issue}1600[1600] -- Introduce close_eof harvester options {issue}1600[1600] - +- Introduce close_eof harvester option {issue}1600[1600] +- Add clean_removed config option {issue}1600[1600] *Winlogbeat* diff --git a/filebeat/etc/beat.full.yml b/filebeat/etc/beat.full.yml index e9d4201c3189..8d1a0551efe7 100644 --- a/filebeat/etc/beat.full.yml +++ b/filebeat/etc/beat.full.yml @@ -170,7 +170,7 @@ filebeat.prospectors: # Note: Potential data loss if renamed file is not picked up by prospector. #close_renamed: false - # When enabling this option, a file handler is closed immidiately in case a file can't be found + # When enabling this option, a file handler is closed immediately in case a file can't be found # any more. In case the file shows up again later, harvesting will continue at the last known position # after scan_frequency. # Note: Potential data loss if file reading was not finished when file was removed. @@ -187,6 +187,10 @@ filebeat.prospectors: # By default this is disabled. #clean_older: 0 + # Removes the state for file which cannot be found on disk anymore immediately + #clean_removed: false + + #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index f0a991044ecb..9bc8a54af26d 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -170,7 +170,7 @@ filebeat.prospectors: # Note: Potential data loss if renamed file is not picked up by prospector. #close_renamed: false - # When enabling this option, a file handler is closed immidiately in case a file can't be found + # When enabling this option, a file handler is closed immediately in case a file can't be found # any more. In case the file shows up again later, harvesting will continue at the last known position # after scan_frequency. # Note: Potential data loss if file reading was not finished when file was removed. @@ -187,6 +187,10 @@ filebeat.prospectors: # By default this is disabled. #clean_older: 0 + # Removes the state for file which cannot be found on disk anymore immediately + #clean_removed: false + + #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 96b7c585ff09..c24fad881b57 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -93,7 +93,7 @@ func (s *States) Cleanup() { for _, state := range s.states { ttl := state.TTL - if ttl >= 0 && currentTime.Sub(state.Timestamp) > ttl { + if ttl == 0 || (ttl > 0 && currentTime.Sub(state.Timestamp) > ttl) { logp.Debug("state", "State removed for %v because of older: %v", state.Source, ttl) continue // drop state } diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index ff2f7dd8b774..6933491166ec 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -14,6 +14,7 @@ var ( ScanFrequency: 10 * time.Second, InputType: cfg.DefaultInputType, CleanOlder: 0, + CleanRemoved: false, } ) @@ -24,6 +25,7 @@ type prospectorConfig struct { ScanFrequency time.Duration `config:"scan_frequency"` InputType string `config:"input_type"` CleanOlder time.Duration `config:"clean_older" validate:"min=0"` + CleanRemoved bool `config:"clean_removed"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index d9cab7e5afe7..0910e873ef69 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -56,7 +56,21 @@ func (p *ProspectorLog) Run() { p.Prospector.states.Cleanup() logp.Debug("prospector", "Prospector states cleaned up.") } - p.lastScan = time.Now() + + // Cleanup of removed files will only happen after next scan. Otherwise it can happen that not all states + // were updated before cleanup is called + if p.config.CleanRemoved { + for _, state := range p.Prospector.states.GetStates() { + // os.Stat will return an error in case the file does not exist + _, err := os.Stat(state.Source) + if err != nil { + state.TTL = 0 + h, _ := p.Prospector.createHarvester(state) + h.SendStateUpdate() + logp.Debug("prospector", "Cleanup state for file as file removed: %s", state.Source) + } + } + } } // getFiles returns all files which have to be harvested @@ -131,6 +145,7 @@ func (p *ProspectorLog) scan() { } } + // Only update lastScan timestamp after scan is completed p.lastScan = newLastScan } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 62d876979b7a..0c70e9af2636 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -24,6 +24,7 @@ filebeat.prospectors: close_eof: {{close_eof}} force_close_files: {{force_close_files}} clean_older: {{clean_older}} + clean_removed: {{clean_removed}} {% if fields %} fields: diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index cf808ad29f5a..f5891edce311 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -769,3 +769,61 @@ def test_clean_older(self): else: assert data[0]["offset"] == 2 + + def test_clean_removed(self): + """ + Checks that files which were removed, the state is removed + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + scan_frequency="0.1s", + clean_removed=True, + close_removed=True + ) + + os.mkdir(self.working_dir + "/log/") + testfile1 = self.working_dir + "/log/input1" + testfile2 = self.working_dir + "/log/input2" + + with open(testfile1, 'w') as f: + f.write("file to be removed\n") + + with open(testfile2, 'w') as f: + f.write("2\n") + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_registry() + assert len(data) == 2 + + os.remove(testfile1) + + # Wait until states are removed from prospectors + self.wait_until( + lambda: self.log_contains( + "Cleanup state for file as file removed"), + max_timeout=15) + + # Add one more line to make sure registry is written + with open(testfile2, 'a') as f: + f.write("make sure registry is written\n") + + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + + filebeat.check_kill_and_wait() + + # Check that the first to files were removed from the registry + data = self.get_registry() + assert len(data) == 1 + + # Make sure the last file in the registry is the correct one and has the correct offset + if os.name == "nt": + assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + 2 + else: + assert data[0]["offset"] == len("make sure registry is written\n" + "2\n")