Skip to content

Commit

Permalink
add --ephemeral mode for persistent worker
Browse files Browse the repository at this point in the history
When passing `--ephemeral` to `cirrus worker run`, the worker will
accept one task and then exit the process once the task completed.

This can be used inside ephemeral VMs which should be shutdown after
each task. A user has to take care of cleaning up after the worker
has finished.
  • Loading branch information
0xB10C committed Nov 26, 2024
1 parent 09deaef commit 98c9305
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 5 deletions.
13 changes: 13 additions & 0 deletions PERSISTENT-WORKERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ Note that persistent worker's name should be unique within a pool.

Note that by default a persistent worker has the privileges of the user that invoked it. Read more [about isolation](#isolation) below to learn how to limit or extend persistent worker privileges.

### Ephemeral Mode

The worker can be started in ephemeral mode with:

```
cirrus worker run --token <poll registration token> --ephemeral
```

After having completed a single task, the worker process will exit. This can be used in ephemeral setups where the enviroment should be cleaned after each run.

Note that users of `--ephemeral` need to take care of cleaning the enviroment and caching build inputs themselves.

## Configuration

Path to the YAML configuration can be specified via the `--file` (or `-f` for short version) command-line flag.
Expand All @@ -38,6 +50,7 @@ Example configuration:
token: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855

name: "MacMini-Rack-1-Slot-2"
ephemeral: false

labels:
connected-device: iPhone12ProMax
Expand Down
13 changes: 11 additions & 2 deletions internal/commands/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
var ErrConfiguration = errors.New("configuration error")

type Config struct {
Name string `yaml:"name"`
Token string `yaml:"token"`
Name string `yaml:"name"`
Token string `yaml:"token"`
Ephemeral bool `yaml:"ephemeral"`

Labels map[string]string `yaml:"labels"`
Resources map[string]float64 `yaml:"resources"`
Expand Down Expand Up @@ -67,6 +68,7 @@ var (
labels map[string]string
resources map[string]string
rpcEndpoint string
ephemeral bool
)

func attachFlags(cmd *cobra.Command) {
Expand All @@ -88,13 +90,16 @@ func attachFlags(cmd *cobra.Command) {
"additional resources to use (e.g. --resources devices=2)")
cmd.PersistentFlags().StringVar(&rpcEndpoint, "rpc-endpoint", upstream.DefaultRPCEndpoint,
"RPC endpoint address")
cmd.PersistentFlags().BoolVar(&ephemeral, "ephemeral", false,
"run a single task and then exit the process")
}

func parseConfig(path string) (*Config, error) {
// Instantiate a default configuration
config := Config{
Name: name,
Token: token,
Ephemeral: ephemeral,
Labels: labels,
Resources: map[string]float64{},
RPC: ConfigRPC{
Expand Down Expand Up @@ -231,6 +236,10 @@ func buildWorker(output io.Writer) (*worker.Worker, error) {
opts = append(opts, worker.WithTartPrePull(config.TartPrePull))
}

if ephemeral {
opts = append(opts, worker.WithEphemeral(ephemeral))
}

// Instantiate worker
return worker.New(opts...)
}
9 changes: 9 additions & 0 deletions internal/commands/worker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,13 @@ func TestRestrictForceSoftnet(t *testing.T) {
require.NoError(t, err)

require.True(t, config.Security.AllowedIsolations.Tart.ForceSoftnet)
// ephemeral is not set, it should default to false
require.False(t, config.Ephemeral)
}

func TestEphemeral(t *testing.T) {
config, err := parseConfig(filepath.Join("testdata", "ephemeral.yml"))
require.NoError(t, err)

require.True(t, config.Ephemeral)
}
4 changes: 4 additions & 0 deletions internal/commands/worker/testdata/ephemeral.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
token: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855

name: "ephemeral-runner"
ephemeral: true
6 changes: 6 additions & 0 deletions internal/worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ func WithTartPrePull(tartPrePull []string) Option {
e.tartPrePull = tartPrePull
}
}

func WithEphemeral(ephemeral bool) Option {
return func(e *Worker) {
e.ephemeral = ephemeral
}
}
6 changes: 4 additions & 2 deletions internal/worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func (worker *Worker) startTask(
ctx context.Context,
upstream *upstreampkg.Upstream,
agentAwareTask *api.PollResponse_AgentAwareTask,
) {
taskID := agentAwareTask.TaskId
) (taskID string) {
taskID = agentAwareTask.TaskId
if taskID == "" {
taskID = fmt.Sprintf("%d", agentAwareTask.OldTaskId)
}
Expand Down Expand Up @@ -75,6 +75,8 @@ func (worker *Worker) startTask(
taskID, agentAwareTask.ClientSecret, agentAwareTask.ServerSecret)

worker.logger.Infof("started task %s", taskID)

return

Check failure on line 79 in internal/worker/task.go

View check run for this annotation

Cirrus CI / Lint

internal/worker/task.go#L79

naked return in func `startTask` with 46 lines of code (nakedret)
}

func (worker *Worker) getInstance(
Expand Down
23 changes: 22 additions & 1 deletion internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Worker struct {
standbyInstance abstract.Instance

tartPrePull []string

// In ephemeral mode, only run a single task and then exit
ephemeral bool
ephemeralTaskID string
}

func New(opts ...Option) (*Worker, error) {
Expand All @@ -73,6 +77,8 @@ func New(opts ...Option) (*Worker, error) {

logger: logrus.New(),
echelonLogger: echelon.NewLogger(echelon.TraceLevel, renderers.NewSimpleRenderer(os.Stdout, nil)),

ephemeralTaskID: "",
}

// Apply options
Expand Down Expand Up @@ -309,7 +315,15 @@ func (worker *Worker) pollSingleUpstream(ctx context.Context, upstream *upstream
}

for _, taskToStart := range response.TasksToStart {
worker.startTask(ctx, upstream, taskToStart)
// don't start a new task in ephemeral mode if one is running
if worker.ephemeral && worker.ephemeralTaskID != "" {
break
}
taskID := worker.startTask(ctx, upstream, taskToStart)
if worker.ephemeral {
worker.ephemeralTaskID = taskID
break
}
}

if response.Shutdown {
Expand All @@ -319,6 +333,13 @@ func (worker *Worker) pollSingleUpstream(ctx context.Context, upstream *upstream
return ErrShutdown
}

if worker.ephemeral && worker.ephemeralTaskID != "" {
if _, ok := worker.tasks.Load(worker.ephemeralTaskID); !ok {
worker.logger.Infof("In ephemeral mode: task %s completed, terminating...", worker.ephemeralTaskID)
return ErrShutdown
}
}

return nil
}

Expand Down

0 comments on commit 98c9305

Please sign in to comment.