Skip to content

Commit

Permalink
Fix static class variable
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted committed Jan 26, 2023
1 parent 2c30f48 commit 448cc68
Show file tree
Hide file tree
Showing 19 changed files with 189 additions and 84 deletions.
2 changes: 1 addition & 1 deletion kpops/cli/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def find_components(self, module_name: str) -> None:
:param module_name: name of the python module
"""
for _class in _find_classes(module_name, PipelineComponent):
self._classes[_class._type] = _class
self._classes[_class.type] = _class

def __getitem__(self, component_type: str) -> type[PipelineComponent]:
try:
Expand Down
10 changes: 5 additions & 5 deletions kpops/components/base_components/base_defaults_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import deque
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import TypeVar
from typing import ClassVar, TypeVar

import typer
from pydantic import BaseConfig, BaseModel, Field, PrivateAttr
Expand All @@ -17,7 +17,7 @@


class BaseDefaultsComponent(BaseModel):
_type: str = Field(..., alias="type")
type: ClassVar[str] = Field(..., const=True) # component type discriminator

enrich: bool = Field(default=False, exclude=True)
config: PipelineConfig = Field(default=..., exclude=True)
Expand Down Expand Up @@ -65,13 +65,13 @@ def extend_with_defaults(self, kwargs) -> dict:
if not environment_default_file_path.exists():
kwargs = update_nested(
kwargs,
defaults_from_yaml(main_default_file_path, base._type),
defaults_from_yaml(main_default_file_path, base.type),
)
else:
kwargs = update_nested(
kwargs,
defaults_from_yaml(environment_default_file_path, base._type),
defaults_from_yaml(main_default_file_path, base._type),
defaults_from_yaml(environment_default_file_path, base.type),
defaults_from_yaml(main_default_file_path, base.type),
)
return kwargs

Expand Down
3 changes: 2 additions & 1 deletion kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from typing import ClassVar, Literal

from pydantic import BaseModel, Extra
from typing_extensions import override
Expand Down Expand Up @@ -37,7 +38,7 @@ class KafkaApp(KubernetesApp):
Producer or streaming apps should inherit from this class.
"""

_type = "kafka-app"
type: ClassVar[Literal["kafka-app"]] = "kafka-app"
app: KafkaAppConfig

def __init__(self, **kwargs) -> None:
Expand Down
8 changes: 4 additions & 4 deletions kpops/components/base_components/kafka_connect.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import os
from abc import ABC
from typing import NoReturn
from typing import ClassVar, Literal, NoReturn

from typing_extensions import override

Expand All @@ -14,7 +14,7 @@


class KafkaConnector(PipelineComponent, ABC):
_type = "kafka-connect"
type: ClassVar[Literal["kafka-connect"]] = "kafka-connect"
app: KafkaConnectConfig

def __init__(self, **kwargs) -> None:
Expand Down Expand Up @@ -64,7 +64,7 @@ def clean(self, dry_run: bool) -> None:


class KafkaSourceConnector(KafkaConnector):
_type = "kafka-source-connector"
type: ClassVar[Literal["kafka-source-connector"]] = "kafka-source-connector"

@override
def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:
Expand All @@ -90,7 +90,7 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None:


class KafkaSinkConnector(KafkaConnector):
_type = "kafka-sink-connector"
type: ClassVar[Literal["kafka-sink-connector"]] = "kafka-sink-connector"

@override
def add_input_topics(self, topics: list[str]) -> None:
Expand Down
3 changes: 2 additions & 1 deletion kpops/components/base_components/kubernetes_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import re
from functools import cached_property
from typing import ClassVar, Literal

from pydantic import BaseModel
from typing_extensions import override
Expand Down Expand Up @@ -35,7 +36,7 @@ class Config(CamelCaseConfig):
class KubernetesApp(PipelineComponent):
"""Base kubernetes app"""

_type = "kubernetes-app"
type: ClassVar[Literal["kubernetes-app"]] = "kubernetes-app"
app: KubernetesAppConfig

version: str | None = None
Expand Down
8 changes: 4 additions & 4 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ def substitute_component_variables(self, topic_name: str) -> str:
"""
error_topic_name = self.substitute_component_names(
self.config.topic_name_config.default_error_topic_name,
self._type,
self.type,
**os.environ,
)
output_topic_name = self.substitute_component_names(
self.config.topic_name_config.default_output_topic_name,
self._type,
self.type,
**os.environ,
)
return self.substitute_component_names(
topic_name,
self._type,
self.type,
error_topic_name=error_topic_name,
output_topic_name=output_topic_name,
)
Expand Down Expand Up @@ -147,7 +147,7 @@ def weave_from_topics(self, prev_component_to: ToSection) -> None:

def substitute_name(self):
if self.name:
self.name = self.substitute_component_names(self.name, self._type)
self.name = self.substitute_component_names(self.name, self.type)
else:
raise ValueError("Every component must have a name in the end.")

Expand Down
5 changes: 2 additions & 3 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Literal
from typing import ClassVar, Literal

from pydantic import BaseConfig, Extra
from typing_extensions import override
Expand All @@ -20,8 +20,7 @@ class ProducerApp(KafkaApp):
This producer holds configuration to use as values for the streams bootstrap produce helm chart.
"""

