Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use default serialization settings if serialization_settings is None #2367

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flytekit/models/admin/task_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(
def phase(self):
"""
Enum value from flytekit.models.core.execution.TaskExecutionPhase
:rtype: int
:rtype: flytekit.models.core.execution.TaskExecutionPhase
"""
return self._phase

Expand Down
24 changes: 15 additions & 9 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,15 +759,16 @@ async def _serialize_and_register(
for entity, cp_entity in cp_task_entity_map.items():
tasks.append(
loop.run_in_executor(
None, functools.partial(self.raw_register, cp_entity, settings, version, og_entity=entity)
None,
functools.partial(self.raw_register, cp_entity, serialization_settings, version, og_entity=entity),
)
)
ident = []
ident.extend(await asyncio.gather(*tasks))
# serial register
cp_other_entities = OrderedDict(filter(lambda x: not isinstance(x[1], task_models.TaskSpec), m.items()))
for entity, cp_entity in cp_other_entities.items():
ident.append(self.raw_register(cp_entity, settings, version, og_entity=entity))
ident.append(self.raw_register(cp_entity, serialization_settings, version, og_entity=entity))
return ident[-1]

def register_task(
Expand All @@ -793,6 +794,8 @@ def register_task(
serialization_settings = SerializationSettings(
image_config=ImageConfig.auto_default_image(),
source_root=project_root,
project=self.default_project,
domain=self.default_domain,
)

ident = asyncio.run(
Expand Down Expand Up @@ -825,13 +828,16 @@ def register_workflow(
:param options: Additional execution options that can be configured for the default launchplan
:return:
"""
ident = self._resolve_identifier(ResourceType.WORKFLOW, entity.name, version, serialization_settings)
if serialization_settings:
b = serialization_settings.new_builder()
b.project = ident.project
b.domain = ident.domain
b.version = ident.version
serialization_settings = b.build()
if serialization_settings is None:
_, _, _, module_file = extract_task_module(entity)
project_root = _find_project_root(module_file)
serialization_settings = SerializationSettings(
image_config=ImageConfig.auto_default_image(),
source_root=project_root,
project=self.default_project,
domain=self.default_domain,
)
self._resolve_identifier(ResourceType.WORKFLOW, entity.name, version, serialization_settings)
ident = asyncio.run(
self._serialize_and_register(entity, serialization_settings, version, options, default_launch_plan)
)
Expand Down
Loading