From c003b5cbd7db790dfd346bfaf33643234ce0d027 Mon Sep 17 00:00:00 2001 From: Manu Garg Date: Mon, 29 Jun 2020 16:50:25 -0700 Subject: [PATCH] Add ports support to Kubernetes services resource. = Fill in RDS resource's port with service's port. = Allow filtering services by ports. = For multiple ports in a service, create multiple RDS resources, each with name _. PiperOrigin-RevId: 318921004 --- rds/kubernetes/kubernetes.go | 2 +- rds/kubernetes/services.go | 73 +++++++++++++++++++++++++-------- rds/kubernetes/services_test.go | 53 +++++++++++++++++++++--- 3 files changed, 105 insertions(+), 23 deletions(-) diff --git a/rds/kubernetes/kubernetes.go b/rds/kubernetes/kubernetes.go index b05791ca..c61c227e 100644 --- a/rds/kubernetes/kubernetes.go +++ b/rds/kubernetes/kubernetes.go @@ -76,7 +76,7 @@ var SupportedFilters = struct { RegexFilterKeys []string LabelsFilter bool }{ - // Note: the port filter applies only to endpoints + // Note: the port filter applies only to endpoints and services. []string{"name", "namespace", "port"}, true, } diff --git a/rds/kubernetes/services.go b/rds/kubernetes/services.go index 42923691..a9469d57 100644 --- a/rds/kubernetes/services.go +++ b/rds/kubernetes/services.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "math/rand" + "strconv" "strings" "sync" "time" @@ -83,22 +84,7 @@ func (lister *servicesLister) listResources(req *pb.ListResourcesRequest) ([]*pb continue } - res := &pb.Resource{ - Name: proto.String(name), - Labels: svc.Metadata.Labels, - } - - if req.GetIpConfig().GetIpType() == pb.IPConfig_PUBLIC { - // If there is no ingress IP, skip the resource. - if len(svc.Status.LoadBalancer.Ingress) == 0 { - continue - } - res.Ip = proto.String(svc.Status.LoadBalancer.Ingress[0].IP) - } else { - res.Ip = proto.String(svc.Spec.ClusterIP) - } - - resources = append(resources, res) + resources = append(resources, svc.resources(allFilters.RegexFilters["port"], req.GetIpConfig().GetIpType(), lister.l)...) } lister.l.Infof("kubernetes.listResources: returning %d services", len(resources)) @@ -123,6 +109,61 @@ type serviceInfo struct { } } +func (si *serviceInfo) matchPorts(portFilter *filter.RegexFilter, l *logger.Logger) ([]int, map[int]string) { + ports, portNameMap := []int{}, make(map[int]string) + for _, port := range si.Spec.Ports { + // For unnamed ports, use port number. + portName := port.Name + if portName == "" { + portName = strconv.FormatInt(int64(port.Port), 10) + } + + if portFilter != nil && !portFilter.Match(portName, l) { + continue + } + ports = append(ports, port.Port) + portNameMap[port.Port] = portName + } + return ports, portNameMap +} + +// resources returns RDS resources corresponding to a service resource. Each +// service object can have multiple ports. +// +// a) If service has only 1 port or there is a port filter and only one port +// matches the port filter, we return only one RDS resource with same name as +// service name. +// b) If there are multiple ports, we create one RDS resource for each port and +// name each resource as: _ +func (si *serviceInfo) resources(portFilter *filter.RegexFilter, reqIPType pb.IPConfig_IPType, l *logger.Logger) (resources []*pb.Resource) { + ports, portNameMap := si.matchPorts(portFilter, l) + for _, port := range ports { + resName := si.Metadata.Name + if len(ports) != 1 { + resName = fmt.Sprintf("%s_%s", si.Metadata.Name, portNameMap[port]) + } + + res := &pb.Resource{ + Name: proto.String(resName), + Port: proto.Int32(int32(port)), + Labels: si.Metadata.Labels, + } + + if reqIPType == pb.IPConfig_PUBLIC { + // If there is no ingress IP, skip the resource. + if len(si.Status.LoadBalancer.Ingress) == 0 { + continue + } + res.Ip = proto.String(si.Status.LoadBalancer.Ingress[0].IP) + } else { + res.Ip = proto.String(si.Spec.ClusterIP) + } + + resources = append(resources, res) + } + return +} + func parseServicesJSON(resp []byte) (names []string, services map[string]*serviceInfo, err error) { var itemList struct { Items []*serviceInfo diff --git a/rds/kubernetes/services_test.go b/rds/kubernetes/services_test.go index fed60601..5d24d603 100644 --- a/rds/kubernetes/services_test.go +++ b/rds/kubernetes/services_test.go @@ -23,10 +23,20 @@ import ( pb "github.com/google/cloudprober/rds/proto" ) -func testServiceInfo(name, ns, ip, publicIP string, labels map[string]string) *serviceInfo { +func testServiceInfo(name, ns, ip, publicIP string, labels map[string]string, ports []int) *serviceInfo { si := &serviceInfo{Metadata: kMetadata{Name: name, Namespace: ns, Labels: labels}} si.Spec.ClusterIP = ip + for _, port := range ports { + si.Spec.Ports = append(si.Spec.Ports, struct { + Name string + Port int + }{ + Name: "", + Port: port, + }) + } + if publicIP != "" { si.Status.LoadBalancer.Ingress = []struct{ IP string }{ { @@ -41,9 +51,9 @@ func TestListSvcResources(t *testing.T) { sl := &servicesLister{} sl.names = []string{"serviceA", "serviceB", "serviceC"} sl.cache = map[string]*serviceInfo{ - "serviceA": testServiceInfo("serviceA", "nsAB", "10.1.1.1", "", map[string]string{"app": "appA"}), - "serviceB": testServiceInfo("serviceB", "nsAB", "10.1.1.2", "192.16.16.199", map[string]string{"app": "appB"}), - "serviceC": testServiceInfo("serviceC", "nsC", "10.1.1.3", "192.16.16.200", map[string]string{"app": "appC", "func": "web"}), + "serviceA": testServiceInfo("serviceA", "nsAB", "10.1.1.1", "", map[string]string{"app": "appA"}, []int{9313, 9314}), + "serviceB": testServiceInfo("serviceB", "nsAB", "10.1.1.2", "192.16.16.199", map[string]string{"app": "appB"}, []int{443}), + "serviceC": testServiceInfo("serviceC", "nsC", "10.1.1.3", "192.16.16.200", map[string]string{"app": "appC", "func": "web"}, []int{3141}), } tests := []struct { @@ -53,6 +63,7 @@ func TestListSvcResources(t *testing.T) { labelsFilter map[string]string wantServices []string wantIPs []string + wantPorts []int32 wantPublicIPs []string wantErr bool }{ @@ -66,23 +77,34 @@ func TestListSvcResources(t *testing.T) { filters: map[string]string{"name": "service(B|C)"}, wantServices: []string{"serviceB", "serviceC"}, wantIPs: []string{"10.1.1.2", "10.1.1.3"}, + wantPorts: []int32{443, 3141}, + }, + { + desc: "only port filter for serviceA and serviceB's ports 9314 and 3141", + filters: map[string]string{"port": "314"}, + wantServices: []string{"serviceA", "serviceC"}, + wantIPs: []string{"10.1.1.1", "10.1.1.3"}, + wantPorts: []int32{9314, 3141}, }, { desc: "name and namespace filter for serviceB", filters: map[string]string{"name": "service(B|C)", "namespace": "nsAB"}, wantServices: []string{"serviceB"}, wantIPs: []string{"10.1.1.2"}, + wantPorts: []int32{443}, }, { desc: "only namespace filter for serviceA and serviceB", filters: map[string]string{"namespace": "nsAB"}, - wantServices: []string{"serviceA", "serviceB"}, - wantIPs: []string{"10.1.1.1", "10.1.1.2"}, + wantServices: []string{"serviceA_9313", "serviceA_9314", "serviceB"}, + wantIPs: []string{"10.1.1.1", "10.1.1.1", "10.1.1.2"}, + wantPorts: []int32{9313, 9314, 443}, }, { desc: "only services with public IPs", wantServices: []string{"serviceB", "serviceC"}, wantPublicIPs: []string{"192.16.16.199", "192.16.16.200"}, + wantPorts: []int32{443, 3141}, }, } @@ -110,9 +132,11 @@ func TestListSvcResources(t *testing.T) { } var gotNames, gotIPs []string + var gotPorts []int32 for _, res := range results { gotNames = append(gotNames, res.GetName()) gotIPs = append(gotIPs, res.GetIp()) + gotPorts = append(gotPorts, res.GetPort()) } if !reflect.DeepEqual(gotNames, test.wantServices) { @@ -127,6 +151,10 @@ func TestListSvcResources(t *testing.T) { if !reflect.DeepEqual(gotIPs, wantIPs) { t.Errorf("services.listResources IPs: got=%v, expected=%v", gotIPs, wantIPs) } + + if !reflect.DeepEqual(gotPorts, test.wantPorts) { + t.Errorf("services.listResources Ports: got=%v, expected=%v", gotPorts, test.wantPorts) + } }) } } @@ -147,23 +175,28 @@ func TestParseSvcResourceList(t *testing.T) { services := map[string]struct { ip string publicIP string + ports []int labels map[string]string }{ "cloudprober": { ip: "10.31.252.209", + ports: []int{9313}, labels: map[string]string{"app": "cloudprober"}, }, "cloudprober-rds": { ip: "10.96.15.88", publicIP: "192.88.99.199", + ports: []int{9314, 9313}, labels: map[string]string{"app": "cloudprober"}, }, "cloudprober-test": { ip: "10.31.246.77", + ports: []int{9313}, labels: map[string]string{"app": "cloudprober"}, }, "kubernetes": { ip: "10.31.240.1", + ports: []int{443}, labels: map[string]string{"component": "apiserver", "provider": "kubernetes"}, }, } @@ -187,5 +220,13 @@ func TestParseSvcResourceList(t *testing.T) { t.Errorf("%s service load balancer ip: got=%s, want=%s", name, servicesByName[name].Status.LoadBalancer.Ingress[0].IP, svc.publicIP) } } + + var gotPorts []int + for _, port := range servicesByName[name].Spec.Ports { + gotPorts = append(gotPorts, port.Port) + } + if !reflect.DeepEqual(gotPorts, svc.ports) { + t.Errorf("%s service ports: got=%v, want=%v", name, gotPorts, svc.ports) + } } }