Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat async publisher support #782

Merged
merged 1 commit into from
Jan 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Filebeat*
- Add multiline support for combining multiple related lines into one event. {issue}461[461]
- Add close_older configuration option to complete ignore_older https://github.com/elastic/filebeat/issues/181[181]
- Add experimental option to enable filebeat publisher pipeline to operate asynchonrously {pull}782[782]

*Winlogbeat*
- Added support for reading event logs using the Windows Event Log API {pull}576[576]
Expand Down
26 changes: 3 additions & 23 deletions filebeat/beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"

cfg "github.com/elastic/beats/filebeat/config"
. "github.com/elastic/beats/filebeat/crawler"
Expand Down Expand Up @@ -93,7 +91,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Publishes event to output
go Publish(b, fb)
pub := newPublisher(fb.FbConfig.Filebeat.PublishAsync,
fb.publisherChan, fb.registrar.Channel, b.Events)
pub.Start()

// Blocks progressing
select {
Expand Down Expand Up @@ -123,23 +123,3 @@ func (fb *Filebeat) Stop() {
// Stop Filebeat
close(fb.done)
}

func Publish(beat *beat.Beat, fb *Filebeat) {
logp.Info("Start sending events to output")

// Receives events from spool during flush
for events := range fb.publisherChan {

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
pubEvents = append(pubEvents, event.ToMapStr())
}

beat.Events.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed)

logp.Info("Events sent: %d", len(events))

// Tell the registrar that we've successfully sent these events
fb.registrar.Channel <- events
}
}
223 changes: 223 additions & 0 deletions filebeat/beat/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package beat

import (
"sync"
"sync/atomic"
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

type logPublisher interface {
Start()
Stop()
}

type syncLogPublisher struct {
client publisher.Client
in, out chan []*input.FileEvent

done chan struct{}
wg sync.WaitGroup
}

type asyncLogPublisher struct {
client publisher.Client
in, out chan []*input.FileEvent

// list of in-flight batches
active batchList
failing bool

done chan struct{}
wg sync.WaitGroup
}

// eventsBatch is used to store sorted list of actively published log lines.
// Implements `outputs.Signalerr` interface for marking batch as finished
type eventsBatch struct {
next *eventsBatch
flag int32
events []*input.FileEvent
}

type batchList struct {
head, tail *eventsBatch
}

const (
defaultGCTimeout = 1 * time.Second
)

const (
batchSuccess int32 = 1
batchFailed int32 = 2
)

func newPublisher(
async bool,
in, out chan []*input.FileEvent,
client publisher.Client,
) logPublisher {
if async {
return newAsyncLogPublisher(in, out, client)
}
return newSyncLogPublisher(in, out, client)
}

func newSyncLogPublisher(
in, out chan []*input.FileEvent,
client publisher.Client,
) *syncLogPublisher {
return &syncLogPublisher{
in: in,
out: out,
client: client,
done: make(chan struct{}),
}
}

func (p *syncLogPublisher) Start() {
p.wg.Add(1)
go func() {
defer p.wg.Done()

logp.Info("Start sending events to output")

for {
var events []*input.FileEvent
select {
case <-p.done:
return
case events = <-p.in:
}

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
pubEvents = append(pubEvents, event.ToMapStr())
}

p.client.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed)
logp.Info("Events sent: %d", len(events))

// Tell the registrar that we've successfully sent these events
select {
case <-p.done:
return
case p.out <- events:
}
}
}()
}

func (p *syncLogPublisher) Stop() {
close(p.done)
p.wg.Wait()
}

func newAsyncLogPublisher(
in, out chan []*input.FileEvent,
client publisher.Client,
) *asyncLogPublisher {
return &asyncLogPublisher{
in: in,
out: out,
client: client,
done: make(chan struct{}),
}
}

func (p *asyncLogPublisher) Start() {
p.wg.Add(1)
go func() {
defer p.wg.Done()

logp.Info("Start sending events to output")

// short gc timer, in case no logs are received from spooler the queued
// bulkEvents can still be cleaned up and forwarded to the registrar
ticker := time.NewTicker(defaultGCTimeout)

for {
select {
case <-p.done:
return
case events := <-p.in:
pubEvents := make([]common.MapStr, len(events))
for i, event := range events {
pubEvents[i] = event.ToMapStr()
}

batch := &eventsBatch{
flag: 0,
events: events,
}
p.client.PublishEvents(pubEvents,
publisher.Signal(batch), publisher.Guaranteed)

p.active.append(batch)
case <-ticker.C:
}

p.collect()
}
}()
}

