Skip to content

Commit

Permalink
See #444. Renamed module.
Browse files Browse the repository at this point in the history
  • Loading branch information
j3-signalroom committed Nov 10, 2024
1 parent 8ee578e commit 72c6e1b
Showing 1 changed file with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
__status__ = "dev"


class SchemaRegistry:
class KafkaSchemaRegistry:
"""Kafka Schema Registry class."""

def __init__(self, service_account_user: str):
Expand All @@ -26,20 +26,19 @@ def __init__(self, service_account_user: str):
the prefix to the path of the Schema Registry Cluster secrets in the AWS Secrets Manager.
"""

# --- Set the service account user
self._service_account_user = service_account_user

#
self.aws_service = AwsConfluentProperties()

#
# --- Get the Schema Registry properties
schema_registry_path = f"/confluent_cloud_resource/{self._service_account_user}/schema_registry_cluster/python_client"
self._schema_registry_properties = self.get_schema_registry_properties(schema_registry_path)
confluent_properties = AwsConfluentProperties()
self._schema_registry_properties = confluent_properties.get_schema_registry_properties(schema_registry_path)
if self._schema_registry_properties is None:
raise RuntimeError(f"Failed to retrieve the Schema Registry properties from '{schema_registry_path}' secrets because {self._schema_registry_properties.get_error_message_code()}:{self._schema_registry_properties.get_error_message()}")

def create_delete_if_exist_schema(self, subject_name: str, schema_type: str, schema_string: str) -> (any):
"""
This method deletes schema (and all its versions), if exist. Then Takes the `schema_string`,
This method deletes schema (and all its versions), if exist. Then takes the `schema_string`,
saves it as the `schema_type`, and puts it in in the Schema Registry as a new entry.
Arg(s):
Expand All @@ -53,23 +52,23 @@ def create_delete_if_exist_schema(self, subject_name: str, schema_type: str, sch
"""

# Connect to the Schema Registry
self.client = SchemaRegistryClient(self._schema_registry_properties)
self._client = SchemaRegistryClient(self._schema_registry_properties)

# Delete schema and all versions of it
with suppress(Exception):
self.client.delete_subject(subject_name, True)
self._client.delete_subject(subject_name, True)

# Converts schema into a byte array for transport over the wire
schema = Schema(schema_string, schema_type)

# Registers the schema with the Schema Registry
schema_id = self.client.register_schema(subject_name, schema)
schema_id = self._client.register_schema(subject_name, schema)
logging.info('%s Avro Schema ID: %d', subject_name, schema_id)

return schema_id

def get_schema_registry_client(self):
"""Get the Schema Registry Client."""

return self.client
return self._client

0 comments on commit 72c6e1b

Please sign in to comment.