A KafkaEx.GenConsumer alternative using GenStage Producers for proper backpressure regulated event consumption.
If available in Hex, the package can be installed
by adding kafka_ex_gen_stage_consumer
to your list of dependencies in mix.exs
:
def deps do
[
{:kafka_ex_gen_stage_consumer, git: "https://github.com/gerbal/kafka_ex_gen_stage_consumer"},
# currently depends on an alternative kafka_ex branch
{:kafka_ex, git: "https://github.com/gerbal/kafka_ex", branch: "custom-genconsumer"}
]
end
Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/kafka_ex_gen_stage_consumer.
KafkaExGenStageConsumer
is a GenStage
producer implementation of a
KafkaEx.GenConsumer
. Unlike a GenConsumer, a KafkaExGenStageConsumer
consumes events from kafka in response to demand from subscribing services.
Allowing controlled consumption according to available service capacity.
KafkaExGenStageConsumer
should be started as a KafkaEx.GenConsumer
would
be started, except with an additional argument for the subscribing module
n.b. This may not be the most ideal pattern, suggestions of alternate supervision approaches are welcome.
defmodule MyApp do
use Application
def start(_type, _args) do
import Supervisor.Spec
consumer_group_opts = [
# setting for the ConsumerGroup
heartbeat_interval: 1_000,
# this setting will be forwarded to the GenConsumer
commit_interval: 1_000,
extra_consumer_args: [],
commit_strategy: :async_commit
]
subscriber_impl = ExampleSubscriber
consumer_group_name = "example_group"
topic_names = ["example_topic"]
children = [
# ... other children
supervisor(
KafkaEx.ConsumerGroup,
[KafkaExGenStageConsumer, subscriber_impl, consumer_group_name, topic_names, consumer_group_opts]
)
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
The subscribing module is expected to implement a single function of
start_link/1
, which receives a tuple of {pid, topic, partition, extra_consumer_args}
.
defmodule ExampleSubscriber do
use GenStage
def start_link({producer, topic, partition, extra_consumer_args} = opts) do
gen_server_options = Keyword.split([:name, :debug]) # GenServer.Options.t()
GenStage.start_link(__MODULE__, opts, gen_server_options)
end
def init({producer, topic, partition, extra_consumer_args} = opts) do
{:consumer, [], subscribe_to: [producer]}
end
def handle_events(events, state) do
events
|> Enum.map(&do_work/1)
{:noreply, [], state}
end
end
defmodule ExampleFlowConsumer do
def start_link({producer, topic, partition, extra_consumer_args} = opts) do
Flow.from_stages([producer])
|> Flow.map(&decode_event/1)
|> Flow.map(&do_work/1)
|> Flow.map(&KafkaExGenStageConsumer.trigger_commit(pid, {:async_commit, &1.offset}))
|> Flow.start_link()
end
end
Because the Consumer Subscriber Stage is started by the Consumer, its possible to lose events in the event of a crash or a consumer group reballance.
There are two strategies to handle this case:
- use your consumer subscriber stage as a relay to consumers outside of the
ConsumerGroup
supervision tree - use
commit_strategy: :no_commit
and add a commit offset stage to your genstage pipeline. CallKafkaExGenStageConsumer.trigger_commit/2
to trigger commits