Skip to content

Commit

Permalink
Add device filter for disk scraper (#1378)
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington authored Jul 17, 2020
1 parent 9a08d4d commit 6297c1a
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,24 @@

package diskscraper

import "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
import (
"go.opentelemetry.io/collector/internal/processor/filterset"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

// Config relating to Disk Metric Scraper.
type Config struct {
internal.ConfigSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Include specifies a filter on the devices that should be included from the generated metrics.
// Exclude specifies a filter on the devices that should be excluded from the generated metrics.
// If neither `include` or `exclude` are set, metrics will be generated for all devices.
Include MatchConfig `mapstructure:"include"`
Exclude MatchConfig `mapstructure:"exclude"`
}

type MatchConfig struct {
filterset.Config `mapstructure:",squash"`

Devices []string `mapstructure:"devices"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,49 @@ package diskscraper

import (
"context"
"fmt"
"time"

"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/host"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterset"
)

// scraper for Disk Metrics
type scraper struct {
config *Config
startTime pdata.TimestampUnixNano
includeFS filterset.FilterSet
excludeFS filterset.FilterSet

// for mocking
bootTime func() (uint64, error)
ioCounters func(names ...string) (map[string]disk.IOCountersStat, error)
}

// newDiskScraper creates a Disk Scraper
func newDiskScraper(_ context.Context, cfg *Config) *scraper {
return &scraper{config: cfg, bootTime: host.BootTime, ioCounters: disk.IOCounters}
func newDiskScraper(_ context.Context, cfg *Config) (*scraper, error) {
scraper := &scraper{config: cfg, bootTime: host.BootTime, ioCounters: disk.IOCounters}

var err error

if len(cfg.Include.Devices) > 0 {
scraper.includeFS, err = filterset.CreateFilterSet(cfg.Include.Devices, &cfg.Include.Config)
if err != nil {
return nil, fmt.Errorf("error creating device include filters: %w", err)
}
}

if len(cfg.Exclude.Devices) > 0 {
scraper.excludeFS, err = filterset.CreateFilterSet(cfg.Exclude.Devices, &cfg.Exclude.Config)
if err != nil {
return nil, fmt.Errorf("error creating device exclude filters: %w", err)
}
}

return scraper, nil
}

// Initialize
Expand Down Expand Up @@ -66,11 +88,17 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) {
return metrics, err
}

metrics.Resize(3 + systemSpecificMetricsLen)
initializeDiskIOMetric(metrics.At(0), s.startTime, ioCounters)
initializeDiskOpsMetric(metrics.At(1), s.startTime, ioCounters)
initializeDiskTimeMetric(metrics.At(2), s.startTime, ioCounters)
appendSystemSpecificMetrics(metrics, 3, s.startTime, ioCounters)
// filter devices by name
ioCounters = s.filterByDevice(ioCounters)

if len(ioCounters) > 0 {
metrics.Resize(3 + systemSpecificMetricsLen)
initializeDiskIOMetric(metrics.At(0), s.startTime, ioCounters)
initializeDiskOpsMetric(metrics.At(1), s.startTime, ioCounters)
initializeDiskTimeMetric(metrics.At(2), s.startTime, ioCounters)
appendSystemSpecificMetrics(metrics, 3, s.startTime, ioCounters)
}

return metrics, nil
}

Expand Down Expand Up @@ -124,3 +152,21 @@ func initializeDataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.Timesta
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetValue(value)
}

func (s *scraper) filterByDevice(ioCounters map[string]disk.IOCountersStat) map[string]disk.IOCountersStat {
if s.includeFS == nil && s.excludeFS == nil {
return ioCounters
}

for device := range ioCounters {
if !s.includeDevice(device) {
delete(ioCounters, device)
}
}
return ioCounters
}

func (s *scraper) includeDevice(deviceName string) bool {
return (s.includeFS == nil || s.includeFS.Matches(deviceName)) &&
(s.excludeFS == nil || !s.excludeFS.Matches(deviceName))
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ func TestScrapeMetrics_Others(t *testing.T) {

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
scraper := newDiskScraper(context.Background(), &Config{})
scraper, err := newDiskScraper(context.Background(), &Config{})
require.NoError(t, err, "Failed to create disk scraper: %v", err)

if test.bootTimeFunc != nil {
scraper.bootTime = test.bootTimeFunc
}
if test.ioCountersFunc != nil {
scraper.ioCounters = test.ioCountersFunc
}

err := scraper.Initialize(context.Background())
err = scraper.Initialize(context.Background())
if test.initializationErr != "" {
assert.EqualError(t, err, test.initializationErr)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,75 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterset"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

func TestScrapeMetrics(t *testing.T) {
scraper := newDiskScraper(context.Background(), &Config{})
type testCase struct {
name string
config Config
expectMetrics bool
newErrRegex string
}

err := scraper.Initialize(context.Background())
require.NoError(t, err, "Failed to initialize disk scraper: %v", err)
defer func() { assert.NoError(t, scraper.Close(context.Background())) }()
testCases := []testCase{
{
name: "Standard",
expectMetrics: true,
},
{
name: "Include Filter that matches nothing",
config: Config{Include: MatchConfig{filterset.Config{MatchType: "strict"}, []string{"@*^#&*$^#)"}}},
expectMetrics: false,
},
{
name: "Invalid Include Filter",
config: Config{Include: MatchConfig{Devices: []string{"test"}}},
newErrRegex: "^error creating device include filters:",
},
{
name: "Invalid Exclude Filter",
config: Config{Exclude: MatchConfig{Devices: []string{"test"}}},
newErrRegex: "^error creating device exclude filters:",
},
}

metrics, err := scraper.ScrapeMetrics(context.Background())
require.NoError(t, err, "Failed to scrape metrics: %v", err)
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
scraper, err := newDiskScraper(context.Background(), &test.config)
if test.newErrRegex != "" {
require.Error(t, err)
require.Regexp(t, test.newErrRegex, err)
return
}
require.NoError(t, err, "Failed to create disk scraper: %v", err)

assert.GreaterOrEqual(t, metrics.Len(), 2)
err = scraper.Initialize(context.Background())
require.NoError(t, err, "Failed to initialize disk scraper: %v", err)
defer func() { assert.NoError(t, scraper.Close(context.Background())) }()

assertDiskMetricValid(t, metrics.At(0), diskIODescriptor, 0)
assertDiskMetricValid(t, metrics.At(1), diskOpsDescriptor, 0)
metrics, err := scraper.ScrapeMetrics(context.Background())
require.NoError(t, err, "Failed to scrape metrics: %v", err)

if runtime.GOOS != "windows" {
assertDiskMetricValid(t, metrics.At(2), diskTimeDescriptor, 0)
}
if !test.expectMetrics {
assert.Equal(t, 0, metrics.Len())
return
}

assert.GreaterOrEqual(t, metrics.Len(), 2)

assertDiskMetricValid(t, metrics.At(0), diskIODescriptor, 0)
assertDiskMetricValid(t, metrics.At(1), diskOpsDescriptor, 0)

if runtime.GOOS != "windows" {
assertDiskMetricValid(t, metrics.At(2), diskTimeDescriptor, 0)
}

if runtime.GOOS == "linux" {
assertDiskMetricValid(t, metrics.At(3), diskMergedDescriptor, 0)
if runtime.GOOS == "linux" {
assertDiskMetricValid(t, metrics.At(3), diskMergedDescriptor, 0)
}
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package diskscraper

import (
"context"
"fmt"
"reflect"
"time"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterset"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/windows/pdh"
)

Expand All @@ -33,10 +35,11 @@ const (

// scraper for Disk Metrics
type scraper struct {
config *Config

config *Config
startTime pdata.TimestampUnixNano
prevScrapeTime time.Time
includeFS filterset.FilterSet
excludeFS filterset.FilterSet

diskReadBytesPerSecCounter pdh.PerfCounterScraper
diskWriteBytesPerSecCounter pdh.PerfCounterScraper
Expand Down Expand Up @@ -65,12 +68,30 @@ type value struct {
}

// newDiskScraper creates a Disk Scraper
func newDiskScraper(_ context.Context, cfg *Config) *scraper {
return &scraper{
func newDiskScraper(_ context.Context, cfg *Config) (*scraper, error) {
scraper := &scraper{
config: cfg,
cumulativeDiskIO: map[string]*value{},
cumulativeDiskOps: map[string]*value{},
}

var err error

if len(cfg.Include.Devices) > 0 {
scraper.includeFS, err = filterset.CreateFilterSet(cfg.Include.Devices, &cfg.Include.Config)
if err != nil {
return nil, fmt.Errorf("error creating device include filters: %w", err)
}
}

if len(cfg.Exclude.Devices) > 0 {
scraper.excludeFS, err = filterset.CreateFilterSet(cfg.Exclude.Devices, &cfg.Exclude.Config)
if err != nil {
return nil, fmt.Errorf("error creating device exclude filters: %w", err)
}
}

return scraper, nil
}

// Initialize
Expand Down Expand Up @@ -173,11 +194,19 @@ func (s *scraper) scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, duratio
}

for _, diskReadBytesPerSec := range diskReadBytesPerSecValues {
s.cumulativeDiskIO.getOrAdd(diskReadBytesPerSec.InstanceName).read += (diskReadBytesPerSec.Value * durationSinceLastScraped)
if s.includeDevice(diskReadBytesPerSec.InstanceName) {
s.cumulativeDiskIO.getOrAdd(diskReadBytesPerSec.InstanceName).read += (diskReadBytesPerSec.Value * durationSinceLastScraped)
}
}

for _, diskWriteBytesPerSec := range diskWriteBytesPerSecValues {
s.cumulativeDiskIO.getOrAdd(diskWriteBytesPerSec.InstanceName).write += (diskWriteBytesPerSec.Value * durationSinceLastScraped)
if s.includeDevice(diskWriteBytesPerSec.InstanceName) {
s.cumulativeDiskIO.getOrAdd(diskWriteBytesPerSec.InstanceName).write += (diskWriteBytesPerSec.Value * durationSinceLastScraped)
}
}

if len(s.cumulativeDiskIO) == 0 {
return nil
}

idx := metrics.Len()
Expand Down Expand Up @@ -212,11 +241,19 @@ func (s *scraper) scrapeAndAppendDiskOpsMetric(metrics pdata.MetricSlice, durati
}

for _, diskReadsPerSec := range diskReadsPerSecValues {
s.cumulativeDiskOps.getOrAdd(diskReadsPerSec.InstanceName).read += (diskReadsPerSec.Value * durationSinceLastScraped)
if s.includeDevice(diskReadsPerSec.InstanceName) {
s.cumulativeDiskOps.getOrAdd(diskReadsPerSec.InstanceName).read += (diskReadsPerSec.Value * durationSinceLastScraped)
}
}

for _, diskWritesPerSec := range diskWritesPerSecValues {
s.cumulativeDiskOps.getOrAdd(diskWritesPerSec.InstanceName).write += (diskWritesPerSec.Value * durationSinceLastScraped)
if s.includeDevice(diskWritesPerSec.InstanceName) {
s.cumulativeDiskOps.getOrAdd(diskWritesPerSec.InstanceName).write += (diskWritesPerSec.Value * durationSinceLastScraped)
}
}

if len(s.cumulativeDiskIO) == 0 {
return nil
}

idx := metrics.Len()
Expand Down Expand Up @@ -247,3 +284,8 @@ func initializeDataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.Timesta
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetValue(value)
}

func (s *scraper) includeDevice(deviceName string) bool {
return (s.includeFS == nil || s.includeFS.Matches(deviceName)) &&
(s.excludeFS == nil || !s.excludeFS.Matches(deviceName))
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ func TestScrapeMetrics_Error(t *testing.T) {

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
scraper := newDiskScraper(context.Background(), &Config{})
scraper, err := newDiskScraper(context.Background(), &Config{})
require.NoError(t, err, "Failed to create disk scraper: %v", err)

err := scraper.Initialize(context.Background())
err = scraper.Initialize(context.Background())
require.NoError(t, err, "Failed to initialize disk scraper: %v", err)
defer func() { assert.NoError(t, scraper.Close(context.Background())) }()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (f *Factory) CreateMetricsScraper(
_ *zap.Logger,
config internal.Config,
) (internal.Scraper, error) {
cfg := config.(*Config)
return obsreportscraper.WrapScraper(newDiskScraper(ctx, cfg), TypeStr), nil
scraper, err := newDiskScraper(ctx, config.(*Config))
if err != nil {
return nil, err
}

return obsreportscraper.WrapScraper(scraper, TypeStr), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,12 @@ func TestCreateMetricsScraper(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, scraper)
}

func TestCreateMetricsScraper_Error(t *testing.T) {
factory := &Factory{}
cfg := &Config{Include: MatchConfig{Devices: []string{""}}}

_, err := factory.CreateMetricsScraper(context.Background(), zap.NewNop(), cfg)

assert.Error(t, err)
}

0 comments on commit 6297c1a

Please sign in to comment.