Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 committed Nov 5, 2024
1 parent b5ec779 commit 12771f4
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 76 deletions.
26 changes: 13 additions & 13 deletions internal/runner/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type allocator struct {
// allocating jobs to runners.
client allocatorClient
// runners to allocate jobs to, keyed by runner ID
runners map[string]*RunnerMeta
runners map[resource.ID]*RunnerMeta
// jobs awaiting allocation to an runner, keyed by job ID
jobs map[JobSpec]*Job
jobs map[resource.ID]*Job
}

type allocatorClient interface {
Expand All @@ -34,8 +34,8 @@ type allocatorClient interface {
listRunners(ctx context.Context) ([]*RunnerMeta, error)
listJobs(ctx context.Context) ([]*Job, error)

allocateJob(ctx context.Context, spec JobSpec, runnerID resource.ID) (*Job, error)
reallocateJob(ctx context.Context, spec JobSpec, runnerID resource.ID) (*Job, error)
allocateJob(ctx context.Context, jobID, runnerID resource.ID) (*Job, error)
reallocateJob(ctx context.Context, jobID, runnerID resource.ID) (*Job, error)
}

// Start the allocator. Should be invoked in a go routine.
Expand Down Expand Up @@ -78,9 +78,9 @@ func (a *allocator) Start(ctx context.Context) error {
}
switch event.Type {
case pubsub.DeletedEvent:
delete(a.jobs, event.Payload.Spec)
delete(a.jobs, event.Payload.ID)
default:
a.jobs[event.Payload.Spec] = event.Payload
a.jobs[event.Payload.ID] = event.Payload
}
}
if err := a.allocate(ctx); err != nil {
Expand All @@ -90,13 +90,13 @@ func (a *allocator) Start(ctx context.Context) error {
}

func (a *allocator) seed(agents []*RunnerMeta, jobs []*Job) {
a.runners = make(map[string]*RunnerMeta, len(agents))
a.runners = make(map[resource.ID]*RunnerMeta, len(agents))
for _, runner := range agents {
a.runners[runner.ID] = runner
}
a.jobs = make(map[JobSpec]*Job, len(jobs))
a.jobs = make(map[resource.ID]*Job, len(jobs))
for _, job := range jobs {
a.jobs[job.Spec] = job
a.jobs[job.ID] = job
}
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (a *allocator) allocate(ctx context.Context) error {
case JobFinished, JobCanceled, JobErrored:
// job has completed: remove and adjust number of current jobs
// agents has
delete(a.jobs, job.Spec)
delete(a.jobs, job.ID)
a.runners[*job.RunnerID].CurrentJobs--
continue
default:
Expand Down Expand Up @@ -177,18 +177,18 @@ func (a *allocator) allocate(ctx context.Context) error {
)
if reallocate {
from := *job.RunnerID
updatedJob, err = a.client.reallocateJob(ctx, job.Spec, runner.ID)
updatedJob, err = a.client.reallocateJob(ctx, job.ID, runner.ID)
if err != nil {
return err
}
a.runners[from].CurrentJobs--
} else {
updatedJob, err = a.client.allocateJob(ctx, job.Spec, runner.ID)
updatedJob, err = a.client.allocateJob(ctx, job.ID, runner.ID)
if err != nil {
return err
}
}
a.jobs[job.Spec] = updatedJob
a.jobs[job.ID] = updatedJob
a.runners[runner.ID].CurrentJobs++
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/runner/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (a *api) registerAgent(w http.ResponseWriter, r *http.Request) {
func (a *api) getJobs(w http.ResponseWriter, r *http.Request) {
// retrieve runner, which contains ID of calling agent
runner, err := runnerFromContext(r.Context())
if err != nil || runner.ID == "" {
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
Expand All @@ -87,7 +87,7 @@ func (a *api) getJobs(w http.ResponseWriter, r *http.Request) {
func (a *api) updateAgentStatus(w http.ResponseWriter, r *http.Request) {
// retrieve runner, which contains ID of calling agent
runner, err := runnerFromContext(r.Context())
if err != nil || runner.ID == "" {
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
Expand Down
4 changes: 2 additions & 2 deletions internal/runner/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (a *agentCLI) agentTokenCommand() *cobra.Command {

func (a *agentCLI) agentTokenNewCommand() *cobra.Command {
var (
poolID resource.ID
poolID string
opts = CreateAgentTokenOptions{}
)
cmd := &cobra.Command{
Expand All @@ -61,7 +61,7 @@ func (a *agentCLI) agentTokenNewCommand() *cobra.Command {
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
_, token, err := a.CreateAgentToken(cmd.Context(), poolID, opts)
_, token, err := a.CreateAgentToken(cmd.Context(), resource.ParseID(poolID), opts)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/runner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type remoteClient struct {
*otfapi.Client

// agentID is the ID of the agent using the client
agentID *string
agentID *resource.ID
}

// newRequest constructs a new API request
Expand All @@ -37,7 +37,7 @@ func (c *remoteClient) newRequest(method, path string, v interface{}) (*retryabl
return nil, err
}
if c.agentID != nil {
req.Header.Add(runnerIDHeader, *c.agentID)
req.Header.Add(runnerIDHeader, c.agentID.String())
}
return req, err
}
Expand Down
63 changes: 31 additions & 32 deletions internal/runner/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type runnerMetaResult struct {

func (r runnerMetaResult) toRunnerMeta() *RunnerMeta {
meta := &RunnerMeta{
ID: r.RunnerID.String,
ID: resource.ParseID(r.RunnerID.String),
Name: r.Name.String,
Version: r.Version.String,
MaxJobs: int(r.MaxJobs.Int32),
Expand All @@ -45,7 +45,7 @@ func (r runnerMetaResult) toRunnerMeta() *RunnerMeta {
}
if r.AgentPool != nil {
meta.AgentPool = &RunnerMetaAgentPool{
ID: r.AgentPool.AgentPoolID.String,
ID: resource.ParseID(r.AgentPool.AgentPoolID.String),
Name: r.AgentPool.Name.String,
OrganizationName: r.AgentPool.OrganizationName.String,
}
Expand Down Expand Up @@ -158,31 +158,32 @@ func (db *db) deleteRunner(ctx context.Context, agentID resource.ID) error {
// jobs

type jobResult struct {
RunID pgtype.Text `json:"run_id"`
Phase pgtype.Text `json:"phase"`
Status pgtype.Text `json:"status"`
Signaled pgtype.Bool `json:"signaled"`
RunnerID pgtype.Text `json:"agent_id"`
AgentPoolID pgtype.Text `json:"agent_pool_id"`
WorkspaceID pgtype.Text `json:"workspace_id"`
OrganizationName pgtype.Text `json:"organization_name"`
JobID pgtype.Text
RunID pgtype.Text
Phase pgtype.Text
Status pgtype.Text
Signaled pgtype.Bool
RunnerID pgtype.Text
AgentPoolID pgtype.Text
WorkspaceID pgtype.Text
OrganizationName pgtype.Text
}

func (r jobResult) toJob() *Job {
job := &Job{
Spec: JobSpec{
RunID: r.RunID.String,
Phase: internal.PhaseType(r.Phase.String),
},
RunID: resource.ParseID(r.RunID.String),
Phase: internal.PhaseType(r.Phase.String),
Status: JobStatus(r.Status.String),
WorkspaceID: r.WorkspaceID.String,
WorkspaceID: resource.ParseID(r.WorkspaceID.String),
Organization: r.OrganizationName.String,
}
if r.RunnerID.Valid {
job.RunnerID = &r.RunnerID.String
runnerID := resource.ParseID(r.RunnerID.String)
job.RunnerID = &runnerID
}
if r.AgentPoolID.Valid {
job.AgentPoolID = &r.AgentPoolID.String
agentPoolID := resource.ParseID(r.AgentPoolID.String)
job.AgentPoolID = &agentPoolID
}
if r.Signaled.Valid {
job.Signaled = &r.Signaled.Bool
Expand All @@ -192,19 +193,20 @@ func (r jobResult) toJob() *Job {

func (db *db) createJob(ctx context.Context, job *Job) error {
err := db.Querier(ctx).InsertJob(ctx, sqlc.InsertJobParams{
RunID: sql.ID(job.Spec.RunID),
Phase: sql.String(string(job.Spec.Phase)),
JobID: sql.ID(job.ID),
RunID: sql.ID(job.RunID),
Phase: sql.String(string(job.Phase)),
Status: sql.String(string(job.Status)),
})
return sql.Error(err)
}

func (db *db) getAllocatedAndSignaledJobs(ctx context.Context, agentID resource.ID) ([]*Job, error) {
allocated, err := db.Querier(ctx).FindAllocatedJobs(ctx, sql.ID(agentID))
func (db *db) getAllocatedAndSignaledJobs(ctx context.Context, runnerID resource.ID) ([]*Job, error) {
allocated, err := db.Querier(ctx).FindAllocatedJobs(ctx, sql.ID(runnerID))
if err != nil {
return nil, sql.Error(err)
}
signaled, err := db.Querier(ctx).FindAndUpdateSignaledJobs(ctx, sql.ID(agentID))
signaled, err := db.Querier(ctx).FindAndUpdateSignaledJobs(ctx, sql.ID(runnerID))
if err != nil {
return nil, sql.Error(err)
}
Expand All @@ -218,11 +220,8 @@ func (db *db) getAllocatedAndSignaledJobs(ctx context.Context, agentID resource.
return jobs, nil
}

func (db *db) getJob(ctx context.Context, spec JobSpec) (*Job, error) {
result, err := db.Querier(ctx).FindJob(ctx, sqlc.FindJobParams{
RunID: sql.ID(spec.RunID),
Phase: sql.String(string(spec.Phase)),
})
func (db *db) getJob(ctx context.Context, jobID resource.ID) (*Job, error) {
result, err := db.Querier(ctx).FindJob(ctx, sql.ID(jobID))
if err != nil {
return nil, sql.Error(err)
}
Expand Down Expand Up @@ -258,7 +257,7 @@ func (db *db) updateJob(ctx context.Context, spec JobSpec, fn func(*Job) error)
_, err = q.UpdateJob(ctx, sqlc.UpdateJobParams{
Status: sql.String(string(job.Status)),
Signaled: sql.BoolPtr(job.Signaled),
RunnerID: sql.StringPtr(job.RunnerID),
RunnerID: sql.IDPtr(job.RunnerID),
RunID: result.RunID,
Phase: result.Phase,
})
Expand All @@ -284,10 +283,10 @@ type agentTokenRow struct {

func (row agentTokenRow) toAgentToken() *agentToken {
return &agentToken{
ID: row.AgentTokenID.String,
ID: resource.ParseID(row.AgentTokenID.String),
CreatedAt: row.CreatedAt.Time.UTC(),
Description: row.Description.String,
AgentPoolID: row.AgentPoolID.String,
AgentPoolID: resource.ParseID(row.AgentPoolID.String),
}
}

Expand Down Expand Up @@ -342,7 +341,7 @@ type poolresult struct {

func (r poolresult) toPool() *Pool {
return &Pool{
ID: r.AgentPoolID.String,
ID: resource.ParseID(r.AgentPoolID.String),
Name: r.Name.String,
CreatedAt: r.CreatedAt.Time.UTC(),
Organization: r.OrganizationName.String,
Expand Down Expand Up @@ -436,7 +435,7 @@ func (db *db) listPoolsByOrganization(ctx context.Context, organization string,
OrganizationName: sql.String(organization),
NameSubstring: sql.StringPtr(opts.NameSubstring),
AllowedWorkspaceName: sql.StringPtr(opts.AllowedWorkspaceName),
AllowedWorkspaceID: sql.StringPtr(opts.AllowedWorkspaceID),
AllowedWorkspaceID: sql.IDPtr(opts.AllowedWorkspaceID),
})
if err != nil {
return nil, sql.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/runner/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Job struct {
// ID of agent pool the job's workspace is assigned to use. If non-nil then
// the job is allocated to an agent runner belonging to the pool. If nil then
// the job is allocated to a server runner.
AgentPoolID *string `jsonapi:"attribute" json:"agent_pool_id"`
AgentPoolID *resource.ID `jsonapi:"attribute" json:"agent_pool_id"`
// Name of job's organization
Organization string `jsonapi:"attribute" json:"organization"`
// ID of job's workspace
Expand Down
2 changes: 1 addition & 1 deletion internal/runner/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type (
}

logsClient interface {
PutChunk(ctx context.Context, opts internal.PutChunkOptions) error
PutChunk(ctx context.Context, opts logs.PutChunkOptions) error
}

hostnameClient interface {
Expand Down
11 changes: 4 additions & 7 deletions internal/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,12 @@ func NewService(opts ServiceOptions) *Service {
opts.Logger,
opts.Listener,
"jobs",
func(ctx context.Context, specStr string, action sql.Action) (*Job, error) {
spec, err := jobSpecFromString(specStr)
if err != nil {
return nil, err
}
JobKind,
func(ctx context.Context, id resource.ID, action sql.Action) (*Job, error) {
if action == sql.DeleteAction {
return &Job{Spec: spec}, nil
return &Job{ID: id}, nil
}
return svc.db.getJob(ctx, spec)
return svc.db.getJob(ctx, id)
},
)
// Register with auth middleware the agent token kind and a means of
Expand Down
6 changes: 3 additions & 3 deletions internal/runner/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

const (
AgentTokenKind resource.Kind = "at"
JobTokenKind tokens.Kind = "job_token"
JobTokenKind resource.Kind = "jt"

defaultJobTokenExpiry = 60 * time.Minute
)
Expand All @@ -35,8 +35,8 @@ type (

func (a *agentToken) LogValue() slog.Value {
attrs := []slog.Attr{
slog.String("id", a.ID),
slog.String("agent_pool_id", string(a.AgentPoolID)),
slog.String("id", a.ID.String()),
slog.String("agent_pool_id", string(a.AgentPoolID.String())),
slog.String("description", a.Description),
}
return slog.GroupValue(attrs...)
Expand Down
22 changes: 22 additions & 0 deletions internal/sql/migrations/003_add_ids_to_jobs_and_log_chunks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,28 @@ ALTER TABLE jobs ADD COLUMN job_id TEXT;
UPDATE jobs SET job_id = 'job-' || substr(md5(random()::text), 0, 17);
ALTER TABLE jobs ADD PRIMARY KEY (job_id);

-- replace job event function to instead provide job_id in payload
CREATE OR REPLACE FUNCTION public.jobs_notify_event() RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
record RECORD;
notification JSON;
BEGIN
IF (TG_OP = 'DELETE') THEN
record = OLD;
ELSE
record = NEW;
END IF;
notification = json_build_object(
'table',TG_TABLE_NAME,
'action', TG_OP,
'id', record.job_id);
PERFORM pg_notify('events', notification::text);
RETURN NULL;
END;
$$;

-- Add job_id primary key to jobs table and populate with random identifiers.
ALTER TABLE logs DROP chunk_id;
ALTER TABLE logs ADD chunk_id TEXT;
Expand Down
Loading

0 comments on commit 12771f4

Please sign in to comment.