From 12771f4c4719cec111391cbe541d1a85e0e9c333 Mon Sep 17 00:00:00 2001 From: Louis Garman Date: Tue, 5 Nov 2024 21:23:31 +0000 Subject: [PATCH] wip --- internal/runner/allocator.go | 26 ++++---- internal/runner/api.go | 4 +- internal/runner/cli.go | 4 +- internal/runner/client.go | 4 +- internal/runner/db.go | 63 +++++++++---------- internal/runner/job.go | 2 +- internal/runner/operation.go | 2 +- internal/runner/service.go | 11 ++-- internal/runner/tokens.go | 6 +- .../003_add_ids_to_jobs_and_log_chunks.sql | 22 +++++++ internal/sql/queries/job.sql | 10 ++- internal/sql/sqlc/job.sql.go | 39 ++++++++---- 12 files changed, 117 insertions(+), 76 deletions(-) diff --git a/internal/runner/allocator.go b/internal/runner/allocator.go index cfb6cb183..ffa1a056b 100644 --- a/internal/runner/allocator.go +++ b/internal/runner/allocator.go @@ -22,9 +22,9 @@ type allocator struct { // allocating jobs to runners. client allocatorClient // runners to allocate jobs to, keyed by runner ID - runners map[string]*RunnerMeta + runners map[resource.ID]*RunnerMeta // jobs awaiting allocation to an runner, keyed by job ID - jobs map[JobSpec]*Job + jobs map[resource.ID]*Job } type allocatorClient interface { @@ -34,8 +34,8 @@ type allocatorClient interface { listRunners(ctx context.Context) ([]*RunnerMeta, error) listJobs(ctx context.Context) ([]*Job, error) - allocateJob(ctx context.Context, spec JobSpec, runnerID resource.ID) (*Job, error) - reallocateJob(ctx context.Context, spec JobSpec, runnerID resource.ID) (*Job, error) + allocateJob(ctx context.Context, jobID, runnerID resource.ID) (*Job, error) + reallocateJob(ctx context.Context, jobID, runnerID resource.ID) (*Job, error) } // Start the allocator. Should be invoked in a go routine. @@ -78,9 +78,9 @@ func (a *allocator) Start(ctx context.Context) error { } switch event.Type { case pubsub.DeletedEvent: - delete(a.jobs, event.Payload.Spec) + delete(a.jobs, event.Payload.ID) default: - a.jobs[event.Payload.Spec] = event.Payload + a.jobs[event.Payload.ID] = event.Payload } } if err := a.allocate(ctx); err != nil { @@ -90,13 +90,13 @@ func (a *allocator) Start(ctx context.Context) error { } func (a *allocator) seed(agents []*RunnerMeta, jobs []*Job) { - a.runners = make(map[string]*RunnerMeta, len(agents)) + a.runners = make(map[resource.ID]*RunnerMeta, len(agents)) for _, runner := range agents { a.runners[runner.ID] = runner } - a.jobs = make(map[JobSpec]*Job, len(jobs)) + a.jobs = make(map[resource.ID]*Job, len(jobs)) for _, job := range jobs { - a.jobs[job.Spec] = job + a.jobs[job.ID] = job } } @@ -124,7 +124,7 @@ func (a *allocator) allocate(ctx context.Context) error { case JobFinished, JobCanceled, JobErrored: // job has completed: remove and adjust number of current jobs // agents has - delete(a.jobs, job.Spec) + delete(a.jobs, job.ID) a.runners[*job.RunnerID].CurrentJobs-- continue default: @@ -177,18 +177,18 @@ func (a *allocator) allocate(ctx context.Context) error { ) if reallocate { from := *job.RunnerID - updatedJob, err = a.client.reallocateJob(ctx, job.Spec, runner.ID) + updatedJob, err = a.client.reallocateJob(ctx, job.ID, runner.ID) if err != nil { return err } a.runners[from].CurrentJobs-- } else { - updatedJob, err = a.client.allocateJob(ctx, job.Spec, runner.ID) + updatedJob, err = a.client.allocateJob(ctx, job.ID, runner.ID) if err != nil { return err } } - a.jobs[job.Spec] = updatedJob + a.jobs[job.ID] = updatedJob a.runners[runner.ID].CurrentJobs++ } return nil diff --git a/internal/runner/api.go b/internal/runner/api.go index bf257764f..bc3b5a3b0 100644 --- a/internal/runner/api.go +++ b/internal/runner/api.go @@ -69,7 +69,7 @@ func (a *api) registerAgent(w http.ResponseWriter, r *http.Request) { func (a *api) getJobs(w http.ResponseWriter, r *http.Request) { // retrieve runner, which contains ID of calling agent runner, err := runnerFromContext(r.Context()) - if err != nil || runner.ID == "" { + if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) return } @@ -87,7 +87,7 @@ func (a *api) getJobs(w http.ResponseWriter, r *http.Request) { func (a *api) updateAgentStatus(w http.ResponseWriter, r *http.Request) { // retrieve runner, which contains ID of calling agent runner, err := runnerFromContext(r.Context()) - if err != nil || runner.ID == "" { + if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) return } diff --git a/internal/runner/cli.go b/internal/runner/cli.go index 1009b616a..f8c3e469d 100644 --- a/internal/runner/cli.go +++ b/internal/runner/cli.go @@ -52,7 +52,7 @@ func (a *agentCLI) agentTokenCommand() *cobra.Command { func (a *agentCLI) agentTokenNewCommand() *cobra.Command { var ( - poolID resource.ID + poolID string opts = CreateAgentTokenOptions{} ) cmd := &cobra.Command{ @@ -61,7 +61,7 @@ func (a *agentCLI) agentTokenNewCommand() *cobra.Command { SilenceUsage: true, SilenceErrors: true, RunE: func(cmd *cobra.Command, args []string) error { - _, token, err := a.CreateAgentToken(cmd.Context(), poolID, opts) + _, token, err := a.CreateAgentToken(cmd.Context(), resource.ParseID(poolID), opts) if err != nil { return err } diff --git a/internal/runner/client.go b/internal/runner/client.go index 03924d69b..98b082462 100644 --- a/internal/runner/client.go +++ b/internal/runner/client.go @@ -27,7 +27,7 @@ type remoteClient struct { *otfapi.Client // agentID is the ID of the agent using the client - agentID *string + agentID *resource.ID } // newRequest constructs a new API request @@ -37,7 +37,7 @@ func (c *remoteClient) newRequest(method, path string, v interface{}) (*retryabl return nil, err } if c.agentID != nil { - req.Header.Add(runnerIDHeader, *c.agentID) + req.Header.Add(runnerIDHeader, c.agentID.String()) } return req, err } diff --git a/internal/runner/db.go b/internal/runner/db.go index 714540cd7..c95e36e65 100644 --- a/internal/runner/db.go +++ b/internal/runner/db.go @@ -33,7 +33,7 @@ type runnerMetaResult struct { func (r runnerMetaResult) toRunnerMeta() *RunnerMeta { meta := &RunnerMeta{ - ID: r.RunnerID.String, + ID: resource.ParseID(r.RunnerID.String), Name: r.Name.String, Version: r.Version.String, MaxJobs: int(r.MaxJobs.Int32), @@ -45,7 +45,7 @@ func (r runnerMetaResult) toRunnerMeta() *RunnerMeta { } if r.AgentPool != nil { meta.AgentPool = &RunnerMetaAgentPool{ - ID: r.AgentPool.AgentPoolID.String, + ID: resource.ParseID(r.AgentPool.AgentPoolID.String), Name: r.AgentPool.Name.String, OrganizationName: r.AgentPool.OrganizationName.String, } @@ -158,31 +158,32 @@ func (db *db) deleteRunner(ctx context.Context, agentID resource.ID) error { // jobs type jobResult struct { - RunID pgtype.Text `json:"run_id"` - Phase pgtype.Text `json:"phase"` - Status pgtype.Text `json:"status"` - Signaled pgtype.Bool `json:"signaled"` - RunnerID pgtype.Text `json:"agent_id"` - AgentPoolID pgtype.Text `json:"agent_pool_id"` - WorkspaceID pgtype.Text `json:"workspace_id"` - OrganizationName pgtype.Text `json:"organization_name"` + JobID pgtype.Text + RunID pgtype.Text + Phase pgtype.Text + Status pgtype.Text + Signaled pgtype.Bool + RunnerID pgtype.Text + AgentPoolID pgtype.Text + WorkspaceID pgtype.Text + OrganizationName pgtype.Text } func (r jobResult) toJob() *Job { job := &Job{ - Spec: JobSpec{ - RunID: r.RunID.String, - Phase: internal.PhaseType(r.Phase.String), - }, + RunID: resource.ParseID(r.RunID.String), + Phase: internal.PhaseType(r.Phase.String), Status: JobStatus(r.Status.String), - WorkspaceID: r.WorkspaceID.String, + WorkspaceID: resource.ParseID(r.WorkspaceID.String), Organization: r.OrganizationName.String, } if r.RunnerID.Valid { - job.RunnerID = &r.RunnerID.String + runnerID := resource.ParseID(r.RunnerID.String) + job.RunnerID = &runnerID } if r.AgentPoolID.Valid { - job.AgentPoolID = &r.AgentPoolID.String + agentPoolID := resource.ParseID(r.AgentPoolID.String) + job.AgentPoolID = &agentPoolID } if r.Signaled.Valid { job.Signaled = &r.Signaled.Bool @@ -192,19 +193,20 @@ func (r jobResult) toJob() *Job { func (db *db) createJob(ctx context.Context, job *Job) error { err := db.Querier(ctx).InsertJob(ctx, sqlc.InsertJobParams{ - RunID: sql.ID(job.Spec.RunID), - Phase: sql.String(string(job.Spec.Phase)), + JobID: sql.ID(job.ID), + RunID: sql.ID(job.RunID), + Phase: sql.String(string(job.Phase)), Status: sql.String(string(job.Status)), }) return sql.Error(err) } -func (db *db) getAllocatedAndSignaledJobs(ctx context.Context, agentID resource.ID) ([]*Job, error) { - allocated, err := db.Querier(ctx).FindAllocatedJobs(ctx, sql.ID(agentID)) +func (db *db) getAllocatedAndSignaledJobs(ctx context.Context, runnerID resource.ID) ([]*Job, error) { + allocated, err := db.Querier(ctx).FindAllocatedJobs(ctx, sql.ID(runnerID)) if err != nil { return nil, sql.Error(err) } - signaled, err := db.Querier(ctx).FindAndUpdateSignaledJobs(ctx, sql.ID(agentID)) + signaled, err := db.Querier(ctx).FindAndUpdateSignaledJobs(ctx, sql.ID(runnerID)) if err != nil { return nil, sql.Error(err) } @@ -218,11 +220,8 @@ func (db *db) getAllocatedAndSignaledJobs(ctx context.Context, agentID resource. return jobs, nil } -func (db *db) getJob(ctx context.Context, spec JobSpec) (*Job, error) { - result, err := db.Querier(ctx).FindJob(ctx, sqlc.FindJobParams{ - RunID: sql.ID(spec.RunID), - Phase: sql.String(string(spec.Phase)), - }) +func (db *db) getJob(ctx context.Context, jobID resource.ID) (*Job, error) { + result, err := db.Querier(ctx).FindJob(ctx, sql.ID(jobID)) if err != nil { return nil, sql.Error(err) } @@ -258,7 +257,7 @@ func (db *db) updateJob(ctx context.Context, spec JobSpec, fn func(*Job) error) _, err = q.UpdateJob(ctx, sqlc.UpdateJobParams{ Status: sql.String(string(job.Status)), Signaled: sql.BoolPtr(job.Signaled), - RunnerID: sql.StringPtr(job.RunnerID), + RunnerID: sql.IDPtr(job.RunnerID), RunID: result.RunID, Phase: result.Phase, }) @@ -284,10 +283,10 @@ type agentTokenRow struct { func (row agentTokenRow) toAgentToken() *agentToken { return &agentToken{ - ID: row.AgentTokenID.String, + ID: resource.ParseID(row.AgentTokenID.String), CreatedAt: row.CreatedAt.Time.UTC(), Description: row.Description.String, - AgentPoolID: row.AgentPoolID.String, + AgentPoolID: resource.ParseID(row.AgentPoolID.String), } } @@ -342,7 +341,7 @@ type poolresult struct { func (r poolresult) toPool() *Pool { return &Pool{ - ID: r.AgentPoolID.String, + ID: resource.ParseID(r.AgentPoolID.String), Name: r.Name.String, CreatedAt: r.CreatedAt.Time.UTC(), Organization: r.OrganizationName.String, @@ -436,7 +435,7 @@ func (db *db) listPoolsByOrganization(ctx context.Context, organization string, OrganizationName: sql.String(organization), NameSubstring: sql.StringPtr(opts.NameSubstring), AllowedWorkspaceName: sql.StringPtr(opts.AllowedWorkspaceName), - AllowedWorkspaceID: sql.StringPtr(opts.AllowedWorkspaceID), + AllowedWorkspaceID: sql.IDPtr(opts.AllowedWorkspaceID), }) if err != nil { return nil, sql.Error(err) diff --git a/internal/runner/job.go b/internal/runner/job.go index 216d0f2a5..fc345988b 100644 --- a/internal/runner/job.go +++ b/internal/runner/job.go @@ -51,7 +51,7 @@ type Job struct { // ID of agent pool the job's workspace is assigned to use. If non-nil then // the job is allocated to an agent runner belonging to the pool. If nil then // the job is allocated to a server runner. - AgentPoolID *string `jsonapi:"attribute" json:"agent_pool_id"` + AgentPoolID *resource.ID `jsonapi:"attribute" json:"agent_pool_id"` // Name of job's organization Organization string `jsonapi:"attribute" json:"organization"` // ID of job's workspace diff --git a/internal/runner/operation.go b/internal/runner/operation.go index 9422b0553..71d6c4a26 100644 --- a/internal/runner/operation.go +++ b/internal/runner/operation.go @@ -136,7 +136,7 @@ type ( } logsClient interface { - PutChunk(ctx context.Context, opts internal.PutChunkOptions) error + PutChunk(ctx context.Context, opts logs.PutChunkOptions) error } hostnameClient interface { diff --git a/internal/runner/service.go b/internal/runner/service.go index 2ddf2a06c..fe02f647a 100644 --- a/internal/runner/service.go +++ b/internal/runner/service.go @@ -117,15 +117,12 @@ func NewService(opts ServiceOptions) *Service { opts.Logger, opts.Listener, "jobs", - func(ctx context.Context, specStr string, action sql.Action) (*Job, error) { - spec, err := jobSpecFromString(specStr) - if err != nil { - return nil, err - } + JobKind, + func(ctx context.Context, id resource.ID, action sql.Action) (*Job, error) { if action == sql.DeleteAction { - return &Job{Spec: spec}, nil + return &Job{ID: id}, nil } - return svc.db.getJob(ctx, spec) + return svc.db.getJob(ctx, id) }, ) // Register with auth middleware the agent token kind and a means of diff --git a/internal/runner/tokens.go b/internal/runner/tokens.go index d7fb0775d..326b39911 100644 --- a/internal/runner/tokens.go +++ b/internal/runner/tokens.go @@ -12,7 +12,7 @@ import ( const ( AgentTokenKind resource.Kind = "at" - JobTokenKind tokens.Kind = "job_token" + JobTokenKind resource.Kind = "jt" defaultJobTokenExpiry = 60 * time.Minute ) @@ -35,8 +35,8 @@ type ( func (a *agentToken) LogValue() slog.Value { attrs := []slog.Attr{ - slog.String("id", a.ID), - slog.String("agent_pool_id", string(a.AgentPoolID)), + slog.String("id", a.ID.String()), + slog.String("agent_pool_id", string(a.AgentPoolID.String())), slog.String("description", a.Description), } return slog.GroupValue(attrs...) diff --git a/internal/sql/migrations/003_add_ids_to_jobs_and_log_chunks.sql b/internal/sql/migrations/003_add_ids_to_jobs_and_log_chunks.sql index b49b8340b..204d1e226 100644 --- a/internal/sql/migrations/003_add_ids_to_jobs_and_log_chunks.sql +++ b/internal/sql/migrations/003_add_ids_to_jobs_and_log_chunks.sql @@ -3,6 +3,28 @@ ALTER TABLE jobs ADD COLUMN job_id TEXT; UPDATE jobs SET job_id = 'job-' || substr(md5(random()::text), 0, 17); ALTER TABLE jobs ADD PRIMARY KEY (job_id); +-- replace job event function to instead provide job_id in payload +CREATE OR REPLACE FUNCTION public.jobs_notify_event() RETURNS trigger + LANGUAGE plpgsql + AS $$ +DECLARE + record RECORD; + notification JSON; +BEGIN + IF (TG_OP = 'DELETE') THEN + record = OLD; + ELSE + record = NEW; + END IF; + notification = json_build_object( + 'table',TG_TABLE_NAME, + 'action', TG_OP, + 'id', record.job_id); + PERFORM pg_notify('events', notification::text); + RETURN NULL; +END; +$$; + -- Add job_id primary key to jobs table and populate with random identifiers. ALTER TABLE logs DROP chunk_id; ALTER TABLE logs ADD chunk_id TEXT; diff --git a/internal/sql/queries/job.sql b/internal/sql/queries/job.sql index 89093dbab..945b21396 100644 --- a/internal/sql/queries/job.sql +++ b/internal/sql/queries/job.sql @@ -1,9 +1,11 @@ -- name: InsertJob :exec INSERT INTO jobs ( + job_id, run_id, phase, status ) VALUES ( + sqlc.arg('job_id'), sqlc.arg('run_id'), sqlc.arg('phase'), sqlc.arg('status') @@ -11,6 +13,7 @@ INSERT INTO jobs ( -- name: FindJobs :many SELECT + j.job_id, j.run_id, j.phase, j.status, @@ -26,6 +29,7 @@ JOIN workspaces w USING (workspace_id) -- name: FindJob :one SELECT + j.job_id, j.run_id, j.phase, j.status, @@ -37,12 +41,12 @@ SELECT FROM jobs j JOIN runs r USING (run_id) JOIN workspaces w USING (workspace_id) -WHERE j.run_id = sqlc.arg('run_id') -AND phase = sqlc.arg('phase') +WHERE j.job_id = sqlc.arg('job_id') ; -- name: FindJobForUpdate :one SELECT + j.job_id, j.run_id, j.phase, j.status, @@ -61,6 +65,7 @@ FOR UPDATE OF j -- name: FindAllocatedJobs :many SELECT + j.job_id, j.run_id, j.phase, j.status, @@ -87,6 +92,7 @@ AND j.runner_id = sqlc.arg('runner_id') AND j.status = 'running' AND j.signaled IS NOT NULL RETURNING + j.job_id, j.run_id, j.phase, j.status, diff --git a/internal/sql/sqlc/job.sql.go b/internal/sql/sqlc/job.sql.go index 5cc6b74af..c1bd08396 100644 --- a/internal/sql/sqlc/job.sql.go +++ b/internal/sql/sqlc/job.sql.go @@ -13,6 +13,7 @@ import ( const findAllocatedJobs = `-- name: FindAllocatedJobs :many SELECT + j.job_id, j.run_id, j.phase, j.status, @@ -29,6 +30,7 @@ AND j.status = 'allocated' ` type FindAllocatedJobsRow struct { + JobID pgtype.Text RunID pgtype.Text Phase pgtype.Text Status pgtype.Text @@ -49,6 +51,7 @@ func (q *Queries) FindAllocatedJobs(ctx context.Context, runnerID pgtype.Text) ( for rows.Next() { var i FindAllocatedJobsRow if err := rows.Scan( + &i.JobID, &i.RunID, &i.Phase, &i.Status, @@ -78,6 +81,7 @@ AND j.runner_id = $1 AND j.status = 'running' AND j.signaled IS NOT NULL RETURNING + j.job_id, j.run_id, j.phase, j.status, @@ -89,6 +93,7 @@ RETURNING ` type FindAndUpdateSignaledJobsRow struct { + JobID pgtype.Text RunID pgtype.Text Phase pgtype.Text Status pgtype.Text @@ -110,6 +115,7 @@ func (q *Queries) FindAndUpdateSignaledJobs(ctx context.Context, runnerID pgtype for rows.Next() { var i FindAndUpdateSignaledJobsRow if err := rows.Scan( + &i.JobID, &i.RunID, &i.Phase, &i.Status, @@ -131,6 +137,7 @@ func (q *Queries) FindAndUpdateSignaledJobs(ctx context.Context, runnerID pgtype const findJob = `-- name: FindJob :one SELECT + j.job_id, j.run_id, j.phase, j.status, @@ -142,16 +149,11 @@ SELECT FROM jobs j JOIN runs r USING (run_id) JOIN workspaces w USING (workspace_id) -WHERE j.run_id = $1 -AND phase = $2 +WHERE j.job_id = $1 ` -type FindJobParams struct { - RunID pgtype.Text - Phase pgtype.Text -} - type FindJobRow struct { + JobID pgtype.Text RunID pgtype.Text Phase pgtype.Text Status pgtype.Text @@ -162,10 +164,11 @@ type FindJobRow struct { OrganizationName pgtype.Text } -func (q *Queries) FindJob(ctx context.Context, arg FindJobParams) (FindJobRow, error) { - row := q.db.QueryRow(ctx, findJob, arg.RunID, arg.Phase) +func (q *Queries) FindJob(ctx context.Context, jobID pgtype.Text) (FindJobRow, error) { + row := q.db.QueryRow(ctx, findJob, jobID) var i FindJobRow err := row.Scan( + &i.JobID, &i.RunID, &i.Phase, &i.Status, @@ -180,6 +183,7 @@ func (q *Queries) FindJob(ctx context.Context, arg FindJobParams) (FindJobRow, e const findJobForUpdate = `-- name: FindJobForUpdate :one SELECT + j.job_id, j.run_id, j.phase, j.status, @@ -202,6 +206,7 @@ type FindJobForUpdateParams struct { } type FindJobForUpdateRow struct { + JobID pgtype.Text RunID pgtype.Text Phase pgtype.Text Status pgtype.Text @@ -216,6 +221,7 @@ func (q *Queries) FindJobForUpdate(ctx context.Context, arg FindJobForUpdatePara row := q.db.QueryRow(ctx, findJobForUpdate, arg.RunID, arg.Phase) var i FindJobForUpdateRow err := row.Scan( + &i.JobID, &i.RunID, &i.Phase, &i.Status, @@ -230,6 +236,7 @@ func (q *Queries) FindJobForUpdate(ctx context.Context, arg FindJobForUpdatePara const findJobs = `-- name: FindJobs :many SELECT + j.job_id, j.run_id, j.phase, j.status, @@ -244,6 +251,7 @@ JOIN workspaces w USING (workspace_id) ` type FindJobsRow struct { + JobID pgtype.Text RunID pgtype.Text Phase pgtype.Text Status pgtype.Text @@ -264,6 +272,7 @@ func (q *Queries) FindJobs(ctx context.Context) ([]FindJobsRow, error) { for rows.Next() { var i FindJobsRow if err := rows.Scan( + &i.JobID, &i.RunID, &i.Phase, &i.Status, @@ -285,24 +294,32 @@ func (q *Queries) FindJobs(ctx context.Context) ([]FindJobsRow, error) { const insertJob = `-- name: InsertJob :exec INSERT INTO jobs ( + job_id, run_id, phase, status ) VALUES ( $1, $2, - $3 + $3, + $4 ) ` type InsertJobParams struct { + JobID pgtype.Text RunID pgtype.Text Phase pgtype.Text Status pgtype.Text } func (q *Queries) InsertJob(ctx context.Context, arg InsertJobParams) error { - _, err := q.db.Exec(ctx, insertJob, arg.RunID, arg.Phase, arg.Status) + _, err := q.db.Exec(ctx, insertJob, + arg.JobID, + arg.RunID, + arg.Phase, + arg.Status, + ) return err }