Skip to content

Commit

Permalink
Merge branch 'master' into auth-update
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored and Ketan Umare committed Feb 13, 2023
2 parents f3df707 + 8e5bf6c commit 3251e8b
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 61 deletions.
2 changes: 2 additions & 0 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def setup_execution(
if compressed_serialization_settings:
ss = SerializationSettings.from_transport(compressed_serialization_settings)
ssb = ss.new_builder()
ssb.project = exe_project
ssb.domain = exe_domain
ssb.version = tk_version
if dynamic_addl_distro:
ssb.fast_serialization_settings = FastSerializationSettings(
Expand Down
8 changes: 4 additions & 4 deletions flytekit/configuration/default_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ class DefaultImages(object):
"""

_DEFAULT_IMAGE_PREFIXES = {
PythonVersion.PYTHON_3_7: "ghcr.io/flyteorg/flytekit:py3.7-",
PythonVersion.PYTHON_3_8: "ghcr.io/flyteorg/flytekit:py3.8-",
PythonVersion.PYTHON_3_9: "ghcr.io/flyteorg/flytekit:py3.9-",
PythonVersion.PYTHON_3_10: "ghcr.io/flyteorg/flytekit:py3.10-",
PythonVersion.PYTHON_3_7: "cr.flyte.org/flyteorg/flytekit:py3.7-",
PythonVersion.PYTHON_3_8: "cr.flyte.org/flyteorg/flytekit:py3.8-",
PythonVersion.PYTHON_3_9: "cr.flyte.org/flyteorg/flytekit:py3.9-",
PythonVersion.PYTHON_3_10: "cr.flyte.org/flyteorg/flytekit:py3.10-",
}

@classmethod
Expand Down
23 changes: 18 additions & 5 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
# The cache returns None iff the key does not exist in the cache
if outputs_literal_map is None:
logger.info("Cache miss, task will be executed now")
outputs_literal_map = self.dispatch_execute(ctx, input_literal_map)
outputs_literal_map = self.sandbox_execute(ctx, input_literal_map)
# TODO: need `native_inputs`
LocalTaskCache.set(self.name, self.metadata.cache_version, input_literal_map, outputs_literal_map)
logger.info(
Expand All @@ -268,10 +268,10 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
else:
logger.info("Cache hit")
else:
es = ctx.execution_state
b = es.user_space_params.with_task_sandbox()
ctx = ctx.current_context().with_execution_state(es.with_params(user_space_params=b.build())).build()
outputs_literal_map = self.dispatch_execute(ctx, input_literal_map)
# This code should mirror the call to `sandbox_execute` in the above cache case.
# Code is simpler with duplication and less metaprogramming, but introduces regressions
# if one is changed and not the other.
outputs_literal_map = self.sandbox_execute(ctx, input_literal_map)
outputs_literals = outputs_literal_map.literals

# TODO maybe this is the part that should be done for local execution, we pass the outputs to some special
Expand Down Expand Up @@ -326,6 +326,19 @@ def get_config(self, settings: SerializationSettings) -> Optional[Dict[str, str]
"""
return None

def sandbox_execute(
self,
ctx: FlyteContext,
input_literal_map: _literal_models.LiteralMap,
) -> _literal_models.LiteralMap:
"""
Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
"""
es = ctx.execution_state
b = es.user_space_params.with_task_sandbox()
ctx = ctx.current_context().with_execution_state(es.with_params(user_space_params=b.build())).build()
return self.dispatch_execute(ctx, input_literal_map)

@abstractmethod
def dispatch_execute(
self,
Expand Down
22 changes: 10 additions & 12 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def extract_value(
val_type: type,
flyte_literal_type: _type_models.LiteralType,
) -> _literal_models.Literal:

if isinstance(input_val, list):
lt = flyte_literal_type
python_type = val_type
Expand Down Expand Up @@ -142,17 +141,16 @@ def extract_value(


def get_primitive_val(prim: Primitive) -> Any:
if prim.integer:
return prim.integer
if prim.datetime:
return prim.datetime
if prim.boolean:
return prim.boolean
if prim.duration:
return prim.duration
if prim.string_value:
return prim.string_value
return prim.float_value
for value in [
prim.integer,
prim.float_value,
prim.string_value,
prim.boolean,
prim.datetime,
prim.duration,
]:
if value is not None:
return value


class ConjunctionOps(Enum):
Expand Down
2 changes: 1 addition & 1 deletion flytekit/types/schema/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class FlyteSchema(object):
"""
This is the main schema class that users should use.
"""
logger.warning("FlyteSchema is deprecated, use Structured Dataset instead.")

@classmethod
def columns(cls) -> typing.Dict[str, typing.Type]:
Expand Down Expand Up @@ -233,7 +234,6 @@ def __init__(
supported_mode: SchemaOpenMode = SchemaOpenMode.WRITE,
downloader: typing.Callable[[str, os.PathLike], None] = None,
):

if supported_mode == SchemaOpenMode.READ and remote_path is None:
raise ValueError("To create a FlyteSchema in read mode, remote_path is required")
if (
Expand Down
13 changes: 1 addition & 12 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
import sys

from setuptools import find_packages, setup # noqa

MIN_PYTHON_VERSION = (3, 7)
CURRENT_PYTHON = sys.version_info[:2]
if CURRENT_PYTHON < MIN_PYTHON_VERSION:
print(
f"Flytekit API is only supported for Python version is {MIN_PYTHON_VERSION}+. Detected you are on"
f" version {CURRENT_PYTHON}, installation will not proceed!"
)
sys.exit(-1)

extras_require = {}

__version__ = "0.0.0+develop"
Expand Down Expand Up @@ -92,7 +81,7 @@
"flytekit/bin/entrypoint.py",
],
license="apache2",
python_requires=">=3.7",
python_requires=">=3.7,<3.11",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
Expand Down
17 changes: 0 additions & 17 deletions tests/flytekit/unit/bin/test_python_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from flyteidl.core.errors_pb2 import ErrorDocument

from flytekit.bin.entrypoint import _dispatch_execute, normalize_inputs, setup_execution
from flytekit.configuration import Image, ImageConfig, SerializationSettings
from flytekit.core import context_manager
from flytekit.core.base_task import IgnoreOutputs
from flytekit.core.data_persistence import DiskPersistence
Expand Down Expand Up @@ -324,22 +323,6 @@ def test_setup_cloud_prefix():
assert isinstance(ctx.file_access._default_remote, GCSPersistence)


def test_persist_ss():
default_img = Image(name="default", fqn="test", tag="tag")
ss = SerializationSettings(
project="proj1",
domain="dom",
version="version123",
env=None,
image_config=ImageConfig(default_image=default_img, images=[default_img]),
)
ss_txt = ss.serialized_context
os.environ["_F_SS_C"] = ss_txt
with setup_execution("s3://", checkpoint_path=None, prev_checkpoint=None) as ctx:
assert ctx.serialization_settings.project == "proj1"
assert ctx.serialization_settings.domain == "dom"


def test_normalize_inputs():
assert normalize_inputs("{{.rawOutputDataPrefix}}", "{{.checkpointOutputPrefix}}", "{{.prevCheckpointPrefix}}") == (
None,
Expand Down
20 changes: 10 additions & 10 deletions tests/flytekit/unit/configuration/test_image_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
@pytest.mark.parametrize(
"python_version_enum, expected_image_string",
[
(PythonVersion.PYTHON_3_7, "ghcr.io/flyteorg/flytekit:py3.7-latest"),
(PythonVersion.PYTHON_3_8, "ghcr.io/flyteorg/flytekit:py3.8-latest"),
(PythonVersion.PYTHON_3_9, "ghcr.io/flyteorg/flytekit:py3.9-latest"),
(PythonVersion.PYTHON_3_10, "ghcr.io/flyteorg/flytekit:py3.10-latest"),
(PythonVersion.PYTHON_3_7, "cr.flyte.org/flyteorg/flytekit:py3.7-latest"),
(PythonVersion.PYTHON_3_8, "cr.flyte.org/flyteorg/flytekit:py3.8-latest"),
(PythonVersion.PYTHON_3_9, "cr.flyte.org/flyteorg/flytekit:py3.9-latest"),
(PythonVersion.PYTHON_3_10, "cr.flyte.org/flyteorg/flytekit:py3.10-latest"),
],
)
def test_defaults(python_version_enum, expected_image_string):
Expand All @@ -24,8 +24,8 @@ def test_defaults(python_version_enum, expected_image_string):
@pytest.mark.parametrize(
"python_version_enum, flytekit_version, expected_image_string",
[
(PythonVersion.PYTHON_3_7, "v0.32.0", "ghcr.io/flyteorg/flytekit:py3.7-0.32.0"),
(PythonVersion.PYTHON_3_8, "1.31.3", "ghcr.io/flyteorg/flytekit:py3.8-1.31.3"),
(PythonVersion.PYTHON_3_7, "v0.32.0", "cr.flyte.org/flyteorg/flytekit:py3.7-0.32.0"),
(PythonVersion.PYTHON_3_8, "1.31.3", "cr.flyte.org/flyteorg/flytekit:py3.8-1.31.3"),
],
)
def test_set_both(python_version_enum, flytekit_version, expected_image_string):
Expand All @@ -36,7 +36,7 @@ def test_image_config_auto():
x = ImageConfig.auto_default_image()
assert x.images[0].name == "default"
version_str = f"{sys.version_info.major}.{sys.version_info.minor}"
assert x.images[0].full == f"ghcr.io/flyteorg/flytekit:py{version_str}-latest"
assert x.images[0].full == f"cr.flyte.org/flyteorg/flytekit:py{version_str}-latest"


def test_image_from_flytectl_config():
Expand All @@ -56,7 +56,7 @@ def test_not_version(mock_sys):

def test_image_create():
with pytest.raises(ValueError):
ImageConfig.create_from("ghcr.io/im/g:latest")
ImageConfig.create_from("cr.flyte.org/im/g:latest")

ic = ImageConfig.from_images("ghcr.io/im/g:latest")
assert ic.default_image.fqn == "ghcr.io/im/g"
ic = ImageConfig.from_images("cr.flyte.org/im/g:latest")
assert ic.default_image.fqn == "cr.flyte.org/im/g"
19 changes: 19 additions & 0 deletions tests/flytekit/unit/core/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import flytekit
from flytekit.core.checkpointer import SyncCheckpoint
from flytekit.core.local_cache import LocalTaskCache


def test_sync_checkpoint_write(tmpdir):
Expand Down Expand Up @@ -123,5 +124,23 @@ def t1(n: int) -> int:
return n + 1


@flytekit.task(cache=True, cache_version="v0")
def t2(n: int) -> int:
ctx = flytekit.current_context()
cp = ctx.checkpoint
cp.write(bytes(n + 1))
return n + 1


@pytest.fixture(scope="function", autouse=True)
def setup():
LocalTaskCache.initialize()
LocalTaskCache.clear()


def test_checkpoint_task():
assert t1(n=5) == 6


def test_checkpoint_cached_task():
assert t2(n=5) == 6
16 changes: 16 additions & 0 deletions tests/flytekit/unit/core/test_conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ def multiplier_2(my_input: float) -> float:
multiplier_2(my_input=10.0)


def test_condition_else_int():
@workflow
def multiplier_3(my_input: int) -> float:
return (
conditional("fractions")
.if_((my_input >= 0) & (my_input < 1.0))
.then(double(n=my_input))
.elif_((my_input > 1.0) & (my_input < 10.0))
.then(square(n=my_input))
.else_()
.fail("The input must be between 0 and 10")
)

assert multiplier_3(my_input=0) == 0


def test_condition_sub_workflows():
@task
def sum_div_sub(a: int, b: int) -> typing.NamedTuple("Outputs", sum=int, div=int, sub=int):
Expand Down

0 comments on commit 3251e8b

Please sign in to comment.