Skip to content

Commit

Permalink
Merge branch 'jean/oss-5963-deadlock-when-deloying-flow-with-dependen…
Browse files Browse the repository at this point in the history
…cies-that' of https://github.com/PrefectHQ/prefect into jean/oss-5963-deadlock-when-deloying-flow-with-dependencies-that
  • Loading branch information
jeanluciano committed Jan 16, 2025
2 parents d61af8a + bff1ee5 commit c570973
Show file tree
Hide file tree
Showing 144 changed files with 1,792 additions and 849 deletions.
61 changes: 1 addition & 60 deletions .github/workflows/static-analysis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,63 +66,4 @@ jobs:

- name: Run pre-commit
run: |
pre-commit run --show-diff-on-failure --color=always --all-files
type-completeness-check:
name: Type completeness check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
fetch-depth: 0

- name: Set up uv
uses: astral-sh/setup-uv@v5
with:
python-version: "3.12"

- name: Calculate type completeness score
id: calculate_current_score
run: |
# `pyright` will exit with a non-zero status code if it finds any issues,
# so we need to explicitly ignore the exit code with `|| true`.
uv tool run --with-editable . pyright --verifytypes prefect --ignoreexternal --outputjson > prefect-analysis.json || true
SCORE=$(jq -r '.typeCompleteness.completenessScore' prefect-analysis.json)
echo "current_score=$SCORE" >> $GITHUB_OUTPUT
- name: Checkout base branch
run: |
git checkout ${{ github.base_ref }}
- name: Calculate base branch score
id: calculate_base_score
run: |
uv tool run --with-editable . pyright --verifytypes prefect --ignoreexternal --outputjson > prefect-analysis-base.json || true
BASE_SCORE=$(jq -r '.typeCompleteness.completenessScore' prefect-analysis-base.json)
echo "base_score=$BASE_SCORE" >> $GITHUB_OUTPUT
- name: Compare scores
run: |
CURRENT_SCORE=$(echo ${{ steps.calculate_current_score.outputs.current_score }})
BASE_SCORE=$(echo ${{ steps.calculate_base_score.outputs.base_score }})
if (( $(echo "$BASE_SCORE > $CURRENT_SCORE" | bc -l) )); then
echo "::notice title=Type Completeness Check::We noticed a decrease in type coverage with these changes. Check workflow summary for more details."
echo "### ℹ️ Type Completeness Check" >> $GITHUB_STEP_SUMMARY
echo "We noticed a decrease in type coverage with these changes. To maintain our codebase quality, we aim to keep or improve type coverage with each change." >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Need help? Ping @desertaxle or @zzstoatzz for assistance!" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Here's what changed:" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
uv run scripts/pyright_diff.py prefect-analysis-base.json prefect-analysis.json >> $GITHUB_STEP_SUMMARY
SCORE_DIFF=$(echo "$BASE_SCORE - $CURRENT_SCORE" | bc -l)
if (( $(echo "$SCORE_DIFF > 0.001" | bc -l) )); then
exit 1
fi
elif (( $(echo "$BASE_SCORE < $CURRENT_SCORE" | bc -l) )); then
echo "🎉 Great work! The type coverage has improved with these changes" >> $GITHUB_STEP_SUMMARY
else
echo "✅ Type coverage maintained" >> $GITHUB_STEP_SUMMARY
fi
pre-commit run --show-diff-on-failure --color=always --all-files
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ repos:
)$
- repo: local
hooks:
- id: type-completeness-check
name: Type Completeness Check
language: system
entry: uv run --with pyright pyright --ignoreexternal --verifytypes prefect
pass_filenames: false
- id: generate-mintlify-openapi-docs
name: Generating OpenAPI docs for Mintlify
language: system
Expand Down
35 changes: 33 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,28 @@
<img src="https://img.shields.io/badge/youtube-watch_videos-red.svg?color=0052FF&labelColor=090422&logo=youtube" /></a>
</p>

<p align="center">
<a href="https://docs.prefect.io/v3/get-started/index?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none">
Installation
</a>
·
<a href="https://docs.prefect.io/v3/get-started/quickstart?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none">
Quickstart
</a>
·
<a href="https://docs.prefect.io/v3/develop/index?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none">
Build workflows
</a>
·
<a href="https://docs.prefect.io/v3/deploy/index?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none">
Deploy workflows
</a>
·
<a href="https://app.prefect.cloud/?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none">
Prefect Cloud
</a>
</p>

# Prefect

Prefect is a workflow orchestration framework for building data pipelines in Python.
Expand All @@ -26,6 +48,11 @@ With just a few lines of code, data teams can confidently automate any data proc

