Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Add ports support to Kubernetes services resource.
Browse files Browse the repository at this point in the history
= Fill in RDS resource's port with service's port.
= Allow filtering services by ports.
= For multiple ports in a service, create multiple RDS resources, each with name <service>_<port>.

PiperOrigin-RevId: 318921004
  • Loading branch information
manugarg committed Jun 30, 2020
1 parent 382659d commit c003b5c
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 23 deletions.
2 changes: 1 addition & 1 deletion rds/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var SupportedFilters = struct {
RegexFilterKeys []string
LabelsFilter bool
}{
// Note: the port filter applies only to endpoints
// Note: the port filter applies only to endpoints and services.
[]string{"name", "namespace", "port"},
true,
}
Expand Down
73 changes: 57 additions & 16 deletions rds/kubernetes/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -83,22 +84,7 @@ func (lister *servicesLister) listResources(req *pb.ListResourcesRequest) ([]*pb
continue
}

res := &pb.Resource{
Name: proto.String(name),
Labels: svc.Metadata.Labels,
}

if req.GetIpConfig().GetIpType() == pb.IPConfig_PUBLIC {
// If there is no ingress IP, skip the resource.
if len(svc.Status.LoadBalancer.Ingress) == 0 {
continue
}
res.Ip = proto.String(svc.Status.LoadBalancer.Ingress[0].IP)
} else {
res.Ip = proto.String(svc.Spec.ClusterIP)
}

resources = append(resources, res)
resources = append(resources, svc.resources(allFilters.RegexFilters["port"], req.GetIpConfig().GetIpType(), lister.l)...)
}

lister.l.Infof("kubernetes.listResources: returning %d services", len(resources))
Expand All @@ -123,6 +109,61 @@ type serviceInfo struct {
}
}

func (si *serviceInfo) matchPorts(portFilter *filter.RegexFilter, l *logger.Logger) ([]int, map[int]string) {
ports, portNameMap := []int{}, make(map[int]string)
for _, port := range si.Spec.Ports {
// For unnamed ports, use port number.
portName := port.Name
if portName == "" {
portName = strconv.FormatInt(int64(port.Port), 10)
}

if portFilter != nil && !portFilter.Match(portName, l) {
continue
}
ports = append(ports, port.Port)
portNameMap[port.Port] = portName
}
return ports, portNameMap
}

// resources returns RDS resources corresponding to a service resource. Each
// service object can have multiple ports.
//
// a) If service has only 1 port or there is a port filter and only one port
// matches the port filter, we return only one RDS resource with same name as
// service name.
// b) If there are multiple ports, we create one RDS resource for each port and
// name each resource as: <service_name>_<port_name>
func (si *serviceInfo) resources(portFilter *filter.RegexFilter, reqIPType pb.IPConfig_IPType, l *logger.Logger) (resources []*pb.Resource) {
ports, portNameMap := si.matchPorts(portFilter, l)
for _, port := range ports {
resName := si.Metadata.Name
if len(ports) != 1 {
resName = fmt.Sprintf("%s_%s", si.Metadata.Name, portNameMap[port])
}

res := &pb.Resource{
Name: proto.String(resName),
Port: proto.Int32(int32(port)),
Labels: si.Metadata.Labels,
}

if reqIPType == pb.IPConfig_PUBLIC {
// If there is no ingress IP, skip the resource.
if len(si.Status.LoadBalancer.Ingress) == 0 {
continue
}
res.Ip = proto.String(si.Status.LoadBalancer.Ingress[0].IP)
} else {
res.Ip = proto.String(si.Spec.ClusterIP)
}

resources = append(resources, res)
}
return
}

