Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add base sensor #1773

Closed
wants to merge 7 commits into from
Closed

Add base sensor #1773

wants to merge 7 commits into from

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Aug 1, 2023

TL;DR

  • Add a BaseSensor clas. People can inherit BaseSensor to implement their sensors.
  • Add a file sensor into flytekit.core, which uses fsspec to check if file exists

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

import asyncio
from typing import Optional

import fsspec
import grpc
from fsspec.utils import get_protocol

from flytekit.configuration import DataConfig
from flytekit.core.data_persistence import s3_setup_args
from flytekit.core.sensor_task import FileSensorConfig
from flytekit.extend.backend.base_agent import AgentRegistry
from flytekit.extend.backend.base_sensor import SensorBase
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate


class FileSensor(SensorBase):
    def __init__(self):
        super().__init__(task_type="file_sensor")

    async def poke(self, cfg: FileSensorConfig) -> bool:
        path = cfg.path
        protocol = get_protocol(path)
        kwargs = {}
        if get_protocol(path):
            kwargs = s3_setup_args(DataConfig.auto().s3, anonymous=False)
        fs = fsspec.filesystem(protocol, **kwargs)
        return await asyncio.to_thread(fs.exists, path)

    async def extract(
        self, context: grpc.ServicerContext, task_template: TaskTemplate, inputs: Optional[LiteralMap] = None
    ) -> FileSensorConfig:
        return FileSensorConfig(**task_template.custom)


AgentRegistry.register(FileSensor())

Tracking Issue

https://github.com/flyteorg/flyte/issues/

Follow-up issue

NA

pingsutw added 2 commits July 31, 2023 16:53
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants