Skip to content

Commit

Permalink
refactor: Cache create_schema_registry_client and rename to get_...
Browse files Browse the repository at this point in the history
This is just foundation for the next commit.
  • Loading branch information
timmc-edx committed Aug 26, 2022
1 parent 10aa36a commit 7ce9fff
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
14 changes: 13 additions & 1 deletion edx_event_bus_kafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
"""

import warnings
from functools import lru_cache
from typing import Optional

from django.conf import settings
from django.dispatch import receiver
from django.test.signals import setting_changed

# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
Expand All @@ -16,10 +19,13 @@


# return type (Optional[SchemaRegistryClient]) removed from signature to avoid error on import
def create_schema_registry_client():
@lru_cache
def get_schema_registry_client():
"""
Create a schema registry client from common settings.
This is cached for convenience.
Returns
None if confluent_kafka library is not available or the settings are invalid.
SchemaRegistryClient if it is.
Expand Down Expand Up @@ -69,3 +75,9 @@ def load_common_settings() -> Optional[dict]:
})

return base_settings


@receiver(setting_changed)
def _reset_state(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches during testing when settings change."""
get_schema_registry_client.cache_clear()
4 changes: 2 additions & 2 deletions edx_event_bus_kafka/consumer/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
from openedx_events.tooling import OpenEdxPublicSignal

from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings
from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,7 +68,7 @@ def _create_consumer(self):
DeserializingConsumer if it is.
"""

schema_registry_client = create_schema_registry_client()
schema_registry_client = get_schema_registry_client()

# TODO (EventBus):
# 1. Reevaluate if all consumers should listen for the earliest unprocessed offset (auto.offset.reset)
Expand Down
4 changes: 2 additions & 2 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings
from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -161,7 +161,7 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str):
logger.warning('Library confluent-kafka not available. Cannot create event producer.')
return None

schema_registry_client = create_schema_registry_client()
schema_registry_client = get_schema_registry_client()
if schema_registry_client is None:
return None

Expand Down
4 changes: 2 additions & 2 deletions edx_event_bus_kafka/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

class TestSchemaRegistryClient(TestCase):
def test_unconfigured(self):
assert config.create_schema_registry_client() is None
assert config.get_schema_registry_client() is None

def test_configured(self):
with override_settings(EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345'):
assert isinstance(config.create_schema_registry_client(), SchemaRegistryClient)
assert isinstance(config.get_schema_registry_client(), SchemaRegistryClient)


class TestCommonSettings(TestCase):
Expand Down

0 comments on commit 7ce9fff

Please sign in to comment.