Skip to content

Commit

Permalink
Refactoring of FileState loading for Prospector (#1558)
Browse files Browse the repository at this point in the history
Previously the file state was loaded from disk every time a new file was found. The state from the registry file is now only loaded once during startup and from the one the prospector internal in memory state is used. This is more efficient and prevents race conditions.

This can have one side affect. In case a shared file system is not available during a restart and becomes available later, all files are read from the start again as the state for these files was not loaded. Only the state for files which are found during startup (rotated or not) are loaded.

This is related to #1022

Changes:
* Registry only loaded once during startup
* File Stats are only read once and reuse during one scan
* No state for empty files is written
* Additional tests to confirm changes were added
  • Loading branch information
ruflin authored and tsg committed May 4, 2016
1 parent 51caa05 commit ca5972a
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 86 deletions.
134 changes: 77 additions & 57 deletions filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,23 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {

func (p *ProspectorLog) Init() {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)
p.scan()

logp.Info("Load previous states from registry into memory")

for path, fileinfo := range p.getFiles() {

// Check for each path found, if there is a previous state
offset := p.Prospector.registrar.fetchState(path, fileinfo)

// Offset found -> skip to previous state
if offset > 0 {
fs := input.NewFileStat(fileinfo, 0)
fs.Skip(offset)
p.harvesterStats[path] = *fs
}
}

logp.Info("Previous states loaded: %v", len(p.harvesterStats))
}

func (p *ProspectorLog) Run() {
Expand All @@ -60,54 +76,61 @@ func (p *ProspectorLog) Run() {

}

// Scan starts a scanGlob for each provided path/glob
func (p *ProspectorLog) scan() {

newlastscan := time.Now()

// getFiles returns all files which have to be harvested
// All globs are expanded and then directory and excluded files are removed
func (p *ProspectorLog) getFiles() map[string]os.FileInfo {
// Now let's do one quick scan to pick up new files
for _, path := range p.config.Paths {
p.scanGlob(path)
}
p.lastscan = newlastscan
}

// Scans the specific path which can be a glob (/**/**/*.log)
// For all found files it is checked if a harvester should be started
func (p *ProspectorLog) scanGlob(glob string) {
paths := map[string]os.FileInfo{}

logp.Debug("prospector", "scan path %s", glob)
for _, glob := range p.config.Paths {
// Evaluate the path as a wildcards/shell glob
matches, err := filepath.Glob(glob)
if err != nil {
logp.Err("glob(%s) failed: %v", glob, err)
continue
}

// Evaluate the path as a wildcards/shell glob
matches, err := filepath.Glob(glob)
if err != nil {
logp.Debug("prospector", "glob(%s) failed: %v", glob, err)
return
// Check any matched files to see if we need to start a harvester
for _, file := range matches {
logp.Debug("prospector", "Check file for harvesting: %s", file)

// check if the file is in the exclude_files list
if p.isFileExcluded(file) {
logp.Debug("prospector", "Exclude file: %s", file)
continue
}

// Stat the file, following any symlinks.
fileinfo, err := os.Stat(file)
if err != nil {
logp.Debug("prospector", "stat(%s) failed: %s", file, err)
continue
}

if fileinfo.IsDir() {
logp.Debug("prospector", "Skipping directory: %s", file)
continue
}

paths[file] = fileinfo
}
}

p.missingFiles = map[string]os.FileInfo{}
return paths
}

// Check any matched files to see if we need to start a harvester
for _, file := range matches {
logp.Debug("prospector", "Check file for harvesting: %s", file)
// Scan starts a scanGlob for each provided path/glob
func (p *ProspectorLog) scan() {

// check if the file is in the exclude_files list
if p.isFileExcluded(file) {
logp.Debug("prospector", "Exclude file: %s", file)
continue
}
newlastscan := time.Now()

// Stat the file, following any symlinks.
fileinfo, err := os.Stat(file)
if err != nil {
logp.Debug("prospector", "stat(%s) failed: %s", file, err)
continue
}
p.missingFiles = map[string]os.FileInfo{}

if fileinfo.IsDir() {
logp.Debug("prospector", "Skipping directory: %s", file)
continue
}
// Now let's do one quick scan to pick up new files
for file, fileinfo := range p.getFiles() {

logp.Debug("prospector", "Check file for harvesting: %s", file)

newFile := input.NewFile(fileinfo)

Expand Down Expand Up @@ -140,6 +163,8 @@ func (p *ProspectorLog) scanGlob(glob string) {
// rotation/etc
p.harvesterStats[h.Path] = *h.Stat
}

p.lastscan = newlastscan
}

// Check if harvester for new file has to be started
Expand All @@ -148,31 +173,24 @@ func (p *ProspectorLog) checkNewFile(h *harvester.Harvester) {

logp.Debug("prospector", "Start harvesting unknown file: %s", h.Path)

// Call crawler if there if there exists a state for the given file
offset, resuming := p.Prospector.registrar.fetchState(h.Path, h.Stat.Fileinfo)

if p.checkOldFile(h) {

logp.Debug("prospector", "Fetching old state of file to resume: %s", h.Path)

// Are we resuming a dead file? We have to resume even if dead so we catch any old updates to the file
// This is safe as the harvester, once it hits the EOF and a timeout, will stop harvesting
// Once we detect changes again we can resume another harvester again - this keeps number of go routines to a minimum
if resuming {
logp.Debug("prospector", "Resuming harvester on a previously harvested file: %s", h.Path)
p.resumeHarvesting(h, offset)
} else {
// Old file, skip it, but push offset of file size so we start from the end if this file changes and needs picking up
logp.Debug("prospector", "Skipping file (older than ignore older of %v, %v): %s",
p.config.IgnoreOlderDuration,
time.Since(h.Stat.Fileinfo.ModTime()),
h.Path)
h.Stat.Skip(h.Stat.Fileinfo.Size())
}
logp.Debug("prospector", "Fetching old state of file to resume: %s", h.Path)

// Old file, skip it, but push offset of file size so we start from the end if this file changes and needs picking up
logp.Debug("prospector", "Skipping file (older than ignore older of %v, %v): %s",
p.config.IgnoreOlderDuration,
time.Since(h.Stat.Fileinfo.ModTime()),
h.Path)
h.Stat.Skip(h.Stat.Fileinfo.Size())

} else if previousFile, err := p.getPreviousFile(h.Path, h.Stat.Fileinfo); err == nil {
p.continueExistingFile(h, previousFile)
} else {
p.resumeHarvesting(h, offset)
p.resumeHarvesting(h, 0)
}
}

Expand Down Expand Up @@ -224,7 +242,6 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input

// Forget about the previous harvester and let it continue on the old file - so start a new channel to use with the new harvester
h.Stat.Ignore()
//h.SetOffset(0)

// Start a new harvester on the path
h.Start()
Expand All @@ -239,6 +256,9 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input
// Start a harvester on the path; a file was just modified and it doesn't have a harvester
// The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel
p.resumeHarvesting(h, <-h.Stat.Offset)
} else if h.Stat.Finished() {
logp.Debug("prospector", "Update state because of an unwritten offset update: %s", h.Path)
p.resumeHarvesting(h, <-h.Stat.Offset)
} else {
logp.Debug("prospector", "Not harvesting, file didn't change: %s", h.Path)
}
Expand Down
36 changes: 11 additions & 25 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,36 +143,27 @@ func (r *Registrar) writeRegistry() error {
return SafeFileRotate(r.registryFile, tempfile)
}

func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bool) {
func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) int64 {

// Check if there is a state for this file
lastState, isFound := r.GetFileState(filePath)
if previous, err := r.getPreviousFile(filePath, fileInfo); err == nil {

if isFound && input.IsSameFile(filePath, fileInfo) {
logp.Debug("registrar", "Same file as before found. Fetch the state and persist it.")
// We're resuming - throw the last state back downstream so we resave it
// And return the offset - also force harvest in case the file is old and we're about to skip it
return lastState.Offset, true
}
if previous != filePath {
// File has rotated between shutdown and startup
// We return last state downstream, with a modified event source with the new file name
// And return the offset - also force harvest in case the file is old and we're about to skip it
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)
}

if previous, err := r.getPreviousFile(filePath, fileInfo); err == nil {
// File has rotated between shutdown and startup
// We return last state downstream, with a modified event source with the new file name
// And return the offset - also force harvest in case the file is old and we're about to skip it
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)
logp.Info("Previous state for file %s found", filePath)

lastState, _ := r.GetFileState(previous)
return lastState.Offset, true
}

if isFound {
logp.Info("Not resuming rotated file: %s", filePath)
return lastState.Offset
}

logp.Info("New file. Start reading from the beginning: %s", filePath)

// New file so just start from the beginning
return 0, false
return 0
}

// getPreviousFile checks in the registrar if there is the newFile already exist with a different name
Expand All @@ -183,11 +174,6 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo)

for oldFilePath, oldState := range r.getState() {

// Skipping when path the same
if oldFilePath == newFilePath {
continue
}

// Compare states
if newState.IsSame(oldState.FileStateOS) {
logp.Info("Old file with new name found: %s is no %s", oldFilePath, newFilePath)
Expand Down
1 change: 1 addition & 0 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (h *Harvester) updateOffset(increment int64) {
}

func (h *Harvester) UpdateState() {
logp.Debug("Update state: %s, offset: %v", h.Path, h.offset)
h.sendEvent(h.createEvent())
}

Expand Down
73 changes: 69 additions & 4 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,76 @@ def test_rotating_file_inode(self):
assert len(data) == 2


def test_rotating_file_with_shutdown(self):
def test_restart_continue(self):
"""
Check that inodes are properly written during file rotation and shutdown
Check that file readining continues after restart
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
scan_frequency="1s"
)

if os.name == "nt":
raise SkipTest

os.mkdir(self.working_dir + "/log/")
testfile = self.working_dir + "/log/input"

filebeat = self.start_beat()

with open(testfile, 'w') as f:
f.write("entry1\n")

self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)

# Wait a momemt to make sure registry is completely written
time.sleep(1)

data = self.get_registry()
assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"]

filebeat.check_kill_and_wait()

# Store first registry file
shutil.copyfile(self.working_dir + "/registry", self.working_dir + "/registry.first")

# Append file
with open(testfile, 'a') as f:
f.write("entry2\n")

filebeat = self.start_beat(output="filebeat2.log")

# Output file was rotated
self.wait_until(
lambda: self.output_has(lines=1, output_file="output/filebeat.1"),
max_timeout=10)

self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)

filebeat.check_kill_and_wait()

data = self.get_registry()

# Compare file inodes and the one in the registry
assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"]

# Check that 1 files are part of the registrar file. The deleted file should never have been detected
assert len(data) == 1

output = self.read_output()

# Check that output file has the same number of lines as the log file
assert 1 == len(output)
assert output[0]["message"] == "entry2"


def test_rotating_file_with_restart(self):
"""
Check that inodes are properly written during file rotation and restart
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
Expand Down Expand Up @@ -376,5 +443,3 @@ def test_rotating_file_with_shutdown(self):

# Check that 2 files are part of the registrar file. The deleted file should never have been detected
assert len(data) == 2


0 comments on commit ca5972a

Please sign in to comment.