diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4706950fa268..73e99fed8b6d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha2...master[Check the HEAD d *Topbeat* *Filebeat* +- The registry format was changed to an array instead of dict. The migration to the new format will happen automatically at the first startup. {pull}1703[1703] *Winlogbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 98d52749ba20..47f1beca0a88 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -71,7 +71,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } // Load the previous log file locations now, for use in prospector - fb.registrar.LoadState() + err = fb.registrar.LoadState() + if err != nil { + logp.Err("Error loading state: %v", err) + return err + } // Init and Start spooler: Harvesters dump events into the spooler. fb.spooler = NewSpooler(fb.FbConfig.Filebeat, fb.publisherChan) diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index e0b46a18242c..d4c6277f5e3f 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -36,10 +36,13 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu logp.Info("Loading Prospectors: %v", len(prospectorConfigs)) + // Get existing states + states := *c.Registrar.state + // Prospect the globs/paths given on the command line and launch harvesters for _, prospectorConfig := range prospectorConfigs { - prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan) + prospector, err := NewProspector(prospectorConfig, states, eventChan) if err != nil { return fmt.Errorf("Error in initing prospector: %s", err) } @@ -60,10 +63,9 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu logp.Debug("crawler", "Starting prospector %v", id) prospector.Run() }(i, p) - } - logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getState())) + logp.Info("All prospectors are initialised and running with %d states to persist", c.Registrar.state.Count()) return nil } diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index 11480e7def6b..e624a57e73be 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -17,7 +17,6 @@ type Prospector struct { prospectorer Prospectorer spoolerChan chan *input.FileEvent harvesterChan chan *input.FileEvent - registrar *Registrar done chan struct{} states *input.States wg sync.WaitGroup @@ -28,14 +27,13 @@ type Prospectorer interface { Run() } -func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *input.FileEvent) (*Prospector, error) { +func NewProspector(cfg *common.Config, states input.States, spoolerChan chan *input.FileEvent) (*Prospector, error) { prospector := &Prospector{ config: defaultConfig, - registrar: registrar, spoolerChan: spoolerChan, harvesterChan: make(chan *input.FileEvent), done: make(chan struct{}), - states: input.NewStates(), + states: &states, wg: sync.WaitGroup{}, } @@ -44,7 +42,6 @@ func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *i } err := prospector.Init() - if err != nil { return nil, err } @@ -72,15 +69,13 @@ func (p *Prospector) Init() error { switch p.config.Harvester.InputType { case cfg.StdinInputType: prospectorer, err = NewProspectorStdin(p) - prospectorer.Init() case cfg.LogInputType: prospectorer, err = NewProspectorLog(p) - prospectorer.Init() - default: return fmt.Errorf("Invalid prospector type: %v", p.config.Harvester.InputType) } + prospectorer.Init() p.prospectorer = prospectorer return nil diff --git a/filebeat/crawler/prospector_log.go b/filebeat/crawler/prospector_log.go index eeebae364ef2..281aa5c8b7b0 100644 --- a/filebeat/crawler/prospector_log.go +++ b/filebeat/crawler/prospector_log.go @@ -30,24 +30,17 @@ func (p *ProspectorLog) Init() { logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles) logp.Info("Load previous states from registry into memory") + fileStates := p.Prospector.states.GetStates() - // Load the initial state from the registry - for path, fileinfo := range p.getFiles() { - - // Check for each path found, if there is a previous state - offset := p.Prospector.registrar.fetchState(path, fileinfo) - - // Offset found -> skip to previous state - if offset > 0 { - state := input.NewFileState(fileinfo, path) - state.Offset = offset - // Make sure new harvester is started for all states - state.Finished = true - // Prospector must update all states as it has to detect also file rotation - p.Prospector.states.Update(state) - } + // Make sure all states are set as finished + for key, state := range fileStates { + state.Finished = true + fileStates[key] = state } + // Overwrite prospector states + p.Prospector.states.SetStates(fileStates) + logp.Info("Previous states loaded: %v", p.Prospector.states.Count()) } diff --git a/filebeat/crawler/prospector_test.go b/filebeat/crawler/prospector_test.go index ba660e396a2e..7c8efeb48f7e 100644 --- a/filebeat/crawler/prospector_test.go +++ b/filebeat/crawler/prospector_test.go @@ -15,7 +15,7 @@ import ( func TestProspectorDefaultConfigs(t *testing.T) { - prospector, err := NewProspector(common.NewConfig(), nil, nil) + prospector, err := NewProspector(common.NewConfig(), *input.NewStates(), nil) assert.NoError(t, err) // Default values expected diff --git a/filebeat/crawler/registrar.go b/filebeat/crawler/registrar.go index fdc5a406d2f6..f7f5ebb41958 100644 --- a/filebeat/crawler/registrar.go +++ b/filebeat/crawler/registrar.go @@ -7,6 +7,8 @@ import ( "path/filepath" "sync" + "time" + cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input" . "github.com/elastic/beats/filebeat/input" @@ -17,9 +19,8 @@ import ( type Registrar struct { Channel chan []*FileEvent done chan struct{} - registryFile string // Path to the Registry File - state map[string]FileState // Map with all file paths inside and the corresponding state - stateMutex sync.Mutex + registryFile string // Path to the Registry File + state *input.States // Map with all file paths inside and the corresponding state wg sync.WaitGroup } @@ -28,7 +29,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) { r := &Registrar{ registryFile: registryFile, done: make(chan struct{}), - state: map[string]FileState{}, + state: input.NewStates(), Channel: make(chan []*FileEvent, 1), wg: sync.WaitGroup{}, } @@ -63,14 +64,84 @@ func (r *Registrar) Init() error { // loadState fetches the previous reading state from the configure RegistryFile file // The default file is `registry` in the data path. -func (r *Registrar) LoadState() { - if existing, e := os.Open(r.registryFile); e == nil { - defer existing.Close() +func (r *Registrar) LoadState() error { + + // Check if files exists + _, err := os.Stat(r.registryFile) + if err != nil && !os.IsNotExist(err) { + return err + } + + // Error means no file found + if err != nil { + logp.Info("No registry file found under: %s. Creating a new registry file.", r.registryFile) + return nil + } + + file, err := os.Open(r.registryFile) + if err != nil { + return err + } + + defer file.Close() + + logp.Info("Loading registrar data from %s", r.registryFile) + + // DEPRECATED: This should be removed in 6.0 + oldStates := r.loadAndConvertOldState(file) + if oldStates { + return nil + } + + decoder := json.NewDecoder(file) + states := []input.FileState{} + decoder.Decode(&states) + + r.state.SetStates(states) + logp.Info("States Loaded from registrar: %+v", len(states)) + + return nil +} + +// loadAndConvertOldState loads the old state file and converts it to the new state +// This is designed so it can be easily removed in later versions +func (r *Registrar) loadAndConvertOldState(file *os.File) bool { + // Make sure file reader is reset afterwards + defer file.Seek(0, 0) + + decoder := json.NewDecoder(file) + oldStates := map[string]FileState{} + err := decoder.Decode(&oldStates) + + if err != nil { + logp.Debug("registrar", "Error decoding old state: %+v", err) + return false + } + + // No old states found -> probably already new format + if oldStates == nil { + return false + } - logp.Info("Loading registrar data from %s", r.registryFile) - decoder := json.NewDecoder(existing) - decoder.Decode(&r.state) + // Convert old states to new states + states := make([]input.FileState, len(oldStates)) + logp.Info("Old registry states found: %v", len(oldStates)) + counter := 0 + for _, state := range oldStates { + // Makes time last_seen time of migration, as this is the best guess + state.LastSeen = time.Now() + states[counter] = state + counter++ } + + r.state.SetStates(states) + + // Rewrite registry in new format + r.writeRegistry() + + logp.Info("Old states converted to new states and written to registrar: %v", len(oldStates)) + + return true } func (r *Registrar) Start() { @@ -112,22 +183,17 @@ func (r *Registrar) processEventStates(events []*FileEvent) { if event.InputType == cfg.StdinInputType { continue } - - r.setState(event.Source, event.FileState) + r.state.Update(event.FileState) } } +// Stop stops the registry. It waits until Run function finished. func (r *Registrar) Stop() { logp.Info("Stopping Registrar") close(r.done) r.wg.Wait() } -func (r *Registrar) GetFileState(path string) (FileState, bool) { - state, exist := r.getStateEntry(path) - return state, exist -} - // writeRegistry writes the new json registry file to disk. func (r *Registrar) writeRegistry() error { logp.Debug("registrar", "Write registry file: %s", r.registryFile) @@ -139,84 +205,15 @@ func (r *Registrar) writeRegistry() error { return e } - encoder := json.NewEncoder(file) + states := r.state.GetStates() - state := r.getState() - encoder.Encode(state) + encoder := json.NewEncoder(file) + encoder.Encode(states) // Directly close file because of windows file.Close() - logp.Info("Registry file updated. %d states written.", len(state)) + logp.Info("Registry file updated. %d states written.", len(states)) return SafeFileRotate(r.registryFile, tempfile) } - -func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) int64 { - - if previous, err := r.getPreviousFile(filePath, fileInfo); err == nil { - - if previous != filePath { - // File has rotated between shutdown and startup - // We return last state downstream, with a modified event source with the new file name - // And return the offset - also force harvest in case the file is old and we're about to skip it - logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath) - } - - logp.Info("Previous state for file %s found", filePath) - - lastState, _ := r.GetFileState(previous) - return lastState.Offset - } - - logp.Info("New file. Start reading from the beginning: %s", filePath) - - // New file so just start from the beginning - return 0 -} - -// getPreviousFile checks in the registrar if there is the newFile already exist with a different name -// In case an old file is found, the path to the file is returned, if not, an error is returned -func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) (string, error) { - - newState := input.GetOSFileState(newFileInfo) - - for oldFilePath, oldState := range r.getState() { - - // Compare states - if newState.IsSame(oldState.FileStateOS) { - logp.Info("Old file with new name found: %s -> %s", oldFilePath, newFilePath) - return oldFilePath, nil - } - } - - return "", fmt.Errorf("No previous file found") -} - -func (r *Registrar) setState(path string, state FileState) { - r.stateMutex.Lock() - defer r.stateMutex.Unlock() - - r.state[path] = state -} - -func (r *Registrar) getStateEntry(path string) (FileState, bool) { - r.stateMutex.Lock() - defer r.stateMutex.Unlock() - - state, exist := r.state[path] - return state, exist -} - -func (r *Registrar) getState() map[string]FileState { - r.stateMutex.Lock() - defer r.stateMutex.Unlock() - - copy := make(map[string]FileState) - - for k, v := range r.state { - copy[k] = v - } - - return copy -} diff --git a/filebeat/input/state.go b/filebeat/input/state.go index 106fd8559172..ae3af93d6b17 100644 --- a/filebeat/input/state.go +++ b/filebeat/input/state.go @@ -15,6 +15,7 @@ type FileState struct { Finished bool `json:"-"` // harvester state Fileinfo os.FileInfo `json:"-"` // the file info FileStateOS FileStateOS + LastSeen time.Time `json:"last_seen"` } // NewFileState creates a new file state @@ -24,6 +25,7 @@ func NewFileState(fileInfo os.FileInfo, path string) FileState { Source: path, Finished: false, FileStateOS: GetOSFileState(fileInfo), + LastSeen: time.Now(), } } @@ -45,6 +47,7 @@ func (s *States) Update(newState FileState) { defer s.mutex.Unlock() index, oldState := s.findPrevious(newState) + newState.LastSeen = time.Now() if index >= 0 { s.states[index] = newState @@ -84,10 +87,9 @@ func (s *States) Cleanup(older time.Duration) { defer s.mutex.Unlock() for i, state := range s.states { - // File is older then ignore_older -> remove state - modTime := state.Fileinfo.ModTime() - if time.Since(modTime) > older { + // File wasn't seen for longer then older -> remove state + if time.Since(state.LastSeen) > older { logp.Debug("prospector", "State removed for %s because of older: %s", state.Source) s.states = append(s.states[:i], s.states[i+1:]...) } @@ -101,3 +103,25 @@ func (s *States) Count() int { defer s.mutex.Unlock() return len(s.states) } + +// Returns a copy of the file states +func (s *States) GetStates() []FileState { + + s.mutex.Lock() + defer s.mutex.Unlock() + + copy := make([]FileState, len(s.states)) + + for i := range s.states { + copy[i] = s.states[i] + } + + return copy +} + +// SetStates overwrites all internal states with the given states array +func (s *States) SetStates(states []FileState) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.states = states +} diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index a584cb0e2342..cb6372a53bf5 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -21,3 +21,26 @@ def get_registry(self): with open(dotFilebeat) as file: return json.load(file) + + def get_registry_entry_by_path(self, path): + """ + Fetches the registry file and checks if an entry for the given path exists + If the path exists, the state for the given path is returned + If a path exists multiple times (which is possible because of file rotation) + the most recent version is returned + """ + registry = self.get_registry() + + tmp_entry = None + + # Checks all entries and returns the most recent one + for entry in registry: + if entry["source"] == path: + if tmp_entry == None: + tmp_entry = entry + else: + if tmp_entry["last_seen"] < entry["last_seen"]: + tmp_entry = entry + + return tmp_entry + diff --git a/filebeat/tests/system/test_crawler.py b/filebeat/tests/system/test_crawler.py index 3e1995860cd6..62c648220a5d 100644 --- a/filebeat/tests/system/test_crawler.py +++ b/filebeat/tests/system/test_crawler.py @@ -318,8 +318,8 @@ def test_file_disappear_appear(self): data = self.get_registry() # Make sure new file was picked up. As it has the same file name, - # only one entry exists - assert len(data) == 1 + # one entry for the new file and one for the old should exist + assert len(data) == 2 # Make sure output has 11 entries, the new file was started # from scratch @@ -376,8 +376,8 @@ def test_force_close(self): data = self.get_registry() # Make sure new file was picked up. As it has the same file name, - # only one entry exists - assert len(data) == 1 + # one entry for the new and one for the old should exist + assert len(data) == 2 # Make sure output has 11 entries, the new file was started # from scratch diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 56e7bc7e3f3c..95f90fed2982 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -4,6 +4,7 @@ import platform import time import shutil +import json from nose.plugins.skip import Skip, SkipTest @@ -59,7 +60,7 @@ def test_registrar_file_content(self): assert len(data) == 1 logFileAbsPath = os.path.abspath(testfile) - record = data[logFileAbsPath] + record = self.get_registry_entry_by_path(logFileAbsPath) self.assertDictContainsSubset({ "source": logFileAbsPath, @@ -192,8 +193,12 @@ def test_rotating_file(self): data = self.get_registry() # Make sure the offsets are correctly set - data[os.path.abspath(testfile)]["offset"] = 10 - data[os.path.abspath(testfilerenamed)]["offset"] = 9 + if os.name == "nt": + assert self.get_registry_entry_by_path(os.path.abspath(testfile))["offset"] == 11 + assert self.get_registry_entry_by_path(os.path.abspath(testfilerenamed))["offset"] == 10 + else: + assert self.get_registry_entry_by_path(os.path.abspath(testfile))["offset"] == 10 + assert self.get_registry_entry_by_path(os.path.abspath(testfilerenamed))["offset"] == 9 # Check that 2 files are port of the registrar file assert len(data) == 2 @@ -240,7 +245,7 @@ def test_rotating_file_inode(self): max_timeout=10) data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] testfilerenamed1 = self.working_dir + "/log/input.1" os.rename(testfile, testfilerenamed1) @@ -257,8 +262,8 @@ def test_rotating_file_inode(self): data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] - assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfilerenamed1))["FileStateOS"]["inode"] # Rotate log file, create a new empty one and remove it afterwards testfilerenamed2 = self.working_dir + "/log/input.2" @@ -282,11 +287,11 @@ def test_rotating_file_inode(self): data = self.get_registry() # Compare file inodes and the one in the registry - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] - assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfilerenamed1))["FileStateOS"]["inode"] - # Check that 2 files are part of the registrar file. The deleted file should never have been detected - assert len(data) == 2 + # Check that 3 files are part of the registrar file. The deleted file should never have been detected, but the rotated one should be in + assert len(data) == 3 def test_restart_continue(self): @@ -316,8 +321,7 @@ def test_restart_continue(self): # Wait a momemt to make sure registry is completely written time.sleep(1) - data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] filebeat.check_kill_and_wait() @@ -344,7 +348,7 @@ def test_restart_continue(self): data = self.get_registry() # Compare file inodes and the one in the registry - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] # Check that 1 files are part of the registrar file. The deleted file should never have been detected assert len(data) == 1 @@ -384,7 +388,7 @@ def test_rotating_file_with_restart(self): time.sleep(1) data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] testfilerenamed1 = self.working_dir + "/log/input.1" os.rename(testfile, testfilerenamed1) @@ -401,8 +405,8 @@ def test_rotating_file_with_restart(self): data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] - assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfilerenamed1))["FileStateOS"]["inode"] filebeat.check_kill_and_wait() @@ -438,11 +442,11 @@ def test_rotating_file_with_restart(self): data = self.get_registry() # Compare file inodes and the one in the registry - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] - assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfilerenamed1))["FileStateOS"]["inode"] - # Check that 2 files are part of the registrar file. The deleted file should never have been detected - assert len(data) == 2 + # Check that 3 files are part of the registrar file. The deleted file should never have been detected, but the rotated one should be in + assert len(data) == 3 def test_state_after_rotation(self): """ @@ -478,11 +482,11 @@ def test_state_after_rotation(self): # Check that offsets are correct if os.name == "nt": # Under windows offset is +1 because of additional newline char - assert data[os.path.abspath(testfile1)]["offset"] == 9 - assert data[os.path.abspath(testfile2)]["offset"] == 8 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8 else: - assert data[os.path.abspath(testfile1)]["offset"] == 8 - assert data[os.path.abspath(testfile2)]["offset"] == 7 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 8 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 7 # Rotate files and remove old one os.rename(testfile2, testfile3) @@ -508,18 +512,14 @@ def test_state_after_rotation(self): time.sleep(5) filebeat.kill_and_wait() - - data = self.get_registry() - # Check that offsets are correct if os.name == "nt": # Under windows offset is +1 because of additional newline char - assert data[os.path.abspath(testfile1)]["offset"] == 10 - assert data[os.path.abspath(testfile2)]["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 10 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 9 else: - assert data[os.path.abspath(testfile1)]["offset"] == 9 - assert data[os.path.abspath(testfile2)]["offset"] == 8 - + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8 def test_state_after_rotation_ignore_older(self): @@ -561,9 +561,9 @@ def test_state_after_rotation_ignore_older(self): # Check that offsets are correct if os.name == "nt": # Under windows offset is +1 because of additional newline char - assert data[os.path.abspath(testfile1)]["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 else: - assert data[os.path.abspath(testfile1)]["offset"] == 8 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 8 # Rotate files and remove old one os.rename(testfile2, testfile3) @@ -589,14 +589,112 @@ def test_state_after_rotation_ignore_older(self): time.sleep(5) filebeat.kill_and_wait() - - data = self.get_registry() - # Check that offsets are correct if os.name == "nt": # Under windows offset is +1 because of additional newline char - assert data[os.path.abspath(testfile1)]["offset"] == 10 - assert data[os.path.abspath(testfile2)]["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 10 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 9 else: - assert data[os.path.abspath(testfile1)]["offset"] == 9 - assert data[os.path.abspath(testfile2)]["offset"] == 8 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8 + + + def test_migration_non_windows(self): + """ + Tests if migration from old filebeat registry to new format works + """ + + if os.name == "nt": + raise SkipTest + + registry_file = self.working_dir + '/registry' + + # Write old registry file + with open(registry_file, 'w') as f: + f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}') + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + ) + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.log_contains("Old registry states found: 2"), + max_timeout=15) + + self.wait_until( + lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check if content is same as above + assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 + assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 + + # Compare first entry + oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') + newJson = self.get_registry_entry_by_path("logs/hello.log") + del newJson["last_seen"] + assert newJson == oldJson + + # Compare second entry + oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') + newJson = self.get_registry_entry_by_path("logs/log2.log") + del newJson["last_seen"] + assert newJson == oldJson + + # Make sure the right number of entries is in + data = self.get_registry() + assert len(data) == 2 + + def test_migration_windows(self): + """ + Tests if migration from old filebeat registry to new format works + """ + + if os.name != "nt": + raise SkipTest + + registry_file = self.working_dir + '/registry' + + # Write old registry file + with open(registry_file, 'w') as f: + f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}') + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + ) + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.log_contains("Old registry states found: 2"), + max_timeout=15) + + self.wait_until( + lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check if content is same as above + assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 + assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 + + # Compare first entry + oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') + newJson = self.get_registry_entry_by_path("logs/hello.log") + del newJson["last_seen"] + assert newJson == oldJson + + # Compare second entry + oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') + newJson = self.get_registry_entry_by_path("logs/log2.log") + del newJson["last_seen"] + assert newJson == oldJson + + # Make sure the right number of entries is in + data = self.get_registry() + assert len(data) == 2