Skip to content

Commit

Permalink
[Kayrock] Refactor test helpers, add random ports & fix compile issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Oct 21, 2023
1 parent f51a337 commit 6204b60
Show file tree
Hide file tree
Showing 28 changed files with 379 additions and 326 deletions.
2 changes: 1 addition & 1 deletion test/integration/consumer_group_implementation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do

alias KafkaEx.ConsumerGroup
alias KafkaEx.GenConsumer
import TestHelper
import KafkaEx.TestHelpers

require Logger

Expand Down
23 changes: 13 additions & 10 deletions test/integration/consumer_group_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule KafkaEx.ConsumerGroup.Test do
alias KafkaEx.Protocol, as: Proto
alias KafkaEx.Config
use ExUnit.Case
import TestHelper
import KafkaEx.TestHelpers

@moduletag :consumer_group

Expand Down Expand Up @@ -237,7 +237,8 @@ defmodule KafkaEx.ConsumerGroup.Test do
consumer_group: consumer_group
)

offset_before = TestHelper.latest_offset_number(topic, 0, worker_name)
offset_before =
KafkaEx.TestHelpers.latest_offset_number(topic, 0, worker_name)

Enum.each(1..10, fn _ ->
msg = %Proto.Produce.Message{value: "hey #{inspect(:os.timestamp())}"}
Expand All @@ -253,7 +254,9 @@ defmodule KafkaEx.ConsumerGroup.Test do
)
end)

offset_after = TestHelper.latest_offset_number(topic, 0, worker_name)
offset_after =
KafkaEx.TestHelpers.latest_offset_number(topic, 0, worker_name)

assert offset_after == offset_before + 10

[logs] =
Expand Down Expand Up @@ -471,7 +474,7 @@ defmodule KafkaEx.ConsumerGroup.Test do
offset: 0
)

log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end)
log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end)

refute Enum.empty?(log)

Expand Down Expand Up @@ -520,9 +523,9 @@ defmodule KafkaEx.ConsumerGroup.Test do
# make sure the offset commit is actually committed before we
# start streaming again
:ok =
TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
3 ==
TestHelper.latest_consumer_offset_number(
KafkaEx.TestHelpers.latest_consumer_offset_number(
random_string,
0,
consumer_group,
Expand All @@ -531,7 +534,7 @@ defmodule KafkaEx.ConsumerGroup.Test do
end)

stream = KafkaEx.stream(random_string, 0, worker_name: worker_name)
log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end)
log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end)

refute Enum.empty?(log)
first_message = log |> hd
Expand Down Expand Up @@ -574,7 +577,7 @@ defmodule KafkaEx.ConsumerGroup.Test do
assert "message 3" == m2.value

