From 1cc46b633833f05d8dab386ca5642c4943286e04 Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 27 Jun 2016 12:46:29 +0200 Subject: [PATCH] Introduce close_eof harvster option --- CHANGELOG.asciidoc | 3 +- filebeat/etc/beat.full.yml | 5 +++ filebeat/filebeat.full.yml | 5 +++ filebeat/harvester/config.go | 4 +- filebeat/harvester/log.go | 26 ++++++------ filebeat/harvester/reader/log.go | 8 ++-- filebeat/tests/system/config/filebeat.yml.j2 | 1 + filebeat/tests/system/test_harvester.py | 42 ++++++++++++++++++++ 8 files changed, 74 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 2464a72e9a46..b30d180a019a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -53,7 +53,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d *Topbeat* *Filebeat* -- Introdce close_removed and close_renamed harvester options {issue}1600[1600] +- Introduce close_removed and close_renamed harvester options {issue}1600[1600] +- Introduce close_eof harvester options {issue}1600[1600] *Winlogbeat* diff --git a/filebeat/etc/beat.full.yml b/filebeat/etc/beat.full.yml index b87c94a891dd..1b77cc8c4575 100644 --- a/filebeat/etc/beat.full.yml +++ b/filebeat/etc/beat.full.yml @@ -176,6 +176,11 @@ filebeat.prospectors: # Note: Potential data loss if file reading was not finished when file was removed. #close_removed: false + # Closes the file handler as soon as the harvesters reaches then end of the file. + # The file will be picked up again by the harvester at previous known state + # after scan_frequency in case the file can still be discovered by the prospector. + #close_eof: false + #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 4ce79c3013b0..1c1725e57f15 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -176,6 +176,11 @@ filebeat.prospectors: # Note: Potential data loss if file reading was not finished when file was removed. #close_removed: false + # Closes the file handler as soon as the harvesters reaches then end of the file. + # The file will be picked up again by the harvester at previous known state + # after scan_frequency in case the file can still be discovered by the prospector. + #close_eof: false + #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index f4fbf74681b9..f6189333e5ec 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -26,6 +26,7 @@ var ( MaxBytes: 10 * (1 << 20), // 10MB CloseRemoved: false, CloseRenamed: false, + CloseEOF: false, ForceCloseFiles: false, } ) @@ -43,6 +44,7 @@ type harvesterConfig struct { CloseOlder time.Duration `config:"close_older"` CloseRemoved bool `config:"close_removed"` CloseRenamed bool `config:"close_renamed"` + CloseEOF bool `config:"close_eof"` ForceCloseFiles bool `config:"force_close_files"` ExcludeLines []*regexp.Regexp `config:"exclude_lines"` IncludeLines []*regexp.Regexp `config:"include_lines"` @@ -53,7 +55,7 @@ type harvesterConfig struct { func (config *harvesterConfig) Validate() error { - // TODO: remove in 7.0 + // DEPRECATED: remove in 6.0 if config.ForceCloseFiles { config.CloseRemoved = true config.CloseRenamed = true diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index ed4307c8205e..51621490746c 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -6,6 +6,8 @@ import ( "golang.org/x/text/transform" + "io" + "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/harvester/processor" @@ -40,6 +42,7 @@ func (h *Harvester) Harvest() { CloseRemoved: cfg.CloseRemoved, CloseRenamed: cfg.CloseRenamed, CloseOlder: cfg.CloseOlder, + CloseEOF: cfg.CloseEOF, BackoffDuration: cfg.Backoff, MaxBackoffDuration: cfg.MaxBackoff, BackoffFactor: cfg.BackoffFactor, @@ -69,24 +72,19 @@ func (h *Harvester) Harvest() { // Partial lines return error and are only read on completion ts, text, bytesRead, jsonFields, err := readLine(processor) if err != nil { - - if err == reader.ErrFileTruncate { + switch err { + case reader.ErrFileTruncate: logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) h.SetOffset(0) - return - } - - if err == reader.ErrRemoved { + case reader.ErrRemoved: logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path) - return + case reader.ErrRenamed: + logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.Path) + case io.EOF: + logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.Path) + default: + logp.Info("Read line error: %s", err) } - - if err == reader.ErrRenamed { - logp.Info("File was renamed: %s Closing because close_renamed is enabled.", h.Path) - return - } - - logp.Info("Read line error: %s", err) return } diff --git a/filebeat/harvester/reader/log.go b/filebeat/harvester/reader/log.go index 52d3075a33e5..ea973051bdb9 100644 --- a/filebeat/harvester/reader/log.go +++ b/filebeat/harvester/reader/log.go @@ -34,6 +34,7 @@ type LogFileReaderConfig struct { BackoffFactor int CloseRenamed bool CloseRemoved bool + CloseEOF bool } func NewLogFileReader( @@ -81,13 +82,12 @@ func (r *logFileReader) Read(buf []byte) (int, error) { return n, nil } - continuable := r.fs.Continuable() - if err == io.EOF && !continuable { - logp.Info("Reached end of file: %s", r.fs.Name()) + if err == io.EOF && r.config.CloseEOF { return n, err } - if err != io.EOF || !continuable { + // Stdin is not continuable + if err != io.EOF || !r.fs.Continuable() { logp.Err("Unexpected state reading from %s; error: %s", r.fs.Name(), err) return n, err } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index e0cc96ffd6d4..5b6af299cf60 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -21,6 +21,7 @@ filebeat.prospectors: max_backoff: 0.1s close_removed: {{close_removed}} close_renamed: {{close_renamed}} + close_eof: {{close_eof}} {% if fields %} fields: diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index 075ae31073f7..9b9adbede115 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -104,3 +104,45 @@ def test_close_removed(self): # Make sure the state for the file was persisted assert len(data) == 1 + + + def test_close_eof(self): + """ + Checks that a file is closed if eof is reached + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/test.log", + close_eof="true", + scan_frequency="0.1s" + ) + os.mkdir(self.working_dir + "/log/") + + testfile1 = self.working_dir + "/log/test.log" + file = open(testfile1, 'w') + + iterations1 = 5 + for n in range(0, iterations1): + file.write("rotation file") + file.write("\n") + + file.close() + + filebeat = self.start_beat() + + # Let it read the file + self.wait_until( + lambda: self.output_has(lines=iterations1), max_timeout=10) + + + # Wait until error shows up on windows + self.wait_until( + lambda: self.log_contains( + "Closing because close_eof is enabled"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + + # Make sure the state for the file was persisted + assert len(data) == 1