Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Kubernetes RDS: Use name and namespace as resource keys instead of ju…
Browse files Browse the repository at this point in the history
…st names.

In Kubernetes, resource may have the same name across namespaces. Currently, this can lead to a pretty bizarre behavior. See #436 (comment) for background.

PiperOrigin-RevId: 325143549
  • Loading branch information
manugarg committed Aug 6, 2020
1 parent a6cf40b commit 91bae43
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 117 deletions.
28 changes: 14 additions & 14 deletions rds/kubernetes/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type epLister struct {
kClient *client

mu sync.RWMutex // Mutex for names and cache
names []string
cache map[string]*epInfo
keys []resourceKey
cache map[resourceKey]*epInfo
l *logger.Logger
}

Expand Down Expand Up @@ -67,16 +67,16 @@ func (lister *epLister) listResources(req *pb.ListResourcesRequest) ([]*pb.Resou
lister.mu.RLock()
defer lister.mu.RUnlock()

for _, name := range lister.names {
if epName != "" && name != epName {
for _, key := range lister.keys {
if epName != "" && key.name != epName {
continue
}

if nameFilter != nil && !nameFilter.Match(name, lister.l) {
if nameFilter != nil && !nameFilter.Match(key.name, lister.l) {
continue
}

epi := lister.cache[name]
epi := lister.cache[key]
if nsFilter != nil && !nsFilter.Match(epi.Metadata.Namespace, lister.l) {
continue
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (epi *epInfo) resources(portFilter *filter.RegexFilter, l *logger.Logger) (
return
}

func parseEndpointsJSON(resp []byte) (names []string, endpoints map[string]*epInfo, err error) {
func parseEndpointsJSON(resp []byte) (keys []resourceKey, endpoints map[resourceKey]*epInfo, err error) {
var itemList struct {
Items []*epInfo
}
Expand All @@ -150,11 +150,11 @@ func parseEndpointsJSON(resp []byte) (names []string, endpoints map[string]*epIn
return
}

names = make([]string, len(itemList.Items))
endpoints = make(map[string]*epInfo)
keys = make([]resourceKey, len(itemList.Items))
endpoints = make(map[resourceKey]*epInfo)
for i, item := range itemList.Items {
names[i] = item.Metadata.Name
endpoints[item.Metadata.Name] = item
keys[i] = resourceKey{item.Metadata.Namespace, item.Metadata.Name}
endpoints[keys[i]] = item
}

return
Expand All @@ -166,16 +166,16 @@ func (lister *epLister) expand() {
lister.l.Warningf("epLister.expand(): error while getting endpoints list from API: %v", err)
}

names, endpoints, err := parseEndpointsJSON(resp)
keys, endpoints, err := parseEndpointsJSON(resp)
if err != nil {
lister.l.Warningf("epLister.expand(): error while parsing endpoints API response (%s): %v", string(resp), err)
}

lister.l.Infof("epLister.expand(): got %d endpoints", len(names))
lister.l.Infof("epLister.expand(): got %d endpoints", len(keys))

lister.mu.Lock()
defer lister.mu.Unlock()
lister.names = names
lister.keys = keys
lister.cache = endpoints
}

Expand Down
18 changes: 11 additions & 7 deletions rds/kubernetes/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@ func TestParseEndpoints(t *testing.T) {
if err != nil {
t.Fatalf("error reading test data file: %s", epListFile)
}
_, epByName, err := parseEndpointsJSON(data)
_, epByKey, err := parseEndpointsJSON(data)
if err != nil {
t.Fatalf("error reading test data file: %s", epListFile)
}

testNames := []string{"cloudprober", "cloudprober-test", "kubernetes"}
for _, testP := range testNames {
if epByName[testP] == nil {
t.Errorf("didn't get endpoints by the name: %s", testP)
testKeys := []resourceKey{
{"default", "cloudprober"},
{"default", "cloudprober-test"},
{"system", "kubernetes"},
}
for _, key := range testKeys {
if epByKey[key] == nil {
t.Errorf("didn't get endpoints for %+v", key)
}
}

for _, name := range testNames[:1] {
epi := epByName[name]
for _, key := range testKeys[:1] {
epi := epByKey[key]
if epi.Metadata.Labels["app"] != "cloudprober" {
t.Errorf("cloudprober endpoints app label: got=%s, want=cloudprober", epi.Metadata.Labels["app"])
}
Expand Down
4 changes: 4 additions & 0 deletions rds/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type kMetadata struct {
Labels map[string]string
}

type resourceKey struct {
namespace, name string
}

// ListResources returns the list of resources from the cache.
func (p *Provider) ListResources(req *pb.ListResourcesRequest) (*pb.ListResourcesResponse, error) {
tok := strings.SplitN(req.GetResourcePath(), "/", 2)
Expand Down
28 changes: 14 additions & 14 deletions rds/kubernetes/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type podsLister struct {
kClient *client

mu sync.RWMutex // Mutex for names and cache
names []string
cache map[string]*podInfo
keys []resourceKey
cache map[resourceKey]*podInfo
l *logger.Logger
}

Expand All @@ -59,12 +59,12 @@ func (pl *podsLister) listResources(req *pb.ListResourcesRequest) ([]*pb.Resourc
pl.mu.RLock()
defer pl.mu.RUnlock()

for _, name := range pl.names {
if nameFilter != nil && !nameFilter.Match(name, pl.l) {
for _, key := range pl.keys {
if nameFilter != nil && !nameFilter.Match(key.name, pl.l) {
continue
}

pod := pl.cache[name]
pod := pl.cache[key]
if nsFilter != nil && !nsFilter.Match(pod.Metadata.Namespace, pl.l) {
continue
}
Expand All @@ -73,7 +73,7 @@ func (pl *podsLister) listResources(req *pb.ListResourcesRequest) ([]*pb.Resourc
}

resources = append(resources, &pb.Resource{
Name: proto.String(name),
Name: proto.String(key.name),
Ip: proto.String(pod.Status.PodIP),
Labels: pod.Metadata.Labels,
})
Expand All @@ -91,7 +91,7 @@ type podInfo struct {
}
}

func parsePodsJSON(resp []byte) (names []string, pods map[string]*podInfo, err error) {
func parsePodsJSON(resp []byte) (keys []resourceKey, pods map[resourceKey]*podInfo, err error) {
var itemList struct {
Items []*podInfo
}
Expand All @@ -100,14 +100,14 @@ func parsePodsJSON(resp []byte) (names []string, pods map[string]*podInfo, err e
return
}

names = make([]string, len(itemList.Items))
pods = make(map[string]*podInfo)
keys = make([]resourceKey, len(itemList.Items))
pods = make(map[resourceKey]*podInfo)
for i, item := range itemList.Items {
if item.Status.Phase != "Running" {
continue
}
names[i] = item.Metadata.Name
pods[item.Metadata.Name] = item
keys[i] = resourceKey{item.Metadata.Namespace, item.Metadata.Name}
pods[keys[i]] = item
}

return
Expand All @@ -119,16 +119,16 @@ func (pl *podsLister) expand() {
pl.l.Warningf("podsLister.expand(): error while getting pods list from API: %v", err)
}

names, pods, err := parsePodsJSON(resp)
keys, pods, err := parsePodsJSON(resp)
if err != nil {
pl.l.Warningf("podsLister.expand(): error while parsing pods API response (%s): %v", string(resp), err)
}

pl.l.Infof("podsLister.expand(): got %d pods", len(names))
pl.l.Infof("podsLister.expand(): got %d pods", len(keys))

pl.mu.Lock()
defer pl.mu.Unlock()
pl.names = names
pl.keys = keys
pl.cache = pods
}

Expand Down
73 changes: 44 additions & 29 deletions rds/kubernetes/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,33 @@ import (
)

func testPodInfo(name, ns, ip string, labels map[string]string) *podInfo {
labels["namespace"] = ns
pi := &podInfo{Metadata: kMetadata{Name: name, Namespace: ns, Labels: labels}}
pi.Status.PodIP = ip
return pi
}

func TestListResources(t *testing.T) {
pl := &podsLister{}
pl.names = []string{"podA", "podB", "podC"}
pl.cache = map[string]*podInfo{
"podA": testPodInfo("podA", "nsAB", "10.1.1.1", map[string]string{"app": "appA"}),
"podB": testPodInfo("podB", "nsAB", "10.1.1.2", map[string]string{"app": "appB"}),
"podC": testPodInfo("podC", "nsC", "10.1.1.3", map[string]string{"app": "appC", "func": "web"}),
pl := &podsLister{
cache: make(map[resourceKey]*podInfo),
}
for _, pi := range []*podInfo{
testPodInfo("podA", "nsAB", "10.1.1.1", map[string]string{"app": "appA"}),
testPodInfo("podB", "nsAB", "10.1.1.2", map[string]string{"app": "appB"}),
testPodInfo("podC", "nsC", "10.1.1.3", map[string]string{"app": "appC", "func": "web"}),
testPodInfo("podC", "devC", "10.2.1.3", map[string]string{"app": "appC", "func": "web"}),
} {
key := resourceKey{pi.Metadata.Namespace, pi.Metadata.Name}
pl.keys = append(pl.keys, key)
pl.cache[key] = pi
}

tests := []struct {
desc string
nameFilter string
filters map[string]string
labelsFilter map[string]string
wantPods []string
wantPods []resourceKey
wantErr bool
}{
{
Expand All @@ -54,17 +61,22 @@ func TestListResources(t *testing.T) {
{
desc: "only name filter for podB and podC",
filters: map[string]string{"name": "pod(B|C)"},
wantPods: []string{"podB", "podC"},
wantPods: []resourceKey{{"nsAB", "podB"}, {"nsC", "podC"}, {"devC", "podC"}},
},
{
desc: "name filter for podB and podC, and namespace filter",
filters: map[string]string{"name": "pod(B|C)", "namespace": "ns.*"},
wantPods: []resourceKey{{"nsAB", "podB"}, {"nsC", "podC"}},
},
{
desc: "name and namespace filter for podB",
filters: map[string]string{"name": "pod(B|C)", "namespace": "nsAB"},
wantPods: []string{"podB"},
wantPods: []resourceKey{{"nsAB", "podB"}},
},
{
desc: "only namespace filter for podA and podB",
filters: map[string]string{"namespace": "nsAB"},
wantPods: []string{"podA", "podB"},
wantPods: []resourceKey{{"nsAB", "podA"}, {"nsAB", "podB"}},
},
}

Expand All @@ -83,13 +95,13 @@ func TestListResources(t *testing.T) {
return
}

var gotNames []string
var gotPods []resourceKey
for _, res := range results {
gotNames = append(gotNames, res.GetName())
gotPods = append(gotPods, resourceKey{res.GetLabels()["namespace"], res.GetName()})
}

if !reflect.DeepEqual(gotNames, test.wantPods) {
t.Errorf("pods.listResources: got=%v, expected=%v", gotNames, test.wantPods)
if !reflect.DeepEqual(gotPods, test.wantPods) {
t.Errorf("pods.listResources: got=%v, expected=%v", gotPods, test.wantPods)
}
})
}
Expand All @@ -102,28 +114,31 @@ func TestParseResourceList(t *testing.T) {
if err != nil {
t.Fatalf("error reading test data file: %s", podsListFile)
}
_, podsByName, err := parsePodsJSON(data)
_, podsByKey, err := parsePodsJSON(data)

if err != nil {
t.Fatalf("Error while parsing pods JSON data: %v", err)
}

cpPod := "cloudprober-54778d95f5-7hqtd"
if podsByName[cpPod] == nil {
t.Errorf("didn't get pod by the name: %s", cpPod)
for _, ns := range []string{"prod", "dev"} {
cpPodKey := resourceKey{ns, "cloudprober-54778d95f5-7hqtd"}
if podsByKey[cpPodKey] == nil {
t.Errorf("didn't get pod by the key: %+v", cpPodKey)
continue
}

if podsByKey[cpPodKey].Metadata.Labels["app"] != "cloudprober" {
t.Errorf("cloudprober pod app label: got=%s, want=cloudprober", podsByKey[cpPodKey].Metadata.Labels["app"])
}

cpPodIP := "10.28.0.3"
if podsByKey[cpPodKey].Status.PodIP != cpPodIP {
t.Errorf("cloudprober pod ip: got=%s, want=%s", podsByKey[cpPodKey].Status.PodIP, cpPodIP)
}
}

// Verify that we got the pending pod.
if podsByName["test"] != nil {
// Verify that we didn't the pending pod.
if podsByKey[resourceKey{"default", "test"}] != nil {
t.Error("got a non-running pod in the list: test")
}

if podsByName[cpPod].Metadata.Labels["app"] != "cloudprober" {
t.Errorf("cloudprober pod app label: got=%s, want=cloudprober", podsByName[cpPod].Metadata.Labels["app"])
}

cpPodIP := "10.28.0.3"
if podsByName[cpPod].Status.PodIP != cpPodIP {
t.Errorf("cloudprober pod ip: got=%s, want=%s", podsByName[cpPod].Status.PodIP, cpPodIP)
}
}
Loading

0 comments on commit 91bae43

Please sign in to comment.