Skip to content

Commit

Permalink
[Kayrock] Add API & ARM support
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Oct 21, 2023
1 parent fbaa3bf commit 5576af0
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 15 deletions.
94 changes: 94 additions & 0 deletions docker-compose-arm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
version: '3.9'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.4.arm64
restart: unless-stopped
ports:
- '32181:32181'
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: "zookeeper-shell 127.0.01:32181 ls /"
interval: 10s
timeout: 10s
retries: 5

kafka-1:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9092:9092'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
healthcheck:
test: kafka-topics --bootstrap-server kafka-1:29092 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka-2:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9093:9093'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
healthcheck:
test: kafka-topics --bootstrap-server kafka-2:29093 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka-3:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9094:9094'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
healthcheck:
test: kafka-topics --bootstrap-server kafka-2:29093 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka_setup:
image: confluentinc/cp-kafka:7.0.4.arm64
depends_on:
zookeeper:
condition: service_healthy
kafka-1:
condition: service_healthy
kafka-2:
condition: service_healthy
kafka-3:
condition: service_healthy
command: "bash -c 'kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic consumer_group_implementation_test && \
kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic test0p8p0 && \
kafka-topics --zookeeper zookeeper:32181 --list'"
16 changes: 16 additions & 0 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ defmodule KafkaEx do
Server.call(worker, :consumer_group)
end

@doc """
Sends a request to describe a group identified by its name.
We support only one consumer group per request for now, as we don't
group requests by group coordinator.
This is a new client implementation, and is not compatible with the old clients
"""
@spec describe_group(binary, Keyword.t()) :: {:ok, any} | {:error, any}
def describe_group(consumer_group_name, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker())

case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do
{:ok, [group]} -> {:ok, group}
{:error, error} -> {:error, error}
end
end

@doc """
Sends a request to join a consumer group.
"""
Expand Down
12 changes: 4 additions & 8 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,12 @@ defmodule KafkaEx.New.Client do
case kayrock_network_request(request, node_selector, state) do
{{:ok, response}, state_out} ->
case ResponseParser.describe_groups_response(response) do
{:ok, [consumer_group]} ->
{{:ok, consumer_group}, state_out}
{:ok, consumer_groups} ->
{{:ok, consumer_groups}, state_out}

{:error, [error | _]} ->
consumer_group = request.groups[0]

Logger.warn(
"Unable to fetch consumer group metadata for #{consumer_group.group_id}"
"Unable to fetch consumer group metadata for #{inspect(request.group_ids)}"
)

handle_describe_group_request(
Expand All @@ -312,10 +310,8 @@ defmodule KafkaEx.New.Client do
end

{_, _state_out} ->
consumer_group = request.groups[0]

Logger.warn(
"Unable to fetch consumer group metadata for #{consumer_group.group_id}"
"Unable to fetch consumer group metadata for #{inspect(request.group_ids)}"
)

handle_describe_group_request(
Expand Down
17 changes: 16 additions & 1 deletion lib/kafka_ex/new/kafka_ex_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule KafkaEx.New.KafkaExAPI do

alias KafkaEx.New.Client
alias KafkaEx.New.Structs.ClusterMetadata
alias KafkaEx.New.Structs.ConsumerGroup
alias KafkaEx.New.Structs.Topic
alias KafkaEx.New.Structs.NodeSelector

Expand Down Expand Up @@ -56,6 +57,20 @@ defmodule KafkaEx.New.KafkaExAPI do
end
end

@doc """
Sends a request to describe a group identified by its name.
We support only one consumer group per request for now, as we don't
group requests by group coordinator.
"""
@spec describe_group(client, Keyword.t()) ::
{:ok, ConsumerGroup.t()} | {:error, any}
def describe_group(client, consumer_group_name) do
case GenServer.call(client, {:describe_groups, [consumer_group_name]}) do
{:ok, [group]} -> {:ok, group}
{:error, error} -> {:error, error}
end
end

@doc """
Get topic metadata for the given topics
Expand All @@ -73,7 +88,7 @@ defmodule KafkaEx.New.KafkaExAPI do
Returns the cluster metadata from the given client
"""
@spec cluster_metadata(client) :: {:ok, ClusterMetadata.t()}
def(cluster_metadata(client)) do
def cluster_metadata(client) do
GenServer.call(client, :cluster_metadata)
end

Expand Down
8 changes: 5 additions & 3 deletions test/integration/kayrock/compatibility_streaming_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do
partition = 0
consumer_group = "streamers"

{:ok, topic} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic)
{:ok, topic} =
KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic)

