Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new Python example that generates large output files #483

Merged
merged 7 commits into from
Nov 21, 2024
49 changes: 49 additions & 0 deletions examples/python_large_output/evaluate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (C) 2022 - 2024 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import logging
import os
import sys
import time

log = logging.getLogger(__name__)


def main():
start_time = time.time()

file_name = "output.bin"
size = 1

log.info(f"Generating file {file_name} with size {size} GB")
GB1 = 1024 * 1024 * 1024 # 1GB
with open(file_name, "wb") as fout:
for i in range(size):
fout.write(os.urandom(GB1))
log.info(f"File {file_name} has been generated after {(time.time() - start_time):.2f} seconds")
return 0


if __name__ == "__main__":
logger = logging.getLogger()
logging.basicConfig(format="%(message)s", level=logging.DEBUG)
sys.exit(main())
70 changes: 70 additions & 0 deletions examples/python_large_output/exec_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (C) 2022 - 2024 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""
Simplistic execution script for Python.

Command formed: python <script_file> <input_file (optional)>
"""
import os

from ansys.rep.common.logging import log
from ansys.rep.evaluator.task_manager import ApplicationExecution


class PythonExecution(ApplicationExecution):
def execute(self):

log.info("Start Python execution script")

# Identify files
script_file = next((f for f in self.context.input_files if f["name"] == "script"), None)
assert script_file, "Python script file script missing"
inp_file = next((f for f in self.context.input_files if f["name"] == "inp"), None)
# assert inp_file, "Input file inp missing"

# Identify application
app_name = "Python"
app = next((a for a in self.context.software if a["name"] == app_name), None)
assert app, f"Cannot find app {app_name}"

# Add " around exe if needed for Windows
exe = app["executable"]
if " " in exe and not exe.startswith('"'):
exe = '"%s"' % exe

# Use properties from resource requirements
# None currently

# Pass env vars correctly
env = dict(os.environ)
env.update(self.context.environment)

# Form command
cmd = f"{exe} {script_file['path']}"
if inp_file:
cmd += f" {inp_file['path']}"

# Execute
self.run_and_capture_output(cmd, shell=True, env=env)

log.info("End Python execution script")
187 changes: 187 additions & 0 deletions examples/python_large_output/project_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright (C) 2022 - 2024 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""
Python job that can create large output files
"""

import argparse
import logging
import os

from ansys.hps.client import Client, HPSError
from ansys.hps.client.jms import (
File,
IntParameterDefinition,
JmsApi,
Job,
JobDefinition,
ParameterMapping,
Project,
ProjectApi,
ResourceRequirements,
Software,
TaskDefinition,
)

log = logging.getLogger(__name__)


def main(client, use_exec_script, python_version=None) -> Project:
"""
Create project that runs a Python script to generate a large output file.
"""
log.debug("=== Project")
proj = Project(name="Python Large Output Files", priority=1, active=True)
jms_api = JmsApi(client)
proj = jms_api.create_project(proj, replace=True)
project_api = ProjectApi(client, proj.id)

log.debug("=== Files")
cwd = os.path.dirname(__file__)

files = [
File(
name="script",
evaluation_path="evaluate.py",
type="text/plain",
src=os.path.join(cwd, "evaluate.py"),
),
File(
name="output",
evaluation_path="output.bin",
type="application/octet-stream",
monitor=False,
collect=True,
),
]

if use_exec_script:
# Define and upload an exemplary exec script to run Python
files.append(
File(
name="exec_python",
evaluation_path="exec_python.py",
type="application/x-python-code",
src=os.path.join(cwd, "exec_python.py"),
)
)

files = project_api.create_files(files)
file_ids = {f.name: f.id for f in files}

log.debug("=== Job Definition with simulation workflow and parameters")
job_def = JobDefinition(name="JobDefinition.1", active=True)

# Input params
input_params = [
IntParameterDefinition(name="size", lower_limit=1, upper_limit=1000, default=1),
]
input_params = project_api.create_parameter_definitions(input_params)

mappings = [
ParameterMapping(
key_string="size",
tokenizer="=",
parameter_definition_id=input_params[0].id,
file_id=file_ids["script"],
),
]

output_params = []
# output_params = project_api.create_parameter_definitions(output_params)
# mappings.extend([])

mappings = project_api.create_parameter_mappings(mappings)

