Skip to content

flytekit v0.20.0 - Towards full portability and standardized flow.

Compare
Choose a tag to compare
@kumare3 kumare3 released this 02 Jul 05:02
· 1654 commits to master since this release
68cd7be

v0.20 Release

Core Authoring

from enum import Enum

class Color(Enum):
   RED = "red"
   BLUE = "blue"
   GREEN = "green"

@task
def foo(c: Color) -> str:
   return c.value
  • Support for Schema inference for Structured types - dataclass - usable in flytectl, Flyteconsole update coming soon
  • Use map with pod tasks (#510)
  • Max parallelism added to LaunchPlans: (#510)
    • Max parallelism allows to limit number of nodes to be executed concurrently within a workflow
max_parallelism_lp1 = launch_plan.LaunchPlan.get_or_create(
        workflow=wf,
        name="name",
        max_parallelism=max_parallelism,
    )
  • Node resource override (#523)
    Example shows you can override resources when you create a node in map task
@workflow
    def my_wf(a: typing.List[str]) -> typing.List[str]:
        mappy = map_task(t1)
        map_node = create_node(mappy, a=a).with_overrides(
            requests=Resources(cpu="1", mem="100"), limits=Resources(cpu="2", mem="200")
        )
        return map_node.o0

for regular task

@workflow
    def my_wf(a: str) -> str:
        return t1(a=a).with_overrides(
            requests=Resources(cpu="1", mem="100"), limits=Resources(cpu="2", mem="200")
        )
  • new package -> register Flow (#526). This flow is the preferred flow going forward. Also powers getting started etc
pyflyte --pkgs myapp.workflows package --image myapp:03eccc1cf101adbd8c4734dba865d3fdeb720aa7 -f --fast
  • Much better documentation

Control Plane

  • control_plane -> remote. This will not change now. moving in beta

Plugins

  • AWS Athena plugin: (#504)
athena_task = AthenaTask(
        name="flytekit.demo.athena_task.query",
        inputs=kwtypes(ds=str),
        task_config=AthenaConfig(database="mnist", catalog="my_catalog", workgroup="my_wg"),
        query_template="""
            insert overwrite directory '{{ .rawOutputDataPrefix }}' stored as parquet
            select *
            from blah
            where ds = '{{ .Inputs.ds }}'
        """,
        # the schema literal's backend uri will be equal to the value of .raw_output_data
        output_schema_type=FlyteSchema,
    )

Smaller Fixes

  • Destination Dir is automatically filled in for fast register. Improves portability