diff --git a/filebeat/etc/beat.full.yml b/filebeat/etc/beat.full.yml index 1b77cc8c4575..dffa3a746a12 100644 --- a/filebeat/etc/beat.full.yml +++ b/filebeat/etc/beat.full.yml @@ -179,8 +179,14 @@ filebeat.prospectors: # Closes the file handler as soon as the harvesters reaches then end of the file. # The file will be picked up again by the harvester at previous known state # after scan_frequency in case the file can still be discovered by the prospector. + # Note: Potential data loss if file is deleted / moved before picked up again after + # scan_frequency by prospector #close_eof: false + # Files for the modification data is older then clean_older the state from the registry is removed + # By default this is disabled. + #clean_older: 0 + #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 1c1725e57f15..6773eebec378 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -181,6 +181,10 @@ filebeat.prospectors: # after scan_frequency in case the file can still be discovered by the prospector. #close_eof: false + # Files for the modification data is older then clean_older the state from the registry is removed + # By default this is disabled. + #clean_older: 0 + #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 2c1ba908bc45..96b7c585ff09 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -15,7 +15,8 @@ type State struct { Finished bool `json:"-"` // harvester state Fileinfo os.FileInfo `json:"-"` // the file info FileStateOS StateOS - LastSeen time.Time `json:"last_seen"` + Timestamp time.Time `json:"timestamp"` + TTL time.Duration `json:"ttl"` } // NewState creates a new file state @@ -25,7 +26,8 @@ func NewState(fileInfo os.FileInfo, path string) State { Source: path, Finished: false, FileStateOS: GetOSState(fileInfo), - LastSeen: time.Now(), + Timestamp: time.Now(), + TTL: -1 * time.Second, // By default, state does have an infinit ttl } } @@ -47,7 +49,7 @@ func (s *States) Update(newState State) { defer s.mutex.Unlock() index, _ := s.findPrevious(newState) - newState.LastSeen = time.Now() + newState.Timestamp = time.Now() if index >= 0 { s.states[index] = newState @@ -81,25 +83,30 @@ func (s *States) findPrevious(newState State) (int, State) { } // Cleanup cleans up the state array. All states which are older then `older` are removed -func (s *States) Cleanup(older time.Duration) { +func (s *States) Cleanup() { + s.mutex.Lock() defer s.mutex.Unlock() - for i, state := range s.states { + currentTime := time.Now() + states := s.states[:0] - // File wasn't seen for longer then older -> remove state - if time.Since(state.LastSeen) > older { - logp.Debug("prospector", "State removed for %s because of older: %s", state.Source) - s.states = append(s.states[:i], s.states[i+1:]...) + for _, state := range s.states { + ttl := state.TTL + if ttl >= 0 && currentTime.Sub(state.Timestamp) > ttl { + logp.Debug("state", "State removed for %v because of older: %v", state.Source, ttl) + continue // drop state } + states = append(states, state) // in-place copy old state } - + s.states = states } // Count returns number of states func (s *States) Count() int { s.mutex.Lock() defer s.mutex.Unlock() + return len(s.states) } diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index 85f201b7ac05..ff2f7dd8b774 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -8,16 +8,12 @@ import ( cfg "github.com/elastic/beats/filebeat/config" ) -const ( - DefaultIgnoreOlder time.Duration = 0 - DefaultScanFrequency time.Duration = 10 * time.Second -) - var ( defaultConfig = prospectorConfig{ - IgnoreOlder: DefaultIgnoreOlder, - ScanFrequency: DefaultScanFrequency, + IgnoreOlder: 0, + ScanFrequency: 10 * time.Second, InputType: cfg.DefaultInputType, + CleanOlder: 0, } ) @@ -27,6 +23,7 @@ type prospectorConfig struct { Paths []string `config:"paths"` ScanFrequency time.Duration `config:"scan_frequency"` InputType string `config:"input_type"` + CleanOlder time.Duration `config:"clean_older" validate:"min=0"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 287f378c0ef5..73d1844d9968 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -105,6 +105,10 @@ func (p *Prospector) Run() { logp.Info("Prospector channel stopped") return case event := <-p.harvesterChan: + // Add ttl if cleanOlder is enabled + if p.config.CleanOlder > 0 { + event.FileState.TTL = p.config.CleanOlder + } select { case <-p.done: logp.Info("Prospector channel stopped") diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 02f27f4fe717..d9cab7e5afe7 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -12,8 +12,9 @@ import ( type ProspectorLog struct { Prospector *Prospector - lastscan time.Time config prospectorConfig + lastScan time.Time + lastClean time.Time } func NewProspectorLog(p *Prospector) (*ProspectorLog, error) { @@ -40,6 +41,7 @@ func (p *ProspectorLog) Init() { // Overwrite prospector states p.Prospector.states.SetStates(fileStates) + p.lastClean = time.Now() logp.Info("Previous states loaded: %v", p.Prospector.states.Count()) } @@ -48,10 +50,13 @@ func (p *ProspectorLog) Run() { logp.Debug("prospector", "Start next scan") p.scan() - // Only cleanup states if enabled - if p.config.IgnoreOlder != 0 { - p.Prospector.states.Cleanup(p.config.IgnoreOlder) + + // It is important that a first scan is run before cleanup to make sure all new states are read first + if p.config.CleanOlder > 0 { + p.Prospector.states.Cleanup() + logp.Debug("prospector", "Prospector states cleaned up.") } + p.lastScan = time.Now() } // getFiles returns all files which have to be harvested @@ -104,7 +109,7 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo { // Scan starts a scanGlob for each provided path/glob func (p *ProspectorLog) scan() { - newlastscan := time.Now() + newLastScan := time.Now() // TODO: Track harvesters to prevent any file from being harvested twice. Finished state could be delayed? // Now let's do one quick scan to pick up new files @@ -126,7 +131,7 @@ func (p *ProspectorLog) scan() { } } - p.lastscan = newlastscan + p.lastScan = newLastScan } // harvestNewFile harvest a new file @@ -145,10 +150,12 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S logp.Debug("prospector", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) + // TODO: check for ignore_older reached? or should that happen in scan already? + // No harvester is running for the file, start a new harvester // It is important here that only the size is checked and not modification time, as modification time could be incorrect on windows // https://blogs.technet.microsoft.com/asiasupp/2010/12/14/file-date-modified-property-are-not-updating-while-modifying-a-file-without-closing-it/ - if oldState.Finished && newState.Fileinfo.Size() > newState.Offset { + if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset { // Resume harvesting of an old file we've stopped harvesting from // This could also be an issue with force_close_older that a new harvester is started after each scan but not needed? // One problem with comparing modTime is that it is in seconds, and scans can happen more then once a second @@ -166,6 +173,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S // Update state because of file rotation h.SendStateUpdate() } else { + // TODO: improve logging depedent on what the exact reason is that harvesting does not continue // Nothing to do. Harvester is still running and file was not renamed logp.Debug("prospector", "No updates needed, file %s is already harvested.", newState.Source) } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 4d0d3c34eca2..ebd9274fdb33 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -132,8 +132,8 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool { logp.Info("Old registry states found: %v", len(oldStates)) counter := 0 for _, state := range oldStates { - // Makes time last_seen time of migration, as this is the best guess - state.LastSeen = time.Now() + // Makes timestamp time of migration, as this is the best guess + state.Timestamp = time.Now() states[counter] = state counter++ } @@ -180,8 +180,10 @@ func (r *Registrar) Run() { r.processEventStates(events) } - if e := r.writeRegistry(); e != nil { - logp.Err("Writing of registry returned error: %v. Continuing..", e) + r.states.Cleanup() + logp.Debug("registrar", "Registrar states cleaned up.") + if err := r.writeRegistry(); err != nil { + logp.Err("Writing of registry returned error: %v. Continuing...", err) } } } @@ -219,6 +221,7 @@ func (r *Registrar) writeRegistry() error { return err } + // First clean up states states := r.states.GetStates() encoder := json.NewEncoder(f) diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 5b6af299cf60..c8b6b05209f2 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -22,6 +22,8 @@ filebeat.prospectors: close_removed: {{close_removed}} close_renamed: {{close_renamed}} close_eof: {{close_eof}} + force_close_files: {{force_close_files}} + clean_older: {{clean_older}} {% if fields %} fields: diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index cb6372a53bf5..c6db2731d439 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -39,7 +39,7 @@ def get_registry_entry_by_path(self, path): if tmp_entry == None: tmp_entry = entry else: - if tmp_entry["last_seen"] < entry["last_seen"]: + if tmp_entry["timestamp"] < entry["timestamp"]: tmp_entry = entry return tmp_entry diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 95f90fed2982..cf808ad29f5a 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -636,13 +636,15 @@ def test_migration_non_windows(self): # Compare first entry oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["last_seen"] + del newJson["timestamp"] + del newJson["ttl"] assert newJson == oldJson # Compare second entry oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["last_seen"] + del newJson["timestamp"] + del newJson["ttl"] assert newJson == oldJson # Make sure the right number of entries is in @@ -686,15 +688,84 @@ def test_migration_windows(self): # Compare first entry oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["last_seen"] + del newJson["timestamp"] + del newJson["ttl"] assert newJson == oldJson # Compare second entry oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["last_seen"] + del newJson["timestamp"] + del newJson["ttl"] assert newJson == oldJson # Make sure the right number of entries is in data = self.get_registry() assert len(data) == 2 + + + def test_clean_older(self): + """ + Checks that states are properly removed after clean_older + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + clean_older="4s", + ignoreOlder="2s", + closeOlder="0.2s", + scan_frequency="0.1s" + ) + + + os.mkdir(self.working_dir + "/log/") + testfile1 = self.working_dir + "/log/input1" + testfile2 = self.working_dir + "/log/input2" + testfile3 = self.working_dir + "/log/input3" + + with open(testfile1, 'w') as f: + f.write("first file\n") + + with open(testfile2, 'w') as f: + f.write("second file\n") + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_registry() + assert len(data) == 2 + + # Wait until states are removed from prospectors + self.wait_until( + lambda: self.log_contains_count( + "State removed for") == 2, + max_timeout=15) + + with open(testfile3, 'w') as f: + f.write("2\n") + + # Write new file to make sure registrar is flushed again + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=30) + + # Wait until states are removed from prospectors + self.wait_until( + lambda: self.log_contains_count( + "State removed for") == 4, + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check that the first to files were removed from the registry + data = self.get_registry() + assert len(data) == 1 + + # Make sure the last file in the registry is the correct one and has the correct offset + if os.name == "nt": + assert data[0]["offset"] == 3 + else: + assert data[0]["offset"] == 2 +