KafkaEx.produce(topic, partition, "foo 1", api_version: 3)
KafkaEx.produce(topic, partition, "foo 2", api_version: 3)
Expand All @@ -41,7 +42,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do
}
)

TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
length(Enum.take(stream, 3)) == 3
end)

Expand Down Expand Up @@ -81,7 +82,8 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do
topic_name = "kayrock_stream_with_empty_log"
consumer_group = "streamers_with_empty_log"

{:ok, topic} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic_name)
{:ok, topic} =
KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic_name)

{:ok, agent} = Agent.start(fn -> [] end)

Expand Down
54 changes: 53 additions & 1 deletion test/integration/kayrock/compatibility_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ defmodule KafkaEx.KayrockCompatibilityTest do
These mostly come from the original integration_test.exs file
"""

use ExUnit.Case
import KafkaEx.TestHelpers

@moduletag :new_client

Expand Down Expand Up @@ -35,6 +35,46 @@ defmodule KafkaEx.KayrockCompatibilityTest do
assert Process.alive?(pid)
end

describe "describe_groups/1" do
setup do
consumer_group = generate_random_string()
topic = "new_client_implementation"

{:ok, %{consumer_group: consumer_group, topic: topic}}
end

test "with new client - returns group metadata", %{
client: client,
consumer_group: consumer_group,
topic: topic
} do
join_to_group(client, topic, consumer_group)

{:ok, group_metadata} = KafkaExAPI.describe_group(client, consumer_group)

assert group_metadata.group_id == consumer_group
assert group_metadata.protocol_type == "consumer"
assert group_metadata.protocol == ""
assert length(group_metadata.members) == 1
end

test "with old client - returns group metadata", %{
client: client,
consumer_group: consumer_group,
topic: topic
} do
join_to_group(client, topic, consumer_group)

{:ok, group_metadata} =
KafkaEx.describe_group(consumer_group, worker_name: client)

assert group_metadata.group_id == consumer_group
assert group_metadata.protocol_type == "consumer"
assert group_metadata.protocol == ""
assert length(group_metadata.members) == 1
end
end

test "worker updates metadata after specified interval" do
{:ok, args} = KafkaEx.build_worker_options(metadata_update_interval: 100)
{:ok, pid} = Client.start_link(args, :no_name)
Expand Down Expand Up @@ -622,4 +662,16 @@ defmodule KafkaEx.KayrockCompatibilityTest do
topic = KafkaEx.TestHelpers.generate_random_string()
:ok = KafkaEx.produce(topic, nil, "hello", worker_name: client)
end

# -----------------------------------------------------------------------------
defp join_to_group(client, topic, consumer_group) do
request = %KafkaEx.Protocol.JoinGroup.Request{
group_name: consumer_group,
member_id: "",
topics: [topic],
session_timeout: 6000
}

KafkaEx.join_group(request, worker_name: client, timeout: 10000)
end
end
4 changes: 2 additions & 2 deletions test/integration/new_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule KafkaEx.New.Client.Test do
} do
join_to_group(client, topic, consumer_group)

{:ok, group_metadata} =
{:ok, [group_metadata]} =
GenServer.call(client, {:describe_groups, [consumer_group]})

assert group_metadata.group_id == consumer_group
Expand All @@ -48,7 +48,7 @@ defmodule KafkaEx.New.Client.Test do
end

test "returns dead when consumer group does not exist", %{client: client} do
{:ok, group_metadata} =
{:ok, [group_metadata]} =
GenServer.call(client, {:describe_groups, ["non-existing-group"]})

assert group_metadata.group_id == "non-existing-group"
Expand Down

0 comments on commit 5576af0

Please sign in to comment.