From 8bbc264244dc451968c62fcc2494eb704776177a Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 12 Apr 2016 09:41:55 +0200 Subject: [PATCH] Fix filebeat file rotation registrar issue #1281 When multiple files were rotated at the same time, the first rotation was overwriting the states for the other rotated files. This happened because the new file directly had new content inside and the state for all the other files was fetched from disk, resetting the changes which were in memory. The problem is now fixed by not writing the state to disk every time state changes are discovered by the registrar, as it is up to the prospector to decide if these changes are the most recent ones. Now changes get written to disk every time a scan is done. The downside of this solution is that the registrar is also updated, if no changes did happen. * System test added to verify that issue is fixed * Closes #1281 * Refactor some variables for better sync handling and fix tests * Make offset private and only set it through method * Rename system tests to close_older as this is what is actually checked * Update registrar only when file was rotated or changed * Add test to check state between shutdown and rotation.Only update registry when changes happen. --- CHANGELOG.asciidoc | 1 + filebeat/beater/publish_test.go | 2 +- filebeat/crawler/crawler.go | 2 +- filebeat/crawler/prospector.go | 2 +- filebeat/crawler/prospector_log.go | 12 +- filebeat/crawler/registrar.go | 62 +++++++-- filebeat/harvester/filestat.go | 2 +- filebeat/harvester/harvester.go | 5 +- filebeat/harvester/harvester_test.go | 2 +- filebeat/harvester/log.go | 49 +++++-- filebeat/input/file.go | 6 +- filebeat/tests/system/test_prospector.py | 10 +- filebeat/tests/system/test_registrar.py | 165 +++++++++++++++++++++++ 13 files changed, 280 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f84b73f102fb..c9060be47ef2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d *Topbeat* *Filebeat* +- Improvements in registrar dealing with file rotation. {pull}1281[1281] *Winlogbeat* diff --git a/filebeat/beater/publish_test.go b/filebeat/beater/publish_test.go index 16d8aabce0f2..1194e1138a4f 100644 --- a/filebeat/beater/publish_test.go +++ b/filebeat/beater/publish_test.go @@ -22,7 +22,7 @@ func makeEvents(name string, n int) []*input.FileEvent { DocumentType: "log", Bytes: 100, Offset: int64(i), - Source: &name, + Source: name, } events = append(events, event) } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index f32fb368eca6..15961cc06131 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -56,7 +56,7 @@ func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan c go prospector.Run(&c.wg) } - logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.State)) + logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getStateCopy())) return nil } diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index 7a29a107181e..762aec7e37ea 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -20,8 +20,8 @@ type Prospector struct { } type Prospectorer interface { - Run() Init() + Run() } func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar, channel chan *input.FileEvent) (*Prospector, error) { diff --git a/filebeat/crawler/prospector_log.go b/filebeat/crawler/prospector_log.go index 5094ccf0c766..4714cc8c69f3 100644 --- a/filebeat/crawler/prospector_log.go +++ b/filebeat/crawler/prospector_log.go @@ -228,6 +228,8 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input // Start a new harvester on the path h.Start() + p.Prospector.registrar.Persist <- h.GetState() + } // Keep the old file in missingFiles so we don't rescan it if it was renamed and we've not yet reached the new filename @@ -239,6 +241,8 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input // Start a harvester on the path; a file was just modified and it doesn't have a harvester // The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel p.resumeHarvesting(h, <-h.Stat.Return) + p.Prospector.registrar.Persist <- h.GetState() + } else { logp.Debug("prospector", "Not harvesting, file didn't change: %s", h.Path) } @@ -252,14 +256,20 @@ func (p *ProspectorLog) continueExistingFile(h *harvester.Harvester, previousFil lastinfo := p.prospectorList[previousFile] h.Stat.Continue(&lastinfo) + + // Update state because of file rotation + p.Prospector.registrar.Persist <- h.GetState() } // Start / resume harvester with a predefined offset func (p *ProspectorLog) resumeHarvesting(h *harvester.Harvester, offset int64) { logp.Debug("prospector", "Start / resuming harvester of file: %s", h.Path) - h.Offset = offset + h.SetOffset(offset) h.Start() + + // Update state because of file rotation + p.Prospector.registrar.Persist <- h.GetState() } // Check if the given file was renamed. If file is known but with different path, diff --git a/filebeat/crawler/registrar.go b/filebeat/crawler/registrar.go index f31e6a1309bf..04cd2f71c5e0 100644 --- a/filebeat/crawler/registrar.go +++ b/filebeat/crawler/registrar.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input" @@ -17,7 +18,8 @@ type Registrar struct { // Path to the Registry File registryFile string // Map with all file paths inside and the corresponding state - State map[string]*FileState + state map[string]*FileState + stateMutex sync.Mutex // Channel used by the prospector and crawler to send FileStates to be persisted Persist chan *input.FileState @@ -39,7 +41,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) { func (r *Registrar) Init() error { // Init state r.Persist = make(chan *FileState) - r.State = make(map[string]*FileState) + r.state = map[string]*FileState{} r.Channel = make(chan []*FileEvent, 1) // Set to default in case it is not set @@ -66,11 +68,13 @@ func (r *Registrar) Init() error { // loadState fetches the previous reading state from the configure RegistryFile file // The default file is `registry` in the data path. func (r *Registrar) LoadState() { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() if existing, e := os.Open(r.registryFile); e == nil { defer existing.Close() logp.Info("Loading registrar data from %s", r.registryFile) decoder := json.NewDecoder(existing) - decoder.Decode(&r.State) + decoder.Decode(&r.state) } } @@ -87,8 +91,9 @@ func (r *Registrar) Run() { return // Treats new log files to persist with higher priority then new events case state := <-r.Persist: - r.State[*state.Source] = state - logp.Debug("prospector", "Registrar will re-save state for %s", *state.Source) + source := state.Source + r.setState(source, state) + logp.Debug("prospector", "Registrar will re-save state for %s", source) case events := <-r.Channel: r.processEvents(events) } @@ -111,7 +116,7 @@ func (r *Registrar) processEvents(events []*FileEvent) { continue } - r.State[*event.Source] = event.GetState() + r.setState(event.Source, event.GetState()) } } @@ -122,7 +127,7 @@ func (r *Registrar) Stop() { } func (r *Registrar) GetFileState(path string) (*FileState, bool) { - state, exist := r.State[path] + state, exist := r.getState(path) return state, exist } @@ -138,12 +143,14 @@ func (r *Registrar) writeRegistry() error { } encoder := json.NewEncoder(file) - encoder.Encode(r.State) + + state := r.getStateCopy() + encoder.Encode(state) // Directly close file because of windows file.Close() - logp.Info("Registry file updated. %d states written.", len(r.State)) + logp.Info("Registry file updated. %d states written.", len(state)) return SafeFileRotate(r.registryFile, tempfile) } @@ -157,7 +164,6 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo logp.Debug("registrar", "Same file as before found. Fetch the state and persist it.") // We're resuming - throw the last state back downstream so we resave it // And return the offset - also force harvest in case the file is old and we're about to skip it - r.Persist <- lastState return lastState.Offset, true } @@ -168,8 +174,7 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath) lastState, _ := r.GetFileState(previous) - lastState.Source = &filePath - r.Persist <- lastState + r.updateStateSource(lastState, filePath) return lastState.Offset, true } @@ -189,7 +194,7 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) newState := input.GetOSFileState(&newFileInfo) - for oldFilePath, oldState := range r.State { + for oldFilePath, oldState := range r.getStateCopy() { // Skipping when path the same if oldFilePath == newFilePath { @@ -205,3 +210,34 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) return "", fmt.Errorf("No previous file found") } + +func (r *Registrar) setState(path string, state *FileState) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + r.state[path] = state +} + +func (r *Registrar) getState(path string) (*FileState, bool) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + state, exist := r.state[path] + return state, exist +} + +func (r *Registrar) updateStateSource(state *FileState, path string) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + state.Source = path +} + +func (r *Registrar) getStateCopy() map[string]FileState { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + + copy := make(map[string]FileState) + for k, v := range r.state { + copy[k] = *v + } + + return copy +} diff --git a/filebeat/harvester/filestat.go b/filebeat/harvester/filestat.go index 7fc140b1d72b..3ecd6f6211b8 100644 --- a/filebeat/harvester/filestat.go +++ b/filebeat/harvester/filestat.go @@ -2,7 +2,7 @@ package harvester import "os" -// Contains statistic about file when it was last seend by the prospector +// Contains statistic about file when it was last seen by the prospector type FileStat struct { Fileinfo os.FileInfo /* the file info */ Return chan int64 /* the harvester will send an event with its offset when it closes */ diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 5d954eb5f73a..fcd86ebc1b9b 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -16,6 +16,7 @@ package harvester import ( "fmt" "regexp" + "sync" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" @@ -25,7 +26,8 @@ import ( type Harvester struct { Path string /* the file path to harvest */ Config *config.HarvesterConfig - Offset int64 + offset int64 + offsetLock sync.Mutex Stat *FileStat SpoolerChan chan *input.FileEvent encoding encoding.EncodingFactory @@ -67,6 +69,5 @@ func NewHarvester( func (h *Harvester) Start() { // Starts harvester and picks the right type. In case type is not set, set it to defeault (log) - go h.Harvest() } diff --git a/filebeat/harvester/harvester_test.go b/filebeat/harvester/harvester_test.go index cba49ecc287c..175969d9f8ca 100644 --- a/filebeat/harvester/harvester_test.go +++ b/filebeat/harvester/harvester_test.go @@ -15,7 +15,7 @@ func TestExampleTest(t *testing.T) { h := Harvester{ Path: "/var/log/", - Offset: 0, + offset: 0, } assert.Equal(t, "/var/log/", h.Path) diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 84c724f45684..266446cb1175 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -61,7 +61,7 @@ func (h *Harvester) Harvest() { defer func() { // On completion, push offset so we can continue where we left off if we relaunch on the same file if h.Stat != nil { - h.Stat.Return <- h.Offset + h.Stat.Return <- h.GetOffset() } logp.Debug("harvester", "Stopping harvester for file: %s", h.Path) @@ -123,8 +123,8 @@ func (h *Harvester) Harvest() { logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) - h.Offset = 0 - seeker.Seek(h.Offset, os.SEEK_SET) + h.SetOffset(0) + seeker.Seek(h.GetOffset(), os.SEEK_SET) continue } @@ -136,10 +136,10 @@ func (h *Harvester) Harvest() { event := &input.FileEvent{ EventMetadata: h.Config.EventMetadata, ReadTime: ts, - Source: &h.Path, + Source: h.Path, InputType: h.Config.InputType, DocumentType: h.Config.DocumentType, - Offset: h.Offset, + Offset: h.GetOffset(), Bytes: bytesRead, Text: &text, Fileinfo: &info, @@ -151,7 +151,7 @@ func (h *Harvester) Harvest() { } // Set Offset - h.Offset += int64(bytesRead) // Update offset if complete line has been processed + h.SetOffset(h.GetOffset() + int64(bytesRead)) // Update offset if complete line has been processed } } @@ -237,25 +237,52 @@ func (h *Harvester) openFile() (encoding.Encoding, error) { func (h *Harvester) initFileOffset(file *os.File) error { offset, err := file.Seek(0, os.SEEK_CUR) - if h.Offset > 0 { + if h.GetOffset() > 0 { // continue from last known offset logp.Debug("harvester", - "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.Offset, offset) - _, err = file.Seek(h.Offset, os.SEEK_SET) + "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.GetOffset(), offset) + _, err = file.Seek(h.GetOffset(), os.SEEK_SET) } else if h.Config.TailFiles { // tail file if file is new and tail_files config is set logp.Debug("harvester", "harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset) - h.Offset, err = file.Seek(0, os.SEEK_END) + offset, err = file.Seek(0, os.SEEK_END) + h.SetOffset(offset) } else { // get offset from file in case of encoding factory was // required to read some data. logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset) - h.Offset = offset + h.SetOffset(offset) } return err } + +// GetState returns current state of harvester +func (h *Harvester) GetState() *input.FileState { + + state := input.FileState{ + Source: h.Path, + Offset: h.GetOffset(), + FileStateOS: input.GetOSFileState(&h.Stat.Fileinfo), + } + + return &state +} + +func (h *Harvester) SetOffset(offset int64) { + h.offsetLock.Lock() + defer h.offsetLock.Unlock() + + h.offset = offset +} + +func (h *Harvester) GetOffset() int64 { + h.offsetLock.Lock() + defer h.offsetLock.Unlock() + + return h.offset +} diff --git a/filebeat/input/file.go b/filebeat/input/file.go index a6d0095b4578..c64692725ac8 100644 --- a/filebeat/input/file.go +++ b/filebeat/input/file.go @@ -20,7 +20,7 @@ type File struct { type FileEvent struct { common.EventMetadata ReadTime time.Time - Source *string + Source string InputType string DocumentType string Offset int64 @@ -32,8 +32,8 @@ type FileEvent struct { } type FileState struct { - Source *string `json:"source,omitempty"` - Offset int64 `json:"offset,omitempty"` + Source string `json:"source,omitempty"` + Offset int64 `json:"offset,omitempty"` FileStateOS *FileStateOS } diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index c3f12a05b328..9acf1a059d99 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -108,10 +108,10 @@ def test_stdin(self): objs = self.read_output() assert len(objs) == iterations1 + iterations2 - def test_rotating_ignore_older_larger_write_rate(self): + def test_rotating_close_older_larger_write_rate(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - ignoreOlder="1s", + ignoreOlder="10s", closeOlder="1s", scan_frequency="0.1s", ) @@ -174,10 +174,10 @@ def test_exclude_files(self): assert 1 == len(output) assert output[0]["message"] == "line in log file" - def test_rotating_ignore_older_low_write_rate(self): + def test_rotating_close_older_low_write_rate(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - ignoreOlder="1s", + ignoreOlder="10s", closeOlder="1s", scan_frequency="0.1s", ) @@ -209,7 +209,7 @@ def test_rotating_ignore_older_low_write_rate(self): os.rename(testfile, testfile + ".1") open(testfile, 'w').close() - # wait for file to be closed due to ignore_older + # wait for file to be closed due to close_older self.wait_until( lambda: self.log_contains( "Stopping harvester, closing file: {}\n".format(os.path.abspath(testfile))), diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 0d4af776395a..3cad178636b9 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -2,6 +2,9 @@ import os import platform +import time +import shutil +from nose.plugins.skip import Skip, SkipTest # Additional tests: to be implemented @@ -32,11 +35,18 @@ def test_registrar_file_content(self): file.close() filebeat = self.start_beat() + c = self.log_contains_count("states written") + self.wait_until( lambda: self.log_contains( "Processing 5 events"), max_timeout=15) + # Make sure states written appears one more time + self.wait_until( + lambda: self.log_contains("states written") > c, + max_timeout=10) + # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( @@ -207,3 +217,158 @@ def test_data_path(self): filebeat.check_kill_and_wait() assert os.path.isfile(self.working_dir + "/datapath/registry") + + def test_rotating_file_inode(self): + """ + Check that inodes are properly written during file rotation + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + scan_frequency="1s" + ) + + if os.name == "nt": + raise SkipTest + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/input" + + filebeat = self.start_beat() + + with open(testfile, 'w') as f: + f.write("entry1\n") + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + data = self.get_registry() + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + + testfilerenamed1 = self.working_dir + "/log/input.1" + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("entry2\n") + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_registry() + + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Rotate log file, create a new empty one and remove it afterwards + testfilerenamed2 = self.working_dir + "/log/input.2" + os.rename(testfilerenamed1, testfilerenamed2) + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("") + + os.remove(testfilerenamed2) + + with open(testfile, 'w') as f: + f.write("entry3\n") + + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + + # Compare file inodes and the one in the registry + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Check that 2 files are part of the registrar file. The deleted file should never have been detected + assert len(data) == 2 + + + def test_rotating_file_with_shutdown(self): + """ + Check that inodes are properly written during file rotation and shutdown + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + scan_frequency="1s" + ) + + if os.name == "nt": + raise SkipTest + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/input" + + filebeat = self.start_beat() + + with open(testfile, 'w') as f: + f.write("entry1\n") + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + data = self.get_registry() + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + + testfilerenamed1 = self.working_dir + "/log/input.1" + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("entry2\n") + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + data = self.get_registry() + + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + filebeat.check_kill_and_wait() + + # Store first registry file + shutil.copyfile(self.working_dir + "/registry", self.working_dir + "/registry.first") + + # Rotate log file, create a new empty one and remove it afterwards + testfilerenamed2 = self.working_dir + "/log/input.2" + os.rename(testfilerenamed1, testfilerenamed2) + os.rename(testfile, testfilerenamed1) + + with open(testfile, 'w') as f: + f.write("") + + os.remove(testfilerenamed2) + + with open(testfile, 'w') as f: + f.write("entry3\n") + + filebeat = self.start_beat(output="filebeat2.log") + + # Output file was rotated + self.wait_until( + lambda: self.output_has(lines=2, output_file="output/filebeat.1"), + max_timeout=10) + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + + # Compare file inodes and the one in the registry + assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + + # Check that 2 files are part of the registrar file. The deleted file should never have been detected + assert len(data) == 2 + +