Skip to content

Commit

Permalink
Generate schema for pipeline.yaml and config.yaml (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Jan 30, 2023
1 parent 1d571f6 commit 7bb1345
Show file tree
Hide file tree
Showing 32 changed files with 1,812 additions and 744 deletions.
1 change: 1 addition & 0 deletions .github/actions/update-docs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ runs:
- name: Install python and set up Poetry
uses: bakdata/ci-templates/actions/[email protected]
with:
poetry-version: "1.3.2"
python-version: "3.10"

- name: Update documentation branch with mike
Expand Down
13 changes: 7 additions & 6 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name: CI

on:
pull_request:
types: [opened, ready_for_review, synchronize]
push:
branches:
- main
pull_request:
types: [opened, ready_for_review, synchronize]
push:
branches:
- main

jobs:
test:
Expand All @@ -20,8 +20,9 @@ jobs:
- uses: actions/checkout@v3

- name: Install python and set up Poetry
uses: bakdata/ci-templates/actions/[email protected].2
uses: bakdata/ci-templates/actions/[email protected].3
with:
poetry-version: "1.3.2"
python-version: ${{ matrix.python-version }}

- name: Install dependencies
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/publish-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

- name: Bump version
id: bump_version
uses: bakdata/ci-templates/actions/[email protected].2
uses: bakdata/ci-templates/actions/[email protected].3
with:
release-type: "patch"

Expand All @@ -40,7 +40,9 @@ jobs:
shell: bash

- name: Release to TestPyPI
uses: bakdata/ci-templates/actions/[email protected].2
uses: bakdata/ci-templates/actions/[email protected].3
with:
python-version: "3.10"
poetry-version: "1.3.2"
publish-to-test: true
pypi-token: ${{ secrets.test-pypi-token }}
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
release-type: ${{ github.event.inputs.release-type }}
publish-to-test: false
python-version: "3.10"
poetry-version: "1.2.2"
poetry-version: "1.3.2"
secrets:
github-email: ${{ secrets.GH_EMAIL }}
github-username: ${{ secrets.GH_USERNAME }}
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
args: ["--settings", "setup.cfg"]
Expand Down
61 changes: 61 additions & 0 deletions gen_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from pathlib import Path
from typing import Annotated, Any, Sequence

from pydantic import Field, schema, schema_json_of
from pydantic.fields import ModelField
from pydantic.schema import SkipField

from kpops.cli.pipeline_config import PipelineConfig
from kpops.components.base_components.kafka_app import KafkaApp
from kpops.components.base_components.kafka_connect import (
KafkaSinkConnector,
KafkaSourceConnector,
)
from kpops.components.base_components.kubernetes_app import KubernetesApp
from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp
from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp


def write(contents: str, path: Path) -> None:
with open(path, "w") as f:
print(contents, file=f)


original_field_schema = schema.field_schema


# adapted from https://github.com/tiangolo/fastapi/issues/1378#issuecomment-764966955
def field_schema(field: ModelField, **kwargs: Any) -> Any:
if field.field_info.extra.get("hidden_from_schema"):
raise SkipField(f"{field.name} field is being hidden")
else:
return original_field_schema(field, **kwargs)


schema.field_schema = field_schema

PipelineComponent = (
KubernetesApp
| KafkaApp
| StreamsApp
| ProducerApp
| KafkaSourceConnector
| KafkaSinkConnector
)


AnnotatedPipelineComponent = Annotated[
PipelineComponent, Field(discriminator="schema_type")
]


schema = schema_json_of(
Sequence[AnnotatedPipelineComponent],
title="kpops pipeline schema",
by_alias=True,
indent=4,
).replace("schema_type", "type")
write(schema, Path("schema_pipeline.json"))

schema = schema_json_of(PipelineConfig, title="kpops config schema", indent=4)
write(schema, Path("schema_config.json"))
2 changes: 1 addition & 1 deletion kpops/cli/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def find_components(self, module_name: str) -> None:
:param module_name: name of the python module
"""
for _class in _find_classes(module_name, PipelineComponent):
self._classes[_class._type] = _class
self._classes[_class.type] = _class

def __getitem__(self, component_type: str) -> type[PipelineComponent]:
try:
Expand Down
22 changes: 12 additions & 10 deletions kpops/components/base_components/base_defaults_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import deque
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import TypeVar
from typing import Any, ClassVar, TypeVar

import typer
from pydantic import BaseConfig, BaseModel, Field
Expand All @@ -17,21 +17,23 @@


class BaseDefaultsComponent(BaseModel):
_type: str = Field(..., alias="type")
type: ClassVar[str] = Field(default=..., const=True)

enrich: bool = Field(default=False, exclude=True)
config: PipelineConfig = Field(default=..., exclude=True)
handlers: ComponentHandlers = Field(default=..., exclude=True)
enrich: bool = Field(default=False, exclude=True, hidden_from_schema=True)
config: PipelineConfig = Field(default=..., exclude=True, hidden_from_schema=True)
handlers: ComponentHandlers = Field(
default=..., exclude=True, hidden_from_schema=True
)

class Config(BaseConfig):
arbitrary_types_allowed = True

def __init__(self, **kwargs):
def __init__(self, **kwargs) -> None:
if kwargs.get("enrich", True):
kwargs = self.extend_with_defaults(kwargs)
super().__init__(**kwargs)

def extend_with_defaults(self, kwargs) -> dict:
def extend_with_defaults(self, kwargs: dict[str, Any]) -> dict:
"""
Merges tmp_defaults with all tmp_defaults for parent classes
Expand Down Expand Up @@ -60,13 +62,13 @@ def extend_with_defaults(self, kwargs) -> dict:
if not environment_default_file_path.exists():
kwargs = update_nested(
kwargs,
defaults_from_yaml(main_default_file_path, base._type),
defaults_from_yaml(main_default_file_path, base.type),
)
else:
kwargs = update_nested(
kwargs,
defaults_from_yaml(environment_default_file_path, base._type),
defaults_from_yaml(main_default_file_path, base._type),
defaults_from_yaml(environment_default_file_path, base.type),
defaults_from_yaml(main_default_file_path, base.type),
)
return kwargs

Expand Down
10 changes: 7 additions & 3 deletions kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import logging
from typing import ClassVar, Literal

from pydantic import BaseModel, Extra
from pydantic import BaseModel, Extra, Field
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.helm import Helm
Expand Down Expand Up @@ -36,11 +37,14 @@ class KafkaAppConfig(KubernetesAppConfig):

class KafkaApp(KubernetesApp):
"""
Base component for kafka-based components.
Base component for Kafka-based components.
Producer or streaming apps should inherit from this class.
"""

_type = "kafka-app"
type: ClassVar[str] = "kafka-app"
schema_type: Literal["kafka-app"] = Field( # type: ignore[assignment]
default="kafka-app", exclude=True
)
app: KafkaAppConfig
repo_config: HelmRepoConfig = HelmRepoConfig(
repository_name="bakdata-streams-bootstrap",
Expand Down
14 changes: 10 additions & 4 deletions kpops/components/base_components/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from abc import ABC
from functools import cached_property
from typing import NoReturn
from typing import ClassVar, Literal, NoReturn

from pydantic import Field
from typing_extensions import override
Expand Down Expand Up @@ -32,7 +32,7 @@


class KafkaConnector(PipelineComponent, ABC):
_type = "kafka-connect"
type: ClassVar[str] = "kafka-connect"
app: KafkaConnectConfig

repo_config: HelmRepoConfig = HelmRepoConfig(
Expand Down Expand Up @@ -201,7 +201,10 @@ def __uninstall_connect_resetter(self, release_name: str, dry_run: bool) -> None


class KafkaSourceConnector(KafkaConnector):
_type = "kafka-source-connector"
type: ClassVar[str] = "kafka-source-connector"
schema_type: Literal["kafka-source-connector"] = Field(
default="kafka-source-connector", exclude=True
)
offset_topic: str | None = None

@override
Expand All @@ -228,7 +231,10 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:


class KafkaSinkConnector(KafkaConnector):
_type = "kafka-sink-connector"
type: ClassVar[str] = "kafka-sink-connector"
schema_type: Literal["kafka-sink-connector"] = Field(
default="kafka-sink-connector", exclude=True
)

@override
def add_input_topics(self, topics: list[str]) -> None:
Expand Down
10 changes: 7 additions & 3 deletions kpops/components/base_components/kubernetes_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import logging
import re
from functools import cached_property
from typing import ClassVar, Literal

from pydantic import BaseModel, Extra
from pydantic import BaseModel, Extra, Field
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.helm import Helm
Expand All @@ -31,9 +32,12 @@ class Config(CamelCaseConfig):

# TODO: label and annotations
class KubernetesApp(PipelineComponent):
"""Base kubernetes app"""
"""Base Kubernetes app"""

_type = "kubernetes-app"
type: ClassVar[str] = "kubernetes-app"
schema_type: Literal["kubernetes-app"] = Field( # type: ignore[assignment]
default="kubernetes-app", exclude=True
)
app: KubernetesAppConfig
repo_config: HelmRepoConfig | None = None
namespace: str
Expand Down
3 changes: 1 addition & 2 deletions kpops/components/base_components/models/to_section.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@


class OutputTopicTypes(str, Enum):
"""
Types of output topic
"""Types of output topic.
error (error topic), output (output topic), and extra topics. Every extra topic must have a role.
"""

Expand Down
8 changes: 4 additions & 4 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ def substitute_component_variables(self, topic_name: str) -> str:
"""
error_topic_name = self.substitute_component_names(
self.config.topic_name_config.default_error_topic_name,
self._type,
self.type,
**os.environ,
)
output_topic_name = self.substitute_component_names(
self.config.topic_name_config.default_output_topic_name,
self._type,
self.type,
**os.environ,
)
return self.substitute_component_names(
topic_name,
self._type,
self.type,
error_topic_name=error_topic_name,
output_topic_name=output_topic_name,
)
Expand Down Expand Up @@ -149,7 +149,7 @@ def weave_from_topics(self, prev_component_to: ToSection) -> None:

def substitute_name(self):
if self.name:
self.name = self.substitute_component_names(self.name, self._type)
self.name = self.substitute_component_names(self.name, self.type)
else:
raise ValueError("Every component must have a name in the end.")

Expand Down
9 changes: 7 additions & 2 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

from pydantic import BaseConfig, Extra
from typing import ClassVar, Literal

from pydantic import BaseConfig, Extra, Field
from typing_extensions import override

from kpops.components.base_components.kafka_app import KafkaApp
Expand All @@ -19,7 +21,10 @@ class ProducerApp(KafkaApp):
This producer holds configuration to use as values for the streams bootstrap produce helm chart.
"""

_type = "producer"
type: ClassVar[str] = "producer"
schema_type: Literal["producer"] = Field( # type: ignore[assignment]
default="producer", exclude=True
)
app: ProducerValues

class Config(BaseConfig):
Expand Down
9 changes: 7 additions & 2 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

from pydantic import BaseConfig, Extra
from typing import ClassVar, Literal

from pydantic import BaseConfig, Extra, Field
from typing_extensions import override

from kpops.components.base_components.kafka_app import KafkaApp
Expand All @@ -13,7 +15,10 @@ class StreamsApp(KafkaApp):
StreamsApp component that configures a streams bootstrap app
"""

_type = "streams-app"
type: ClassVar[str] = "streams-app"
schema_type: Literal["streams-app"] = Field( # type: ignore[assignment]
default="streams-app", exclude=True
)
app: StreamsAppConfig

class Config(BaseConfig):
Expand Down
2 changes: 1 addition & 1 deletion kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def substitute_component_specific_variables(
substitute(
json.dumps(pair),
{
"component_type": component_object._type,
"component_type": component_object.type,
"component_name": component_object.name,
},
)
Expand Down
2 changes: 2 additions & 0 deletions kpops/utils/pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@


def to_camel(field: str) -> str:
if field == "schema_type":
return field
return humps.camelize(field) # type: ignore


Expand Down
Loading

0 comments on commit 7bb1345

Please sign in to comment.