_type = "producer"
discriminator: Literal["producer"] = "producer"
type: ClassVar[Literal["producer"]] = "producer"
app: ProducerValues

class Config(BaseConfig):
Expand Down
5 changes: 2 additions & 3 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Literal
from typing import ClassVar, Literal

from pydantic import BaseConfig, Extra
from typing_extensions import override
Expand All @@ -16,8 +16,7 @@ class StreamsApp(KafkaApp):
StreamsApp component that configures a streams bootstrap app
"""

_type = "streams-app"
discriminator: Literal["streams-app"] = "streams-app"
type: ClassVar[Literal["streams-app"]] = "streams-app"
app: StreamsAppConfig

class Config(BaseConfig):
Expand Down
2 changes: 1 addition & 1 deletion kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def substitute_component_specific_variables(
substitute(
json.dumps(pair),
{
"component_type": component_object._type,
"component_type": component_object.type,
"component_name": component_object.name,
},
)
Expand Down
113 changes: 107 additions & 6 deletions schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
"type": "array",
"items": {
"discriminator": {
"propertyName": "discriminator",
"propertyName": "type",
"mapping": {
"streams-app": "#/definitions/StreamsApp",
"producer": "#/definitions/ProducerApp"
"producer": "#/definitions/ProducerApp",
"kafka-source-connector": "#/definitions/KafkaSourceConnector",
"kafka-sink-connector": "#/definitions/KafkaSinkConnector"
}
},
"oneOf": [
Expand All @@ -15,6 +17,12 @@
},
{
"$ref": "#/definitions/ProducerApp"
},
{
"$ref": "#/definitions/KafkaSourceConnector"
},
{
"$ref": "#/definitions/KafkaSinkConnector"
}
]
},
Expand Down Expand Up @@ -668,6 +676,14 @@
"description": "StreamsApp component that configures a streams bootstrap app",
"type": "object",
"properties": {
"type": {
"title": "Type",
"default": "streams-app",
"enum": [
"streams-app"
],
"type": "string"
},
"enrich": {
"title": "Enrich",
"default": false,
Expand Down Expand Up @@ -763,6 +779,14 @@
"description": "Producer component\n\nThis producer holds configuration to use as values for the streams bootstrap produce helm chart.",
"type": "object",
"properties": {
"type": {
"title": "Type",
"default": "producer",
"enum": [
"producer"
],
"type": "string"
},
"enrich": {
"title": "Enrich",
"default": false,
Expand All @@ -787,14 +811,91 @@
"version": {
"title": "Version",
"type": "string"
}
},
"required": [
"config",
"name",
"app"
]
},
"KafkaConnectConfig": {
"title": "KafkaConnectConfig",
"type": "object",
"properties": {}
},
"KafkaSourceConnector": {
"title": "KafkaSourceConnector",
"type": "object",
"properties": {
"type": {
"title": "Type",
"default": "kafka-source-connector",
"enum": [
"kafka-source-connector"
],
"type": "string"
},
"discriminator": {
"title": "Discriminator",
"default": "producer",
"enrich": {
"title": "Enrich",
"default": false,
"type": "boolean"
},
"config": {
"$ref": "#/definitions/PipelineConfig"
},
"name": {
"title": "Name",
"type": "string"
},
"from": {
"$ref": "#/definitions/FromSection"
},
"app": {
"$ref": "#/definitions/KafkaConnectConfig"
},
"to": {
"$ref": "#/definitions/ToSection"
}
},
"required": [
"config",
"name",
"app"
]
},
"KafkaSinkConnector": {
"title": "KafkaSinkConnector",
"type": "object",
"properties": {
"type": {
"title": "Type",
"default": "kafka-sink-connector",
"enum": [
"producer"
"kafka-sink-connector"
],
"type": "string"
},
"enrich": {
"title": "Enrich",
"default": false,
"type": "boolean"
},
"config": {
"$ref": "#/definitions/PipelineConfig"
},
"name": {
"title": "Name",
"type": "string"
},
"from": {
"$ref": "#/definitions/FromSection"
},
"app": {
"$ref": "#/definitions/KafkaConnectConfig"
},
"to": {
"$ref": "#/definitions/ToSection"
}
},
"required": [
Expand Down
12 changes: 7 additions & 5 deletions scratch_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

from pydantic import Field, schema_json_of

from kpops.components.base_components.kafka_connect import (
KafkaSinkConnector,
KafkaSourceConnector,
)
from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp
from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp

ComponentType = (
StreamsApp
| ProducerApp
# | KafkaSourceConnector
# | KafkaSinkConnector
| KafkaSourceConnector
| KafkaSinkConnector
# | PipelineComponent
)


AnnotatedPipelineComponent = Annotated[
ComponentType, Field(discriminator="discriminator")
]
AnnotatedPipelineComponent = Annotated[ComponentType, Field(discriminator="type")]
schema = schema_json_of(
Sequence[AnnotatedPipelineComponent],
title="kpops pipeline schema",
Expand Down
Loading

0 comments on commit 448cc68

Please sign in to comment.