Skip to content

Commit

Permalink
[Kayrock] Add additional layer to parse protocols & add request builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Oct 8, 2023
1 parent 01af8f4 commit e4244a3
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 4 deletions.
36 changes: 36 additions & 0 deletions lib/kafka_ex/new/client/request_builder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule KafkaEx.New.Client.RequestBuilder do
@moduledoc """
This module is used to build request for KafkaEx.New.Client.
It's main decision point which protocol to use for building request and what
is required version.
"""
@protocol Application.get_env(
:kafka_ex,
:protocol,
KafkaEx.New.Protocols.KayrockProtocol
)

@default_api_version %{
describe_groups: 1
}

alias KafkaEx.New.Client.State

@doc """
Builds request for Describe Groups API
"""
@spec describe_groups_request([binary], State.t()) :: term
def describe_groups_request(group_names, state) do
api_version = get_api_version(state, :describe_groups)

@protocol.build_request(:describe_groups, api_version,
group_names: group_names
)
end

# -----------------------------------------------------------------------------
defp get_api_version(state, request_type) do
default = Map.fetch!(@default_api_version, request_type)
State.max_supported_api_version(state, request_type, default)
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defprotocol KafkaEx.New.Protocols.DescribeGroups do
defprotocol KafkaEx.New.Protocols.Kayrock.DescribeGroups do
@moduledoc """
This module handles Describe Groups request & response handling & parsing
This module handles Describe Groups request & response parsing.
Request is built using Kayrock protocol, response is parsed to
native KafkaEx structs.
"""

defprotocol Request do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defimpl KafkaEx.New.Protocols.DescribeGroups.Request,
defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Request,
for: [Kayrock.DescribeGroups.V0.Request, Kayrock.DescribeGroups.V1.Request] do
def build_request(request_template, consumer_group_names) do
Map.put(request_template, :group_ids, consumer_group_names)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defimpl KafkaEx.New.Protocols.DescribeGroups.Response,
defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Response,
for: [Kayrock.DescribeGroups.V0.Response, Kayrock.DescribeGroups.V1.Response] do
def parse_response(%{groups: groups}) do
case Enum.filter(groups, &(&1.error_code != 0)) do
Expand Down
20 changes: 20 additions & 0 deletions lib/kafka_ex/new/protocols/kayrock_protocol.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule KafkaEx.New.Protocols.KayrockProtocol do
@moduledoc """
This module handles Kayrock request & response handling & parsing.
Once Kafka Ex v1.0 is released, this module will be renamed to KayrockProtocol
and will become a separated package.
"""

alias KafkaEx.New.Protocols.Kayrock, as: KayrockProtocol

@doc """
Builds request for Describe Groups API
"""
def build_request(:describe_groups, api_version, opts) do
group_names = Keyword.fetch!(opts, :group_names)

api_version
|> Kayrock.DescribeGroups.get_request_struct()
|> KayrockProtocol.DescribeGroups.Request.build_request(group_names)
end
end
17 changes: 17 additions & 0 deletions test/kafka_ex/new/client/request_builder_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule KafkaEx.New.Client.RequestBuilderTest do
use ExUnit.Case, async: true
alias KafkaEx.New.Client.RequestBuilder

describe "describe_groups_request/2" do
test "returns request for DescribeGroups API" do
state = %KafkaEx.New.Client.State{api_versions: %{describe_groups: 1}}
group_names = ["group1", "group2"]

assert RequestBuilder.describe_groups_request(group_names, state) ==
{:ok,
%KafkaEx.New.Protocols.DescribeGroups.Request{
group_names: group_names
}}
end
end
end
81 changes: 81 additions & 0 deletions test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroupsTest do
use ExUnit.Case, async: true

alias KafkaEx.New.Protocols.Kayrock.DescribeGroups, as: KayrockDescribeGroups

alias Kayrock.DescribeGroups.V0

describe "build_request/2" do
test "builds request for Describe Groups API" do
api_version = 0
consumer_group_names = ["test-group"]
expected_request = %V0.Request{group_ids: groups}

assert KayrockDescribeGroups.build_request(
api_version,
consumer_group_names
) == expected_request
end
end

describe "build_response/1" do
test "for api version 0 - returns response if all groups succeeded" do
response = %V0.Response{
groups: [
%{
group_id: "succeeded",
error_code: 0,
state: "stable",
protocol_type: "protocol_type",
protocol: "protocol",
members: [
%{
member_id: "member_id",
client_id: "client_id",
client_host: "client_host",
member_metadata: "member_metadata",
member_assignment: %{
version: 0,
user_data: "user_data",
partition_assignments: [
%{topic: "test-topic", partitions: [1, 2, 3]}
]
}
}
]
}
]
}

assert {:ok,
[
%KafkaEx.New.ConsumerGroup{
group_id: "succeeded",
state: "stable",
protocol_type: "protocol_type",
protocol: "protocol",
members: [
%KafkaEx.New.ConsumerGroup.Member{
member_id: "member_id",
client_id: "client_id",
client_host: "client_host",
member_metadata: "member_metadata",
member_assignment:
%KafkaEx.New.ConsumerGroup.Member.MemberAssignment{
version: 0,
user_data: "user_data",
partition_assignments: [
%KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{
topic: "test-topic",
partitions: [1, 2, 3]
}
]
}
}
]
}
]} ==
DescribeGroups.Response.parse_response(response)
end
end
end

0 comments on commit e4244a3

Please sign in to comment.