Skip to content

Commit

Permalink
fix(k8sprocessor): fix metadata enrichment
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
sumo-drosiek authored and Dominik Rosiek committed Sep 14, 2022
1 parent 68720ae commit ac85539
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 21 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v0.57.2-sumo-1]

### Changed

- fix(k8sprocessor): fix metadata enrichment [#724]

[v0.57.2-sumo-1]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.57.2-sumo-0...v0.57.2-sumo-1
[#724]: https://github.com/SumoLogic/sumologic-otel-collector/pull/724

## [v0.57.2-sumo-0]

This release deprecates the following features, which will be removed in `v0.60.0`:
Expand Down
36 changes: 20 additions & 16 deletions pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,24 +490,28 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
c.m.Lock()
defer c.m.Unlock()

if pod.UID != "" {
c.Pods[PodIdentifier(pod.UID)] = newPod
}
if pod.Status.PodIP != "" {
// compare initial scheduled timestamp for existing pod and new pod with same IP
// and only replace old pod if scheduled time of new pod is newer? This should fix
// the case where scheduler has assigned the same IP to a new pod but update event for
// the old pod came in later.
if p, ok := c.Pods[PodIdentifier(pod.Status.PodIP)]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
return
}
}
c.Pods[PodIdentifier(pod.Status.PodIP)] = newPod
identifiers := []PodIdentifier{
PodIdentifier(pod.UID),
PodIdentifier(pod.Status.PodIP),
}
// Use pod_name.namespace_name identifier

if newPod.Name != "" && newPod.Namespace != "" {
c.Pods[generatePodIDFromName(newPod)] = newPod
identifiers = append(identifiers, generatePodIDFromName(newPod))
}

for _, identifier := range identifiers {
if identifier != "" {
// compare initial scheduled timestamp for existing pod and new pod with same identifier
// and only replace old pod if scheduled time of new pod is newer or equal.
// This should fix the case where scheduler has assigned the same attributes (like IP address)
// to a new pod but update event for the old pod came in later.
if p, ok := c.Pods[identifier]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
continue
}
}
c.Pods[identifier] = newPod
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,24 @@ func TestPodAddOutOfSync(t *testing.T) {

pod := &api_v1.Pod{}
pod.Name = "podA"
pod.Namespace = "namespace"
pod.Status.PodIP = "1.1.1.1"
startTime := meta_v1.NewTime(time.Now())
pod.Status.StartTime = &startTime
c.handlePodAdd(pod)
assert.Equal(t, len(c.Pods), 1)
assert.Equal(t, len(c.Pods), 2)
got := c.Pods["1.1.1.1"]
assert.Equal(t, got.Address, "1.1.1.1")
assert.Equal(t, got.Name, "podA")

pod2 := &api_v1.Pod{}
pod2.Name = "podB"
pod.Status.PodIP = "1.1.1.1"
pod2.Namespace = "namespace"
pod2.Status.PodIP = "1.1.1.1"
startTime2 := meta_v1.NewTime(time.Now().Add(-time.Second * 10))
pod.Status.StartTime = &startTime2
c.handlePodAdd(pod)
assert.Equal(t, len(c.Pods), 1)
pod2.Status.StartTime = &startTime2
c.handlePodAdd(pod2)
assert.Equal(t, len(c.Pods), 3)
got = c.Pods["1.1.1.1"]
assert.Equal(t, got.Address, "1.1.1.1")
assert.Equal(t, got.Name, "podA")
Expand Down

0 comments on commit ac85539

Please sign in to comment.