Skip to content

Commit

Permalink
Fix an issue with caching from elastic#888.
Browse files Browse the repository at this point in the history
Add expvar metrics for the cache (hits/misses/size).
Add strict validation for top-level YAML keys in winlogbeat config.
  • Loading branch information
andrewkroh committed Mar 3, 2016
1 parent 02a9a3a commit a7d42cc
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]

*Winlogbeat*
- Add caching of event metadata handles and the system render context for the wineventlog API {pull}888[888]
- Improve config validation by checking for unknown top-level YAML keys. {pull}1100[1100]

==== Deprecated

Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (c *Cache) CleanUp() int {
return count
}

// Entries returns a copy of the non-expired elements in the cache.
// Entries returns a shallow copy of the non-expired elements in the cache.
func (c *Cache) Entries() map[Key]Value {
c.RLock()
defer c.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (eb *Winlogbeat) Config(b *beat.Beat) error {
}

// Validate configuration.
err = eb.config.Winlogbeat.Validate()
err = eb.config.Validate()
if err != nil {
return fmt.Errorf("Error validating configuration file. %v", err)
}
Expand Down
29 changes: 28 additions & 1 deletion winlogbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package config
import (
"fmt"
"net"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -27,7 +28,33 @@ type Validator interface {

// Settings is the root of the Winlogbeat configuration data hierarchy.
type Settings struct {
Winlogbeat WinlogbeatConfig
Winlogbeat WinlogbeatConfig `config:"winlogbeat"`
All map[string]interface{} `config:",inline"`
}

// Validate validates the Settings data and returns an error describing
// all problems or nil if there are none.
func (s Settings) Validate() error {
validKeys := []string{"winlogbeat", "output", "shipper", "logging"}
sort.Strings(validKeys)

// Check for invalid top-level keys.
var errs multierror.Errors
for k, _ := range s.All {
k = strings.ToLower(k)
i := sort.SearchStrings(validKeys, k)
if i >= len(validKeys) || validKeys[i] != k {
errs = append(errs, fmt.Errorf("Invalid top-level key '%s' "+
"found. Valid keys are %s", k, strings.Join(validKeys, ", ")))
}
}

err := s.Winlogbeat.Validate()
if err != nil {
errs = append(errs, err)
}

return errs.Err()
}

// WinlogbeatConfig contains all of Winlogbeat configuration data.
Expand Down
12 changes: 12 additions & 0 deletions winlogbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ func TestConfigValidate(t *testing.T) {
},
"", // No Error
},
{
Settings{
WinlogbeatConfig{
EventLogs: []EventLogConfig{
{Name: "App"},
},
},
map[string]interface{}{"other": "value"},
},
"1 error: Invalid top-level key 'other' found. Valid keys are " +
"logging, output, shipper, winlogbeat",
},
{
WinlogbeatConfig{},
"1 error: At least one event log must be configured as part of " +
Expand Down
47 changes: 40 additions & 7 deletions winlogbeat/eventlog/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ package eventlog
// to event message files.

import (
"expvar"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/winlogbeat/sys/eventlogging"
)

// Stats for the message file caches.
var (
cacheStats = expvar.NewMap("msgFileCacheStats")
)

// Constants that control the cache behavior.
const (
expirationTimeout time.Duration = 2 * time.Minute
Expand All @@ -31,6 +37,11 @@ type messageFilesCache struct {
loader messageFileLoaderFunc
freer freeHandleFunc
eventLogName string

// Cache metrics.
hit func() // Increments number of cache hits.
miss func() // Increments number of cache misses.
size func() // Sets the current cache size.
}

// newHandleCache creates and returns a new handleCache that has been
Expand All @@ -39,14 +50,24 @@ type messageFilesCache struct {
func newMessageFilesCache(eventLogName string, loader messageFileLoaderFunc,
freer freeHandleFunc) *messageFilesCache {

size := &expvar.Int{}
cacheStats.Set(eventLogName+"Size", size)

hc := &messageFilesCache{
loader: loader,
freer: freer,
eventLogName: eventLogName,
hit: func() { cacheStats.Add(eventLogName+"Hits", 1) },
miss: func() { cacheStats.Add(eventLogName+"Misses", 1) },
}
hc.cache = common.NewCacheWithRemovalListener(expirationTimeout,
initialSize, hc.evictionHandler)
hc.cache.StartJanitor(janitorInterval)
hc.size = func() {
s := hc.cache.Size()
size.Set(int64(s))
debugf("messageFilesCache[%s] size=%d", hc.eventLogName, s)
}
return hc
}

Expand All @@ -57,6 +78,8 @@ func newMessageFilesCache(eventLogName string, loader messageFileLoaderFunc,
func (hc *messageFilesCache) get(sourceName string) eventlogging.MessageFiles {
v := hc.cache.Get(sourceName)
if v == nil {
hc.miss()

// Handle to event message file for sourceName is not cached. Attempt
// to load the Handles into the cache.
v = hc.loader(hc.eventLogName, sourceName)
Expand All @@ -65,11 +88,17 @@ func (hc *messageFilesCache) get(sourceName string) eventlogging.MessageFiles {
// check if a value was already loaded.
existing := hc.cache.PutIfAbsent(sourceName, v)
if existing != nil {
// A value was already loaded, so free the handles we created.
existingMessageFiles, _ := existing.(eventlogging.MessageFiles)
hc.freeHandles(existingMessageFiles)
return existingMessageFiles
// A value was already loaded, so free the handles we just created.
messageFiles, _ := v.(eventlogging.MessageFiles)
hc.freeHandles(messageFiles)

// Return the existing cached value.
messageFiles, _ = existing.(eventlogging.MessageFiles)
return messageFiles
}
hc.size()
} else {
hc.hit()
}

messageFiles, _ := v.(eventlogging.MessageFiles)
Expand All @@ -79,13 +108,16 @@ func (hc *messageFilesCache) get(sourceName string) eventlogging.MessageFiles {
// evictionHandler is the callback handler that receives notifications when
// a key-value pair is evicted from the messageFilesCache.
func (hc *messageFilesCache) evictionHandler(k common.Key, v common.Value) {
// Update the size on a different goroutine after the callback completes.
defer func() { go hc.size() }()

messageFiles, ok := v.(eventlogging.MessageFiles)
if !ok {
return
}

logp.Debug("eventlog", "Evicting messageFiles %+v for sourceName %v.",
messageFiles, k)
debugf("messageFilesCache[%s] Evicting messageFiles %+v for sourceName %v.",
hc.eventLogName, messageFiles, k)
hc.freeHandles(messageFiles)
}

Expand All @@ -95,7 +127,8 @@ func (hc *messageFilesCache) freeHandles(mf eventlogging.MessageFiles) {
for _, fh := range mf.Handles {
err := hc.freer(fh.Handle)
if err != nil {
logp.Warn("FreeLibrary error for handle %v", fh.Handle)
logp.Warn("messageFilesCache[%s] FreeLibrary error for handle %v",
hc.eventLogName, fh.Handle)
}
}
}
2 changes: 1 addition & 1 deletion winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (l *winEventLog) Close() error {
func newWinEventLog(c Config) (EventLog, error) {
eventMetadataHandle := func(providerName, sourceName string) eventlogging.MessageFiles {
mf := eventlogging.MessageFiles{SourceName: sourceName}
h, err := sys.OpenPublisherMetadata(0, providerName, 0)
h, err := sys.OpenPublisherMetadata(0, sourceName, 0)
if err != nil {
mf.Err = err
return mf
Expand Down

0 comments on commit a7d42cc

Please sign in to comment.