Skip to content

Commit

Permalink
Refactor pipelines commands into separate files, rename some options (#…
Browse files Browse the repository at this point in the history
…7769)

* Refactor pipelines commands into separate files, rename some options

**Pipelines is currently in closed beta, renaming these without
providing aliases _should_ have no impact**

Moving each of the subcommand handlers and options for each subcommand into its own
file. This also renames many of the parameters to be more specific.

The following parameters have been renamed:

| Previous Name | New Name |
| ---- | ---- |
| access-key-id | r2-access-key-id |
| secret-access-key | r2-secret-access-key |
| transform | transform-worker |
| r2 | r2-bucket |
| prefix | r2-prefix |
| binding | enable-worker-binding |
| http | enable-http |
| authentication | require-http-auth |
| filename | file-template |
| filepath | partition-template |

Adds the following new option for `create` and `update` commands:

```
--cors-origins           CORS origin allowlist for HTTP endpoint (use * for any origin)  [array]
```

Closes https://jira.cfdata.org/browse/PIPE-160.

* Fix non-exported type

* Add snapshot for create pipeline

* Add support for R2 staging, print banner for list cmd

* Refactor our verifyBucketAccess function

This also replaces the use of retryOnError with a for-loop as
retryOnError wasn't working as expected.

* Update CORS Origins when provided

Previously this was nested under the `enableHttp` block which doesn't
make sense if HTTP is already enabled and the user just wants to update
the authentication or CORS settings.

* Add test for pipeline cors update

* Fix default for partition template

* remove defaults and rely on API defaults instead

* Use proper capitalization for Workers and Pipelines

Also split the previous changeset into two.

* Apply suggestions from code review

Co-authored-by: Edmund Hung <[email protected]>

* Drop metrics calls, use chalk for CLI flag groups

Based on PR feedback, drop the metrics calls and use chalk.bold for CLI
flag groups. Snapshots were updated for tests.

* Update .changeset/short-kids-hunt.md

Co-authored-by: Edmund Hung <[email protected]>

---------

Co-authored-by: Edmund Hung <[email protected]>
Co-authored-by: emily-shen <[email protected]>
  • Loading branch information
3 people authored Feb 4, 2025
1 parent b9805d7 commit 6abe69c
Show file tree
Hide file tree
Showing 11 changed files with 952 additions and 533 deletions.
9 changes: 9 additions & 0 deletions .changeset/plenty-birds-melt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"wrangler": patch
---

Adds the following new option for `wrangler pipelines create` and `wrangler pipelines update` commands:

```
--cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin) [array]
```
20 changes: 20 additions & 0 deletions .changeset/short-kids-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
"wrangler": patch
---

Rename wrangler pipelines <create|update> flags

The following parameters have been renamed:

