Skip to content

Commit

Permalink
fix(k8sprocessor): fix metadata enrichment (#725)
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Rosiek <[email protected]>

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
sumo-drosiek authored Sep 14, 2022
1 parent 68720ae commit bc8bf9e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v0.57.2-sumo-1]

### Released 2022-09-14

### Changed

- fix(k8sprocessor): fix metadata enrichment [#725]

[v0.57.2-sumo-1]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.57.2-sumo-0...v0.57.2-sumo-1
[#725]: https://github.com/SumoLogic/sumologic-otel-collector/pull/725

## [v0.57.2-sumo-0]

### Released 2022-08-12

This release deprecates the following features, which will be removed in `v0.60.0`:

- feat(sumologicexporter): deprecate source templates ([upgrade guide][upgrade_guide_v0_57_0_deprecate_source_templates])
Expand All @@ -17,6 +30,7 @@ This release deprecates the following features, which will be removed in `v0.60.
- feat: define stability levels for components [#701]
- chore: upgrade OpenTelemetry Core to v0.57.2 [#699]

[v0.57.2-sumo-0]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.56.0-sumo-0...v0.57.2-sumo-0
[upgrade_guide_v0_57_0_deprecate_source_templates]: ./docs/Upgrading.md#sumologic-exporter-drop-support-for-source-headers
[#699]: https://github.com/SumoLogic/sumologic-otel-collector/pull/699/
[#701]: https://github.com/SumoLogic/sumologic-otel-collector/pull/701/
Expand Down
36 changes: 20 additions & 16 deletions pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,24 +490,28 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
c.m.Lock()
defer c.m.Unlock()

if pod.UID != "" {
c.Pods[PodIdentifier(pod.UID)] = newPod
}
if pod.Status.PodIP != "" {
// compare initial scheduled timestamp for existing pod and new pod with same IP
// and only replace old pod if scheduled time of new pod is newer? This should fix
// the case where scheduler has assigned the same IP to a new pod but update event for
// the old pod came in later.
if p, ok := c.Pods[PodIdentifier(pod.Status.PodIP)]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
return
}
}
c.Pods[PodIdentifier(pod.Status.PodIP)] = newPod
identifiers := []PodIdentifier{
PodIdentifier(pod.UID),
PodIdentifier(pod.Status.PodIP),
}
// Use pod_name.namespace_name identifier

if newPod.Name != "" && newPod.Namespace != "" {
c.Pods[generatePodIDFromName(newPod)] = newPod
identifiers = append(identifiers, generatePodIDFromName(newPod))
}

for _, identifier := range identifiers {
if identifier != "" {
// compare initial scheduled timestamp for existing pod and new pod with same identifier
// and only replace old pod if scheduled time of new pod is newer or equal.
// This should fix the case where scheduler has assigned the same attributes (like IP address)
// to a new pod but update event for the old pod came in later.
if p, ok := c.Pods[identifier]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
continue
}
}
c.Pods[identifier] = newPod
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,24 @@ func TestPodAddOutOfSync(t *testing.T) {

pod := &api_v1.Pod{}
pod.Name = "podA"
pod.Namespace = "namespace"
pod.Status.PodIP = "1.1.1.1"
startTime := meta_v1.NewTime(time.Now())
pod.Status.StartTime = &startTime
c.handlePodAdd(pod)
assert.Equal(t, len(c.Pods), 1)
assert.Equal(t, len(c.Pods), 2)
got := c.Pods["1.1.1.1"]
assert.Equal(t, got.Address, "1.1.1.1")
assert.Equal(t, got.Name, "podA")

pod2 := &api_v1.Pod{}
pod2.Name = "podB"
pod.Status.PodIP = "1.1.1.1"
pod2.Namespace = "namespace"
pod2.Status.PodIP = "1.1.1.1"
startTime2 := meta_v1.NewTime(time.Now().Add(-time.Second * 10))
pod.Status.StartTime = &startTime2
c.handlePodAdd(pod)
assert.Equal(t, len(c.Pods), 1)
pod2.Status.StartTime = &startTime2
c.handlePodAdd(pod2)
assert.Equal(t, len(c.Pods), 3)
got = c.Pods["1.1.1.1"]
assert.Equal(t, got.Address, "1.1.1.1")
assert.Equal(t, got.Name, "podA")
Expand Down

0 comments on commit bc8bf9e

Please sign in to comment.