From 16ea76611d121700a3f3119d18919063d12c81c1 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Thu, 30 Jan 2025 02:53:27 +0500 Subject: [PATCH] fix: get channel target for a gRPC request (#1339) --- .../publisher/test_publisher_client.py | 24 ++++++++++++++++--- .../subscriber/test_subscriber_client.py | 24 ++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index a311edf23..1e1cc61b3 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -57,6 +57,14 @@ typed_flaky = cast(Callable[[C], C], flaky(max_runs=5, min_passes=1)) +# NOTE: This interceptor is required to create an intercept channel. +class _PublisherClientGrpcInterceptor( + grpc.UnaryUnaryClientInterceptor, +): + def intercept_unary_unary(self, continuation, client_call_details, request): + pass + + def _assert_retries_equal(retry, retry2): # Retry instances cannot be directly compared, because their predicates are # different instances of the same function. We thus manually compare their other @@ -416,17 +424,27 @@ def init(self, *args, **kwargs): assert client.transport._ssl_channel_credentials == mock_ssl_creds -def test_init_emulator(monkeypatch): +def test_init_emulator(monkeypatch, creds): monkeypatch.setenv("PUBSUB_EMULATOR_HOST", "/foo/bar:123") # NOTE: When the emulator host is set, a custom channel will be used, so # no credentials (mock ot otherwise) can be passed in. - client = publisher.Client() + + # TODO(https://github.com/grpc/grpc/issues/38519): Workaround to create an intercept + # channel (for forwards compatibility) with a channel created by the publisher client + # where target is set to the emulator host. + channel = publisher.Client().transport.grpc_channel + interceptor = _PublisherClientGrpcInterceptor() + intercept_channel = grpc.intercept_channel(channel, interceptor) + transport = publisher.Client.get_transport_class("grpc")( + credentials=creds, channel=intercept_channel + ) + client = publisher.Client(transport=transport) # Establish that a gRPC request would attempt to hit the emulator host. # # Sadly, there seems to be no good way to do this without poking at # the private API of gRPC. - channel = client._transport.publish._channel + channel = client._transport.publish._thunk("")._channel # Behavior to include dns prefix changed in gRPCv1.63 grpc_major, grpc_minor = [int(part) for part in grpc.__version__.split(".")[0:2]] if grpc_major > 1 or (grpc_major == 1 and grpc_minor >= 63): diff --git a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index 7c0ebfd83..4b381245d 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -36,6 +36,14 @@ from google.pubsub_v1.types import PubsubMessage +# NOTE: This interceptor is required to create an intercept channel. +class _SubscriberClientGrpcInterceptor( + grpc.UnaryUnaryClientInterceptor, +): + def intercept_unary_unary(self, continuation, client_call_details, request): + pass + + def test_init_default_client_info(creds): client = subscriber.Client(credentials=creds) @@ -119,17 +127,27 @@ def init(self, *args, **kwargs): assert client.transport._ssl_channel_credentials == mock_ssl_creds -def test_init_emulator(monkeypatch): +def test_init_emulator(monkeypatch, creds): monkeypatch.setenv("PUBSUB_EMULATOR_HOST", "/baz/bacon:123") # NOTE: When the emulator host is set, a custom channel will be used, so # no credentials (mock ot otherwise) can be passed in. - client = subscriber.Client() + + # TODO(https://github.com/grpc/grpc/issues/38519): Workaround to create an intercept + # channel (for forwards compatibility) with a channel created by the publisher client + # where target is set to the emulator host. + channel = subscriber.Client().transport.grpc_channel + interceptor = _SubscriberClientGrpcInterceptor() + intercept_channel = grpc.intercept_channel(channel, interceptor) + transport = subscriber.Client.get_transport_class("grpc")( + credentials=creds, channel=intercept_channel + ) + client = subscriber.Client(transport=transport) # Establish that a gRPC request would attempt to hit the emulator host. # # Sadly, there seems to be no good way to do this without poking at # the private API of gRPC. - channel = client._transport.pull._channel + channel = client._transport.pull._thunk("")._channel # Behavior to include dns prefix changed in gRPCv1.63 grpc_major, grpc_minor = [int(part) for part in grpc.__version__.split(".")[0:2]] if grpc_major > 1 or (grpc_major == 1 and grpc_minor >= 63):