func parseServicesJSON(resp []byte) (names []string, services map[string]*serviceInfo, err error) {
var itemList struct {
Items []*serviceInfo
Expand Down
53 changes: 47 additions & 6 deletions rds/kubernetes/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,20 @@ import (
pb "github.com/google/cloudprober/rds/proto"
)

func testServiceInfo(name, ns, ip, publicIP string, labels map[string]string) *serviceInfo {
func testServiceInfo(name, ns, ip, publicIP string, labels map[string]string, ports []int) *serviceInfo {
si := &serviceInfo{Metadata: kMetadata{Name: name, Namespace: ns, Labels: labels}}
si.Spec.ClusterIP = ip

for _, port := range ports {
si.Spec.Ports = append(si.Spec.Ports, struct {
Name string
Port int
}{
Name: "",
Port: port,
})
}

if publicIP != "" {
si.Status.LoadBalancer.Ingress = []struct{ IP string }{
{
Expand All @@ -41,9 +51,9 @@ func TestListSvcResources(t *testing.T) {
sl := &servicesLister{}
sl.names = []string{"serviceA", "serviceB", "serviceC"}
sl.cache = map[string]*serviceInfo{
"serviceA": testServiceInfo("serviceA", "nsAB", "10.1.1.1", "", map[string]string{"app": "appA"}),
"serviceB": testServiceInfo("serviceB", "nsAB", "10.1.1.2", "192.16.16.199", map[string]string{"app": "appB"}),
"serviceC": testServiceInfo("serviceC", "nsC", "10.1.1.3", "192.16.16.200", map[string]string{"app": "appC", "func": "web"}),
"serviceA": testServiceInfo("serviceA", "nsAB", "10.1.1.1", "", map[string]string{"app": "appA"}, []int{9313, 9314}),
"serviceB": testServiceInfo("serviceB", "nsAB", "10.1.1.2", "192.16.16.199", map[string]string{"app": "appB"}, []int{443}),
"serviceC": testServiceInfo("serviceC", "nsC", "10.1.1.3", "192.16.16.200", map[string]string{"app": "appC", "func": "web"}, []int{3141}),
}

tests := []struct {
Expand All @@ -53,6 +63,7 @@ func TestListSvcResources(t *testing.T) {
labelsFilter map[string]string
wantServices []string
wantIPs []string
wantPorts []int32
wantPublicIPs []string
wantErr bool
}{
Expand All @@ -66,23 +77,34 @@ func TestListSvcResources(t *testing.T) {
filters: map[string]string{"name": "service(B|C)"},
wantServices: []string{"serviceB", "serviceC"},
wantIPs: []string{"10.1.1.2", "10.1.1.3"},
wantPorts: []int32{443, 3141},
},
{
desc: "only port filter for serviceA and serviceB's ports 9314 and 3141",
filters: map[string]string{"port": "314"},
wantServices: []string{"serviceA", "serviceC"},
wantIPs: []string{"10.1.1.1", "10.1.1.3"},
wantPorts: []int32{9314, 3141},
},
{
desc: "name and namespace filter for serviceB",
filters: map[string]string{"name": "service(B|C)", "namespace": "nsAB"},
wantServices: []string{"serviceB"},
wantIPs: []string{"10.1.1.2"},
wantPorts: []int32{443},
},
{
desc: "only namespace filter for serviceA and serviceB",
filters: map[string]string{"namespace": "nsAB"},
wantServices: []string{"serviceA", "serviceB"},
wantIPs: []string{"10.1.1.1", "10.1.1.2"},
wantServices: []string{"serviceA_9313", "serviceA_9314", "serviceB"},
wantIPs: []string{"10.1.1.1", "10.1.1.1", "10.1.1.2"},
wantPorts: []int32{9313, 9314, 443},
},
{
desc: "only services with public IPs",
wantServices: []string{"serviceB", "serviceC"},
wantPublicIPs: []string{"192.16.16.199", "192.16.16.200"},
wantPorts: []int32{443, 3141},
},
}

Expand Down Expand Up @@ -110,9 +132,11 @@ func TestListSvcResources(t *testing.T) {
}

var gotNames, gotIPs []string
var gotPorts []int32
for _, res := range results {
gotNames = append(gotNames, res.GetName())
gotIPs = append(gotIPs, res.GetIp())
gotPorts = append(gotPorts, res.GetPort())
}

if !reflect.DeepEqual(gotNames, test.wantServices) {
Expand All @@ -127,6 +151,10 @@ func TestListSvcResources(t *testing.T) {
if !reflect.DeepEqual(gotIPs, wantIPs) {
t.Errorf("services.listResources IPs: got=%v, expected=%v", gotIPs, wantIPs)
}

if !reflect.DeepEqual(gotPorts, test.wantPorts) {
t.Errorf("services.listResources Ports: got=%v, expected=%v", gotPorts, test.wantPorts)
}
})
}
}
Expand All @@ -147,23 +175,28 @@ func TestParseSvcResourceList(t *testing.T) {
services := map[string]struct {
ip string
publicIP string
ports []int
labels map[string]string
}{
"cloudprober": {
ip: "10.31.252.209",
ports: []int{9313},
labels: map[string]string{"app": "cloudprober"},
},
"cloudprober-rds": {
ip: "10.96.15.88",
publicIP: "192.88.99.199",
ports: []int{9314, 9313},
labels: map[string]string{"app": "cloudprober"},
},
"cloudprober-test": {
ip: "10.31.246.77",
ports: []int{9313},
labels: map[string]string{"app": "cloudprober"},
},
"kubernetes": {
ip: "10.31.240.1",
ports: []int{443},
labels: map[string]string{"component": "apiserver", "provider": "kubernetes"},
},
}
Expand All @@ -187,5 +220,13 @@ func TestParseSvcResourceList(t *testing.T) {
t.Errorf("%s service load balancer ip: got=%s, want=%s", name, servicesByName[name].Status.LoadBalancer.Ingress[0].IP, svc.publicIP)
}
}

var gotPorts []int
for _, port := range servicesByName[name].Spec.Ports {
gotPorts = append(gotPorts, port.Port)
}
if !reflect.DeepEqual(gotPorts, svc.ports) {
t.Errorf("%s service ports: got=%v, want=%v", name, gotPorts, svc.ports)
}
}
}

0 comments on commit c003b5c

Please sign in to comment.