Skip to content

Commit

Permalink
add ability to lookup named ports
Browse files Browse the repository at this point in the history
  • Loading branch information
tzneal committed Dec 3, 2021
1 parent fb80806 commit 7215b04
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 17 deletions.
5 changes: 3 additions & 2 deletions cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ easy construction of the configuration.`,
}
cfg := model.Config{}
includeAll, _ := cmd.Flags().GetBool("all")
pl := model.NewPortLookup(cs)
for _, svc := range svcList.Items {
// skip kube-system services by default
if skipByDefault(svc) && !includeAll {
continue
}
cfg.Supplant = append(cfg.Supplant, model.MapSupplantService(svc))
cfg.External = append(cfg.External, model.MapExternalService(svc))
cfg.Supplant = append(cfg.Supplant, model.MapSupplantService(pl, svc))
cfg.External = append(cfg.External, model.MapExternalService(pl, svc))
}

writeConfig(cfg, args[0])
Expand Down
16 changes: 7 additions & 9 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ inside the cluster as described by the configuration file.`,
cfg := readConfig(inputFile)
for _, supplantSvc := range cfg.Supplant {
if !supplantSvc.Enabled {
printWarn("skipping disabled supplanted service %s", supplantSvc.Name)
continue
}
key := svcKey{supplantSvc.Namespace, supplantSvc.Name}
Expand Down Expand Up @@ -112,7 +111,7 @@ inside the cluster as described by the configuration file.`,
svc.Spec.Ports = append(svc.Spec.Ports, newPort)
printList("%s:%d is now the endpoint for %s:%d", ip, port.ListenPort, supplantSvc.Name, port.Port)
}
appendLabel(&svc.ObjectMeta, "supplant", "true")
appendAnnotation(&svc.ObjectMeta, "supplant", "true")

// delete the existing service
err = cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
Expand All @@ -132,6 +131,7 @@ inside the cluster as described by the configuration file.`,

// delete the existing endpoint
endpoints := cs.CoreV1().Endpoints(svc.Namespace)

err = endpoints.Delete(ctx, svc.Name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
log.Fatalf("error deleting endpoint %s", svc.Name)
Expand All @@ -146,12 +146,11 @@ inside the cluster as described by the configuration file.`,
}}}},
}

appendLabel(&ep.ObjectMeta, "supplant", "true")
appendAnnotation(&ep.ObjectMeta, "supplant", "true")

for _, port := range supplantSvc.Ports {
ep.Subsets[0].Ports = append(ep.Subsets[0].Ports, v1.EndpointPort{
Port: port.ListenPort,
Name: port.Name,
})
}
_, err = endpoints.Create(ctx, ep, metav1.CreateOptions{})
Expand All @@ -170,7 +169,6 @@ inside the cluster as described by the configuration file.`,
var portForwards []kube.PortForwarder
for _, externalSvc := range cfg.External {
if !externalSvc.Enabled {
printWarn("skipping disabled external service %s", externalSvc.Name)
continue
}
for _, port := range externalSvc.Ports {
Expand Down Expand Up @@ -282,9 +280,9 @@ func getOutboundIP() (net.IP, error) {
return localAddr.IP, nil
}

func appendLabel(meta *metav1.ObjectMeta, key string, value string) {
if meta.Labels == nil {
meta.Labels = map[string]string{}
func appendAnnotation(meta *metav1.ObjectMeta, key string, value string) {
if meta.Annotations == nil {
meta.Annotations = map[string]string{}
}
meta.Labels[key] = value
meta.Annotations[key] = value
}
54 changes: 48 additions & 6 deletions model/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package model

import (
"context"
"fmt"
"log"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
)

type Config struct {
Expand Down Expand Up @@ -38,7 +43,19 @@ type ExternalPortConfig struct {
LocalPort int32
}

func MapSupplantService(svc v1.Service) SupplantService {
type PortLookup struct {
cs *kubernetes.Clientset
cache map[string]int32
}

func NewPortLookup(cs *kubernetes.Clientset) *PortLookup {
return &PortLookup{
cs: cs,
cache: make(map[string]int32),
}
}

func MapSupplantService(pl *PortLookup, svc v1.Service) SupplantService {
ret := SupplantService{
Name: svc.Name,
Namespace: svc.Namespace,
Expand All @@ -49,13 +66,13 @@ func MapSupplantService(svc v1.Service) SupplantService {
Name: port.Name,
Port: port.Port,
Protocol: port.Protocol,
ListenPort: decodePort(port.TargetPort),
ListenPort: pl.decodePort(svc, port.TargetPort),
})
}
return ret
}

func MapExternalService(svc v1.Service) ExternalService {
func MapExternalService(pl *PortLookup, svc v1.Service) ExternalService {
ret := ExternalService{
Name: svc.Name,
Namespace: svc.Namespace,
Expand All @@ -64,18 +81,43 @@ func MapExternalService(svc v1.Service) ExternalService {
for _, port := range svc.Spec.Ports {
ret.Ports = append(ret.Ports, ExternalPortConfig{
Name: port.Name,
TargetPort: decodePort(port.TargetPort),
TargetPort: pl.decodePort(svc, port.TargetPort),
Protocol: port.Protocol,
LocalPort: 0,
})
}
return ret
}

func decodePort(port intstr.IntOrString) int32 {
func (pl *PortLookup) decodePort(svc v1.Service, port intstr.IntOrString) int32 {
if port.Type == intstr.Int {
return port.IntVal
}
log.Fatalf("TODO: support parsing port names")
key := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)
if port, ok := pl.cache[key]; ok {
return port
}

ctx := context.Background()
listOpts := metav1.ListOptions{
LabelSelector: labels.FormatLabels(svc.Spec.Selector),
}

pods, err := pl.cs.CoreV1().Pods("").List(ctx, listOpts)
if err != nil {
log.Fatalf("error looking up named port %s: %s", port.StrVal, err)
}
for _, pod := range pods.Items {
for _, container := range pod.Spec.Containers {
for _, cport := range container.Ports {
if cport.Name == port.StrVal {
pl.cache[key] = cport.ContainerPort
return cport.ContainerPort
}
}
}
}

log.Fatalf("unable to find named port %s for service %s", port.StrVal, svc.Name)
return -1
}

0 comments on commit 7215b04

Please sign in to comment.