Skip to content

Commit

Permalink
[Kayrock] Add response parser & send describe groups request
Browse files Browse the repository at this point in the history
With single group now only
  • Loading branch information
Argonus committed Oct 8, 2023
1 parent e4244a3 commit a719558
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 5 deletions.
79 changes: 79 additions & 0 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defmodule KafkaEx.New.Client do
alias KafkaEx.Config
alias KafkaEx.NetworkClient

alias KafkaEx.New.Client.RequestBuilder
alias KafkaEx.New.Client.ResponseParser
alias KafkaEx.New.Structs.Broker
alias KafkaEx.New.Structs.ClusterMetadata
alias KafkaEx.New.Structs.NodeSelector
Expand Down Expand Up @@ -165,6 +167,17 @@ defmodule KafkaEx.New.Client do
{:reply, {:ok, topic_metadata}, updated_state}
end

def handle_call({:describe_groups, [consumer_group_name]}, _from, state) do
if KafkaEx.valid_consumer_group?(consumer_group_name) do
{response, updated_state} =
describe_group_request(consumer_group_name, state)

{:reply, response, updated_state}
else
{:reply, {:error, :invalid_consumer_group}, state}
end
end

def handle_call({:kayrock_request, request, node_selector}, _from, state) do
{response, updated_state} =
kayrock_network_request(request, node_selector, state)
Expand Down Expand Up @@ -249,6 +262,72 @@ defmodule KafkaEx.New.Client do
end
end

defp describe_group_request(consumer_group_name, state) do
node_selector = NodeSelector.consumer_group(consumer_group_name)

[consumer_group_name]
|> RequestBuilder.describe_groups_request(state)
|> handle_describe_group_request(node_selector, state)
end

defp handle_describe_group_request(
_,
_,
_,
retry_count \\ @retry_count,
_last_error \\ nil
)

defp handle_describe_group_request(_, _, state, 0, last_error) do
{{:error, last_error}, state}
end

defp handle_describe_group_request(
request,
node_selector,
state,
retry_count,
_last_error
) do
case kayrock_network_request(request, node_selector, state) do
{{:ok, response}, state_out} ->
case ResponseParser.describe_groups_response(response) do
{:ok, [consumer_group]} ->
{{:ok, consumer_group}, state_out}

{:error, [error | _]} ->
consumer_group = request.groups[0]

Logger.warn(
"Unable to fetch consumer group metadata for #{consumer_group.group_id}"
)

handle_describe_group_request(
request,
node_selector,
state,
retry_count - 1,
error
)
end

{_, _state_out} ->
consumer_group = request.groups[0]

Logger.warn(
"Unable to fetch consumer group metadata for #{consumer_group.group_id}"
)

handle_describe_group_request(
request,
node_selector,
state,
retry_count - 1,
:unknown
)
end
end

defp maybe_connect_broker(broker, state) do
case Broker.connected?(broker) do
true ->
Expand Down
22 changes: 22 additions & 0 deletions lib/kafka_ex/new/client/response_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule KafkaEx.New.Client.ResponseParser do
@moduledoc """
This module is used to parse response from KafkaEx.New.Client.
It's main decision point which protocol to use for parsing response
"""
alias KafkaEx.New.Structs.ConsumerGroup

@protocol Application.get_env(
:kafka_ex,
:protocol,
KafkaEx.New.Protocols.KayrockProtocol
)

