Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate schema for pipeline.yaml and config.yaml #70

Merged
merged 45 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9a8e3c0
Refactor to make internal fields private
disrupted Jan 18, 2023
a463ad7
Generate schema
disrupted Jan 18, 2023
1322ff3
WIP
disrupted Jan 18, 2023
b2c948b
Revert "WIP"
disrupted Jan 26, 2023
96b4a09
Revert "Refactor to make internal fields private"
disrupted Jan 26, 2023
2c30f48
Fix usage of private attr
disrupted Jan 26, 2023
448cc68
Fix static class variable
disrupted Jan 26, 2023
e5d5c51
Remove class var
disrupted Jan 26, 2023
dd6730a
Fix test
disrupted Jan 26, 2023
455fb8f
Revert "Remove class var"
disrupted Jan 26, 2023
89cfeb3
Workaround Pydantic discriminator
disrupted Jan 26, 2023
09550be
Exclude config & enrich from schema
disrupted Jan 26, 2023
7c9aeb7
Fix issues
disrupted Jan 26, 2023
370a28d
Add property
disrupted Jan 26, 2023
9c7b8eb
Update config & handlers attrs
disrupted Jan 26, 2023
54be760
Rename schema codegen
disrupted Jan 26, 2023
1d5b5ca
Work around Pydantic quirks
disrupted Jan 26, 2023
26281f2
Work around Pydantic quirks
disrupted Jan 26, 2023
cc6b4a9
Exclude schema_type from generate output
disrupted Jan 26, 2023
c86ac79
Fix producer tests
disrupted Jan 26, 2023
c5a2a8e
Fix test
disrupted Jan 26, 2023
1a8737f
Use field init instead of kwargs
disrupted Jan 26, 2023
5949b99
Fix pydantic workaround
disrupted Jan 26, 2023
5156cb1
Merge remote-tracking branch 'origin/main' into feat/schema
disrupted Jan 26, 2023
59ab0c7
Update schema
disrupted Jan 26, 2023
344eced
Cosmetic
disrupted Jan 26, 2023
00af95b
Fix test name
disrupted Jan 26, 2023
b261c33
Cleanup test
disrupted Jan 26, 2023
f258975
Cleanup test
disrupted Jan 26, 2023
d96035b
Cleanup test
disrupted Jan 26, 2023
d35259e
Cleanup test
disrupted Jan 26, 2023
5c27827
Cleanup test
disrupted Jan 26, 2023
d0bff0b
Cleanup test
disrupted Jan 26, 2023
382f27e
Cleanup test
disrupted Jan 26, 2023
b38edb9
Mark todo
disrupted Jan 26, 2023
9d9559f
Remove another todo
disrupted Jan 26, 2023
ea43c60
Remove base class in schema gen
disrupted Jan 26, 2023
ebe8d96
Rename variable
disrupted Jan 26, 2023
6ff157a
Exclude `schema_type` from camel case conversion
disrupted Jan 30, 2023
51c54c8
Fix Poetry lockfile
disrupted Jan 30, 2023
30d47c2
Fix isort bug
disrupted Jan 30, 2023
f0927db
Update Poetry
disrupted Jan 30, 2023
e4842dc
Fix Poetry publish to TestPyPI
disrupted Jan 30, 2023
4e78163
Add `KubernetesApp` & `KafkaApp` to schema
disrupted Jan 30, 2023
0c8d2f2
Generate schema for config.yaml
disrupted Jan 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions gen_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Annotated, Any, Sequence
disrupted marked this conversation as resolved.
Show resolved Hide resolved

from pydantic import Field, schema, schema_json_of
from pydantic.fields import ModelField
from pydantic.schema import SkipField

from kpops.components.base_components.kafka_connect import (
KafkaSinkConnector,
KafkaSourceConnector,
)
from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp
from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp

original_field_schema = schema.field_schema
raminqaf marked this conversation as resolved.
Show resolved Hide resolved


