Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for work queues #353

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a0de981
first pass of creating, updating, destroying work queues
nick-amplify Jan 7, 2025
90dfe30
adding work_queue and work_queues datasources and first tests
nick-amplify Jan 8, 2025
3ddd762
add work queues to provider
nick-amplify Jan 8, 2025
18660fb
adding new auto generated docs
nick-amplify Jan 8, 2025
4c7d224
adding build file to try custom provider
nick-amplify Jan 8, 2025
8003acf
updating tests using ephemeral workspace
nick-amplify Jan 8, 2025
50ee805
removing build file
nick-amplify Jan 8, 2025
8067a21
update tests and refs
nick-amplify Jan 13, 2025
2826763
update missed pool->queue refs
nick-amplify Jan 13, 2025
ac699c5
switch to TestCheckResourceAttr for known values
nick-amplify Jan 13, 2025
e73b6b5
add back in prefect resource
nick-amplify Jan 13, 2025
216951f
Update internal/provider/datasources/work_queue_test.go
nick-amplify Jan 13, 2025
181cc48
update terraform test fixture spaceing
nick-amplify Jan 13, 2025
249df7e
update terraform test fixture spaceing, more
nick-amplify Jan 13, 2025
77ac998
another pool->queue ref
nick-amplify Jan 13, 2025
627685f
another ref
nick-amplify Jan 13, 2025
b9d3890
another ref
nick-amplify Jan 13, 2025
a107e36
update import functionality
nick-amplify Jan 13, 2025
04e8bcc
updated work queue resource tests
nick-amplify Jan 13, 2025
6a00d99
update prefect work pool ephermal workspace id
nick-amplify Jan 14, 2025
9190ee3
use k8s.io/utils/ptr.To instead of custom func
nick-amplify Jan 14, 2025
a1885cb
update test ref
nick-amplify Jan 14, 2025
a3bc544
update terraform fixture spacing
nick-amplify Jan 14, 2025
cf61720
add workspace id
nick-amplify Jan 15, 2025
2212231
remove space
nick-amplify Jan 15, 2025
5970a4b
add sorting of queues by priority
nick-amplify Jan 16, 2025
ca08f6f
updating test checks
nick-amplify Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/data-sources/work_queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "prefect_work_queue Data Source - prefect"
subcategory: ""
description: |-
Get information about an existing Work Queue by name.

Use this data source to obtain Work Queue-specific attributes.
---

# prefect_work_queue (Data Source)

Get information about an existing Work Queue by name.
<br>
Use this data source to obtain Work Queue-specific attributes.



<!-- schema generated by tfplugindocs -->
## Schema

### Optional

- `account_id` (String) Account ID (UUID), defaults to the account set in the provider
- `concurrency_limit` (Number) The concurrency limit applied to this work queue
- `description` (String) Description of the work queue
- `id` (String) Work queue ID (UUID)
- `name` (String) Name of the work queue
- `work_pool_name` (String) Name of the associated work pool
- `workspace_id` (String) Workspace ID (UUID), defaults to the workspace set in the provider

### Read-Only

- `created` (String) Date and time of the work queue creation in RFC 3339 format
- `is_paused` (Boolean) Whether this work queue is paused
- `priority` (Number) Priority of the work queue
- `updated` (String) Date and time that the work queue was last updated in RFC 3339 format
52 changes: 52 additions & 0 deletions docs/data-sources/work_queues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "prefect_work_queues Data Source - prefect"
subcategory: ""
description: |-
Get information about multiple Work Queues.

Use this data source to search for multiple Work Queues. Defaults to fetching all Work Queues in the Workspace.
---

# prefect_work_queues (Data Source)

Get information about multiple Work Queues.
<br>
Use this data source to search for multiple Work Queues. Defaults to fetching all Work Queues in the Workspace.



<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `work_pool_name` (String) Name of the associated work pool

### Optional

- `account_id` (String) Account ID (UUID), defaults to the account set in the provider
- `filter_any` (List of String) Work queue IDs (UUID) to search for (work queues with any matching UUID are returned)
- `workspace_id` (String) Workspace ID (UUID), defaults to the workspace set in the provider

### Read-Only