func (p *asyncLogPublisher) Stop() {
close(p.done)
p.wg.Wait()
}

// collect collects finished bulk-Events in order and forward processed batches
// to registrar. Reports to registrar are guaranteed to be in same order
// as bulk-Events have been received by the spooler
func (p *asyncLogPublisher) collect() bool {
for batch := p.active.head; batch != nil; batch = batch.next {
state := atomic.LoadInt32(&batch.flag)
if state == 0 && !p.failing {
break
}

// remove batch from active list
p.active.head = batch.next
if batch.next == nil {
p.active.tail = nil
}

// Batches get marked as failed, if publisher pipeline is shutting down
// In this case we do not want to send any more batches to the registrar
if state == batchFailed {
p.failing = true
}

if p.failing {
// if in failing state keep cleaning up queue
continue
}

// Tell the registrar that we've successfully sent these events
select {
case <-p.done:
return false
case p.out <- batch.events:
}
}
return true
}

func (b *eventsBatch) Completed() { atomic.StoreInt32(&b.flag, batchSuccess) }
func (b *eventsBatch) Failed() { atomic.StoreInt32(&b.flag, batchFailed) }

func (l *batchList) append(b *eventsBatch) {
if l.head == nil {
l.head = b
} else {
l.tail.next = b
}
b.next = nil
l.tail = b
}
79 changes: 79 additions & 0 deletions filebeat/beat/publish_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package beat

import (
"fmt"
"testing"
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher"
"github.com/stretchr/testify/assert"
)

func makeEvents(name string, n int) []*input.FileEvent {
var events []*input.FileEvent
for i := 0; i < n; i++ {
event := &input.FileEvent{
ReadTime: time.Now(),
InputType: "log",
DocumentType: "log",
Bytes: 100,
Offset: int64(i),
Source: &name,
}
events = append(events, event)
}
return events
}

func TestPublisherModes(t *testing.T) {
tests := []struct {
title string
async bool
order []int
}{
{"sync", false, []int{1, 2, 3, 4, 5, 6}},
{"async ordered signal", true, []int{1, 2, 3, 4, 5, 6}},
{"async out of order signal", true, []int{5, 2, 3, 1, 4, 6}},
}

for i, test := range tests {
t.Logf("run publisher test (%v): %v", i, test.title)

pubChan := make(chan []*input.FileEvent, len(test.order)+1)
regChan := make(chan []*input.FileEvent, len(test.order)+1)
client := publisher.ExtChanClient{make(chan publisher.PublishMessage)}

pub := newPublisher(test.async, pubChan, regChan, client)
pub.Start()

var events [][]*input.FileEvent
for i := range test.order {
tmp := makeEvents(fmt.Sprintf("msg: %v", i), 1)
pubChan <- tmp
events = append(events, tmp)
}

var msgs []publisher.PublishMessage
for _ = range test.order {
m := <-client.Channel
msgs = append(msgs, m)
}

for _, i := range test.order {
outputs.SignalCompleted(msgs[i-1].Context.Signal)
}

var regEvents [][]*input.FileEvent
for _ = range test.order {
regEvents = append(regEvents, <-regChan)
}
pub.Stop()

// validate order
for i := range events {
assert.Equal(t, events[i], regEvents[i])
}
}
}
1 change: 1 addition & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
type FilebeatConfig struct {
Prospectors []ProspectorConfig
SpoolSize uint64 `yaml:"spool_size"`
PublishAsync bool `yaml:"publish_async"`
IdleTimeout string `yaml:"idle_timeout"`
IdleTimeoutDuration time.Duration
RegistryFile string `yaml:"registry_file"`
Expand Down
7 changes: 7 additions & 0 deletions filebeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,13 @@ filebeat:
-------------------------------------------------------------------------------------


===== publish_async

If enabled, the publisher pipeline in filebeat operates in async mode preparing
a new batch of lines while waiting for ACK. This option can improve load-balancing
throughput at the cost of increased memory usage. The default value is false.


===== idle_timeout

A duration string that specifies how often the spooler is flushed. After the
Expand Down
3 changes: 3 additions & 0 deletions filebeat/etc/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ filebeat:
# Event count spool threshold - forces network flush if exceeded
#spool_size: 2048

# Enable async publisher pipeline in filebeat (Experimental!)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is experimental, I would probably not put it in the config here. Yes I know, somehow breaks our rule so far.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the experimental.

#publish_async: false

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#idle_timeout: 5s
Expand Down
Loading