Workflow activity is tracked and can be monitored with a self-hosted [Prefect server](https://docs.prefect.io/latest/manage/self-host/?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none) instance or managed [Prefect Cloud](https://www.prefect.io/cloud-vs-oss?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none) dashboard.

> [!TIP]
> Prefect flows can handle retries, dependencies, and even complex branching logic
>
> [Check our docs](https://docs.prefect.io/v3/get-started/index?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none) or see the example below to learn more!
## Getting started

Prefect requires Python 3.9 or later. To [install the latest or upgrade to the latest version of Prefect](https://docs.prefect.io/get-started/install), run the following command:
Expand Down Expand Up @@ -79,8 +106,12 @@ if __name__ == "__main__":
You now have a process running locally that is looking for scheduled deployments!
Additionally you can run your workflow manually from the UI or CLI. You can even run deployments in response to [events](https://docs.prefect.io/latest/automate/?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none).

> [!NOTE]
> To explore different infrastructure options for your workflows, check out the [deployment documentation](https://docs.prefect.io/v3/deploy).
> [!TIP]
> Where to go next - check out our [documentation](https://docs.prefect.io/v3/get-started/index?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none) to learn more about:
> - [Deploying flows to production environments](https://docs.prefect.io/v3/deploy?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none)
> - [Adding error handling and retries](https://docs.prefect.io/v3/develop/write-tasks#retries?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none)
> - [Integrating with your existing tools](https://docs.prefect.io/integrations/integrations?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none)
> - [Setting up team collaboration features](https://docs.prefect.io/v3/manage/cloud/manage-users/manage-teams#manage-teams?utm_source=oss&utm_medium=oss&utm_campaign=oss_gh_repo&utm_term=none&utm_content=none)

## Prefect Cloud
Expand Down
13 changes: 11 additions & 2 deletions docs/v3/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -9355,7 +9355,10 @@
"description": "Successful Response",
"content": {
"application/json": {
"schema": {}
"schema": {
"type": "object",
"title": "Response Validate Obj Ui Schemas Validate Post"
}
}
}
},
Expand Down Expand Up @@ -9727,7 +9730,10 @@
"description": "Successful Response",
"content": {
"application/json": {
"schema": {}
"schema": {
"type": "string",
"title": "Response Hello Hello Get"
}
}
}
},
Expand Down Expand Up @@ -22594,6 +22600,9 @@
"description": "An ORM representation of task run data."
},
"TaskRunCount": {
"additionalProperties": {
"type": "integer"
},
"type": "object"
},
"TaskRunCreate": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@
from google.api_core.client_options import ClientOptions
from googleapiclient import discovery
from googleapiclient.discovery import Resource
from jsonpatch import JsonPatch
from pydantic import Field, field_validator

from prefect.logging.loggers import PrefectLogAdapter
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.utilities.dockerutils import get_prefect_image_name
from prefect.utilities.pydantic import JsonPatch
from prefect.workers.base import (
BaseJobConfiguration,
BaseVariables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
# noinspection PyProtectedMember
from googleapiclient.discovery import Resource
from googleapiclient.errors import HttpError
from jsonpatch import JsonPatch
from pydantic import Field, PrivateAttr, field_validator

from prefect.logging.loggers import PrefectLogAdapter
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.utilities.dockerutils import get_prefect_image_name
from prefect.utilities.pydantic import JsonPatch
from prefect.workers.base import (
BaseJobConfiguration,
BaseVariables,
Expand Down
2 changes: 1 addition & 1 deletion src/integrations/prefect-gcp/prefect_gcp/workers/vertex.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
from uuid import uuid4

import anyio
from jsonpatch import JsonPatch
from pydantic import Field, field_validator
from slugify import slugify

from prefect.logging.loggers import PrefectLogAdapter
from prefect.utilities.pydantic import JsonPatch
from prefect.workers.base import (
BaseJobConfiguration,
BaseVariables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import aiohttp
import anyio.abc
import kubernetes_asyncio
from jsonpatch import JsonPatch
from kubernetes_asyncio import config
from kubernetes_asyncio.client import (
ApiClient,
Expand Down Expand Up @@ -148,7 +149,6 @@
from prefect.server.schemas.core import Flow
from prefect.server.schemas.responses import DeploymentResponse
from prefect.utilities.dockerutils import get_prefect_image_name
from prefect.utilities.pydantic import JsonPatch
from prefect.utilities.templating import find_placeholders
from prefect.utilities.timeout import timeout_async
from prefect.workers.base import (
Expand Down
10 changes: 6 additions & 4 deletions src/prefect/_internal/schemas/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
This will be subject to consolidation and refactoring over the next few months.
"""

from __future__ import annotations

import os
import re
import urllib.parse
Expand Down Expand Up @@ -627,18 +629,18 @@ def validate_name_present_on_nonanonymous_blocks(values: M) -> M:


@overload
def validate_command(v: str) -> Path:
def validate_working_dir(v: str) -> Path:
...


@overload
def validate_command(v: None) -> None:
def validate_working_dir(v: None) -> None:
...


def validate_command(v: Optional[str]) -> Optional[Path]:
def validate_working_dir(v: Optional[Path | str]) -> Optional[Path]:
"""Make sure that the working directory is formatted for the current platform."""
if v is not None:
if isinstance(v, str):
return relative_path_to_current_platform(v)
return v

Expand Down
5 changes: 4 additions & 1 deletion src/prefect/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.context import get_task_and_flow_run_ids

logger = get_logger("artifacts")
if TYPE_CHECKING:
import logging

logger: "logging.Logger" = get_logger("artifacts")

if TYPE_CHECKING:
from prefect.client.orchestration import PrefectClient
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/automations.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def create(self: Self) -> Self:
self.id = client.create_automation(automation=automation)
return self

async def aupdate(self: Self):
async def aupdate(self: Self) -> None:
"""
Updates an existing automation.
Expand Down
7 changes: 5 additions & 2 deletions src/prefect/blocks/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Union,
)

from typing_extensions import Self, TypeAlias
from typing_extensions import TYPE_CHECKING, Self, TypeAlias

from prefect.blocks.core import Block
from prefect.exceptions import MissingContextError
Expand All @@ -26,7 +26,10 @@
if sys.version_info >= (3, 12):
LoggingAdapter = logging.LoggerAdapter[logging.Logger]
else:
LoggingAdapter = logging.LoggerAdapter
if TYPE_CHECKING:
LoggingAdapter = logging.LoggerAdapter[logging.Logger]
else:
LoggingAdapter = logging.LoggerAdapter

LoggerOrAdapter: TypeAlias = Union[Logger, LoggingAdapter]

Expand Down
4 changes: 2 additions & 2 deletions src/prefect/cli/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from prefect.settings import PREFECT_UI_URL
from prefect.utilities.asyncutils import run_sync_in_worker_thread

dashboard_app = PrefectTyper(
dashboard_app: PrefectTyper = PrefectTyper(
name="dashboard",
help="Commands for interacting with the Prefect UI.",
)
app.add_typer(dashboard_app)


@dashboard_app.command()
async def open():
async def open() -> None:
"""
Open the Prefect UI in the browser.
"""
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/cli/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
Note that many of these commands require extra dependencies (such as npm and MkDocs)
to function properly.
"""
dev_app = PrefectTyper(
dev_app: PrefectTyper = PrefectTyper(
name="dev", short_help="Internal Prefect development.", help=DEV_HELP
)
app.add_typer(dev_app)


def exit_with_error_if_not_editable_install():
def exit_with_error_if_not_editable_install() -> None:
if (
prefect.__module_path__.parent == "site-packages"
or not (prefect.__development_base_path__ / "setup.py").exists()
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/cli/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
get_events_subscriber,
)

events_app = PrefectTyper(name="events", help="Stream events.")
events_app: PrefectTyper = PrefectTyper(name="events", help="Stream events.")
app.add_typer(events_app, aliases=["event"])


Expand Down Expand Up @@ -60,7 +60,7 @@ async def stream(
handle_error(exc)


async def handle_event(event: Event, format: StreamFormat, output_file: str):
async def handle_event(event: Event, format: StreamFormat, output_file: str) -> None:
if format == StreamFormat.json:
event_data = orjson.dumps(event.model_dump(), default=str).decode()
elif format == StreamFormat.text:
Expand All @@ -74,7 +74,7 @@ async def handle_event(event: Event, format: StreamFormat, output_file: str):
print(event_data)


def handle_error(exc):
def handle_error(exc: Exception) -> None:
if isinstance(exc, websockets.exceptions.ConnectionClosedError):
exit_with_error(f"Connection closed, retrying... ({exc})")
elif isinstance(exc, (KeyboardInterrupt, asyncio.exceptions.CancelledError)):
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/cli/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from prefect.runner import Runner
from prefect.utilities import urls

flow_app = PrefectTyper(name="flow", help="View and serve flows.")
flow_app: PrefectTyper = PrefectTyper(name="flow", help="View and serve flows.")
app.add_typer(flow_app, aliases=["flows"])


Expand Down
6 changes: 4 additions & 2 deletions src/prefect/cli/flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
from prefect.runner import Runner
from prefect.states import State

flow_run_app = PrefectTyper(name="flow-run", help="Interact with flow runs.")
flow_run_app: PrefectTyper = PrefectTyper(
name="flow-run", help="Interact with flow runs."
)
app.add_typer(flow_run_app, aliases=["flow-runs"])

LOGS_DEFAULT_PAGE_SIZE = 200
LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS = 20

logger = get_logger(__name__)
logger: "logging.Logger" = get_logger(__name__)


@flow_run_app.command()
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/cli/global_concurrency_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
PrefectHTTPStatusError,
)

global_concurrency_limit_app = PrefectTyper(
global_concurrency_limit_app: PrefectTyper = PrefectTyper(
name="global-concurrency-limit",
help="Manage global concurrency limits.",
)
Expand Down
Loading

0 comments on commit c570973

Please sign in to comment.