-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(controller) Emissary executor. (#4925)
Signed-off-by: Alex Collins <[email protected]>
- Loading branch information
Showing
26 changed files
with
873 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
package commands | ||
|
||
import ( | ||
"compress/gzip" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"os" | ||
"os/exec" | ||
"os/signal" | ||
"path/filepath" | ||
"strconv" | ||
"syscall" | ||
"time" | ||
|
||
log "github.com/sirupsen/logrus" | ||
"github.com/spf13/cobra" | ||
|
||
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" | ||
"github.com/argoproj/argo-workflows/v3/util/archive" | ||
"github.com/argoproj/argo-workflows/v3/workflow/common" | ||
"github.com/argoproj/argo-workflows/v3/workflow/util/path" | ||
) | ||
|
||
var ( | ||
varRunArgo = "/var/run/argo" | ||
containerName = os.Getenv(common.EnvVarContainerName) | ||
includeScriptOutput = os.Getenv(common.EnvVarIncludeScriptOutput) == "true" // capture stdout/stderr | ||
template = &wfv1.Template{} | ||
logger = log.WithField("argo", true) | ||
) | ||
|
||
func NewEmissaryCommand() *cobra.Command { | ||
return &cobra.Command{ | ||
Use: "emissary", | ||
SilenceUsage: true, // this prevents confusing usage message being printed when we SIGTERM | ||
RunE: func(cmd *cobra.Command, args []string) error { | ||
exitCode := 64 | ||
|
||
defer func() { | ||
err := ioutil.WriteFile(varRunArgo+"/ctr/"+containerName+"/exitcode", []byte(strconv.Itoa(exitCode)), 0600) | ||
if err != nil { | ||
logger.Error(fmt.Errorf("failed to write exit code: %w", err)) | ||
} | ||
}() | ||
|
||
// this also indicates we've started | ||
if err := os.MkdirAll(varRunArgo+"/ctr/"+containerName, 0700); err != nil { | ||
return fmt.Errorf("failed to create ctr directory: %w", err) | ||
} | ||
|
||
name, args := args[0], args[1:] | ||
|
||
signals := make(chan os.Signal, 1) | ||
defer close(signals) | ||
signal.Notify(signals) | ||
defer signal.Reset() | ||
go func() { | ||
for s := range signals { | ||
if s != syscall.SIGCHLD { | ||
_ = syscall.Kill(-os.Getpid(), s.(syscall.Signal)) | ||
} | ||
} | ||
}() | ||
|
||
data, err := ioutil.ReadFile(varRunArgo + "/template") | ||
if err != nil { | ||
return fmt.Errorf("failed to read template: %w", err) | ||
} | ||
|
||
if err := json.Unmarshal(data, template); err != nil { | ||
return fmt.Errorf("failed to unmarshal template: %w", err) | ||
} | ||
|
||
name, err = path.Search(name) | ||
if err != nil { | ||
return fmt.Errorf("failed to find name in PATH: %w", err) | ||
} | ||
|
||
command := exec.Command(name, args...) | ||
command.Env = os.Environ() | ||
command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} | ||
command.Stdout = os.Stdout | ||
command.Stderr = os.Stderr | ||
|
||
// this may not be that important an optimisation, except for very long logs we don't want to capture | ||
if includeScriptOutput { | ||
logger.Info("capturing script output") | ||
stdout, err := os.Create(varRunArgo + "/ctr/" + containerName + "/stdout") | ||
if err != nil { | ||
return fmt.Errorf("failed to open stdout: %w", err) | ||
} | ||
defer func() { _ = stdout.Close() }() | ||
command.Stdout = io.MultiWriter(os.Stdout, stdout) | ||
|
||
stderr, err := os.Create(varRunArgo + "/ctr/" + containerName + "/stderr") | ||
if err != nil { | ||
return fmt.Errorf("failed to open stderr: %w", err) | ||
} | ||
defer func() { _ = stderr.Close() }() | ||
command.Stderr = io.MultiWriter(os.Stderr, stderr) | ||
} | ||
|
||
if err := command.Start(); err != nil { | ||
return err | ||
} | ||
|
||
go func() { | ||
for { | ||
data, _ := ioutil.ReadFile(varRunArgo + "/ctr/" + containerName + "/signal") | ||
_ = os.Remove(varRunArgo + "/ctr/" + containerName + "/signal") | ||
s, _ := strconv.Atoi(string(data)) | ||
if s > 0 { | ||
_ = syscall.Kill(command.Process.Pid, syscall.Signal(s)) | ||
} | ||
time.Sleep(2 * time.Second) | ||
} | ||
}() | ||
|
||
cmdErr := command.Wait() | ||
|
||
if cmdErr == nil { | ||
exitCode = 0 | ||
} else if exitError, ok := cmdErr.(*exec.ExitError); ok { | ||
if exitError.ExitCode() >= 0 { | ||
exitCode = exitError.ExitCode() | ||
} else { | ||
exitCode = 137 // SIGTERM | ||
} | ||
} | ||
|
||
if containerName == common.MainContainerName { | ||
for _, x := range template.Outputs.Parameters { | ||
if x.ValueFrom != nil && x.ValueFrom.Path != "" { | ||
if err := saveParameter(x.ValueFrom.Path); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
for _, x := range template.Outputs.Artifacts { | ||
if x.Path != "" { | ||
if err := saveArtifact(x.Path); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} else { | ||
logger.Info("not saving outputs - not main container") | ||
} | ||
|
||
return cmdErr // this is the error returned from cmd.Wait(), which maybe an exitError | ||
}, | ||
} | ||
} | ||
|
||
func saveArtifact(srcPath string) error { | ||
if common.FindOverlappingVolume(template, srcPath) != nil { | ||
logger.Infof("no need to save artifact - on overlapping volume: %s", srcPath) | ||
return nil | ||
} | ||
if _, err := os.Stat(srcPath); os.IsNotExist(err) { // might be optional, so we ignore | ||
logger.WithError(err).Errorf("cannot save artifact %s", srcPath) | ||
return nil | ||
} | ||
dstPath := varRunArgo + "/outputs/artifacts/" + srcPath + ".tgz" | ||
logger.Infof("%s -> %s", srcPath, dstPath) | ||
z := filepath.Dir(dstPath) | ||
if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------ | ||
return fmt.Errorf("failed to create directory %s: %w", z, err) | ||
} | ||
dst, err := os.Create(dstPath) | ||
if err != nil { | ||
return fmt.Errorf("failed to create destination %s: %w", dstPath, err) | ||
} | ||
defer func() { _ = dst.Close() }() | ||
if err = archive.TarGzToWriter(srcPath, gzip.DefaultCompression, dst); err != nil { | ||
return fmt.Errorf("failed to tarball the output %s to %s: %w", srcPath, dstPath, err) | ||
} | ||
if err = dst.Close(); err != nil { | ||
return fmt.Errorf("failed to close %s: %w", dstPath, err) | ||
} | ||
return nil | ||
} | ||
|
||
func saveParameter(srcPath string) error { | ||
if common.FindOverlappingVolume(template, srcPath) != nil { | ||
logger.Infof("no need to save parameter - on overlapping volume: %s", srcPath) | ||
return nil | ||
} | ||
src, err := os.Open(srcPath) | ||
if os.IsNotExist(err) { // might be optional, so we ignore | ||
logger.WithError(err).Errorf("cannot save parameter %s", srcPath) | ||
return nil | ||
} | ||
if err != nil { | ||
return fmt.Errorf("failed to open %s: %w", srcPath, err) | ||
} | ||
defer func() { _ = src.Close() }() | ||
dstPath := varRunArgo + "/outputs/parameters/" + srcPath | ||
logger.Infof("%s -> %s", srcPath, dstPath) | ||
z := filepath.Dir(dstPath) | ||
if err := os.MkdirAll(z, 0700); err != nil { // chmod rwx------ | ||
return fmt.Errorf("failed to create directory %s: %w", z, err) | ||
} | ||
dst, err := os.Create(dstPath) | ||
if err != nil { | ||
return fmt.Errorf("failed to create %s: %w", srcPath, err) | ||
} | ||
defer func() { _ = dst.Close() }() | ||
if _, err = io.Copy(dst, src); err != nil { | ||
return fmt.Errorf("failed to copy %s to %s: %w", srcPath, dstPath, err) | ||
} | ||
if err = dst.Close(); err != nil { | ||
return fmt.Errorf("failed to close %s: %w", dstPath, err) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
package commands | ||
|
||
import ( | ||
"io/ioutil" | ||
"os" | ||
"os/exec" | ||
"path/filepath" | ||
"strconv" | ||
"sync" | ||
"syscall" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestEmissary(t *testing.T) { | ||
tmp, err := ioutil.TempDir("", "") | ||
assert.NoError(t, err) | ||
|
||
varRunArgo = tmp | ||
includeScriptOutput = true | ||
|
||
wd, err := os.Getwd() | ||
assert.NoError(t, err) | ||
|
||
x := filepath.Join(wd, "../../../dist/argosay") | ||
|
||
err = ioutil.WriteFile(varRunArgo+"/template", []byte(`{}`), 0600) | ||
assert.NoError(t, err) | ||
|
||
t.Run("Exit0", func(t *testing.T) { | ||
err := run(x, []string{"exit"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "0", string(data)) | ||
}) | ||
t.Run("Exit1", func(t *testing.T) { | ||
err := run(x, []string{"exit", "1"}) | ||
assert.Equal(t, 1, err.(*exec.ExitError).ExitCode()) | ||
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "1", string(data)) | ||
}) | ||
t.Run("Stdout", func(t *testing.T) { | ||
err := run(x, []string{"echo", "hello", "/dev/stdout"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/stdout") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "hello", string(data)) | ||
}) | ||
t.Run("Stderr", func(t *testing.T) { | ||
err := run(x, []string{"echo", "hello", "/dev/stderr"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/stderr") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "hello", string(data)) | ||
}) | ||
t.Run("Signal", func(t *testing.T) { | ||
for signal, message := range map[syscall.Signal]string{ | ||
syscall.SIGTERM: "terminated", | ||
syscall.SIGKILL: "killed", | ||
} { | ||
err := ioutil.WriteFile(varRunArgo+"/ctr/main/signal", []byte(strconv.Itoa(int(signal))), 0600) | ||
assert.NoError(t, err) | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
err := run(x, []string{"sleep", "5s"}) | ||
assert.EqualError(t, err, "signal: "+message) | ||
}() | ||
time.Sleep(time.Second) | ||
} | ||
}) | ||
t.Run("Artifact", func(t *testing.T) { | ||
err = ioutil.WriteFile(varRunArgo+"/template", []byte(` | ||
{ | ||
"outputs": { | ||
"artifacts": [ | ||
{"path": "/tmp/artifact"} | ||
] | ||
} | ||
} | ||
`), 0600) | ||
assert.NoError(t, err) | ||
err := run(x, []string{"echo", "hello", "/tmp/artifact"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz") | ||
assert.NoError(t, err) | ||
assert.NotEmpty(t, string(data)) // data is tgz format | ||
}) | ||
t.Run("Parameter", func(t *testing.T) { | ||
err = ioutil.WriteFile(varRunArgo+"/template", []byte(` | ||
{ | ||
"outputs": { | ||
"parameters": [ | ||
{ | ||
"valueFrom": {"path": "/tmp/parameter"} | ||
} | ||
] | ||
} | ||
} | ||
`), 0600) | ||
assert.NoError(t, err) | ||
err := run(x, []string{"echo", "hello", "/tmp/parameter"}) | ||
assert.NoError(t, err) | ||
data, err := ioutil.ReadFile(varRunArgo + "/outputs/parameters/tmp/parameter") | ||
assert.NoError(t, err) | ||
assert.Equal(t, "hello", string(data)) | ||
}) | ||
} | ||
|
||
func run(name string, args []string) error { | ||
cmd := NewEmissaryCommand() | ||
containerName = "main" | ||
return cmd.RunE(cmd, append([]string{name}, args...)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.