Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaner kubectl port-forward retry logic #2593

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/microservices/leeroy-app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "leeroooooy app!!\n")
fmt.Fprintf(w, "6\n")
}

func main() {
Expand Down
44 changes: 28 additions & 16 deletions integration/dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,26 +267,38 @@ func TestDevPortForwardGKELoadBalancer(t *testing.T) {
}()

body := []byte{}
err = wait.PollImmediate(time.Millisecond*500, 5*time.Minute, func() (bool, error) {
e := <-entries
switch e.Event.GetEventType().(type) {
case *proto.Event_PortEvent:
if e.Event.GetPortEvent().ResourceName == "gke-loadbalancer" &&
e.Event.GetPortEvent().ResourceType == "service" {
port := e.Event.GetPortEvent().LocalPort
t.Logf("Detected service/gke-loadbalancer is forwarded to port %d", port)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d", port))
if err != nil {
t.Errorf("could not get service/gke-loadbalancer due to %s", err)
var port int32
timeout := time.After(1 * time.Minute)

portForwardEvent:
for {
select {
case <-timeout:
t.Errorf("timed out waiting for port forwarding event")
break portForwardEvent
case e := <-entries:
switch e.Event.GetEventType().(type) {
case *proto.Event_PortEvent:
if e.Event.GetPortEvent().ResourceName == "gke-loadbalancer" &&
e.Event.GetPortEvent().ResourceType == "service" {
port = e.Event.GetPortEvent().LocalPort
t.Logf("Detected service/gke-loadbalancer is forwarded to port %d", port)
break portForwardEvent
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
return true, err
default:
t.Logf("event received %v", e)
}
return false, nil
default:
}
}
err = wait.PollImmediate(time.Millisecond*500, 3*time.Minute, func() (bool, error) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d", port))
if err != nil {
t.Logf("could not get service/gke-loadbalancer due to %s", err)
return false, nil
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
return true, err
})

testutil.CheckErrorAndDeepEqual(t, false, err, string(body), "hello!!\n")
Expand Down
42 changes: 12 additions & 30 deletions pkg/skaffold/kubernetes/portforward/entry_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,14 @@ import (
"fmt"
"io"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
)

var (
// For testing
forwardingTimeoutTime = time.Minute
portForwardEvent = func(entry *portForwardEntry) {
portForwardEvent = func(entry *portForwardEntry) {
// TODO priyawadhwa@, change event API to accept ports of type int
event.PortForwarded(
int32(entry.localPort),
Expand Down Expand Up @@ -157,35 +152,27 @@ func NewEntryManager(out io.Writer, cli *kubectl.CLI) EntryManager {
output: out,
forwardedPorts: newForwardedPorts(),
forwardedResources: newForwardedResources(),
EntryForwarder: &KubectlForwarder{kubectl: cli},
EntryForwarder: &KubectlForwarder{kubectl: cli, out: out},
}
}

func (b *EntryManager) forwardPortForwardEntry(ctx context.Context, entry *portForwardEntry) error {
func (b *EntryManager) forwardPortForwardEntry(ctx context.Context, entry *portForwardEntry) {
// Check if this resource has already been forwarded
if _, ok := b.forwardedResources.Load(entry.key()); ok {
return nil
return
}
b.forwardedResources.Store(entry.key(), entry)
err := wait.PollImmediate(time.Second, forwardingTimeoutTime, func() (bool, error) {
if err := b.Forward(ctx, entry); err != nil {
return false, nil
}
return true, nil
})

go b.Monitor(entry, func() {
b.Retry(ctx, entry)
})

if err != nil {
return err
}

color.Default.Fprintln(b.output, fmt.Sprintf("Port forwarded %s/%s from remote port %d to local port %d", entry.resource.Type, entry.resource.Name, entry.resource.Port, entry.localPort))
b.Forward(ctx, entry)

color.Default.Fprintln(
b.output,
fmt.Sprintf("Port forwarding %s/%s from remote port %d to local port %d",
entry.resource.Type,
entry.resource.Name,
entry.resource.Port,
entry.localPort))
portForwardEvent(entry)
return nil
}

// Stop terminates all kubectl port-forward commands.
Expand All @@ -201,8 +188,3 @@ func (b *EntryManager) Terminate(p *portForwardEntry) {
b.forwardedPorts.Delete(p.localPort)
b.EntryForwarder.Terminate(p)
}

func (b *EntryManager) Retry(ctx context.Context, p *portForwardEntry) error {
b.Terminate(p)
return b.forwardPortForwardEntry(ctx, p)
}
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/entry_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestStop(t *testing.T) {
em.forwardedPorts.Store(9000, struct{}{})
em.forwardedPorts.Store(9001, struct{}{})

fakeForwarder := newTestForwarder(nil)
fakeForwarder := newTestForwarder()
fakeForwarder.forwardedResources = em.forwardedResources
em.EntryForwarder = fakeForwarder

Expand Down
155 changes: 93 additions & 62 deletions pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,79 +20,99 @@ import (
"bytes"
"context"
"fmt"
"net"
"io"
"os/exec"
"strings"
"time"

"github.com/pkg/errors"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
)

type EntryForwarder interface {
Forward(parentCtx context.Context, pfe *portForwardEntry) error
Forward(parentCtx context.Context, pfe *portForwardEntry)
Terminate(p *portForwardEntry)
Monitor(*portForwardEntry, func())
}

type KubectlForwarder struct {
kubectl *kubectl.CLI
out io.Writer
}

// Forward port-forwards a pod using kubectl port-forward
// It returns an error only if the process fails or was terminated by a signal other than SIGTERM
func (k *KubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEntry) error {
ctx, cancel := context.WithCancel(parentCtx)
// when retrying a portforwarding entry, it might already have a context running
if pfe.cancel != nil {
pfe.cancel()
}
pfe.cancel = cancel

cmd := k.kubectl.Command(ctx,
"port-forward",
"--pod-running-timeout", "5s",
fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name),
fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port),
"--namespace", pfe.resource.Namespace,
)
pfe.logBuffer = &bytes.Buffer{}
cmd.Stdout = pfe.logBuffer
cmd.Stderr = pfe.logBuffer

if err := cmd.Start(); err != nil {
if errors.Cause(err) == context.Canceled {
return nil
// Forward port-forwards a pod using kubectl port-forward in the background
// It kills the command on errors in the kubectl port-forward log
// It restarts the command if it was not cancelled by skaffold
// It retries in case the port is taken
func (k *KubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEntry) {
go k.forward(parentCtx, pfe)
}

func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEntry) {
var notifiedUser bool
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when reading this, it looks like this forward runs forever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does, until the pfe gets cancelled (there are 3 return statements in the body, all around cancellation!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got lost in reading and saw only 1 return statement.
I still feel, all the 3 return statements are waiting for conditions that are

  1. pfe.terminated. - not sure when that happens
  2. & 3. user enter ^C in skaffold dev .

Will it keep trying forever untill above two conditions are satisfied?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, if things are going well, it is going to wait in cmd.Wait() as long as kubectl portforward is running ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yes, it does retry unless it is an explicit cancel from the skaffold process...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this:

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this was generated on planttext.com :)

@startuml

title Port Forward State Model
[*] --> CheckForTermination
CheckForTermination -down-> Cancelled
CheckForTermination -down-> TryCmdStart
TryCmdStart -down-> Cancelled
TryCmdStart --> CheckForTermination
TryCmdStart -down-> LogMonitoring

Cancelled -down-> [*]
state "kubectl port-forward logs are monitored" as LogMonitoring {
  TryCmdWait -down-> Cancelled
  TryCmdWait -up-> CheckForTermination
}

@enduml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish I could put labels on the arrows - but the key thing is non-cancellation error scenarios go back to start, cancellation exists, no error continues to next step.

if parentCtx.Err() == context.Canceled {
logrus.Debugf("port forwarding %v cancelled...", pfe)
return
}
if !util.IsPortFree(pfe.localPort) {
//assuming that Skaffold brokered ports don't overlap, this has to be an external process that started
//since the dev loop kicked off. We are notifying the user in the hope that they can fix it
color.Red.Fprintf(k.out, "failed to port forward %v, port %d is taken, retrying...\n", pfe, pfe.localPort)
notifiedUser = true
time.Sleep(5 * time.Second)
continue
}
return errors.Wrapf(err, "port forwarding %s/%s, port: %d to local port: %d, err: %s", pfe.resource.Type, pfe.resource.Name, pfe.resource.Port, pfe.localPort, pfe.logBuffer.String())
}

resultChan := make(chan error, 1)
go func() {
err := cmd.Wait()
if err != nil {
logrus.Debugf("port forwarding %v terminated: %s, output: %s", pfe, err, pfe.logBuffer.String())
resultChan <- err
if notifiedUser {
color.Green.Fprintf(k.out, "port forwarding %v recovered on port %d\n", pfe, pfe.localPort)
notifiedUser = false
}
}()

go func() {
err := wait.PollImmediate(200*time.Millisecond, 5*time.Second, func() (bool, error) {
// creating a listening port should not succeed
if ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", util.Loopback, pfe.localPort)); err == nil {
ln.Close()
return false, nil

ctx, cancel := context.WithCancel(parentCtx)
// when retrying a portforwarding entry, it might already have a context running
if pfe.cancel != nil {
pfe.cancel()
}
pfe.cancel = cancel
cmd := k.kubectl.Command(ctx,
"port-forward",
"--pod-running-timeout", "1s",
fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name),
fmt.Sprintf("%d:%d", pfe.localPort, pfe.resource.Port),
"--namespace", pfe.resource.Namespace,
)
buf := &bytes.Buffer{}
cmd.Stdout = buf
cmd.Stderr = buf

if err := cmd.Start(); err != nil {
if ctx.Err() == context.Canceled {
logrus.Debugf("couldn't start %v due to context cancellation", pfe)
return
}
return true, nil
})
resultChan <- err
}()
//retry on exit at Start()
logrus.Debugf("error starting port forwarding %v: %s, output: %s", pfe, err, buf.String())
time.Sleep(500 * time.Millisecond)
continue
}

err := <-resultChan
return err
//kill kubectl on port forwarding error logs
go k.monitorErrorLogs(ctx, buf, cmd, pfe)

if err := cmd.Wait(); err != nil {
if ctx.Err() == context.Canceled {
logrus.Debugf("terminated %v due to context cancellation", pfe)
return
}
logrus.Debugf("port forwarding %v got terminated: %s, output: %s", pfe, err, buf.String())
time.Sleep(500 * time.Millisecond)
}
}
}

// Terminate terminates an existing kubectl port-forward command using SIGTERM
Expand All @@ -107,18 +127,29 @@ func (*KubectlForwarder) Terminate(p *portForwardEntry) {
// Monitor monitors the logs for a kubectl port forward command
// If it sees an error, it calls back to the EntryManager to
// retry the entire port forward operation.
func (*KubectlForwarder) Monitor(p *portForwardEntry, retryFunc func()) {
func (*KubectlForwarder) monitorErrorLogs(ctx context.Context, buf *bytes.Buffer, cmd *exec.Cmd, p *portForwardEntry) {
for {
time.Sleep(1 * time.Second)
s, _ := p.logBuffer.ReadString(byte('\n'))
if s != "" {
logrus.Tracef("[port-forward] %s", s)
if strings.Contains(s, "error forwarding port") || strings.Contains(s, "unable to forward") {
// kubectl is having an error. retry the command
logrus.Infof("retrying kubectl port-forward due to error: %s", s)
go retryFunc()
return
select {
case <-ctx.Done():
return
default:
time.Sleep(1 * time.Second)
s, _ := buf.ReadString(byte('\n'))
if s != "" {
logrus.Tracef("[port-forward] %s", s)

if strings.Contains(s, "error forwarding port") ||
strings.Contains(s, "unable to forward") ||
strings.Contains(s, "error upgrading connection") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to log a warning or something so the user knows the retry is happening and why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is there in trace mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep thinking about what you mentioned: "error upgrading connection" is from port forwarding in client-go: https://github.com/kubernetes/client-go/blob/master/tools/portforward/portforward.go#L194 - I think it is fine to retry on it...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure Cloud Code doesn't show trace — it would be good for users to at least be able to discover that there was a failure.

I think restarting on error makes a lot of sense: it's essentially turning kubectl port-forward into a single-use attempt. Perhaps this code could look for the more general error header (portforward.go:.*error occurred) and log the error but suppress when one of these strings (known issues)?

// kubectl is having an error. retry the command
logrus.Tracef("killing port forwarding %v", p)
if err := cmd.Process.Kill(); err != nil {
logrus.Tracef("failed to kill port forwarding %v, err: %s", p, err)
balopat marked this conversation as resolved.
Show resolved Hide resolved
}
return
}
}
}

}
}
5 changes: 2 additions & 3 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,8 @@ func (p *WatchingPodForwarder) portForwardPod(ctx context.Context, pod *v1.Pod)
p.Terminate(prevEntry)
}
}
if err := p.forwardPortForwardEntry(ctx, entry); err != nil {
return err
}
p.forwardPortForwardEntry(ctx, entry)

}
}
return nil
Expand Down
Loading