-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for work queues #353
base: main
Are you sure you want to change the base?
Conversation
Sure thing, looks like the same or similar result: https://github.com/PrefectHQ/terraform-provider-prefect/actions/runs/12777563602/job/35620349342
I think to start, we need this: diff --git a/internal/provider/resources/work_queue_test.go b/internal/provider/resources/work_queue_test.go
index c30cbbb..efa80f2 100644
--- a/internal/provider/resources/work_queue_test.go
+++ b/internal/provider/resources/work_queue_test.go
@@ -39,6 +39,7 @@ resource "prefect_work_queue" "%s" {
work_pool_name = prefect_work_pool.%s.name
priority = %d
description = "%s"
+ workspace_id = prefect_workspace.test.id
}
`, workspace, workPoolName, workPoolName, poolType, paused, baseJobTemplate, workQueueName, workQueueName, workPoolName, priority, description) I made that change locally and tested, and it looks like the URL that's created for the API call was malformed. It wasn't substituting
note the |
@mitchnielsen I added in I haven't figured out the /%20tests yet. For my local manual test, I can create this: resource "prefect_work_queue" "example_work_queue" {
name = "first-work-pool-test"
work_pool_name = "test"
workspace_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
} and see the url as this: Is there anything you may be doing differently on your end that I'm missing? |
Aha, finally found it: diff --git a/internal/provider/resources/work_queue_test.go b/internal/provider/resources/work_queue_test.go
index efa80f2..31043b3 100644
--- a/internal/provider/resources/work_queue_test.go
+++ b/internal/provider/resources/work_queue_test.go
@@ -302,7 +302,7 @@ func getWorkQueueImportStateID(workQueueResourceName string, workPoolName string
}
workQueueName := workQueueResource.Primary.Attributes["name"]
- return fmt.Sprintf("%s, %s,%s", workspaceID, workPoolName, workQueueName), nil
+ return fmt.Sprintf("%s,%s,%s", workspaceID, workPoolName, workQueueName), nil
}
}
|
@mitchnielsen ah, good eye! thank you! I just added a commit removing that space. |
resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.1.priority"), | ||
resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.1.description"), | ||
resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.2.name", "test-queue-2"), | ||
resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.2.priority", "3"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I noticed that CI failed on this test, but it passed for me when I triggered it from my laptop. I triggered it a couple more times and got a failure. So I think this one is flaky because the order of the resources is not always the same.
My first thought here is to refactor this test to use a custom check function that accepts a list of work queue objects, and iterates through them to ensure that each object you expect to exist is present. This would get around any expectations around the order of the objects.
That said, I was looking for other examples of this in our codebase and I saw that automations tests seem to be able to refer to specific items in the list and make assertions there.
@parkedwards - did you run into any ordering problems there? I'll look into this a bit more in the meantime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm - the underlying API response might have a non-deterministic ordering behavior; it wouldn't be the first time we ran into this. if this is the case, then i think .WorkQueue
may not have consistent ordering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok yeah - so I did run into something like this on the Automation resource, and ended up swapping the list's type to a SetAttribute (instead of a ListAttribute). the original issue was that i would get an inconsistent value error, because the API would return randomly sorted lists
https://github.com/PrefectHQ/terraform-provider-prefect/pull/339/files
i wonder if you might be able to solve for this by swapping to a SetNestedAttribute for work_queues
https://developer.hashicorp.com/terraform/plugin/framework/handling-data/attributes/set-nested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update - doing some testing on the POST /work_pools/<name>/queues/filter
endpoint, i seem to be getting consistent ordering on the queue response...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mm, that's good to know. Maybe TF itself isn't retaining the order based on the attribute schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mitchnielsen Can you put this through the test suite again this week?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure, here's the output: https://github.com/PrefectHQ/terraform-provider-prefect/actions/runs/12812637385/job/35724765884?pr=353
I can take a look some time early this week and try to help out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Been looking into this today, will follow up if I find something that works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I found at least one potential source of inconsistency here.
When you don't specify a priority, Prefect appears to set one for you. If I've already created 2 work queues with a priority set, then the one without a priority automatically gets priority 3 (n + 1).
However, if the work queue without a priority is created after the first work queue, for example, it would get priority 2.
This is mostly fine, but our tests will expect a particular priority value. We can't consistently determine that because the API appears to create the work queues in varying order.
On that note, here's the help text on priority from the UI:
Priority specifies how Prefect prioritizes the delivery of work — more precisely, execution of flow runs among worker pools. Priority must be a unique, positive integer. Lower numbers indicate higher pool priorities. For example, 1 is the highest priority queue and its flow run execution takes precedence over any lower-priority pool. 10 is a lower priority queue than 1, and 100 is lower priority than both 1 and 10. If you specify a priority that already exists, the new priority supersedes any previously set priority on a worker pool.
Because you can have two matching priorities, it probably isn't ideal to sort based on priority. In fact, I've instead (locally) changed from a ListAttribute to a SetAttribute and I seem to get a consistent order without needing to sort at all. I can see that the order was the same before and after sorting and storing the value in the state. However, the tests don't seem to maintain that order. I don't know where exactly in the chain things get rearranged, but it seems to only happen in the tests and not necessarily in the state itself.
Either way, promising a certain order is probably not very foolproof and users might be better off finding work queues by name (or some other combination of fields) in their Terraform manifests. After some research, it sounds like there's no fixed pattern for providers and it's up to each implementation to decide whether or not to promise a certain order. If there are differing opinions here, please share 🙏🏼
I've started adjusting the tests to instead ensure that all expected work queues are simply present in the result, rather than expecting that they're in the same position every time. I'll share the code here soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's what I have locally. Tests have been passing consistently.
At a high level, this:
- Changes the
work_queues
field fromtypes.List
totypes.Set
for some ordering consistency (we could arguably 'promise' ordering, but I'm currently leaning against it and hoping folks find specific work queues by name or some other combination of fields) - Replaces the individual field checks with a function that will loop through expected work queues and make sure they're all present (so order is irrelevant)
Click to expand diff
diff --git a/internal/provider/datasources/work_queue_test.go b/internal/provider/datasources/work_queue_test.go
index fba155d..2f9af26 100644
--- a/internal/provider/datasources/work_queue_test.go
+++ b/internal/provider/datasources/work_queue_test.go
@@ -1,11 +1,16 @@
package datasources_test
import (
+ "context"
"fmt"
"testing"
+ "github.com/google/uuid"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
+ "github.com/hashicorp/terraform-plugin-testing/terraform"
+ "github.com/prefecthq/terraform-provider-prefect/internal/api"
"github.com/prefecthq/terraform-provider-prefect/internal/testutils"
+ "k8s.io/utils/ptr"
)
func fixtureAccSingleWorkQueue(
@@ -74,7 +79,7 @@ resource "prefect_work_queue" "test_queue2" {
data "prefect_work_queues" "test" {
work_pool_name = prefect_work_pool.test_multi.name
- workspace_id = prefect_workspace.test.id
+ workspace_id = prefect_workspace.test.id
depends_on = [
prefect_work_pool.test_multi,
prefect_work_queue.test_queue1,
@@ -90,7 +95,9 @@ func TestAccDatasource_work_queue(t *testing.T) {
singleWorkQueueDatasourceName := "data.prefect_work_queue.test"
multipleWorkQueueDatasourceName := "data.prefect_work_queues.test"
workspace := testutils.NewEphemeralWorkspace()
-
+
+ workQueues := []*api.WorkQueue{}
+
resource.ParallelTest(t, resource.TestCase{
ProtoV6ProviderFactories: testutils.TestAccProtoV6ProviderFactories,
PreCheck: func() { testutils.AccTestPreCheck(t) },
@@ -112,26 +119,124 @@ func TestAccDatasource_work_queue(t *testing.T) {
Config: fixtureAccMultipleWorkQueue(workspace.Resource, "test-pool-multi", "test-queue", "test-queue-2"),
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.#", "3"),
- resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.0.name", "test-queue"), // We added this as priority 1, so it will have the highest
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.0.id"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.0.created"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.0.created"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.0.updated"),
- resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.0.is_paused", "false"),
- resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.0.priority", "1"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.0.description"),
- resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.1.name", "default"),
- resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.1.priority", "2"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.1.id"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.1.created"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.1.updated"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.1.is_paused"),
- resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.1.priority", "2"),
- resource.TestCheckResourceAttrSet(multipleWorkQueueDatasourceName, "work_queues.1.description"),
- resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.2.name", "test-queue-2"),
- resource.TestCheckResourceAttr(multipleWorkQueueDatasourceName, "work_queues.2.priority", "3"),
+ testAccCheckworkQueueExists("prefect_work_pool.test_multi", &workQueues),
+ testAccCheckWorkQueueValues(&workQueues, expectedWorkQueues),
),
},
},
})
}
+
+// testAccCheckworkQueueExists is a Custom Check Function that
+// verifies that the API object was created correctly.
+func testAccCheckworkQueueExists(workPoolResourceName string, workQueues *[]*api.WorkQueue) resource.TestCheckFunc {
+ return func(s *terraform.State) error {
+ workPoolResource, exists := s.RootModule().Resources[workPoolResourceName]
+ if !exists {
+ return fmt.Errorf("Resource not found in state: %s", workPoolResourceName)
+ }
+
+ workPoolName := workPoolResource.Primary.Attributes["name"]
+
+ workspaceResource, exists := s.RootModule().Resources[testutils.WorkspaceResourceName]
+ if !exists {
+ return fmt.Errorf("Resource not found in state: %s", testutils.WorkspaceResourceName)
+ }
+
+ workspaceID, _ := uuid.Parse(workspaceResource.Primary.ID)
+
+ // Initialize the client with the associated workspaceID
+ // NOTE: the accountID is inherited by the one set in the test environment
+ c, err := testutils.NewTestClient()
+ if err != nil {
+ return fmt.Errorf("error creating new test client: %w", err)
+ }
+
+ workQueuesClient, err := c.WorkQueues(uuid.Nil, workspaceID, workPoolName)
+ if err != nil {
+ return fmt.Errorf("error creating new work queues client: %w", err)
+ }
+
+ fetchedWorkQueues, err := workQueuesClient.List(context.Background(), api.WorkQueueFilter{})
+ if err != nil {
+ return fmt.Errorf("error fetching workQueues: %w", err)
+ }
+
+ if len(fetchedWorkQueues) == 0 {
+ return fmt.Errorf("unable to list any work queues for work pool %s", workPoolName)
+ }
+
+ *workQueues = append(*workQueues, fetchedWorkQueues...)
+
+ return nil
+ }
+}
+
+var expectedWorkQueues = []*api.WorkQueue{
+ {
+ Name: "default",
+ Priority: ptr.To(int64(2)),
+ Description: ptr.To("The work pool's default queue."),
+ IsPaused: false,
+ },
+ {
+ Name: "test-queue",
+ Priority: ptr.To(int64(1)),
+ Description: ptr.To("my work queue"),
+ IsPaused: false,
+ },
+ {
+ Name: "test-queue-2",
+ Description: ptr.To(""),
+ IsPaused: false,
+ // Intentionally not setting Priority.
+ },
+}
+
+// testAccCheckWorkQueueValues is a Custom Check Function that
+// verifies that the API object matches the expected values.
+func testAccCheckWorkQueueValues(fetched *[]*api.WorkQueue, expected []*api.WorkQueue) resource.TestCheckFunc {
+ return func(_ *terraform.State) error {
+ if len(*fetched) != len(expected) {
+ return fmt.Errorf("Expected work queues to be %d, got %d", len(expected), len(*fetched))
+ }
+
+ for _, expectedWorkQueue := range expected {
+ found := false
+
+ for _, fetchedWorkQueue := range *fetched {
+ if fetchedWorkQueue.Name != expectedWorkQueue.Name {
+ continue
+ }
+
+ // Mark the work queue as 'found', and check each of the other fields to ensure they match.
+ found = true
+ name := expectedWorkQueue.Name
+
+ if *fetchedWorkQueue.Description != *expectedWorkQueue.Description {
+ return fmt.Errorf("Expected work queue '%s' description to be %s, got %s", name, *expectedWorkQueue.Description, *fetchedWorkQueue.Description)
+ }
+
+ if fetchedWorkQueue.IsPaused != expectedWorkQueue.IsPaused {
+ return fmt.Errorf("Expected work queue '%s' is paused to be %t, got %t", name, expectedWorkQueue.IsPaused, fetchedWorkQueue.IsPaused)
+ }
+
+ // Priority is special because if one is not configured, it will get a value based on the priority of the most recently created work queue.
+ // Because of this, let's only check for matching priority when we configure one.
+ if expectedWorkQueue.Priority != nil {
+ if *fetchedWorkQueue.Priority != *expectedWorkQueue.Priority {
+ return fmt.Errorf("Expected work queue '%s' priority to be %d, got %d", name, *expectedWorkQueue.Priority, *fetchedWorkQueue.Priority)
+ }
+ }
+
+ break
+ }
+
+ if !found {
+ return fmt.Errorf("Expected to find work queue '%s' by name", expectedWorkQueue.Name)
+ }
+ }
+
+ return nil
+ }
+}
diff --git a/internal/provider/datasources/work_queues.go b/internal/provider/datasources/work_queues.go
index a594f9d..4c37327 100644
--- a/internal/provider/datasources/work_queues.go
+++ b/internal/provider/datasources/work_queues.go
@@ -2,8 +2,7 @@ package datasources
import (
"context"
- "sort"
-
+
"github.com/hashicorp/terraform-plugin-framework/attr"
"github.com/hashicorp/terraform-plugin-framework/datasource"
"github.com/hashicorp/terraform-plugin-framework/datasource/schema"
@@ -28,7 +27,7 @@ type WorkQueuesSourceModel struct {
WorkPoolName types.String `tfsdk:"work_pool_name"`
FilterAny types.List `tfsdk:"filter_any"`
- WorkQueues types.List `tfsdk:"work_queues"`
+ WorkQueues types.Set `tfsdk:"work_queues"`
}
// NewWorkQueuesDataSource returns a new WorkQueuesDataSource.
@@ -83,7 +82,7 @@ Use this data source to search for multiple Work Queues. Defaults to fetching al
Optional: true,
Description: "Work queue IDs (UUID) to search for (work queues with any matching UUID are returned)",
},
- "work_queues": schema.ListNestedAttribute{
+ "work_queues": schema.SetNestedAttribute{
Computed: true,
Description: "Work queues returned by the server",
NestedObject: schema.NestedAttributeObject{
@@ -125,20 +124,6 @@ func (d *WorkQueuesDataSource) Read(ctx context.Context, req datasource.ReadRequ
return
}
- // Sort the queues by priority
- sort.SliceStable(queues, func(i, j int) bool {
- if queues[i].Priority == nil && queues[j].Priority == nil {
- return false
- }
- if queues[i].Priority == nil {
- return false
- }
- if queues[j].Priority == nil {
- return true
- }
- return *queues[i].Priority < *queues[j].Priority
- })
-
attributeTypes := map[string]attr.Type{
"id": customtypes.UUIDType{},
"created": customtypes.TimestampType{},
@@ -177,13 +162,13 @@ func (d *WorkQueuesDataSource) Read(ctx context.Context, req datasource.ReadRequ
}
// Set the final list value to be returned
- list, diag := types.ListValue(types.ObjectType{AttrTypes: attributeTypes}, queueObjects)
+ set, diag := types.SetValueFrom(ctx, types.ObjectType{AttrTypes: attributeTypes}, queueObjects)
resp.Diagnostics.Append(diag...)
if resp.Diagnostics.HasError() {
return
}
- model.WorkQueues = list
+ model.WorkQueues = set
resp.Diagnostics.Append(resp.State.Set(ctx, &model)...)
if resp.Diagnostics.HasError() {
Sorry just a bit of cheerleading. I'm about to create a bunch of queues manually and am really looking forward to this feature! 📣😄 |
Addresses #131.
I've copied a lot of functionality from the WorkPool implementation in order to support the crud and import of resources as well as datasource for prefect work queues.