Skip to content

Commit

Permalink
fix(k8sprocessor): fix metadata enrichment (#725)
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Rosiek <[email protected]>

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
sumo-drosiek authored and Dominik Rosiek committed Sep 14, 2022
1 parent a917452 commit 7696758
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 22 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [v0.54.0-sumo-1]

### Released 2022-09-15

### Changed

- fix(k8sprocessor): fix metadata enrichment [#725]

[v0.54.0-sumo-1]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.54.0-sumo-0...v0.54.0-sumo-1
[#725]: https://github.com/SumoLogic/sumologic-otel-collector/pull/725

## [v0.54.0-sumo-0]

### Released 2022-07-04

### Added

Expand Down
36 changes: 20 additions & 16 deletions pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,24 +485,28 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
c.m.Lock()
defer c.m.Unlock()

if pod.UID != "" {
c.Pods[PodIdentifier(pod.UID)] = newPod
}
if pod.Status.PodIP != "" {
// compare initial scheduled timestamp for existing pod and new pod with same IP
// and only replace old pod if scheduled time of new pod is newer? This should fix
// the case where scheduler has assigned the same IP to a new pod but update event for
// the old pod came in later.
if p, ok := c.Pods[PodIdentifier(pod.Status.PodIP)]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
return
}
}
c.Pods[PodIdentifier(pod.Status.PodIP)] = newPod
identifiers := []PodIdentifier{
PodIdentifier(pod.UID),
PodIdentifier(pod.Status.PodIP),
}
// Use pod_name.namespace_name identifier

if newPod.Name != "" && newPod.Namespace != "" {
c.Pods[generatePodIDFromName(newPod)] = newPod
identifiers = append(identifiers, generatePodIDFromName(newPod))
}

for _, identifier := range identifiers {
if identifier != "" {
// compare initial scheduled timestamp for existing pod and new pod with same identifier
// and only replace old pod if scheduled time of new pod is newer or equal.
// This should fix the case where scheduler has assigned the same attributes (like IP address)
// to a new pod but update event for the old pod came in later.
if p, ok := c.Pods[identifier]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
continue
}
}
c.Pods[identifier] = newPod
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,24 @@ func TestPodAddOutOfSync(t *testing.T) {

pod := &api_v1.Pod{}
pod.Name = "podA"
pod.Namespace = "namespace"
pod.Status.PodIP = "1.1.1.1"
startTime := meta_v1.NewTime(time.Now())
pod.Status.StartTime = &startTime
c.handlePodAdd(pod)
assert.Equal(t, len(c.Pods), 1)
assert.Equal(t, len(c.Pods), 2)
got := c.Pods["1.1.1.1"]
assert.Equal(t, got.Address, "1.1.1.1")
assert.Equal(t, got.Name, "podA")

pod2 := &api_v1.Pod{}
pod2.Name = "podB"
pod.Status.PodIP = "1.1.1.1"
pod2.Namespace = "namespace"
pod2.Status.PodIP = "1.1.1.1"
startTime2 := meta_v1.NewTime(time.Now().Add(-time.Second * 10))
pod.Status.StartTime = &startTime2
c.handlePodAdd(pod)
assert.Equal(t, len(c.Pods), 1)
pod2.Status.StartTime = &startTime2
c.handlePodAdd(pod2)
assert.Equal(t, len(c.Pods), 3)
got = c.Pods["1.1.1.1"]
assert.Equal(t, got.Address, "1.1.1.1")
assert.Equal(t, got.Name, "podA")
Expand Down

0 comments on commit 7696758

Please sign in to comment.