Skip to content

Commit

Permalink
Add new Python example that generates large output files (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
ojkoenig authored Nov 21, 2024
1 parent 21f0394 commit ec6272b
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 13 deletions.
49 changes: 49 additions & 0 deletions examples/python_large_output/evaluate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (C) 2022 - 2024 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import logging
import os
import sys
import time

log = logging.getLogger(__name__)


def main():
start_time = time.time()

file_name = "output.bin"
size = 1

log.info(f"Generating file {file_name} with size {size} GB")
GB1 = 1024 * 1024 * 1024 # 1GB
with open(file_name, "wb") as fout:
for i in range(size):
fout.write(os.urandom(GB1))
log.info(f"File {file_name} has been generated after {(time.time() - start_time):.2f} seconds")
return 0


if __name__ == "__main__":
logger = logging.getLogger()
logging.basicConfig(format="%(message)s", level=logging.DEBUG)
sys.exit(main())
70 changes: 70 additions & 0 deletions examples/python_large_output/exec_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (C) 2022 - 2024 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""
Simplistic execution script for Python.
Command formed: python <script_file> <input_file (optional)>
"""
import os

from ansys.rep.common.logging import log
from ansys.rep.evaluator.task_manager import ApplicationExecution


class PythonExecution(ApplicationExecution):
def execute(self):

log.info("Start Python execution script")

# Identify files
script_file = next((f for f in self.context.input_files if f["name"] == "script"), None)
assert script_file, "Python script file script missing"
inp_file = next((f for f in self.context.input_files if f["name"] == "inp"), None)
# assert inp_file, "Input file inp missing"

# Identify application
app_name = "Python"
app = next((a for a in self.context.software if a["name"] == app_name), None)
assert app, f"Cannot find app {app_name}"

# Add " around exe if needed for Windows
exe = app["executable"]
if " " in exe and not exe.startswith('"'):
exe = '"%s"' % exe

# Use properties from resource requirements
# None currently

# Pass env vars correctly
env = dict(os.environ)
env.update(self.context.environment)

# Form command
cmd = f"{exe} {script_file['path']}"
if inp_file:
cmd += f" {inp_file['path']}"

# Execute
self.run_and_capture_output(cmd, shell=True, env=env)

log.info("End Python execution script")
187 changes: 187 additions & 0 deletions examples/python_large_output/project_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright (C) 2022 - 2024 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""
Python job that can create large output files
"""

import argparse
import logging
import os

from ansys.hps.client import Client, HPSError
from ansys.hps.client.jms import (
File,
IntParameterDefinition,
JmsApi,
Job,
JobDefinition,
ParameterMapping,
Project,
ProjectApi,
ResourceRequirements,
Software,
TaskDefinition,
)

log = logging.getLogger(__name__)


def main(client, use_exec_script, python_version=None) -> Project:
"""
Create project that runs a Python script to generate a large output file.
"""
log.debug("=== Project")
proj = Project(name="Python Large Output Files", priority=1, active=True)
jms_api = JmsApi(client)
proj = jms_api.create_project(proj, replace=True)
project_api = ProjectApi(client, proj.id)

log.debug("=== Files")
cwd = os.path.dirname(__file__)

files = [
File(
name="script",
evaluation_path="evaluate.py",
type="text/plain",
src=os.path.join(cwd, "evaluate.py"),
),
File(
name="output",
evaluation_path="output.bin",
type="application/octet-stream",
monitor=False,
collect=True,
),
]

if use_exec_script:
# Define and upload an exemplary exec script to run Python
files.append(
File(
name="exec_python",
evaluation_path="exec_python.py",
type="application/x-python-code",
src=os.path.join(cwd, "exec_python.py"),
)
)

files = project_api.create_files(files)
file_ids = {f.name: f.id for f in files}

log.debug("=== Job Definition with simulation workflow and parameters")
job_def = JobDefinition(name="JobDefinition.1", active=True)

# Input params
input_params = [
IntParameterDefinition(name="size", lower_limit=1, upper_limit=1000, default=1),
]
input_params = project_api.create_parameter_definitions(input_params)

mappings = [
ParameterMapping(
key_string="size",
tokenizer="=",
parameter_definition_id=input_params[0].id,
file_id=file_ids["script"],
),
]

output_params = []
# output_params = project_api.create_parameter_definitions(output_params)
# mappings.extend([])

mappings = project_api.create_parameter_mappings(mappings)

job_def.parameter_definition_ids = [o.id for o in input_params + output_params]
job_def.parameter_mapping_ids = [o.id for o in mappings]

task_def = TaskDefinition(
name="Python",
software_requirements=[Software(name="Python", version=python_version)],
execution_command="%executable% %file:script%",
resource_requirements=ResourceRequirements(num_cores=0.5),
execution_level=0,
input_file_ids=[file_ids["script"]],
output_file_ids=[file_ids["output"]],
)

if use_exec_script:
task_def.use_execution_script = True
task_def.execution_script_id = file_ids["exec_python"]

task_def = project_api.create_task_definitions([task_def])[0]
job_def.task_definition_ids = [task_def.id]

# Create job_definition in project
job_def = project_api.create_job_definitions([job_def])[0]

params = project_api.get_parameter_definitions(job_def.parameter_definition_ids)

log.debug("=== Jobs")
jobs = []

for size in [1, 5]:
jobs.append(
Job(
name=f"Job {size} GB",
values={"size": size},
eval_status="pending",
job_definition_id=job_def.id,
)
)
for size in [10, 25, 50, 100, 250]:
jobs.append(
Job(
name=f"Job {size} GB",
values={"size": size},
eval_status="inactive",
job_definition_id=job_def.id,
)
)

jobs = project_api.create_jobs(jobs)

log.info(f"Created project '{proj.name}', ID='{proj.id}'")

return proj


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-U", "--url", default="https://127.0.0.1:8443/hps")
parser.add_argument("-u", "--username", default="repuser")
parser.add_argument("-p", "--password", default="repuser")
parser.add_argument("-es", "--use-exec-script", default=False, action="store_true")
parser.add_argument("-v", "--python-version", default="3.10")
args = parser.parse_args()

logger = logging.getLogger()
logging.basicConfig(format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG)

client = Client(url=args.url, username=args.username, password=args.password)

try:
main(client, use_exec_script=args.use_exec_script, python_version=args.python_version)
except HPSError as e:
log.error(str(e))
6 changes: 4 additions & 2 deletions examples/python_linked_multi_process_step/project_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
log = logging.getLogger(__name__)


def main(client, num_task_definitions, num_jobs, start, inactive):
def main(client, num_task_definitions, num_jobs, start, inactive, python_version=None):
"""Create project with multiple dependent Python tasks and linked files in between."""
log.debug("=== Project")
proj = Project(
Expand Down Expand Up @@ -152,7 +152,7 @@ def main(client, num_task_definitions, num_jobs, start, inactive):
software_requirements=[
Software(
name="Python",
version="3.10",
version=python_version,
)
],
execution_command=cmd,
Expand Down Expand Up @@ -211,6 +211,7 @@ def main(client, num_task_definitions, num_jobs, start, inactive):
parser.add_argument(
"-i", "--inactive", action="store_true", default=False, help="Set project to inactive"
)
parser.add_argument("-v", "--python-version", default="3.10")

args = parser.parse_args()

Expand All @@ -223,6 +224,7 @@ def main(client, num_task_definitions, num_jobs, start, inactive):
num_task_definitions=args.num_task_definitions,
start=args.start,
inactive=args.inactive,
python_version=args.python_version,
)
except HPSError as e:
log.error(str(e))
5 changes: 4 additions & 1 deletion examples/python_multi_process_step/project_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def main(
change_job_tasks,
inactive,
sequential,
python_version=None,
) -> Project:
"""Python project implementing multiple steps and optional image generation."""
log.debug("=== Project")
Expand Down Expand Up @@ -227,7 +228,7 @@ def main(
TaskDefinition(
name=f"td{i}_py_eval",
software_requirements=[
Software(name="Python", version="3.10"),
Software(name="Python", version=python_version),
],
execution_command=cmd,
max_execution_time=duration * 1.5,
Expand Down Expand Up @@ -317,6 +318,7 @@ def main(
default=False,
help="Whether to evaluate all tasks of same exec level per job sequentially or in parallel",
)
parser.add_argument("-v", "--python-version", default="3.10")

args = parser.parse_args()

Expand All @@ -334,6 +336,7 @@ def main(
change_job_tasks=args.change_job_tasks,
inactive=args.inactive,
sequential=args.sequential,
python_version=args.python_version,
)
except HPSError as e:
log.error(str(e))
Loading

0 comments on commit ec6272b

Please sign in to comment.