Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sumologicschemaprocessor): add squashing single values in nesting processor #881

Merged
merged 1 commit into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- feat(sumologicschemaprocessor): add nesting processor [#877]
- feat(sumologicschemaprocessor): add allowlist and denylist to nesting processor [#880]
- feat(sumologicschemaprocessor) allow aggregating attributes with given name patterns [#871]
- feat(sumologicschemaprocessor): add squashing single values in nesting processor [#881]

[Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.67.0-sumo-0...main
[#877]: https://github.com/SumoLogic/sumologic-otel-collector/pull/877
[#880]: https://github.com/SumoLogic/sumologic-otel-collector/pull/880
[#871]: https://github.com/SumoLogic/sumologic-otel-collector/pull/871
[#881]: https://github.com/SumoLogic/sumologic-otel-collector/pull/881

## [v0.67.0-sumo-0]

Expand Down
6 changes: 6 additions & 0 deletions pkg/processor/sumologicschemaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ processors:
# default = []
exclude: [<prefix>]

# If enabled, then maps that would have only one value will be squashed.
# For example,{"k8s": {"pods": {"a": "A", "b": "B"}}}
# will be squashed to {"k8s.pods": {"a": "A", "b": "B"}}
# default = false
squash_single_values: {true, false}

# Specifies if attributes matching given pattern should be mapped to a common key.
# See "Aggregating attributes" documentation chapter from this document.
# default = []
Expand Down
14 changes: 8 additions & 6 deletions pkg/processor/sumologicschemaprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ const (
defaultTranslateTelegrafAttributes = true

// Nesting processor default config
defaultNestingEnabled = false
defaultNestingSeparator = "."
defaultNestingEnabled = false
defaultNestingSeparator = "."
defaultNestingSquashSingleValues = false
)

var (
Expand All @@ -62,10 +63,11 @@ func createDefaultConfig() component.Config {
TranslateAttributes: defaultTranslateAttributes,
TranslateTelegrafAttributes: defaultTranslateTelegrafAttributes,
NestAttributes: &NestingProcessorConfig{
Separator: defaultNestingSeparator,
Enabled: defaultNestingEnabled,
Include: defaultNestingInclude,
Exclude: defaultNestingExclude,
Separator: defaultNestingSeparator,
Enabled: defaultNestingEnabled,
Include: defaultNestingInclude,
Exclude: defaultNestingExclude,
SquashSingleValues: defaultNestingSquashSingleValues,
},
AggregateAttributes: defaultAggregateAttributes,
}
Expand Down
45 changes: 25 additions & 20 deletions pkg/processor/sumologicschemaprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ func TestLoadConfig(t *testing.T) {
TranslateAttributes: true,
TranslateTelegrafAttributes: true,
NestAttributes: &NestingProcessorConfig{
Enabled: false,
Separator: ".",
Include: []string{},
Exclude: []string{},
Enabled: false,
Separator: ".",
Include: []string{},
Exclude: []string{},
SquashSingleValues: false,
},
AggregateAttributes: []aggregationPair{},
})
Expand All @@ -72,10 +73,11 @@ func TestLoadConfig(t *testing.T) {
TranslateAttributes: false,
TranslateTelegrafAttributes: true,
NestAttributes: &NestingProcessorConfig{
Enabled: false,
Separator: ".",
Include: []string{},
Exclude: []string{},
Enabled: false,
Separator: ".",
Include: []string{},
Exclude: []string{},
SquashSingleValues: false,
},
AggregateAttributes: []aggregationPair{},
})
Expand All @@ -92,10 +94,11 @@ func TestLoadConfig(t *testing.T) {
TranslateAttributes: true,
TranslateTelegrafAttributes: false,
NestAttributes: &NestingProcessorConfig{
Enabled: false,
Separator: ".",
Include: []string{},
Exclude: []string{},
Enabled: false,
Separator: ".",
Include: []string{},
Exclude: []string{},
SquashSingleValues: false,
},
AggregateAttributes: []aggregationPair{},
})
Expand All @@ -112,10 +115,11 @@ func TestLoadConfig(t *testing.T) {
TranslateAttributes: true,
TranslateTelegrafAttributes: true,
NestAttributes: &NestingProcessorConfig{
Enabled: true,
Separator: "!",
Include: []string{"blep"},
Exclude: []string{"nghu"},
Enabled: true,
Separator: "!",
Include: []string{"blep"},
Exclude: []string{"nghu"},
SquashSingleValues: true,
},
AggregateAttributes: []aggregationPair{},
})
Expand All @@ -132,10 +136,11 @@ func TestLoadConfig(t *testing.T) {
TranslateAttributes: true,
TranslateTelegrafAttributes: true,
NestAttributes: &NestingProcessorConfig{
Enabled: false,
Separator: ".",
Include: []string{},
Exclude: []string{},
Enabled: false,
Separator: ".",
Include: []string{},
Exclude: []string{},
SquashSingleValues: false,
},
AggregateAttributes: []aggregationPair{
{
Expand Down
100 changes: 88 additions & 12 deletions pkg/processor/sumologicschemaprocessor/nesting_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,28 @@ import (
)

type NestingProcessorConfig struct {
Separator string `mapstructure:"separator"`
Enabled bool `mapstructure:"enabled"`
Include []string `mapstructure:"include"`
Exclude []string `mapstructure:"exclude"`
Separator string `mapstructure:"separator"`
Enabled bool `mapstructure:"enabled"`
Include []string `mapstructure:"include"`
Exclude []string `mapstructure:"exclude"`
SquashSingleValues bool `mapstructure:"squash_single_values"`
}

type NestingProcessor struct {
separator string
enabled bool
allowlist []string
denylist []string
separator string
enabled bool
allowlist []string
denylist []string
squashSingleValues bool
}

func newNestingProcessor(config *NestingProcessorConfig) (*NestingProcessor, error) {
proc := &NestingProcessor{
separator: config.Separator,
enabled: config.Enabled,
allowlist: config.Include,
denylist: config.Exclude,
separator: config.Separator,
enabled: config.Enabled,
allowlist: config.Include,
denylist: config.Exclude,
squashSingleValues: config.SquashSingleValues,
}

return proc, nil
Expand Down Expand Up @@ -190,6 +193,10 @@ func (proc *NestingProcessor) processAttributes(attributes pcommon.Map) error {
return true
})

if proc.squashSingleValues {
newMap = proc.squash(newMap)
}

newMap.CopyTo(attributes)

return nil
Expand Down Expand Up @@ -223,6 +230,75 @@ func (proc *NestingProcessor) shouldTranslateKey(k string) bool {
return true
}

// Squashes maps that have single values, eg. map {"a": {"b": {"c": "C", "d": "D"}}}}
// gets squashes into {"a.b": {"c": "C", "d": "D"}}}
func (proc *NestingProcessor) squash(attributes pcommon.Map) pcommon.Map {
newMap := pcommon.NewValueMap()
attributes.CopyTo(newMap.Map())
key := proc.squashAttribute(newMap)

if key != "" {
retMap := pcommon.NewMap()
newMap.Map().CopyTo(retMap.PutEmptyMap(key))
return retMap
}

return newMap.Map()
}

// A function that squashes keys in a value.
// If this value contained a map with one element, it gets squished and its key gets returned.
//
// If this value contained a map with many elements, this function is called on these elements,
// and the key gets replaced if needed, "" is returned.
//
// Else, nothing happens and "" is returned.
func (proc *NestingProcessor) squashAttribute(value pcommon.Value) string {
if value.Type() == pcommon.ValueTypeMap {
m := value.Map()
if m.Len() == 1 {
// If the map contains only one key-value pair, squash it.
key := ""
val := pcommon.NewValueEmpty()
// This will iterate only over one value (the only one)
m.Range(func(k string, v pcommon.Value) bool {
keySuffix := proc.squashAttribute(v)
key = proc.squashKey(k, keySuffix)
val = v
return false
})

val.CopyTo(value)
return key
} else {
// This map doesn't get squashed, but its content might have keys replaced.
newMap := pcommon.NewMap()
m.Range(func(k string, v pcommon.Value) bool {
keySuffix := proc.squashAttribute(v)
// If "" was returned, the value was not a one-element map and did not get squashed.
if keySuffix == "" {
v.CopyTo(newMap.PutEmpty(k))
} else {
v.CopyTo(newMap.PutEmpty(proc.squashKey(k, keySuffix)))
}

return true
})
newMap.CopyTo(value.Map())
}
}

return ""
}

func (proc *NestingProcessor) squashKey(key string, keySuffix string) string {
if keySuffix == "" {
return key
} else {
return key + proc.separator + keySuffix
}
}

func (proc *NestingProcessor) isEnabled() bool {
return proc.enabled
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,77 @@ func TestNestingAttributes(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
proc, err := newNestingProcessor(&NestingProcessorConfig{
Separator: ".", Enabled: true, Include: testCase.include, Exclude: testCase.exclude,
Separator: ".", Enabled: true, Include: testCase.include, Exclude: testCase.exclude, SquashSingleValues: false,
})
require.NoError(t, err)

attrs := mapToPcommonMap(testCase.input)
err = proc.processAttributes(attrs)
require.NoError(t, err)

expected := mapToPcommonMap(testCase.expected)

require.Equal(t, expected.AsRaw(), attrs.AsRaw())
})
}
}

func TestSquashing(t *testing.T) {
testCases := []struct {
name string
input map[string]pcommon.Value
expected map[string]pcommon.Value
}{
{
name: "squash from example",
input: map[string]pcommon.Value{
"k8s": mapToPcommonValue(map[string]pcommon.Value{
"pods": mapToPcommonValue(map[string]pcommon.Value{
"a": pcommon.NewValueStr("A"),
"b": pcommon.NewValueStr("B"),
}),
}),
},
expected: map[string]pcommon.Value{
"k8s.pods": mapToPcommonValue(map[string]pcommon.Value{
"a": pcommon.NewValueStr("A"),
"b": pcommon.NewValueStr("B"),
}),
},
},
{
name: "many-value maps with squashed keys",
input: map[string]pcommon.Value{
"k8s": mapToPcommonValue(map[string]pcommon.Value{
"pods": mapToPcommonValue(map[string]pcommon.Value{
"a": mapToPcommonValue(map[string]pcommon.Value{
"b": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("A"),
}),
}),
"b": pcommon.NewValueStr("B"),
}),
}),
"sumo": mapToPcommonValue(map[string]pcommon.Value{
"logic": mapToPcommonValue(map[string]pcommon.Value{
"schema": pcommon.NewValueStr("processor"),
}),
}),
},
expected: map[string]pcommon.Value{
"k8s.pods": mapToPcommonValue(map[string]pcommon.Value{
"a.b.c": pcommon.NewValueStr("A"),
"b": pcommon.NewValueStr("B"),
}),
"sumo.logic.schema": pcommon.NewValueStr("processor"),
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
proc, err := newNestingProcessor(&NestingProcessorConfig{
Separator: ".", Enabled: true, Include: []string{}, Exclude: []string{}, SquashSingleValues: true,
})
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ processors:
separator: "!"
include: ["blep"]
exclude: ["nghu"]
squash_single_values: true
sumologic_schema/aggregate-attributes:
aggregate_attributes:
- attribute: "attr1"
Expand Down