From 47865d2d3b7aae6823e5fe461c0d5d2fc2d19b3e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 18 Jan 2022 23:04:52 +0800 Subject: [PATCH] Expose configured RawOutputPrefix during execution (#813) * Expose configured RawOutputPrefix during execution Signed-off-by: Kevin Su * Remove sdk_runnable.py and spark_task.py Signed-off-by: Kevin Su --- flytekit/bin/entrypoint.py | 1 + flytekit/core/context_manager.py | 14 ++++++++++++-- tests/flytekit/unit/core/test_type_hints.py | 1 + 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 754eab666d6..7bec83346b0 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -201,6 +201,7 @@ def setup_execution( ), logging=python_logging, tmp_dir=user_workspace_dir, + raw_output_prefix=ctx.file_access._raw_output_prefix, ) # TODO: Remove this check for flytekit 1.0 diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 1db5e11d5e4..a3ee0c2972e 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -159,6 +159,7 @@ class Builder(object): execution_id: str attrs: typing.Dict[str, typing.Any] working_dir: typing.Union[os.PathLike, utils.AutoDeletingTempDir] + raw_output_prefix: str def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.stats = current.stats if current else None @@ -167,6 +168,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.execution_id = current.execution_id if current else None self.logging = current.logging if current else None self.attrs = current._attrs if current else {} + self.raw_output_prefix = current.raw_output_prefix if current else None def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder: self.attrs[key] = v @@ -181,6 +183,7 @@ def build(self) -> ExecutionParameters: tmp_dir=self.working_dir, execution_id=self.execution_id, logging=self.logging, + raw_output_prefix=self.raw_output_prefix, **self.attrs, ) @@ -191,7 +194,7 @@ def new_builder(current: ExecutionParameters = None) -> Builder: def builder(self) -> Builder: return ExecutionParameters.Builder(current=self) - def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, **kwargs): + def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, raw_output_prefix, **kwargs): """ Args: execution_date: Date when the execution is running @@ -205,6 +208,7 @@ def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, **kwar self._working_directory = tmp_dir self._execution_id = execution_id self._logging = logging + self._raw_output_prefix = raw_output_prefix # AutoDeletingTempDir's should be used with a with block, which creates upon entry self._attrs = kwargs # It is safe to recreate the Secrets Manager @@ -226,6 +230,10 @@ def logging(self) -> _logging: """ return self._logging + @property + def raw_output_prefix(self) -> str: + return self._raw_output_prefix + @property def working_directory(self) -> utils.AutoDeletingTempDir: """ @@ -895,14 +903,16 @@ def initialize(): # Note we use the SdkWorkflowExecution object purely for formatting into the ex:project:domain:name format users # are already acquainted with + default_context = FlyteContext(file_access=default_local_file_access_provider) default_user_space_params = ExecutionParameters( execution_id=str(WorkflowExecutionIdentifier.promote_from_model(default_execution_id)), execution_date=_datetime.datetime.utcnow(), stats=mock_stats.MockStats(), logging=_logging, tmp_dir=user_space_path, + raw_output_prefix=default_context.file_access._raw_output_prefix, ) - default_context = FlyteContext(file_access=default_local_file_access_provider) + default_context = default_context.with_execution_state( default_context.new_execution_state().with_params(user_space_params=default_user_space_params) ).build() diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 4b926980227..600e90d74b5 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -53,6 +53,7 @@ def test_default_wf_params_works(): def my_task(a: int): wf_params = flytekit.current_context() assert wf_params.execution_id == "ex:local:local:local" + assert "/tmp/flyte/" in wf_params.raw_output_prefix my_task(a=3) assert context_manager.FlyteContextManager.size() == 1