offset =
TestHelper.latest_consumer_offset_number(
KafkaEx.TestHelpers.latest_consumer_offset_number(
topic_name,
0,
consumer_group,
Expand Down Expand Up @@ -623,7 +626,7 @@ defmodule KafkaEx.ConsumerGroup.Test do
assert "message 5" == m4.value

offset =
TestHelper.latest_consumer_offset_number(
KafkaEx.TestHelpers.latest_consumer_offset_number(
topic_name,
0,
consumer_group,
Expand All @@ -649,7 +652,7 @@ defmodule KafkaEx.ConsumerGroup.Test do
Enum.map(map_stream, fn m -> m.value end)

offset =
TestHelper.latest_consumer_offset_number(
KafkaEx.TestHelpers.latest_consumer_offset_number(
topic_name,
0,
other_consumer_group,
Expand Down
8 changes: 5 additions & 3 deletions test/integration/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule KafkaEx.Integration.Test do
alias KafkaEx.Protocol, as: Proto
alias KafkaEx.Config
use ExUnit.Case
import TestHelper
import KafkaEx.TestHelpers

@moduletag :integration

Expand Down Expand Up @@ -203,7 +203,7 @@ defmodule KafkaEx.Integration.Test do
random_string = generate_random_string()

metadata =
TestHelper.wait_for_value(
KafkaEx.TestHelpers.wait_for_value(
fn -> KafkaEx.metadata(topic: random_string) end,
fn metadata ->
metadata != nil && length(metadata.topic_metadatas) > 0
Expand Down Expand Up @@ -365,7 +365,9 @@ defmodule KafkaEx.Integration.Test do
})

[offset_response] =
TestHelper.wait_for_any(fn -> KafkaEx.latest_offset(random_string, 0) end)
KafkaEx.TestHelpers.wait_for_any(fn ->
KafkaEx.latest_offset(random_string, 0)
end)

offset = offset_response.partition_offsets |> hd |> Map.get(:offset) |> hd

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do
resp = create_topic(name, config, client)
assert {:topic_already_exists, name} == parse_create_topic_resp(resp)

TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
{:ok, metadatas} = KafkaExAPI.topics_metadata(client, [name])
length(metadatas) > 0
end)
Expand All @@ -56,7 +56,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do
resp = KafkaEx.delete_topics([name], timeout: 5_000, worker_name: client)
assert {:no_error, name} = parse_delete_topic_resp(resp)

TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
{:ok, []} == KafkaExAPI.topics_metadata(client, [name])
end)
end
Expand Down
2 changes: 1 addition & 1 deletion test/integration/kayrock/compatibility_0_p_8_p_0_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule KafkaEx.KayrockCompatibility0p8p0Test do
partition = 0
:ok = KafkaEx.produce(@topic, partition, msg, worker_name: client)

TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
[got] =
KafkaEx.fetch(
@topic,
Expand Down
8 changes: 4 additions & 4 deletions test/integration/kayrock/compatibility_0_p_9_p_0_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do
end

test "can join a consumer group", %{client: client} do
random_group = TestHelper.generate_random_string()
random_group = KafkaEx.TestHelpers.generate_random_string()

request = %JoinGroupRequest{
group_name: random_group,
Expand All @@ -45,7 +45,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do
test "can send a simple leader sync for a consumer group", %{client: client} do
# A lot of repetition with the previous test. Leaving it in now, waiting for
# how this pans out eventually as we add more and more 0.9 consumer group code
random_group = TestHelper.generate_random_string()
random_group = KafkaEx.TestHelpers.generate_random_string()

request = %JoinGroupRequest{
group_name: random_group,
Expand Down Expand Up @@ -81,7 +81,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do
test "can leave a consumer group", %{client: client} do
# A lot of repetition with the previous tests. Leaving it in now, waiting for
# how this pans out eventually as we add more and more 0.9 consumer group code
random_group = TestHelper.generate_random_string()
random_group = KafkaEx.TestHelpers.generate_random_string()

request = %JoinGroupRequest{
group_name: random_group,
Expand All @@ -107,7 +107,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do

test "can heartbeat", %{client: client} do
# See sync test. Removing repetition in the next iteration
random_group = TestHelper.generate_random_string()
random_group = KafkaEx.TestHelpers.generate_random_string()

request = %JoinGroupRequest{
group_name: random_group,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
end

def sync_stop(pid) when is_pid(pid) do
TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
if Process.alive?(pid) do
Process.exit(pid, :normal)
end
Expand Down Expand Up @@ -172,7 +172,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
topic_name = "#{@topic_name_prefix}#{:rand.uniform(2_000_000)}"

{:ok, topic_name} =
TestHelper.ensure_append_timestamp_topic(client_pid, topic_name)
KafkaEx.TestHelpers.ensure_append_timestamp_topic(client_pid, topic_name)

{:ok, consumer_group_pid1} =
ConsumerGroup.start_link(
Expand All @@ -197,7 +197,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
)

# wait for both consumer groups to join
TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
ConsumerGroup.active?(consumer_group_pid1, 30000) &&
ConsumerGroup.active?(consumer_group_pid2, 30000)
end)
Expand Down Expand Up @@ -291,7 +291,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
starting_offsets =
partition_range
|> Enum.map(fn px ->
{px, TestHelper.latest_offset_number(topic_name, px)}
{px, KafkaEx.TestHelpers.latest_offset_number(topic_name, px)}
end)
|> Enum.into(%{})

Expand All @@ -315,7 +315,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
|> Enum.map(fn px ->
consumer_pid = Map.get(consumers, {topic_name, px})

TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
message_set = TestConsumer.last_message_set(consumer_pid)
correct_last_message?(message_set, messages[px], starting_offsets[px])
end)
Expand All @@ -335,9 +335,9 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do

# offsets should be committed on exit
for px <- partition_range do
TestHelper.wait_for(fn ->
KafkaEx.TestHelpers.wait_for(fn ->
ending_offset =
TestHelper.latest_consumer_offset_number(
KafkaEx.TestHelpers.latest_consumer_offset_number(
topic_name,
px,
@consumer_group_name,
Expand Down
16 changes: 8 additions & 8 deletions test/integration/kayrock/compatibility_consumer_group_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do
test "fetch with auto_commit doesn't blow up on no messages", %{
client: client
} do
topic = TestHelper.generate_random_string()
topic = KafkaEx.TestHelpers.generate_random_string()
consumer_group = "auto_commit_consumer_group"

KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group)
Expand Down Expand Up @@ -167,7 +167,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do
end

test "fetch starts consuming from last committed offset", %{client: client} do
random_string = TestHelper.generate_random_string()
random_string = KafkaEx.TestHelpers.generate_random_string()
consumer_group = "auto_commit_consumer_group"
KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group)

Expand Down Expand Up @@ -204,7 +204,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do
test "fetch does not commit offset with auto_commit is set to false", %{
client: client
} do
topic = TestHelper.generate_random_string()
topic = KafkaEx.TestHelpers.generate_random_string()

Enum.each(1..10, fn _ ->
KafkaEx.produce(
Expand Down Expand Up @@ -248,7 +248,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do
end

test "offset_fetch does not override consumer_group", %{client: client} do
topic = TestHelper.generate_random_string()
topic = KafkaEx.TestHelpers.generate_random_string()
consumer_group = "bar#{topic}"

KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group)
Expand Down Expand Up @@ -276,7 +276,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do

test "offset_commit commits an offset and offset_fetch retrieves the committed offset",
%{client: client} do
random_string = TestHelper.generate_random_string()
random_string = KafkaEx.TestHelpers.generate_random_string()

Enum.each(1..10, fn _ ->
KafkaEx.produce(
Expand Down Expand Up @@ -325,7 +325,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do
end

test "stream auto_commits offset by default", %{client: client} do
random_string = TestHelper.generate_random_string()
random_string = KafkaEx.TestHelpers.generate_random_string()

KafkaEx.produce(
%Proto.Produce.Request{
Expand All @@ -348,7 +348,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do
offset: 0
)

log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end)
log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end)

refute Enum.empty?(log)

Expand All @@ -368,7 +368,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do
test "streams with a consumer group begin at the last committed offset", %{
client: client
} do
topic_name = TestHelper.generate_random_string()
topic_name = KafkaEx.TestHelpers.generate_random_string()
consumer_group = "stream_test"

KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group)
Expand Down
Loading

0 comments on commit 6204b60

Please sign in to comment.