Skip to content

Commit

Permalink
Use default serialization settings if serialization_settings is None (#…
Browse files Browse the repository at this point in the history
…2367)

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Jan Fiedler <[email protected]>
  • Loading branch information
pingsutw authored and fiedlerNr9 committed Jul 25, 2024
1 parent 1b2d5c3 commit 4c98139
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
2 changes: 1 addition & 1 deletion flytekit/models/admin/task_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(
def phase(self):
"""
Enum value from flytekit.models.core.execution.TaskExecutionPhase
:rtype: int
:rtype: flytekit.models.core.execution.TaskExecutionPhase
"""
return self._phase

Expand Down
24 changes: 15 additions & 9 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,15 +759,16 @@ async def _serialize_and_register(
for entity, cp_entity in cp_task_entity_map.items():
tasks.append(
loop.run_in_executor(
None, functools.partial(self.raw_register, cp_entity, settings, version, og_entity=entity)
None,
functools.partial(self.raw_register, cp_entity, serialization_settings, version, og_entity=entity),
)
)
ident = []
ident.extend(await asyncio.gather(*tasks))
# serial register
cp_other_entities = OrderedDict(filter(lambda x: not isinstance(x[1], task_models.TaskSpec), m.items()))
for entity, cp_entity in cp_other_entities.items():
ident.append(self.raw_register(cp_entity, settings, version, og_entity=entity))
ident.append(self.raw_register(cp_entity, serialization_settings, version, og_entity=entity))
return ident[-1]

def register_task(
Expand All @@ -793,6 +794,8 @@ def register_task(
serialization_settings = SerializationSettings(
image_config=ImageConfig.auto_default_image(),
source_root=project_root,
project=self.default_project,
domain=self.default_domain,
)

ident = asyncio.run(
Expand Down Expand Up @@ -825,13 +828,16 @@ def register_workflow(
:param options: Additional execution options that can be configured for the default launchplan
:return:
"""
ident = self._resolve_identifier(ResourceType.WORKFLOW, entity.name, version, serialization_settings)
if serialization_settings:
b = serialization_settings.new_builder()
b.project = ident.project
b.domain = ident.domain
b.version = ident.version
serialization_settings = b.build()
if serialization_settings is None:
_, _, _, module_file = extract_task_module(entity)
project_root = _find_project_root(module_file)
serialization_settings = SerializationSettings(
image_config=ImageConfig.auto_default_image(),
source_root=project_root,
project=self.default_project,
domain=self.default_domain,
)
self._resolve_identifier(ResourceType.WORKFLOW, entity.name, version, serialization_settings)
ident = asyncio.run(
self._serialize_and_register(entity, serialization_settings, version, options, default_launch_plan)
)
Expand Down

0 comments on commit 4c98139

Please sign in to comment.