Skip to content

Commit

Permalink
Add experimental symlink support (#2478)
Browse files Browse the repository at this point in the history
This implementation of symlinks takes a symlink and opens the original file. The only difference is that as Source the symlink path is reported. The advantage of this implementation is that everything related to file handling is identical with non symlink files as the original file is harvested. All close_* options work as expected. State information is stored for the original file, not the symlink itself. In case a symlink and original file are harvested, only one will be harvested as they share the same inode information.

* Add test to verify that symlinks are disabled by default
* Add test for symlink handling
* Improve error handling in harvester. Return proper error messages instead of logging it. Prevents too many log messages.
* Remove IsRegular file call as not needed anymore and leads to additional Stat calls
* Add documentation

See #2277 and #1767
  • Loading branch information
ruflin authored and tsg committed Sep 8, 2016
1 parent eb1dd17 commit c9260bf
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
- Update to Go 1.7. {pull}2306[2306]
- Log total non-zero internal metrics on shutdown. {pull}2349[2349]
- Add support for encrypted private key files by introducing `ssl.key_passphrase` setting. {pull}2330[2330]
- Add experimental symlink support with `symlinks` config {pull}2478[2478]

*Metricbeat*

Expand Down
12 changes: 12 additions & 0 deletions filebeat/docs/reference/configuration/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,18 @@ This option applies to files that Filebeat has not already processed. If you ran

NOTE: You can use this setting to avoid indexing old log lines when you run Filebeat on a set of log files for the first time. After the first run, we recommend disabling this option, or you risk losing lines during file rotation.

===== symlinks

experimental[]

The symlinks options allows to also harvest symlinks in addition to regular files. In case of symlinks, the file that is opened and read in the end is the original file even though the path of the symlink is reported.

It must be made sure, that if a symlink is defined for harvesting, the original is excluded. In case the symlink and original file are harvested by the same prospector, this will be detected and only one of the two files will be harvested which is the first one found by filebeat. In case the symlink and original are defined in two different prospectors, it can happen that both will be harvested at the same time, send duplicated data and will overwrite each other state.

This option can be useful if symlinks to the log files have additional meta data in the file name which can be processed in Logstash. This is for example the case for kubernetes log files.

As this option may lead to data loss, it is disabled by default.

===== backoff

The backoff options specify how aggressively Filebeat crawls new files for updates.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ filebeat.prospectors:
# this can mean that the first entries of a new file are skipped.
#tail_files: false

# Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the
# original for harvesting but will report the symlink name as source.
#symlinks: false

# Backoff values define how aggressively filebeat crawls new files for updates
# The default values can be used in most cases. Backoff defines how long it is waited
# to check a file again after EOF is reached. Default is 1s which means the file
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ filebeat.prospectors:
# this can mean that the first entries of a new file are skipped.
#tail_files: false

# Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the
# original for harvesting but will report the symlink name as source.
#symlinks: false

# Backoff values define how aggressively filebeat crawls new files for updates
# The default values can be used in most cases. Backoff defines how long it is waited
# to check a file again after EOF is reached. Default is 1s which means the file
Expand Down
17 changes: 9 additions & 8 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"golang.org/x/text/transform"

"fmt"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/harvester/source"
Expand Down Expand Up @@ -163,8 +165,7 @@ func (h *Harvester) openFile() error {

f, err := file.ReadOpen(h.state.Source)
if err != nil {
logp.Err("Failed opening %s: %s", h.state.Source, err)
return err
return fmt.Errorf("Failed opening %s: %s", h.state.Source, err)
}

harvesterOpenFiles.Add(1)
Expand All @@ -182,16 +183,16 @@ func (h *Harvester) openFile() error {
}

func (h *Harvester) validateFile(f *os.File) error {
// Check we are not following a rabbit hole (symlinks, etc.)
if !file.IsRegular(f) {
return errors.New("Given file is not a regular file.")
}

info, err := f.Stat()
if err != nil {
logp.Err("Failed getting stats for file %s: %s", h.state.Source, err)
return err
return fmt.Errorf("Failed getting stats for file %s: %s", h.state.Source, err)
}

if !info.Mode().IsRegular() {
return fmt.Errorf("Tried to open non regular file: %q %s", info.Mode(), info.Name())
}

// Compares the stat of the opened file to the state given by the prospector. Abort if not match.
if !os.SameFile(h.state.Fileinfo, info) {
return errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.")
Expand Down
25 changes: 0 additions & 25 deletions filebeat/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,6 @@ type File struct {
State *State
}

// Check that the file isn't a symlink, mode is regular or file is nil
func (f *File) IsRegular() bool {
if f.File == nil {
logp.Critical("Harvester: BUG: f arg is nil")
return false
}

info, e := f.File.Stat()
if e != nil {
logp.Err("File check fault: stat error: %s", e.Error())
return false
}

if !info.Mode().IsRegular() {
logp.Warn("Harvester: not a regular file: %q %s", info.Mode(), info.Name())
return false
}
return true
}

// Checks if the two files are the same.
func (f *File) IsSameFile(f2 *File) bool {
return os.SameFile(f.FileInfo, f2.FileInfo)
Expand All @@ -49,8 +29,3 @@ func IsSameFile(path string, info os.FileInfo) bool {

return os.SameFile(fileInfo, info)
}

func IsRegular(file *os.File) bool {
f := &File{File: file}
return f.IsRegular()
}
2 changes: 2 additions & 0 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
CleanInactive: 0,
CleanRemoved: false,
HarvesterLimit: 0,
Symlinks: false,
}
)

Expand All @@ -28,6 +29,7 @@ type prospectorConfig struct {
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
30 changes: 23 additions & 7 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo {
continue
}

OUTER:
// Check any matched files to see if we need to start a harvester
for _, file := range matches {

Expand All @@ -108,24 +109,39 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo {
continue
}

fileinfo, err := os.Lstat(file)
// Fetch Lstat File info to detected also symlinks
fileInfo, err := os.Lstat(file)
if err != nil {
logp.Debug("prospector", "stat(%s) failed: %s", file, err)
continue
}

// Check if file is symlink
if fileinfo.Mode()&os.ModeSymlink != 0 {
logp.Debug("prospector", "File %s skipped as it is a symlink.", file)
if fileInfo.IsDir() {
logp.Debug("prospector", "Skipping directory: %s", file)
continue
}

if fileinfo.IsDir() {
logp.Debug("prospector", "Skipping directory: %s", file)
isSymlink := fileInfo.Mode()&os.ModeSymlink > 0
if isSymlink && !p.config.Symlinks {
logp.Debug("prospector", "File %s skipped as it is a symlink.", file)
continue
}

paths[file] = fileinfo
// Fetch Stat file info which fetches the inode from the original and is used for comparison
fileInfo, err = os.Stat(file)

// If symlink is enabled, it is checked that original is not part of same prospector
// It original is harvested by other prospector, states will potentially overwrite each other
if p.config.Symlinks {
for _, finfo := range paths {
if os.SameFile(finfo, fileInfo) {
logp.Info("Same file found as symlink and original. Skipping file: %s", file)
continue OUTER
}
}
}

paths[file] = fileInfo
}
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ filebeat.prospectors:
clean_inactive: {{clean_inactive}}
clean_removed: {{clean_removed}}
harvester_limit: {{harvester_limit | default(0) }}
symlinks: {{symlinks}}

{% if fields %}
fields:
Expand Down
Loading

0 comments on commit c9260bf

Please sign in to comment.