Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PythonJob supports list of AiiDA node as output #190

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion aiida_workgraph/calculations/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,15 @@ def prepare_for_submission(self, folder: Folder) -> CalcInfo:
# TODO: should check this recursively
elif isinstance(value, (AttributeDict, dict)):
# if the value is an AttributeDict, use recursively
input_values[key] = {k: v.value for k, v in value.items()}
if len(value.keys()) > 0 and list(value.keys())[0].startswith(
"list_data_"
):
ndata = len(value.keys())
input_values[key] = [
value[f"list_data_{i}"].value for i in range(ndata)
]
else:
input_values[key] = {k: v.value for k, v in value.items()}
else:
raise ValueError(
f"Input data {value} is not supported. Only AiiDA data Node with a value attribute is allowed. "
Expand Down
6 changes: 6 additions & 0 deletions aiida_workgraph/calculations/python_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ def serialize_output(self, result, output):
else:
serialized_result[key] = general_serializer(value)
return serialized_result
elif isinstance(result, list):
serialized_result = {}
for i, value in enumerate(result):
key = f"list_data_{i}"
serialized_result[key] = general_serializer(value)
return serialized_result
else:
self.exit_codes.ERROR_INVALID_OUTPUT
else:
Expand Down
5 changes: 5 additions & 0 deletions aiida_workgraph/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ def build_pythonjob_task(func: Callable) -> Task:
tdata = {"executor": PythonJob, "task_type": "CALCJOB"}
_, tdata_py = build_task_from_AiiDA(tdata)
tdata = deepcopy(func.tdata)
# if the function has var_kwargs, we need to change the input type to Namespace
if tdata["var_kwargs"]:
for input in tdata["inputs"]:
if input["name"] == tdata["var_kwargs"]:
input["identifier"] = "workgraph.namespace"
# merge the inputs and outputs from the PythonJob task to the function task
# skip the already existed inputs and outputs
inputs = tdata["inputs"]
Expand Down
3 changes: 0 additions & 3 deletions aiida_workgraph/engine/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from aiida_workgraph.orm.serializer import serialize_to_aiida_nodes
from aiida import orm
from aiida.common.extendeddicts import AttributeDict

Expand Down Expand Up @@ -91,8 +90,6 @@ def prepare_for_python_task(task: dict, kwargs: dict, var_kwargs: dict) -> dict:
)
# outputs
output_info = task["outputs"]
# serialize the kwargs into AiiDA Data
function_kwargs = serialize_to_aiida_nodes(function_kwargs)
# transfer the args to kwargs
inputs = {
"process_label": f"PythonJob<{task['name']}>",
Expand Down
30 changes: 19 additions & 11 deletions aiida_workgraph/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,20 +340,28 @@ def serialize_pythonjob_properties(wgdata):
if not task["metadata"]["node_type"].upper() == "PYTHONJOB":
continue
# get the names kwargs for the PythonJob, which are the inputs before _wait
input_kwargs = []
for input in task["inputs"]:
if input["name"] == "_wait":
break
input_kwargs.append(input["name"])
for name in input_kwargs:
prop = task["properties"][name]
# if value is not None, not {}
if not (
prop["value"] is None
or isinstance(prop["value"], dict)
and prop["value"] == {}
):
prop["value"] = general_serializer(prop["value"])
prop = task["properties"][input["name"]]
if input["identifier"] == "workgraph.namespace":
if isinstance(prop["value"], list):
prop["value"] = {
f"list_data_{i}": general_serializer(v)
for i, v in enumerate(prop["value"])
}
elif isinstance(prop["value"], dict):
prop["value"] = {
k: general_serializer(v) for k, v in prop["value"].items()
}
else:
# if value is not None, not {}
if not (
prop["value"] is None
or isinstance(prop["value"], dict)
and prop["value"] == {}
):
prop["value"] = general_serializer(prop["value"])


def generate_bash_to_create_python_env(
Expand Down
2 changes: 2 additions & 0 deletions aiida_workgraph/web/backend/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def get_executor_source(tdata: Any) -> Tuple[bool, Optional[str]]:
source_code = "".join(source_lines)
return source_code
except (TypeError, OSError):
# In case of load function defined inside the Jupyter-notebook,
# OSError('source code not available')
source_code = tdata["executor"].get("function_source_code", "")
return source_code
else:
Expand Down
35 changes: 26 additions & 9 deletions docs/source/built-in/pythonjob.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1505,19 +1505,21 @@
"- **Querying**: The data in the namespace output is stored as an AiiDA data node, allowing for easy querying and retrieval.\n",
"- **Data Provenance**: When the data is used as input for subsequent tasks, the origin of data is tracked.\n",
"\n",
"### Example Use Case\n",
"\n",
"Consider a molecule adsorption calculation where the namespace output stores the surface slabs of the molecule adsorbed on different surface sites. The number of surface slabs can vary depending on the surface. These output surface slabs can be utilized as input to the next task to calculate the energy.\n",
"\n",
"### Defining Namespace Outputs\n",
"\n",
"To declare a namespace output, set the `identifier` to `workgraph.namespace` in the `outputs` parameter of the `@task` decorator. For example:\n",
"To declare a namespace output, set the `identifier` to `workgraph.namespace` in the `outputs` parameter of the `@task` decorator. Take the equation of state (EOS) calculation as an example. The namespace output stores the scaled structures, which can vary depending on the scale list.\n",
"\n",
"\n",
"```python\n",
"@task(outputs=[{\"name\": \"structures\", \"identifier\": \"workgraph.namespace\"}])\n",
"def generate_surface_slabs():\n",
" # Function logic to generate surface slabs\n",
" return {\"slab1\": slab_data1, \"slab2\": slab_data2}\n",
"@task.pythonjob(outputs=[{\"name\": \"structures\", \"identifier\": \"workgraph.namespace\"}])\n",
"def scaled_structures(structure: Atoms, scales: list) -> list[Atoms]:\n",
" structures = {}\n",
" for i in range(len(scales)):\n",
" scaled_structure = structure.copy()\n",
" scaled_structure.set_cell(scaled_structure.cell * scales[i], scale_atoms=True)\n",
" structures[f\"scaled_{i}\"] = scaled_structure\n",
" return structures\n",
"```\n",
"\n",
"\n",
Expand Down Expand Up @@ -1570,7 +1572,22 @@
" x=wg.tasks[\"myfunc\"].outputs[\"add_multiply.add\"],\n",
" )\n",
"```\n",
"### List as Namespace Output and Input (Experimental)\n",
"\n",
"`PythonJob` also supports using a list of AiiDA data nodes as the output and input. Internally, the list output and input will be transferred to a dictionary with a special key (starting with `list_data_{index}`, where the index starts from 0). Note that this will be handled internally by the `PythonJob`, so the user will not be aware of this.\n",
"\n",
"In the following example, we define a task that returns a list of `Atoms` objects:\n",
"\n",
"```python\n",
"@task.pythonjob(outputs=[{\"name\": \"structures\", \"identifier\": \"workgraph.namespace\"}])\n",
"def scaled_structures(structure: Atoms, scales: list) -> list[Atoms]:\n",
" structures = []\n",
" for scale in scales:\n",
" scaled_structure = structure.copy()\n",
" scaled_structure.set_cell(scaled_structure.cell * scale, scale_atoms=True)\n",
" structures.append(scaled_structure)\n",
" return structures\n",
"```\n",
"\n",
"## Second Real-world Workflow: Equation of state (EOS) WorkGraph\n",
"\n"
Expand Down Expand Up @@ -2418,7 +2435,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.0"
"version": "3.10.0"
},
"vscode": {
"interpreter": {
Expand Down
36 changes: 36 additions & 0 deletions tests/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ def myfunc3(x, y):
assert wg.tasks["myfunc3"].outputs["result"].value.value == 7


def test_PythonJob_namespace_list(fixture_localhost):
"""Test function with namespace output and input."""

# output namespace list
@task.pythonjob(
outputs=[
{
"name": "result",
"identifier": "workgraph.namespace",
},
]
)
def myfunc(x, y):
return [x + i for i in range(y)]

# task use list as input
@task.pythonjob()
def myfunc2(x):
return sum(x)

#
wg = WorkGraph("test_namespace_outputs")
wg.add_task(myfunc, name="myfunc")
wg.add_task(myfunc2, name="myfunc2", x=wg.tasks["myfunc"].outputs["result"])
wg.run(
inputs={
"myfunc": {
"x": 1,
"y": 4,
"computer": "localhost",
}
},
)
assert wg.tasks["myfunc2"].outputs["result"].value.value == 10


def test_PythonJob_parent_folder(fixture_localhost):
"""Test function with parent folder."""

Expand Down
Loading