Skip to content

Commit

Permalink
Expose configured RawOutputPrefix during execution (flyteorg#813)
Browse files Browse the repository at this point in the history
* Expose configured RawOutputPrefix during execution

Signed-off-by: Kevin Su <[email protected]>

* Remove sdk_runnable.py and spark_task.py

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored and kennyworkman committed Feb 8, 2022
1 parent f95993a commit 47865d2
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
1 change: 1 addition & 0 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def setup_execution(
),
logging=python_logging,
tmp_dir=user_workspace_dir,
raw_output_prefix=ctx.file_access._raw_output_prefix,
)

# TODO: Remove this check for flytekit 1.0
Expand Down
14 changes: 12 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class Builder(object):
execution_id: str
attrs: typing.Dict[str, typing.Any]
working_dir: typing.Union[os.PathLike, utils.AutoDeletingTempDir]
raw_output_prefix: str

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand All @@ -167,6 +168,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.execution_id = current.execution_id if current else None
self.logging = current.logging if current else None
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
self.attrs[key] = v
Expand All @@ -181,6 +183,7 @@ def build(self) -> ExecutionParameters:
tmp_dir=self.working_dir,
execution_id=self.execution_id,
logging=self.logging,
raw_output_prefix=self.raw_output_prefix,
**self.attrs,
)

Expand All @@ -191,7 +194,7 @@ def new_builder(current: ExecutionParameters = None) -> Builder:
def builder(self) -> Builder:
return ExecutionParameters.Builder(current=self)

def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, **kwargs):
def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, raw_output_prefix, **kwargs):
"""
Args:
execution_date: Date when the execution is running
Expand All @@ -205,6 +208,7 @@ def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, **kwar
self._working_directory = tmp_dir
self._execution_id = execution_id
self._logging = logging
self._raw_output_prefix = raw_output_prefix
# AutoDeletingTempDir's should be used with a with block, which creates upon entry
self._attrs = kwargs
# It is safe to recreate the Secrets Manager
Expand All @@ -226,6 +230,10 @@ def logging(self) -> _logging:
"""
return self._logging

@property
def raw_output_prefix(self) -> str:
return self._raw_output_prefix

@property
def working_directory(self) -> utils.AutoDeletingTempDir:
"""
Expand Down Expand Up @@ -895,14 +903,16 @@ def initialize():

# Note we use the SdkWorkflowExecution object purely for formatting into the ex:project:domain:name format users
# are already acquainted with
default_context = FlyteContext(file_access=default_local_file_access_provider)
default_user_space_params = ExecutionParameters(
execution_id=str(WorkflowExecutionIdentifier.promote_from_model(default_execution_id)),
execution_date=_datetime.datetime.utcnow(),
stats=mock_stats.MockStats(),
logging=_logging,
tmp_dir=user_space_path,
raw_output_prefix=default_context.file_access._raw_output_prefix,
)
default_context = FlyteContext(file_access=default_local_file_access_provider)

default_context = default_context.with_execution_state(
default_context.new_execution_state().with_params(user_space_params=default_user_space_params)
).build()
Expand Down
1 change: 1 addition & 0 deletions tests/flytekit/unit/core/test_type_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def test_default_wf_params_works():
def my_task(a: int):
wf_params = flytekit.current_context()
assert wf_params.execution_id == "ex:local:local:local"
assert "/tmp/flyte/" in wf_params.raw_output_prefix

my_task(a=3)
assert context_manager.FlyteContextManager.size() == 1
Expand Down

0 comments on commit 47865d2

Please sign in to comment.