# adapted from https://github.com/tiangolo/fastapi/issues/1378#issuecomment-764966955
def field_schema(field: ModelField, **kwargs: Any) -> Any:
if field.field_info.extra.get("hidden_from_schema"):
raise SkipField(f"{field.name} field is being hidden")
else:
return original_field_schema(field, **kwargs)


schema.field_schema = field_schema

ComponentType = (
StreamsApp
| ProducerApp
| KafkaSourceConnector
| KafkaSinkConnector
# | PipelineComponent
disrupted marked this conversation as resolved.
Show resolved Hide resolved
)


AnnotatedPipelineComponent = Annotated[
ComponentType, Field(discriminator="schema_type")
]

schema = schema_json_of(
Sequence[AnnotatedPipelineComponent],
title="kpops pipeline schema",
by_alias=True,
indent=4,
).replace("schema_type", "type")

print(schema)
2 changes: 1 addition & 1 deletion kpops/cli/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def find_components(self, module_name: str) -> None:
:param module_name: name of the python module
"""
for _class in _find_classes(module_name, PipelineComponent):
self._classes[_class._type] = _class
self._classes[_class.type] = _class

def __getitem__(self, component_type: str) -> type[PipelineComponent]:
try:
Expand Down
22 changes: 12 additions & 10 deletions kpops/components/base_components/base_defaults_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import deque
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import TypeVar
from typing import Any, ClassVar, TypeVar

import typer
from pydantic import BaseConfig, BaseModel, Field
Expand All @@ -17,21 +17,23 @@


class BaseDefaultsComponent(BaseModel):
_type: str = Field(..., alias="type")
type: ClassVar[str] = Field(default=..., const=True)

enrich: bool = Field(default=False, exclude=True)
config: PipelineConfig = Field(default=..., exclude=True)
handlers: ComponentHandlers = Field(default=..., exclude=True)
enrich: bool = Field(default=False, exclude=True, hidden_from_schema=True)
config: PipelineConfig = Field(default=..., exclude=True, hidden_from_schema=True)
handlers: ComponentHandlers = Field(
default=..., exclude=True, hidden_from_schema=True
)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

class Config(BaseConfig):
arbitrary_types_allowed = True

def __init__(self, **kwargs):
def __init__(self, **kwargs) -> None:
if kwargs.get("enrich", True):
kwargs = self.extend_with_defaults(kwargs)
super().__init__(**kwargs)

def extend_with_defaults(self, kwargs) -> dict:
def extend_with_defaults(self, kwargs: dict[str, Any]) -> dict:
"""
Merges tmp_defaults with all tmp_defaults for parent classes
Expand Down Expand Up @@ -60,13 +62,13 @@ def extend_with_defaults(self, kwargs) -> dict:
if not environment_default_file_path.exists():
kwargs = update_nested(
kwargs,
defaults_from_yaml(main_default_file_path, base._type),
defaults_from_yaml(main_default_file_path, base.type),
)
else:
kwargs = update_nested(
kwargs,
defaults_from_yaml(environment_default_file_path, base._type),
defaults_from_yaml(main_default_file_path, base._type),
defaults_from_yaml(environment_default_file_path, base.type),
defaults_from_yaml(main_default_file_path, base.type),
)
return kwargs

Expand Down
3 changes: 2 additions & 1 deletion kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from typing import ClassVar

