Skip to content

Commit

Permalink
Updating docs for automations
Browse files Browse the repository at this point in the history
  • Loading branch information
Taras Priadka committed Nov 29, 2023
1 parent c6fbde5 commit 5db3804
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 91 deletions.
165 changes: 85 additions & 80 deletions docs/source/automation/automation-usecase.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,88 +14,87 @@ This example requires another _target workflow_ which will get executes on every

1. Initialize a new workflow using `latch init test-workflow`.
2. Replace `__init__.py` and `task.py` with the following sample code.
```python
# __init__.py
```python
# __init__.py

from wf.task import task
from wf.task import task

from latch.resources.workflow import workflow
from latch.types.directory import LatchDir, LatchOutputDir
from latch.types.file import LatchFile
from latch.types.metadata import LatchAuthor, LatchMetadata, LatchParameter
from latch.resources.workflow import workflow
from latch.types.directory import LatchDir, LatchOutputDir
from latch.types.file import LatchFile
from latch.types.metadata import LatchAuthor, LatchMetadata, LatchParameter

metadata = LatchMetadata(
display_name="Target Workflow",
author=LatchAuthor(
name="Your Name",
),
parameters={
"input_directory": LatchParameter(
display_name="Input Directory",
batch_table_column=True, # Show this parameter in batched mode.
),
"output_directory": LatchParameter(
display_name="Output Directory",
batch_table_column=True, # Show this parameter in batched mode.
metadata = LatchMetadata(
display_name="Target Workflow",
author=LatchAuthor(
name="Your Name",
),
},
)

parameters={
"input_directory": LatchParameter(
display_name="Input Directory",
batch_table_column=True, # Show this parameter in batched mode.
),
"output_directory": LatchParameter(
display_name="Output Directory",
batch_table_column=True, # Show this parameter in batched mode.
),
},
)

@workflow(metadata)
def template_workflow(
input_directory: LatchDir, output_directory: LatchOutputDir
) -> LatchOutputDir:
return task(input_directory=input_directory, output_directory=output_directory)
```

```python
# task.py
@workflow(metadata)
def template_workflow(
input_directory: LatchDir, output_directory: LatchOutputDir
) -> LatchOutputDir:
return task(input_directory=input_directory, output_directory=output_directory)
```
```python
# task.py

import os
from logging import Logger
from urllib.parse import urljoin
import os
from logging import Logger
from urllib.parse import urljoin

from latch import message
from latch.resources.tasks import small_task
from latch.types.directory import LatchDir, LatchFile, LatchOutputDir
from latch.account import Account
from latch import message
from latch.resources.tasks import small_task
from latch.types.directory import LatchDir, LatchFile, LatchOutputDir
from latch.account import Account


log = Logger("wf.task")
log = Logger("wf.task")


@small_task
def task(input_directory: LatchDir, output_directory: LatchOutputDir) -> LatchOutputDir:
@small_task
def task(input_directory: LatchDir, output_directory: LatchOutputDir) -> LatchOutputDir:

# iterate through all directories of the child input directories using iterdir()
for file in input_directory.iterdir():
log.error(f"{file} {file.remote_path}") # note: `error` is used here since its the highest logging level
# iterate through all directories of the child input directories using iterdir()
for file in input_directory.iterdir():
log.error(f"{file} {file.remote_path}") # note: `error` is used here since its the highest logging level

return output_directory
return output_directory

```
```
3. Register the sample target workflow with Latch using `latch register --remote --yes test-workflow`.
4. Record the ID of your workflow on the sidebar which we will use later in the example.

![Workflow ID](../assets/automation/get-workflow-id.png)

5. You will need to pass the parameters into your target workflow from your automation. To obtain the JSON representation of the workflow inputs, navigate to a previous execution of your workflow. Select **Graph and Logs**, click on square box around the first task, and select **Inputs**. Copy the workflow parameters inside the `literal` object, and pass it to `params`.
![Workflow ID](../assets/automation/get-workflow-id.png)
5. Test the workflow by running it on Latch
6. You will need to pass the parameters into your target workflow from your automation. To obtain the JSON representation of the workflow inputs, navigate to a previous execution of your workflow. Select **Graph and Logs**, click on square box around the first task, and select **Inputs**. Copy the workflow parameters inside the `literal` object, and pass it to `params`.
\
i.e.
\
i.e.
```json
{
"literals": {
# copy everything inside the brackets
}
}
```
![Workflow Parameters](../assets/automation/get-workflow-parameters.png)
![Workflow Parameters](../assets/automation/get-workflow-parameters.png)


## 2: Create a New Registry Table

In this example, we record all processed child directories in the Registry Table to not reprocess directories when automation workflow is runs again. This example requires you to create a new table with no existing columns. The automation workflow will add a column `Processed directories` with the directory name of processed children.
In this example, we record all processed child directories in the Registry Table to not reprocess directories when automation workflow is runs again. This example requires you to create a new table with no existing columns. The automation workflow will add a column `Processed Directory` with the directory name of processed children.

To create a new table to be used with the automation:

Expand Down Expand Up @@ -176,51 +175,56 @@ metadata = LatchMetadata(
@workflow(metadata)
def automation_workflow(input_directory: LatchDir, automation_id: str) -> None:
output_directory = LatchOutputDir(
path="your path here" # fixme: change to latch path of the desired output directory
path="fixme" # fixme: change to remote path of desired output directory
)

# MODIFY THE WORKFLOW PARAMETERS BELOW
params = {
"input_directory": {
"scalar": {
"blob": {
"metadata": {"type": {"dimensionality": "MULTIPART"}},
"uri": input_directory.remote_path,
}
}
},
"output_directory": {
"scalar": {
"blob": {
"metadata": {"type": {"dimensionality": "MULTIPART"}},
"uri": output_directory.remote_path,
}
}
},
}
# MODIFY WORKFLOW PARAMETERS ABOVE

automation_task(
input_directory=input_directory,
output_directory=output_directory,
params=params,
target_wf_id="fixme", # fixme: change wf_id to the desired workflow id
table_id="fixme", # fixme: change table_id to the desired registry table
)
```

Change the parameters object in `automation.py` from [step 1.6](#1-create-the-target-workflow):
```python
# automation.py

...

params = {
"input_directory": {
"scalar": {
"blob": {
"metadata": {"type": {"dimensionality": "MULTIPART"}},
"uri": input_directory.remote_path,
}
}
},
"output_directory": {
"scalar": {
"blob": {
"metadata": {"type": {"dimensionality": "MULTIPART"}},
"uri": output_directory.remote_path,
}
}
},
}
...
```

**Usage Notes**:
* The `input_directory` refers to the child directory (i.e. the trigger directory) to be passed to the target workflow.
* The `output_directory` refers to directory where the output of the target workflow will be stored.


## 5. Configure Automation Logic
## 5. (Optional) Modify Automation Logic

The file `wf/automation.py` contains the logic that determines how an execution for the target workflow should be launched.

The `automation_task` defines the logic that is used to launch the workflow. The code below checks a registry table to see whether an output directory exists, and launches an execution for the target workflow if that is not the case.

Optionally, modify the function below to change the logic for launching target workflows.
Modify the function below to change the logic for launching target workflows.

```python
# automation.py
Expand All @@ -242,7 +246,7 @@ def automation_task(
output_directory: LatchOutputDir,
target_wf_id: str,
table_id: str,
params: dict,
params: Dict[T,T],
) -> None:
"""
Logic on how to process the input directory and launch the target workflows.
Expand All @@ -252,6 +256,8 @@ def automation_task(
automation_table = Table(table_id)
processed_directory_column = "Processed Directory"

# [PARAMS OMITTED]

# check if the provided table contains column `Processed Directory` and creates one if it isn't present
# we use Latch SDK to get the columns of the table and try to get the column by name
if automation_table.get_columns().get(processed_directory_column, None) is None:
Expand Down Expand Up @@ -329,4 +335,3 @@ Finally, select the automation workflow that you have just registered using the
To test your automation, go to the target directory that you have specified when creating automation, and create a couple of folders. Upload any files to the folders, and wait for the trigger timer to expire.

Go to **Worfklows** > **All Executions**. There should be 1 automation workflow execution, and a target workflow execution for each child in your target directory. Each target workflow should print out

18 changes: 7 additions & 11 deletions docs/source/automation/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,22 @@ Note: This document is a work in progress and is subject to change.

### Description

Automations allow you to automatically run workflows on top of folders in Latch Data when triggered by specific events such as when files are added to folders. Automations consist of a *trigger*, *automation workflow*.
Automations allow you to automatically run workflows on top of folders in Latch Data when triggered by specific events such as when files are added to folders. Automations consist of a [*trigger*](#trigger) and an [*automation workflow*](#automation-workflow).

<!-- * _Status_ specifies if automation is active or not. Use automation sidebar to toggle status of your automation. -->
Additionally, you can pause and resume automations by toggling status radio on the sidebar.

### Trigger

Automation trigger specifies the conditions after which the automation will run. It allows you to specify a target directory to watch, the _event_ which kicks off a workflow, and a _timer_.
Automation trigger specifies the conditions after which the automation will run(i.e. child got added to the target directory). It allows you to specify a target directory to watch, the [_event_](#trigger-event-types) which kicks off a workflow, and a [_timer_](#trigger-timer).

#### Trigger Event Types

Automation events consist of a type and an event itself. Type is a high level definition of event(i.e. data has been updated), and specific event specifies what triggers automation(i.e. child has been added).
> Note: currently, only child addition events are supported in automations.
> Currently, only child addition events are supported in automations.
Available events types:
*Available events:*

- _Data Update_ event type specifies when to run the automation if a data tree in Latch Data has been modified. Supported events for this type are:
- _Child Added_ event triggers if a new child has been added to the target directory at any depth. Automation will not run if the child has been modified.
- _Child Added_ event triggers if a new child has been added to the target directory at any depth. Automation will not run if the child has been modified or deleted.

#### Trigger Timer

Expand Down Expand Up @@ -63,8 +61,6 @@ def automation_workflow(input_directory: LatchDir, automation_id: str) -> None:
pass
```

#### Automation Workflow Example

See an [example](automation-usecase.md) of how we create an automation workflow which reads all children of the target directory and kicks off another workflow which runs processing on child directories.

## Creating an Automation
Expand All @@ -77,6 +73,6 @@ Next, select a folder where files/folders will be uploaded using the `Select Tar

Finally, select the [automation workflow](#automation-workflow) that you have registered with Latch.

Checkout the [example above](#automation-workflow-example) on how to create and register automation workflows.
Checkout an [example](automation-usecase.md) on how to create and register automation workflows.

![Create Automation Example](../assets/automation/create-automation-example.png)

0 comments on commit 5db3804

Please sign in to comment.