- `work_queues` (Attributes List) Work queues returned by the server (see [below for nested schema](#nestedatt--work_queues))

<a id="nestedatt--work_queues"></a>
### Nested Schema for `work_queues`

Optional:

- `concurrency_limit` (Number) The concurrency limit applied to this work queue
- `description` (String) Description of the work queue
- `id` (String) Work queue ID (UUID)
- `name` (String) Name of the work queue
- `work_pool_name` (String) Name of the associated work pool

Read-Only:

- `created` (String) Date and time of the work queue creation in RFC 3339 format
- `is_paused` (Boolean) Whether this work queue is paused
- `priority` (Number) Priority of the work queue
- `updated` (String) Date and time that the work queue was last updated in RFC 3339 format
39 changes: 39 additions & 0 deletions docs/resources/work_queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "prefect_work_queue Resource - prefect"
subcategory: ""
description: |-
The resource work_queue represents a Prefect Work Queue. Work Queues are used to configure and manage job execution queues in Prefect.
Work Queues can be associated with a work pool and have configurations like concurrency limits
---

# prefect_work_queue (Resource)

The resource `work_queue` represents a Prefect Work Queue. Work Queues are used to configure and manage job execution queues in Prefect.

Work Queues can be associated with a work pool and have configurations like concurrency limits



<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `name` (String) Name of the work queue
- `work_pool_name` (String) The name of the work pool associated with this work queue

### Optional

- `account_id` (String) Account ID (UUID), defaults to the account set in the provider
- `concurrency_limit` (Number) The concurrency limit applied to this work queue
- `description` (String) Description of the work queue
- `is_paused` (Boolean) Whether this work queue is paused
- `priority` (Number) The priority of this work queue
- `workspace_id` (String) Workspace ID (UUID), defaults to the workspace set in the provider. In Prefect Cloud, either the `work_pool` resource or the provider's `workspace_id` must be set.

### Read-Only

- `created` (String) Timestamp of when the resource was created (RFC3339)
- `id` (String) Work queue ID (UUID)
- `updated` (String) Timestamp of when the resource was updated (RFC3339)
1 change: 1 addition & 0 deletions internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type PrefectClient interface {
WorkspaceAccess(accountID uuid.UUID, workspaceID uuid.UUID) (WorkspaceAccessClient, error)
WorkspaceRoles(accountID uuid.UUID) (WorkspaceRolesClient, error)
WorkPools(accountID uuid.UUID, workspaceID uuid.UUID) (WorkPoolsClient, error)
WorkQueues(accountID uuid.UUID, workspaceID uuid.UUID, workPoolName string) (WorkQueuesClient, error)
Variables(accountID uuid.UUID, workspaceID uuid.UUID) (VariablesClient, error)
ServiceAccounts(accountID uuid.UUID) (ServiceAccountsClient, error)
Webhooks(accountID, workspaceID uuid.UUID) (WebhooksClient, error)
Expand Down
50 changes: 50 additions & 0 deletions internal/api/work_queues.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package api

import (
"context"

"github.com/google/uuid"
)

// WorkQueuesClient is a client for working with work queues.
type WorkQueuesClient interface {
Create(ctx context.Context, data WorkQueueCreate) (*WorkQueue, error)
List(ctx context.Context, filter WorkQueueFilter) ([]*WorkQueue, error)
Get(ctx context.Context, name string) (*WorkQueue, error)
Update(ctx context.Context, name string, data WorkQueueUpdate) error
Delete(ctx context.Context, name string) error
}

// WorkQueue is a representation of a work queue.
type WorkQueue struct {
BaseModel
Name string `json:"name"`
WorkPoolName string `json:"work_pool_name"`
Description *string `json:"description"`
IsPaused bool `json:"is_paused"`
ConcurrencyLimit *int64 `json:"concurrency_limit"`
Priority *int64 `json:"priority"`
QueueID uuid.UUID `json:"queue_id"`
}

// WorkQueueCreate is a subset of WorkQueue used when creating queues.
type WorkQueueCreate struct {
Name string `json:"name"`
Description *string `json:"description"`
IsPaused *bool `json:"is_paused"`
ConcurrencyLimit *int64 `json:"concurrency_limit"`
Priority *int64 `json:"priority"`
}

// WorkQueueUpdate is a subset of WorkQueue used when updating queues.
type WorkQueueUpdate struct {
Description *string `json:"description"`
IsPaused *bool `json:"is_paused"`
ConcurrencyLimit *int64 `json:"concurrency_limit"`
Priority *int64 `json:"priority"`
}

// WorkQueueFilter defines filters when searching for work queues.
type WorkQueueFilter struct {
Any []uuid.UUID `json:"any_"`
}
136 changes: 136 additions & 0 deletions internal/client/work_queues.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package client

import (
"context"
"fmt"
"net/http"

"github.com/google/uuid"
"github.com/prefecthq/terraform-provider-prefect/internal/api"
)

var _ = api.WorkQueuesClient(&WorkQueuesClient{})

// WorkQueuesClient is a client for working with work queues.
type WorkQueuesClient struct {
hc *http.Client
apiKey string
routePrefix string
}

// WorkQueues returns a WorkQueuesClient.
//
//nolint:ireturn // required to support PrefectClient mocking
func (c *Client) WorkQueues(accountID uuid.UUID, workspaceID uuid.UUID, workPoolName string) (api.WorkQueuesClient, error) {
if accountID == uuid.Nil {
accountID = c.defaultAccountID
}

if workspaceID == uuid.Nil {
workspaceID = c.defaultWorkspaceID
}

if err := validateCloudEndpoint(c.endpoint, accountID, workspaceID); err != nil {
return nil, err
}

route := fmt.Sprintf("work_pools/%s/queues", workPoolName)

return &WorkQueuesClient{
hc: c.hc,
apiKey: c.apiKey,
routePrefix: getWorkspaceScopedURL(c.endpoint, accountID, workspaceID, route),
}, nil
}

// Create returns details for a new work queue.
func (c *WorkQueuesClient) Create(ctx context.Context, data api.WorkQueueCreate) (*api.WorkQueue, error) {
cfg := requestConfig{
method: http.MethodPost,
url: c.routePrefix + "/",
body: &data,
successCodes: successCodesStatusCreated,
apiKey: c.apiKey,
}

var queue api.WorkQueue
if err := requestWithDecodeResponse(ctx, c.hc, cfg, &queue); err != nil {
return nil, fmt.Errorf("failed to create work queue: %w", err)
}

return &queue, nil
}

// List returns a list of work queues matching filter criteria.
func (c *WorkQueuesClient) List(ctx context.Context, filter api.WorkQueueFilter) ([]*api.WorkQueue, error) {
cfg := requestConfig{
method: http.MethodPost,
url: c.routePrefix + "/filter",
body: &filter,
successCodes: successCodesStatusOK,
apiKey: c.apiKey,
}

var queues []*api.WorkQueue
if err := requestWithDecodeResponse(ctx, c.hc, cfg, &queues); err != nil {
return nil, fmt.Errorf("failed to list work queues: %w", err)
}

return queues, nil
}

// Get returns details for a work queue by name.
func (c *WorkQueuesClient) Get(ctx context.Context, name string) (*api.WorkQueue, error) {
cfg := requestConfig{
method: http.MethodGet,
url: c.routePrefix + "/" + name,
successCodes: successCodesStatusOK,
body: http.NoBody,
apiKey: c.apiKey,
}

var queue api.WorkQueue
if err := requestWithDecodeResponse(ctx, c.hc, cfg, &queue); err != nil {
return nil, fmt.Errorf("failed to get work queue: %w", err)
}

return &queue, nil
}

// Update modifies an existing work queue by name.
func (c *WorkQueuesClient) Update(ctx context.Context, name string, data api.WorkQueueUpdate) error {
cfg := requestConfig{
method: http.MethodPatch,
url: c.routePrefix + "/" + name,
body: &data,
successCodes: successCodesStatusOKOrNoContent,
apiKey: c.apiKey,
}

resp, err := request(ctx, c.hc, cfg)
if err != nil {
return fmt.Errorf("failed to update work queue: %w", err)
}
defer resp.Body.Close()

return nil
}

// Delete removes a work queue by name.
func (c *WorkQueuesClient) Delete(ctx context.Context, name string) error {
cfg := requestConfig{
method: http.MethodDelete,
url: c.routePrefix + "/" + name,
successCodes: successCodesStatusOKOrNoContent,
body: http.NoBody,
apiKey: c.apiKey,
}

resp, err := request(ctx, c.hc, cfg)
if err != nil {
return fmt.Errorf("failed to delete work queue: %w", err)
}
defer resp.Body.Close()

return nil
}
Loading
Loading