job_def.parameter_definition_ids = [o.id for o in input_params + output_params]
job_def.parameter_mapping_ids = [o.id for o in mappings]

task_def = TaskDefinition(
name="Python",
software_requirements=[Software(name="Python", version=python_version)],
execution_command="%executable% %file:script%",
resource_requirements=ResourceRequirements(num_cores=0.5),
execution_level=0,
input_file_ids=[file_ids["script"]],
output_file_ids=[file_ids["output"]],
)

if use_exec_script:
task_def.use_execution_script = True
task_def.execution_script_id = file_ids["exec_python"]

task_def = project_api.create_task_definitions([task_def])[0]
job_def.task_definition_ids = [task_def.id]

# Create job_definition in project
job_def = project_api.create_job_definitions([job_def])[0]

params = project_api.get_parameter_definitions(job_def.parameter_definition_ids)

log.debug("=== Jobs")
jobs = []

for size in [1, 5]:
jobs.append(
Job(
name=f"Job {size} GB",
values={"size": size},
eval_status="pending",
job_definition_id=job_def.id,
)
)
for size in [10, 25, 50, 100, 250]:
jobs.append(
Job(
name=f"Job {size} GB",
values={"size": size},
eval_status="inactive",
job_definition_id=job_def.id,
)
)

jobs = project_api.create_jobs(jobs)

log.info(f"Created project '{proj.name}', ID='{proj.id}'")

return proj


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-U", "--url", default="https://127.0.0.1:8443/hps")
parser.add_argument("-u", "--username", default="repuser")
parser.add_argument("-p", "--password", default="repuser")
parser.add_argument("-es", "--use-exec-script", default=False, action="store_true")
parser.add_argument("-v", "--python-version", default="3.10")
args = parser.parse_args()

logger = logging.getLogger()
logging.basicConfig(format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG)

client = Client(url=args.url, username=args.username, password=args.password)

try:
main(client, use_exec_script=args.use_exec_script, python_version=args.python_version)
except HPSError as e:
log.error(str(e))
6 changes: 4 additions & 2 deletions examples/python_linked_multi_process_step/project_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
log = logging.getLogger(__name__)


def main(client, num_task_definitions, num_jobs, start, inactive):
def main(client, num_task_definitions, num_jobs, start, inactive, python_version=None):
"""Create project with multiple dependent Python tasks and linked files in between."""
log.debug("=== Project")
proj = Project(
Expand Down Expand Up @@ -152,7 +152,7 @@ def main(client, num_task_definitions, num_jobs, start, inactive):
software_requirements=[
Software(
name="Python",
version="3.10",
version=python_version,
)
],
execution_command=cmd,
Expand Down Expand Up @@ -211,6 +211,7 @@ def main(client, num_task_definitions, num_jobs, start, inactive):
parser.add_argument(
"-i", "--inactive", action="store_true", default=False, help="Set project to inactive"
)
parser.add_argument("-v", "--python-version", default="3.10")

args = parser.parse_args()

Expand All @@ -223,6 +224,7 @@ def main(client, num_task_definitions, num_jobs, start, inactive):
num_task_definitions=args.num_task_definitions,
start=args.start,
inactive=args.inactive,
python_version=args.python_version,
)
except HPSError as e:
log.error(str(e))
5 changes: 4 additions & 1 deletion examples/python_multi_process_step/project_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def main(
change_job_tasks,
inactive,
sequential,
python_version=None,
) -> Project:
"""Python project implementing multiple steps and optional image generation."""
log.debug("=== Project")
Expand Down Expand Up @@ -227,7 +228,7 @@ def main(
TaskDefinition(
name=f"td{i}_py_eval",
software_requirements=[
Software(name="Python", version="3.10"),
Software(name="Python", version=python_version),
],
execution_command=cmd,
max_execution_time=duration * 1.5,
Expand Down Expand Up @@ -317,6 +318,7 @@ def main(
default=False,
help="Whether to evaluate all tasks of same exec level per job sequentially or in parallel",
)
parser.add_argument("-v", "--python-version", default="3.10")

args = parser.parse_args()

Expand All @@ -334,6 +336,7 @@ def main(
change_job_tasks=args.change_job_tasks,
inactive=args.inactive,
sequential=args.sequential,
python_version=args.python_version,
)
except HPSError as e:
log.error(str(e))
Loading
Loading