from pydantic import BaseModel, Extra
from typing_extensions import override
Expand Down Expand Up @@ -37,7 +38,7 @@ class KafkaApp(KubernetesApp):
Producer or streaming apps should inherit from this class.
"""

_type = "kafka-app"
type: ClassVar[str] = "kafka-app"
app: KafkaAppConfig

def __init__(self, **kwargs) -> None:
Expand Down
15 changes: 11 additions & 4 deletions kpops/components/base_components/kafka_connect.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import os
from abc import ABC
from typing import NoReturn
from typing import ClassVar, Literal, NoReturn

from pydantic import Field
from typing_extensions import override

from kpops.cli.pipeline_config import ENV_PREFIX
Expand All @@ -14,7 +15,7 @@


class KafkaConnector(PipelineComponent, ABC):
_type = "kafka-connect"
type: ClassVar[str] = "kafka-connect"
app: KafkaConnectConfig

def __init__(self, **kwargs) -> None:
Expand Down Expand Up @@ -64,7 +65,10 @@ def clean(self, dry_run: bool) -> None:


class KafkaSourceConnector(KafkaConnector):
_type = "kafka-source-connector"
type: ClassVar[str] = "kafka-source-connector"
schema_type: Literal["kafka-source-connector"] = Field(
default="kafka-source-connector", exclude=True
)

@override
def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:
Expand All @@ -90,7 +94,10 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:


class KafkaSinkConnector(KafkaConnector):
_type = "kafka-sink-connector"
type: ClassVar[str] = "kafka-sink-connector"
schema_type: Literal["kafka-sink-connector"] = Field(
default="kafka-sink-connector", exclude=True
)

@override
def add_input_topics(self, topics: list[str]) -> None:
Expand Down
3 changes: 2 additions & 1 deletion kpops/components/base_components/kubernetes_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import re
from functools import cached_property
from typing import ClassVar

from pydantic import BaseModel
from typing_extensions import override
Expand Down Expand Up @@ -35,7 +36,7 @@ class Config(CamelCaseConfig):
class KubernetesApp(PipelineComponent):
"""Base kubernetes app"""

_type = "kubernetes-app"
type: ClassVar[str] = "kubernetes-app"
app: KubernetesAppConfig

version: str | None = None
Expand Down
8 changes: 4 additions & 4 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ def substitute_component_variables(self, topic_name: str) -> str:
"""
error_topic_name = self.substitute_component_names(
self.config.topic_name_config.default_error_topic_name,
self._type,
self.type,
**os.environ,
)
output_topic_name = self.substitute_component_names(
self.config.topic_name_config.default_output_topic_name,
self._type,
self.type,
**os.environ,
)
return self.substitute_component_names(
topic_name,
self._type,
self.type,
error_topic_name=error_topic_name,
output_topic_name=output_topic_name,
)
Expand Down Expand Up @@ -147,7 +147,7 @@ def weave_from_topics(self, prev_component_to: ToSection) -> None:

def substitute_name(self):
if self.name:
self.name = self.substitute_component_names(self.name, self._type)
self.name = self.substitute_component_names(self.name, self.type)
else:
raise ValueError("Every component must have a name in the end.")

Expand Down
7 changes: 5 additions & 2 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from pydantic import BaseConfig, Extra
from typing import ClassVar, Literal

from pydantic import BaseConfig, Extra, Field
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig
Expand All @@ -18,7 +20,8 @@ class ProducerApp(KafkaApp):
This producer holds configuration to use as values for the streams bootstrap produce helm chart.
"""

_type = "producer"
type: ClassVar[str] = "producer"
schema_type: Literal["producer"] = Field(default="producer", exclude=True)
app: ProducerValues

class Config(BaseConfig):
Expand Down
7 changes: 5 additions & 2 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

from pydantic import BaseConfig, Extra
from typing import ClassVar, Literal

from pydantic import BaseConfig, Extra, Field
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig
Expand All @@ -14,7 +16,8 @@ class StreamsApp(KafkaApp):
StreamsApp component that configures a streams bootstrap app
"""

_type = "streams-app"
type: ClassVar[str] = "streams-app"
schema_type: Literal["streams-app"] = Field(default="streams-app", exclude=True)
app: StreamsAppConfig

class Config(BaseConfig):
Expand Down
2 changes: 1 addition & 1 deletion kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def substitute_component_specific_variables(
substitute(
json.dumps(pair),
{
"component_type": component_object._type,
"component_type": component_object.type,
"component_name": component_object.name,
},
)
Expand Down
Loading