Skip to content

Commit

Permalink
debug (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
grantleehoffman authored Sep 27, 2021
1 parent 4eb6b8f commit 6b137dd
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
6 changes: 5 additions & 1 deletion cli/cmd/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/spf13/cobra"
)

const (
loggingStartByte int64 = 0
)

// logsCmd represents the logs command
var logsCmd = &cobra.Command{
Use: "logs [workflow name]",
Expand All @@ -25,7 +29,7 @@ var logsCmd = &cobra.Command{
ctx := context.Background()
if streamLogs {
// This is a _very_ simple approach to streaming.
cobra.CheckErr(apiCl.StreamLogs(ctx, os.Stdout, workflowName, 0))
cobra.CheckErr(apiCl.StreamLogs(ctx, os.Stdout, workflowName, loggingStartByte))
} else {
resp, err := apiCl.GetLogs(ctx, workflowName)
if err != nil {
Expand Down
17 changes: 11 additions & 6 deletions cli/internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -77,7 +78,7 @@ func (c *Client) GetLogs(ctx context.Context, workflowName string) (responses.Ge
}

// StreamLogs streams the logs of a workflow starting after loggedBytes.
func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName string, loggedBytes int64) error {
func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName string, skippedLogBytes int64) error {
url := fmt.Sprintf("%s/workflows/%s/logstream", c.endpoint, workflowName)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
Expand All @@ -96,17 +97,21 @@ func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName strin
}

// discard reader bytes already logged
discardedWriter := &bytes.Buffer{}
if _, err := io.CopyN(discardedWriter, resp.Body, loggedBytes); err != nil {
if _, err := io.CopyN(ioutil.Discard, resp.Body, skippedLogBytes); err != nil {
return err
}
loggedBytes, err = io.Copy(w, resp.Body)
skippedLogBytes, err = io.Copy(w, resp.Body)
if err != nil {
// retry call if we receive the stream error
if strings.Contains(err.Error(), "stream error: stream ID 1; INTERNAL_ERROR") {
if strings.Contains(err.Error(), "INTERNAL_ERROR") {
// temporary debug message
_, err := fmt.Fprintf(w, "internal error found while copying: '%v'\n", err.Error())
if err != nil {
return err
}
time.Sleep(time.Second * 10)
// Restart log streaming
return c.StreamLogs(ctx, w, workflowName, loggedBytes)
return c.StreamLogs(ctx, w, workflowName, skippedLogBytes)
}
return fmt.Errorf("error reading response body. status code: %d, error: %w", resp.StatusCode, err)
}
Expand Down

0 comments on commit 6b137dd

Please sign in to comment.