diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f84b73f102fb..c9060be47ef2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d *Topbeat* *Filebeat* +- Improvements in registrar dealing with file rotation. {pull}1281[1281] *Winlogbeat* diff --git a/filebeat/beater/publish_test.go b/filebeat/beater/publish_test.go index 16d8aabce0f2..1194e1138a4f 100644 --- a/filebeat/beater/publish_test.go +++ b/filebeat/beater/publish_test.go @@ -22,7 +22,7 @@ func makeEvents(name string, n int) []*input.FileEvent { DocumentType: "log", Bytes: 100, Offset: int64(i), - Source: &name, + Source: name, } events = append(events, event) } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index f32fb368eca6..15961cc06131 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -56,7 +56,7 @@ func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan c go prospector.Run(&c.wg) } - logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.State)) + logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getStateCopy())) return nil } diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index 7a29a107181e..762aec7e37ea 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -20,8 +20,8 @@ type Prospector struct { } type Prospectorer interface { - Run() Init() + Run() } func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar, channel chan *input.FileEvent) (*Prospector, error) { diff --git a/filebeat/crawler/prospector_log.go b/filebeat/crawler/prospector_log.go index 5094ccf0c766..4714cc8c69f3 100644 --- a/filebeat/crawler/prospector_log.go +++ b/filebeat/crawler/prospector_log.go @@ -228,6 +228,8 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input // Start a new harvester on the path h.Start() + p.Prospector.registrar.Persist <- h.GetState() + } // Keep the old file in missingFiles so we don't rescan it if it was renamed and we've not yet reached the new filename @@ -239,6 +241,8 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input // Start a harvester on the path; a file was just modified and it doesn't have a harvester // The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel p.resumeHarvesting(h, <-h.Stat.Return) + p.Prospector.registrar.Persist <- h.GetState() + } else { logp.Debug("prospector", "Not harvesting, file didn't change: %s", h.Path) } @@ -252,14 +256,20 @@ func (p *ProspectorLog) continueExistingFile(h *harvester.Harvester, previousFil lastinfo := p.prospectorList[previousFile] h.Stat.Continue(&lastinfo) + + // Update state because of file rotation + p.Prospector.registrar.Persist <- h.GetState() } // Start / resume harvester with a predefined offset func (p *ProspectorLog) resumeHarvesting(h *harvester.Harvester, offset int64) { logp.Debug("prospector", "Start / resuming harvester of file: %s", h.Path) - h.Offset = offset + h.SetOffset(offset) h.Start() + + // Update state because of file rotation + p.Prospector.registrar.Persist <- h.GetState() } // Check if the given file was renamed. If file is known but with different path, diff --git a/filebeat/crawler/registrar.go b/filebeat/crawler/registrar.go index f31e6a1309bf..04cd2f71c5e0 100644 --- a/filebeat/crawler/registrar.go +++ b/filebeat/crawler/registrar.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input" @@ -17,7 +18,8 @@ type Registrar struct { // Path to the Registry File registryFile string // Map with all file paths inside and the corresponding state - State map[string]*FileState + state map[string]*FileState + stateMutex sync.Mutex // Channel used by the prospector and crawler to send FileStates to be persisted Persist chan *input.FileState @@ -39,7 +41,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) { func (r *Registrar) Init() error { // Init state r.Persist = make(chan *FileState) - r.State = make(map[string]*FileState) + r.state = map[string]*FileState{} r.Channel = make(chan []*FileEvent, 1) // Set to default in case it is not set @@ -66,11 +68,13 @@ func (r *Registrar) Init() error { // loadState fetches the previous reading state from the configure RegistryFile file // The default file is `registry` in the data path. func (r *Registrar) LoadState() { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() if existing, e := os.Open(r.registryFile); e == nil { defer existing.Close() logp.Info("Loading registrar data from %s", r.registryFile) decoder := json.NewDecoder(existing) - decoder.Decode(&r.State) + decoder.Decode(&r.state) } } @@ -87,8 +91,9 @@ func (r *Registrar) Run() { return // Treats new log files to persist with higher priority then new events case state := <-r.Persist: - r.State[*state.Source] = state - logp.Debug("prospector", "Registrar will re-save state for %s", *state.Source) + source := state.Source + r.setState(source, state) + logp.Debug("prospector", "Registrar will re-save state for %s", source) case events := <-r.Channel: r.processEvents(events) } @@ -111,7 +116,7 @@ func (r *Registrar) processEvents(events []*FileEvent) { continue } - r.State[*event.Source] = event.GetState() + r.setState(event.Source, event.GetState()) } } @@ -122,7 +127,7 @@ func (r *Registrar) Stop() { } func (r *Registrar) GetFileState(path string) (*FileState, bool) { - state, exist := r.State[path] + state, exist := r.getState(path) return state, exist } @@ -138,12 +143,14 @@ func (r *Registrar) writeRegistry() error { } encoder := json.NewEncoder(file) - encoder.Encode(r.State) + + state := r.getStateCopy() + encoder.Encode(state) // Directly close file because of windows file.Close() - logp.Info("Registry file updated. %d states written.", len(r.State)) + logp.Info("Registry file updated. %d states written.", len(state)) return SafeFileRotate(r.registryFile, tempfile) } @@ -157,7 +164,6 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo logp.Debug("registrar", "Same file as before found. Fetch the state and persist it.") // We're resuming - throw the last state back downstream so we resave it // And return the offset - also force harvest in case the file is old and we're about to skip it - r.Persist <- lastState return lastState.Offset, true } @@ -168,8 +174,7 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath) lastState, _ := r.GetFileState(previous) - lastState.Source = &filePath - r.Persist <- lastState + r.updateStateSource(lastState, filePath) return lastState.Offset, true } @@ -189,7 +194,7 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) newState := input.GetOSFileState(&newFileInfo) - for oldFilePath, oldState := range r.State { + for oldFilePath, oldState := range r.getStateCopy() { // Skipping when path the same if oldFilePath == newFilePath { @@ -205,3 +210,34 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) return "", fmt.Errorf("No previous file found") } + +func (r *Registrar) setState(path string, state *FileState) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + r.state[path] = state +} + +func (r *Registrar) getState(path string) (*FileState, bool) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + state, exist := r.state[path] + return state, exist +} + +func (r *Registrar) updateStateSource(state *FileState, path string) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + state.Source = path +} + +func (r *Registrar) getStateCopy() map[string]FileState { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + + copy := make(map[string]FileState) + for k, v := range r.state { + copy[k] = *v + } + + return copy +} diff --git a/filebeat/harvester/filestat.go b/filebeat/harvester/filestat.go index 7fc140b1d72b..3ecd6f6211b8 100644 --- a/filebeat/harvester/filestat.go +++ b/filebeat/harvester/filestat.go @@ -2,7 +2,7 @@ package harvester import "os" -// Contains statistic about file when it was last seend by the prospector +// Contains statistic about file when it was last seen by the prospector type FileStat struct { Fileinfo os.FileInfo /* the file info */ Return chan int64 /* the harvester will send an event with its offset when it closes */ diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 5d954eb5f73a..fcd86ebc1b9b 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -16,6 +16,7 @@ package harvester import ( "fmt" "regexp" + "sync" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" @@ -25,7 +26,8 @@ import ( type Harvester struct { Path string /* the file path to harvest */ Config *config.HarvesterConfig - Offset int64 + offset int64 + offsetLock sync.Mutex Stat *FileStat SpoolerChan chan *input.FileEvent encoding encoding.EncodingFactory @@ -67,6 +69,5 @@ func NewHarvester( func (h *Harvester) Start() { // Starts harvester and picks the right type. In case type is not set, set it to defeault (log) - go h.Harvest() } diff --git a/filebeat/harvester/harvester_test.go b/filebeat/harvester/harvester_test.go index cba49ecc287c..175969d9f8ca 100644 --- a/filebeat/harvester/harvester_test.go +++ b/filebeat/harvester/harvester_test.go @@ -15,7 +15,7 @@ func TestExampleTest(t *testing.T) { h := Harvester{ Path: "/var/log/", - Offset: 0, + offset: 0, } assert.Equal(t, "/var/log/", h.Path) diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 84c724f45684..266446cb1175 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -61,7 +61,7 @@ func (h *Harvester) Harvest() { defer func() { // On completion, push offset so we can continue where we left off if we relaunch on the same file if h.Stat != nil { - h.Stat.Return <- h.Offset + h.Stat.Return <- h.GetOffset() } logp.Debug("harvester", "Stopping harvester for file: %s", h.Path) @@ -123,8 +123,8 @@ func (h *Harvester) Harvest() { logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) - h.Offset = 0 - seeker.Seek(h.Offset, os.SEEK_SET) + h.SetOffset(0) + seeker.Seek(h.GetOffset(), os.SEEK_SET) continue } @@ -136,10 +136,10 @@ func (h *Harvester) Harvest() { event := &input.FileEvent{ EventMetadata: h.Config.EventMetadata, ReadTime: ts, - Source: &h.Path, + Source: h.Path, InputType: h.Config.InputType, DocumentType: h.Config.DocumentType, - Offset: h.Offset, + Offset: h.GetOffset(), Bytes: bytesRead, Text: &text, Fileinfo: &info, @@ -151,7 +151,7 @@ func (h *Harvester) Harvest() { } // Set Offset - h.Offset += int64(bytesRead) // Update offset if complete line has been processed + h.SetOffset(h.GetOffset() + int64(bytesRead)) // Update offset if complete line has been processed } } @@ -237,25 +237,52 @@ func (h *Harvester) openFile() (encoding.Encoding, error) { func (h *Harvester) initFileOffset(file *os.File) error { offset, err := file.Seek(0, os.SEEK_CUR) - if h.Offset > 0 { + if h.GetOffset() > 0 { // continue from last known offset logp.Debug("harvester", - "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.Offset, offset) - _, err = file.Seek(h.Offset, os.SEEK_SET) + "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.GetOffset(), offset) + _, err = file.Seek(h.GetOffset(), os.SEEK_SET) } else if h.Config.TailFiles { // tail file if file is new and tail_files config is set logp.Debug("harvester", "harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset) - h.Offset, err = file.Seek(0, os.SEEK_END) + offset, err = file.Seek(0, os.SEEK_END) + h.SetOffset(offset) } else { // get offset from file in case of encoding factory was // required to read some data. logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset) - h.Offset = offset + h.SetOffset(offset) } return err } + +// GetState returns current state of harvester +func (h *Harvester) GetState() *input.FileState { + + state := input.FileState{ + Source: h.Path, + Offset: h.GetOffset(), + FileStateOS: input.GetOSFileState(&h.Stat.Fileinfo), + } + + return &state +} + +func (h *Harvester) SetOffset(offset int64) { + h.offsetLock.Lock() + defer h.offsetLock.Unlock() + + h.offset = offset +} + +func (h *Harvester) GetOffset() int64 { + h.offsetLock.Lock() + defer h.offsetLock.Unlock() + + return h.offset +} diff --git a/filebeat/input/file.go b/filebeat/input/file.go index a6d0095b4578..c64692725ac8 100644 --- a/filebeat/input/file.go +++ b/filebeat/input/file.go @@ -20,7 +20,7 @@ type File struct { type FileEvent struct { common.EventMetadata ReadTime time.Time - Source *string + Source string InputType string DocumentType string Offset int64 @@ -32,8 +32,8 @@ type FileEvent struct { } type FileState struct { - Source *string `json:"source,omitempty"` - Offset int64 `json:"offset,omitempty"` + Source string `json:"source,omitempty"` + Offset int64 `json:"offset,omitempty"` FileStateOS *FileStateOS } diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index c3f12a05b328..9acf1a059d99 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -108,10 +108,10 @@ def test_stdin(self): objs = self.read_output() assert len(objs) == iterations1 + iterations2 - def test_rotating_ignore_older_larger_write_rate(self): + def test_rotating_close_older_larger_write_rate(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - ignoreOlder="1s", + ignoreOlder="10s", closeOlder="1s", scan_frequency="0.1s", ) @@ -174,10 +174,10 @@ def test_exclude_files(self): assert 1 == len(output) assert output[0]["message"] == "line in log file" - def test_rotating_ignore_older_low_write_rate(self): + def test_rotating_close_older_low_write_rate(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - ignoreOlder="1s", + ignoreOlder="10s", closeOlder="1s", scan_frequency="0.1s", ) @@ -209,7 +209,7 @@ def test_rotating_ignore_older_low_write_rate(self): os.rename(testfile, testfile + ".1") open(testfile, 'w').close() - # wait for file to be closed due to ignore_older + # wait for file to be closed due to close_older self.wait_until( lambda: self.log_contains( "Stopping harvester, closing file: {}\n".format(os.path.abspath(testfile))), diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 0d4af776395a..3cad178636b9 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -2,6 +2,9 @@ import os import platform +import time +import shutil +from nose.plugins.skip import Skip, SkipTest # Additional tests: to be implemented @@ -32,11 +35,18 @@ def test_registrar_file_content(self): file.close() filebeat = self.start_beat() + c = self.log_contains_count("states written") + self.wait_until( lambda: self.log_contains( "Processing 5 events"), max_timeout=15) + # Make sure states written appears one more time + self.wait_until( + lambda: self.log_contains("states written") > c, + max_timeout=10) + # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( @@ -207,3 +217,158 @@ def test_data_path(self): filebeat.check_kill_and_wait() assert os.path.isfile(self.working_dir + "/datapath/registry") + + def test_rotating_file_inode(self): + """ + Check that inodes are properly written during file rotation + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + scan_frequency="1s" + ) + + if os.name == "nt": + raise SkipTest + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/input" + + filebeat = self.start_beat() + + with open(testfile, 'w') as f: + f.write("entry1\n") + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + data = self.get_registry() + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + + testfilerenamed1 = self.working_dir + "/log/input.1" + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("entry2\n") + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_registry() + + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Rotate log file, create a new empty one and remove it afterwards + testfilerenamed2 = self.working_dir + "/log/input.2" + os.rename(testfilerenamed1, testfilerenamed2) + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("") + + os.remove(testfilerenamed2) + + with open(testfile, 'w') as f: + f.write("entry3\n") + + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + + # Compare file inodes and the one in the registry + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Check that 2 files are part of the registrar file. The deleted file should never have been detected + assert len(data) == 2 + + + def test_rotating_file_with_shutdown(self): + """ + Check that inodes are properly written during file rotation and shutdown + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + scan_frequency="1s" + ) + + if os.name == "nt": + raise SkipTest + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/input" + + filebeat = self.start_beat() + + with open(testfile, 'w') as f: + f.write("entry1\n") + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + data = self.get_registry() + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + + testfilerenamed1 = self.working_dir + "/log/input.1" + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("entry2\n") + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_registry() + + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + filebeat.check_kill_and_wait() + + # Store first registry file + shutil.copyfile(self.working_dir + "/registry", self.working_dir + "/registry.first") + + # Rotate log file, create a new empty one and remove it afterwards + testfilerenamed2 = self.working_dir + "/log/input.2" + os.rename(testfilerenamed1, testfilerenamed2) + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("") + + os.remove(testfilerenamed2) + + with open(testfile, 'w') as f: + f.write("entry3\n") + + filebeat = self.start_beat(output="filebeat2.log") + + # Output file was rotated + self.wait_until( + lambda: self.output_has(lines=2, output_file="output/filebeat.1"), + max_timeout=10) + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + + # Compare file inodes and the one in the registry + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Check that 2 files are part of the registrar file. The deleted file should never have been detected + assert len(data) == 2 + +