| Previous Name | New Name |
| ----------------- | --------------------- |
| access-key-id | r2-access-key-id |
| secret-access-key | r2-secret-access-key |
| transform | transform-worker |
| r2 | r2-bucket |
| prefix | r2-prefix |
| binding | enable-worker-binding |
| http | enable-http |
| authentication | require-http-auth |
| filename | file-template |
| filepath | partition-template |
132 changes: 91 additions & 41 deletions packages/wrangler/src/__tests__/pipelines.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { http, HttpResponse } from "msw";
import { describe, expect, it } from "vitest";
import { normalizeOutput } from "../../e2e/helpers/normalize";
import { __testSkipDelays } from "../pipelines";
import { endEventLoop } from "./helpers/end-event-loop";
Expand Down Expand Up @@ -255,11 +256,11 @@ describe("pipelines", () => {
"wrangler pipelines
COMMANDS
wrangler pipelines create <pipeline> Create a new pipeline
wrangler pipelines list List current pipelines
wrangler pipelines show <pipeline> Show a pipeline configuration
wrangler pipelines update <pipeline> Update a pipeline
wrangler pipelines delete <pipeline> Delete a pipeline
wrangler pipelines create <pipeline> Create a new Pipeline
wrangler pipelines list List current Pipelines
wrangler pipelines show <pipeline> Show a Pipeline configuration
wrangler pipelines update <pipeline> Update a Pipeline
wrangler pipelines delete <pipeline> Delete a Pipeline
GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
Expand All @@ -278,52 +279,75 @@ describe("pipelines", () => {
expect(std.out).toMatchInlineSnapshot(`
"wrangler pipelines create <pipeline>
Create a new pipeline
Create a new Pipeline
POSITIONALS
pipeline The name of the new pipeline [string] [required]
Source settings
--enable-worker-binding Send data from a Worker to a Pipeline using a Binding [boolean] [default: true]
--enable-http Generate an endpoint to ingest data via HTTP [boolean] [default: true]
--require-http-auth Require Cloudflare API Token for HTTPS endpoint authentication [boolean] [default: false]
--cors-origins CORS origin allowlist for HTTP endpoint (use * for any origin) [array]
Batch hints
--batch-max-mb Maximum batch size in megabytes before flushing [number]
--batch-max-rows Maximum number of rows per batch before flushing [number]
--batch-max-seconds Maximum age of batch in seconds before flushing [number]
Transformations
--transform-worker Pipeline transform Worker and entrypoint (<worker>.<entrypoint>) [string]
Destination settings
--r2-bucket Destination R2 bucket name [string] [required]
--r2-access-key-id R2 service Access Key ID for authentication. Leave empty for OAuth confirmation. [string]
--r2-secret-access-key R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation. [string]
--r2-prefix Prefix for storing files in the destination bucket [string] [default: \\"\\"]
--compression Compression format for output files [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"] [default: \\"gzip\\"]
--file-template Template for individual file names (must include \${slug}) [string]
--partition-template Path template for partitioned files in the bucket. If not specified, the default will be used [string]
GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]
OPTIONS
--secret-access-key The R2 service token Access Key to write data [string]
--access-key-id The R2 service token Secret Key to write data [string]
--batch-max-mb The approximate maximum size (in megabytes) for each batch before flushing (range: 1 - 100) [number]
--batch-max-rows The approximate maximum number of rows in a batch before flushing (range: 100 - 1000000) [number]
--batch-max-seconds The approximate maximum age (in seconds) of a batch before flushing (range: 1 - 300) [number]
--transform The worker and entrypoint of the PipelineTransform implementation in the format \\"worker.entrypoint\\"
Default: No transformation worker [string]
--compression Sets the compression format of output files
Default: gzip [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"]
--prefix Optional base path to store files in the destination bucket
Default: (none) [string]
--filepath The path to store partitioned files in the destination bucket
Default: event_date=\${date}/hr=\${hr} [string]
--filename The name of each unique file in the bucket. Must contain \\"\${slug}\\". File extension is optional
Default: \${slug}\${extension} [string]
--binding Enable Worker binding to this pipeline [boolean] [default: true]
--http Enable HTTPS endpoint to send data to this pipeline [boolean] [default: true]
--authentication Require authentication (Cloudflare API Token) to send data to the HTTPS endpoint [boolean] [default: false]
--r2 Destination R2 bucket name [string] [required]"
-v, --version Show version number [boolean]"
`);
});

it("should create a pipeline with explicit credentials", async () => {
const requests = mockCreateRequest("my-pipeline");
await runWrangler(
"pipelines create my-pipeline --r2 test-bucket --access-key-id my-key --secret-access-key my-secret"
"pipelines create my-pipeline --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret"
);
expect(requests.count).toEqual(1);
expect(std.out).toMatchInlineSnapshot(`
"🌀 Creating Pipeline named \\"my-pipeline\\"
✅ Successfully created Pipeline \\"my-pipeline\\" with id 0001
🎉 You can now send data to your Pipeline!
To start interacting with this Pipeline from a Worker, open your Worker’s config file and add the following binding configuration:
{
\\"pipelines\\": [
{
\\"pipeline\\": \\"my-pipeline\\",
\\"binding\\": \\"PIPELINE\\"
}
]
}
Send data to your Pipeline's HTTP endpoint:
curl \\"foo\\" -d '[{\\"foo\\": \\"bar\\"}]'
"
`);
});

it("should fail a missing bucket", async () => {
const requests = mockCreateR2TokenFailure("bad-bucket");
await expect(
runWrangler("pipelines create bad-pipeline --r2 bad-bucket")
runWrangler("pipelines create bad-pipeline --r2-bucket bad-bucket")
).rejects.toThrowError();

await endEventLoop();
Expand All @@ -338,7 +362,7 @@ describe("pipelines", () => {
it("should create a pipeline with auth", async () => {
const requests = mockCreateRequest("my-pipeline");
await runWrangler(
"pipelines create my-pipeline --authentication --r2 test-bucket --access-key-id my-key --secret-access-key my-secret"
"pipelines create my-pipeline --require-http-auth --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret"
);
expect(requests.count).toEqual(1);

Expand All @@ -352,7 +376,7 @@ describe("pipelines", () => {
it("should create a pipeline without http", async () => {
const requests = mockCreateRequest("my-pipeline");
await runWrangler(
"pipelines create my-pipeline --http=false --r2 test-bucket --access-key-id my-key --secret-access-key my-secret"
"pipelines create my-pipeline --enable-http=false --r2-bucket test-bucket --r2-access-key-id my-key --r2-secret-access-key my-secret"
);
expect(requests.count).toEqual(1);

Expand Down Expand Up @@ -390,7 +414,7 @@ describe("pipelines", () => {

expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"Retrieving config for pipeline \\"foo\\".
"Retrieving config for Pipeline \\"foo\\".
{
\\"id\\": \\"0001\\",
\\"version\\": 1,
Expand Down Expand Up @@ -438,7 +462,7 @@ describe("pipelines", () => {

expect(std.err).toMatchInlineSnapshot(`""`);
expect(normalizeOutput(std.out)).toMatchInlineSnapshot(`
"Retrieving config for pipeline \\"bad-pipeline\\".
"Retrieving config for Pipeline \\"bad-pipeline\\".
X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/pipelines/bad-pipeline) failed.
Pipeline does not exist [code: 1000]
If you think this is a bug, please open an issue at:
Expand Down Expand Up @@ -475,7 +499,7 @@ describe("pipelines", () => {
const updateReq = mockUpdateRequest(update.name, update);

await runWrangler(
"pipelines update my-pipeline --r2 new-bucket --access-key-id service-token-id --secret-access-key my-secret-access-key"
"pipelines update my-pipeline --r2-bucket new-bucket --r2-access-key-id service-token-id --r2-secret-access-key my-secret-access-key"
);

expect(updateReq.count).toEqual(1);
Expand All @@ -495,7 +519,7 @@ describe("pipelines", () => {
const updateReq = mockUpdateRequest(update.name, update);

await runWrangler(
"pipelines update my-pipeline --r2 new-bucket --access-key-id new-key --secret-access-key new-secret"
"pipelines update my-pipeline --r2-bucket new-bucket --r2-access-key-id new-key --r2-secret-access-key new-secret"
);

expect(updateReq.count).toEqual(1);
Expand All @@ -516,7 +540,7 @@ describe("pipelines", () => {
const updateReq = mockUpdateRequest(update.name, update);

await runWrangler(
"pipelines update my-pipeline --binding=false --http --authentication"
"pipelines update my-pipeline --enable-worker-binding=false --enable-http --require-http-auth"
);

expect(updateReq.count).toEqual(1);
Expand All @@ -527,14 +551,40 @@ describe("pipelines", () => {
);
});

it("should update a pipeline cors headers", async () => {
const pipeline: Pipeline = samplePipeline;
mockShowRequest(pipeline.name, pipeline);

const update = JSON.parse(JSON.stringify(pipeline));
update.source = [
{
type: "http",
format: "json",
authenticated: true,
},
];
const updateReq = mockUpdateRequest(update.name, update);

await runWrangler(
"pipelines update my-pipeline --enable-worker-binding=false --enable-http --cors-origins http://localhost:8787"
);

expect(updateReq.count).toEqual(1);
expect(updateReq.body?.source.length).toEqual(1);
expect(updateReq.body?.source[0].type).toEqual("http");
expect((updateReq.body?.source[0] as HttpSource).cors?.origins).toEqual([
"http://localhost:8787",
]);
});

it("should fail a missing pipeline", async () => {
const requests = mockShowRequest("bad-pipeline", null, 404, {
code: 1000,
message: "Pipeline does not exist",
});
await expect(
runWrangler(
"pipelines update bad-pipeline --r2 new-bucket --access-key-id new-key --secret-access-key new-secret"
"pipelines update bad-pipeline --r2-bucket new-bucket --r2-access-key-id new-key --r2-secret-access-key new-secret"
)
).rejects.toThrowError();

Expand All @@ -558,8 +608,8 @@ describe("pipelines", () => {

expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"Deleting pipeline foo.
Deleted pipeline foo."
"Deleting Pipeline foo.
Deleted Pipeline foo."
`);
expect(requests.count).toEqual(1);
});
Expand All @@ -577,7 +627,7 @@ describe("pipelines", () => {

expect(std.err).toMatchInlineSnapshot(`""`);
expect(normalizeOutput(std.out)).toMatchInlineSnapshot(`
"Deleting pipeline bad-pipeline.
"Deleting Pipeline bad-pipeline.
X [ERROR] A request to the Cloudflare API (/accounts/some-account-id/pipelines/bad-pipeline) failed.
Pipeline does not exist [code: 1000]
If you think this is a bug, please open an issue at:
Expand Down
Loading

0 comments on commit 6abe69c

Please sign in to comment.