Skip to content

Commit

Permalink
Support for complex attributes for log records in OTel mode
Browse files Browse the repository at this point in the history
According to the spec:

The log attribute model MUST support any type, a superset of standard Attribute, to preserve the semantics of structured attributes emitted by the applications. This field is optional.
  • Loading branch information
felixbarny committed Jan 3, 2025
1 parent e38d233 commit 6511c8a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 27 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_complex-log-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support for complex attributes for log records in OTel mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: []

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
30 changes: 30 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,36 @@ func TestExporterLogs(t *testing.T) {
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("otel mode attribute complex value", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

logs := plog.NewLogs()
resourceLog := logs.ResourceLogs().AppendEmpty()
resourceLog.Resource().Attributes().PutEmptyMap("some.resource.attribute").PutEmptyMap("foo").PutStr("bar", "baz")
scopeLog := resourceLog.ScopeLogs().AppendEmpty()
scopeLog.Scope().Attributes().PutEmptyMap("some.scope.attribute").PutEmptyMap("foo").PutStr("bar", "baz")
logRecord := scopeLog.LogRecords().AppendEmpty()
logRecord.Attributes().PutEmptyMap("some.record.attribute").PutEmptyMap("foo").PutStr("bar", "baz")

mustSendLogs(t, exporter, logs)

rec.WaitItems(1)

assert.Len(t, rec.Items(), 1)
doc := rec.Items()[0].Document
assert.JSONEq(t, `{"some.record.attribute":{"foo":{"bar":"baz"}}}`, gjson.GetBytes(doc, `attributes`).Raw)
assert.JSONEq(t, `{"some.scope.attribute.foo.bar":"baz"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"some.resource.attribute.foo.bar":"baz"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})
}

func TestExporterMetrics(t *testing.T) {
Expand Down
28 changes: 17 additions & 11 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func DocumentFromAttributesWithPath(path string, am pcommon.Map) Document {
}

fields := make([]field, 0, am.Len())
fields = appendAttributeFields(fields, path, am)
fields = appendAttributeFields(fields, path, am, true)
return Document{fields: fields}
}

Expand Down Expand Up @@ -175,10 +175,16 @@ func (doc *Document) AddUInt(key string, value uint64) {
doc.Add(key, UIntValue(value))
}

// AddAttributes expands and flattens all key-value pairs from the input attribute map into
// AddFlattenedAttributes expands and flattens all key-value pairs from the input attribute map into
// the document.
func (doc *Document) AddAttributes(key string, attributes pcommon.Map) {
doc.fields = appendAttributeFields(doc.fields, key, attributes)
func (doc *Document) AddFlattenedAttributes(key string, attributes pcommon.Map) {
doc.AddAttributes(key, attributes, true)
}

// AddAttributes optionally expands and flattens all key-value pairs from the input attribute map into
// the document.
func (doc *Document) AddAttributes(key string, attributes pcommon.Map, flattenValues bool) {
doc.fields = appendAttributeFields(doc.fields, key, attributes, flattenValues)
}

// AddAttribute converts and adds a AttributeValue to the document. If the attribute represents a map,
Expand All @@ -188,7 +194,7 @@ func (doc *Document) AddAttribute(key string, attribute pcommon.Value) {
case pcommon.ValueTypeEmpty:
// do not add 'null'
case pcommon.ValueTypeMap:
doc.AddAttributes(key, attribute.Map())
doc.AddFlattenedAttributes(key, attribute.Map())
default:
doc.Add(key, ValueFromAttribute(attribute))
}
Expand All @@ -199,7 +205,7 @@ func (doc *Document) AddEvents(key string, events ptrace.SpanEventSlice) {
for i := 0; i < events.Len(); i++ {
e := events.At(i)
doc.AddTimestamp(flattenKey(key, e.Name()+".time"), e.Timestamp())
doc.AddAttributes(flattenKey(key, e.Name()), e.Attributes())
doc.AddFlattenedAttributes(flattenKey(key, e.Name()), e.Attributes())
}
}

Expand Down Expand Up @@ -584,21 +590,21 @@ func arrFromAttributes(aa pcommon.Slice) []Value {
return values
}

func appendAttributeFields(fields []field, path string, am pcommon.Map) []field {
func appendAttributeFields(fields []field, path string, am pcommon.Map, flattenValues bool) []field {
am.Range(func(k string, val pcommon.Value) bool {
fields = appendAttributeValue(fields, path, k, val)
fields = appendAttributeValue(fields, path, k, val, flattenValues)
return true
})
return fields
}

func appendAttributeValue(fields []field, path string, key string, attr pcommon.Value) []field {
func appendAttributeValue(fields []field, path string, key string, attr pcommon.Value, flattenValues bool) []field {
if attr.Type() == pcommon.ValueTypeEmpty {
return fields
}

if attr.Type() == pcommon.ValueTypeMap {
return appendAttributeFields(fields, flattenKey(path, key), attr.Map())
if flattenValues && attr.Type() == pcommon.ValueTypeMap {
return appendAttributeFields(fields, flattenKey(path, key), attr.Map(), true)
}

return append(fields, field{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestObjectModel_CreateMap(t *testing.T) {
m := pcommon.NewMap()
m.PutInt("i", 42)
m.PutStr("str", "test")
doc.AddAttributes("prefix", m)
doc.AddFlattenedAttributes("prefix", m)
return doc
},
want: Document{fields: []field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}},
Expand Down
26 changes: 13 additions & 13 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo
document.AddInt("SeverityNumber", int64(record.SeverityNumber()))
document.AddAttribute("Body", record.Body())
m.encodeAttributes(&document, record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))
document.AddFlattenedAttributes("Resource", resource.Attributes())
document.AddFlattenedAttributes("Scope", scopeToAttributes(scope))

return document
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem
document.AddInt("severity_number", int64(record.SeverityNumber()))
document.AddInt("dropped_attributes_count", int64(record.DroppedAttributesCount()))

m.encodeAttributesOTelMode(&document, record.Attributes())
m.encodeAttributesOTelMode(&document, record.Attributes(), false)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

Expand Down Expand Up @@ -314,7 +314,7 @@ func (m *encodeModel) upsertMetricDataPointValueECSMode(documents map[uint32]obj
if document, ok = documents[hash]; !ok {
encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve)
document.AddTimestamp("@timestamp", dp.Timestamp())
document.AddAttributes("", dp.Attributes())
document.AddFlattenedAttributes("", dp.Attributes())
}

document.AddAttribute(metric.Name(), value)
Expand Down Expand Up @@ -342,7 +342,7 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob
}
document.AddString("unit", metric.Unit())

m.encodeAttributesOTelMode(&document, dp.Attributes())
m.encodeAttributesOTelMode(&document, dp.Attributes(), true)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)
}
Expand Down Expand Up @@ -630,7 +630,7 @@ func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pco
document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal))
}

func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map) {
func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map, flattenValues bool) {
attrsCopy := pcommon.NewMap() // Copy to avoid mutating original map
attributeMap.CopyTo(attrsCopy)
attrsCopy.RemoveIf(func(key string, val pcommon.Value) bool {
Expand All @@ -647,7 +647,7 @@ func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attr
return false
})
mergeGeolocation(attrsCopy)
document.AddAttributes("attributes", attrsCopy)
document.AddAttributes("attributes", attrsCopy, flattenValues)
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
Expand Down Expand Up @@ -676,7 +676,7 @@ func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSche
document.AddString("kind", span.Kind().String())
document.AddUInt("duration", uint64(span.EndTimestamp()-span.StartTimestamp()))

m.encodeAttributesOTelMode(&document, span.Attributes())
m.encodeAttributesOTelMode(&document, span.Attributes(), true)

document.AddInt("dropped_attributes_count", int64(span.DroppedAttributesCount()))
document.AddInt("dropped_events_count", int64(span.DroppedEventsCount()))
Expand Down Expand Up @@ -719,10 +719,10 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra
document.AddString("TraceStatusDescription", span.Status().Message())
document.AddString("Link", spanLinksToString(span.Links()))
m.encodeAttributes(&document, span.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddFlattenedAttributes("Resource", resource.Attributes())
m.encodeEvents(&document, span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))
document.AddFlattenedAttributes("Scope", scopeToAttributes(scope))
return document
}

Expand All @@ -739,7 +739,7 @@ func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaU
document.AddTraceID("trace_id", span.TraceID())
document.AddInt("dropped_attributes_count", int64(spanEvent.DroppedAttributesCount()))

m.encodeAttributesOTelMode(&document, spanEvent.Attributes())
m.encodeAttributesOTelMode(&document, spanEvent.Attributes(), true)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

Expand All @@ -751,7 +751,7 @@ func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes p
if m.mode == MappingRaw {
key = ""
}
document.AddAttributes(key, attributes)
document.AddFlattenedAttributes(key, attributes)
}

func (m *encodeModel) encodeEvents(document *objmodel.Document, events ptrace.SpanEventSlice) {
Expand Down Expand Up @@ -796,7 +796,7 @@ func encodeAttributesECSMode(document *objmodel.Document, attrs pcommon.Map, con
if len(conversionMap) == 0 {
// No conversions to be done; add all attributes at top level of
// document.
document.AddAttributes("", attrs)
document.AddFlattenedAttributes("", attrs)
return
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@ func TestEncodeAttributes(t *testing.T) {
mappingMode: MappingNone,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddAttributes("Attributes", attributes)
doc.AddFlattenedAttributes("Attributes", attributes)
return doc
},
},
"ecs": {
mappingMode: MappingECS,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddAttributes("Attributes", attributes)
doc.AddFlattenedAttributes("Attributes", attributes)
return doc
},
},
Expand Down

0 comments on commit 6511c8a

Please sign in to comment.