diff --git a/cli/cmd/logs.go b/cli/cmd/logs.go index ef5dc0d4..18397725 100644 --- a/cli/cmd/logs.go +++ b/cli/cmd/logs.go @@ -11,6 +11,10 @@ import ( "github.com/spf13/cobra" ) +const ( + loggingStartByte int64 = 0 +) + // logsCmd represents the logs command var logsCmd = &cobra.Command{ Use: "logs [workflow name]", @@ -25,7 +29,7 @@ var logsCmd = &cobra.Command{ ctx := context.Background() if streamLogs { // This is a _very_ simple approach to streaming. - cobra.CheckErr(apiCl.StreamLogs(ctx, os.Stdout, workflowName, 0)) + cobra.CheckErr(apiCl.StreamLogs(ctx, os.Stdout, workflowName, loggingStartByte)) } else { resp, err := apiCl.GetLogs(ctx, workflowName) if err != nil { diff --git a/cli/internal/api/api.go b/cli/internal/api/api.go index c19d9aa8..71dcd8d9 100644 --- a/cli/internal/api/api.go +++ b/cli/internal/api/api.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "net/http" "strings" "time" @@ -77,7 +78,7 @@ func (c *Client) GetLogs(ctx context.Context, workflowName string) (responses.Ge } // StreamLogs streams the logs of a workflow starting after loggedBytes. -func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName string, loggedBytes int64) error { +func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName string, skippedLogBytes int64) error { url := fmt.Sprintf("%s/workflows/%s/logstream", c.endpoint, workflowName) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) @@ -96,17 +97,21 @@ func (c *Client) StreamLogs(ctx context.Context, w io.Writer, workflowName strin } // discard reader bytes already logged - discardedWriter := &bytes.Buffer{} - if _, err := io.CopyN(discardedWriter, resp.Body, loggedBytes); err != nil { + if _, err := io.CopyN(ioutil.Discard, resp.Body, skippedLogBytes); err != nil { return err } - loggedBytes, err = io.Copy(w, resp.Body) + skippedLogBytes, err = io.Copy(w, resp.Body) if err != nil { // retry call if we receive the stream error - if strings.Contains(err.Error(), "stream error: stream ID 1; INTERNAL_ERROR") { + if strings.Contains(err.Error(), "INTERNAL_ERROR") { + // temporary debug message + _, err := fmt.Fprintf(w, "internal error found while copying: '%v'\n", err.Error()) + if err != nil { + return err + } time.Sleep(time.Second * 10) // Restart log streaming - return c.StreamLogs(ctx, w, workflowName, loggedBytes) + return c.StreamLogs(ctx, w, workflowName, skippedLogBytes) } return fmt.Errorf("error reading response body. status code: %d, error: %w", resp.StatusCode, err) }