@doc """
Parses response for Describe Groups API
"""
@spec describe_groups_response(term) ::
{:ok, [ConsumerGroup.t()]} | {:error, term}
def describe_groups_response(response) do
@protocol.parse_response(:describe_groups, response)
end
end
12 changes: 12 additions & 0 deletions lib/kafka_ex/new/protocols/kayrock_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,29 @@ defmodule KafkaEx.New.Protocols.KayrockProtocol do
Once Kafka Ex v1.0 is released, this module will be renamed to KayrockProtocol
and will become a separated package.
"""
@behaviour KafkaEx.New.Client.Protocol

alias KafkaEx.New.Protocols.Kayrock, as: KayrockProtocol

# -----------------------------------------------------------------------------
@doc """
Builds request for Describe Groups API
"""
@impl KafkaEx.New.Client.Protocol
def build_request(:describe_groups, api_version, opts) do
group_names = Keyword.fetch!(opts, :group_names)

api_version
|> Kayrock.DescribeGroups.get_request_struct()
|> KayrockProtocol.DescribeGroups.Request.build_request(group_names)
end

# -----------------------------------------------------------------------------
@doc """
Parses response for Describe Groups API
"""
@impl KafkaEx.New.Client.Protocol
def parse_response(:describe_groups, response) do
KayrockProtocol.DescribeGroups.Response.parse_response(response)
end
end
17 changes: 17 additions & 0 deletions lib/kafka_ex/new/protocols/protocol.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule KafkaEx.New.Client.Protocol do
@moduledoc """
This module is responsible for defining the behaviour of a protocol.
"""
# ------------------------------------------------------------------------------
@type api_version :: non_neg_integer
@type params :: Keyword.t()

# ------------------------------------------------------------------------------
@callback build_request(:describe_groups, integer, params) :: term

# ------------------------------------------------------------------------------
@type consumer_group :: KafkaEx.New.Structs.ConsumerGroup

@callback parse_response(:describe_groups, term) ::
{:ok, [consumer_group]} | {:error, term}
end
46 changes: 46 additions & 0 deletions test/integration/new_client_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule KafkaEx.New.Client.Test do
use ExUnit.Case
import TestHelper

alias KafkaEx.New.Client

Expand All @@ -22,6 +23,39 @@ defmodule KafkaEx.New.Client.Test do
{:ok, %{client: pid}}
end

describe "describe_groups/1" do
setup do
consumer_group = generate_random_string()
topic = "new_client_implementation"

{:ok, %{consumer_group: consumer_group, topic: topic}}
end

test "returns group metadata for single consumer group", %{
consumer_group: consumer_group,
topic: topic,
client: client
} do
join_to_group(client, topic, consumer_group)

{:ok, group_metadata} =
GenServer.call(client, {:describe_groups, [consumer_group]})

assert group_metadata.group_id == consumer_group
assert group_metadata.protocol_type == "consumer"
assert group_metadata.protocol == ""
assert length(group_metadata.members) == 1
end

test "returns dead when consumer group does not exist", %{client: client} do
{:ok, group_metadata} =
GenServer.call(client, {:describe_groups, ["non-existing-group"]})

assert group_metadata.group_id == "non-existing-group"
assert group_metadata.state == "Dead"
end
end

test "update metadata", %{client: client} do
{:ok, updated_metadata} = GenServer.call(client, :update_metadata)
%ClusterMetadata{topics: topics} = updated_metadata
Expand Down Expand Up @@ -181,4 +215,16 @@ defmodule KafkaEx.New.Client.Test do

assert Process.alive?(client)
end

# ------------------------------------------------------------------------------------------------
defp join_to_group(client, topic, consumer_group) do
request = %KafkaEx.Protocol.JoinGroup.Request{
group_name: consumer_group,
member_id: "",
topics: [topic],
session_timeout: 6000
}

KafkaEx.join_group(request, worker_name: client, timeout: 10000)
end
end
14 changes: 9 additions & 5 deletions test/kafka_ex/new/client/request_builder_test.exs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
defmodule KafkaEx.New.Client.RequestBuilderTest do
use ExUnit.Case, async: true

alias KafkaEx.New.Client.RequestBuilder

describe "describe_groups_request/2" do
test "returns request for DescribeGroups API" do
state = %KafkaEx.New.Client.State{api_versions: %{describe_groups: 1}}
group_names = ["group1", "group2"]

assert RequestBuilder.describe_groups_request(group_names, state) ==
{:ok,
%KafkaEx.New.Protocols.DescribeGroups.Request{
group_names: group_names
}}
expected_request = %Kayrock.DescribeGroups.V1.Request{
group_ids: group_ids
}

{:ok, request} =
RequestBuilder.describe_groups_request(group_names, state)

assert expected_request == request
end
end
end

0 comments on commit a719558

Please sign in to comment.