Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-42 Add producer and consumer interceptors #1

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,8 @@ jobs:
platform: [ubuntu-latest]

env:
KAFKA_PEERS: localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095
TOXIPROXY_ADDR: http://localhost:8474
KAFKA_INSTALL_ROOT: /home/runner/kafka
KAFKA_HOSTNAME: localhost
DEBUG: true
KAFKA_VERSION: ${{ matrix.kafka-version }}
KAFKA_SCALA_VERSION: 2.12

steps:
- uses: actions/checkout@v1
Expand Down Expand Up @@ -48,16 +43,9 @@ jobs:
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
export REPOSITORY_ROOT=${GITHUB_WORKSPACE}
vagrant/install_cluster.sh
vagrant/boot_cluster.sh
vagrant/create_topics.sh
vagrant/run_java_producer.sh

- name: Run test suite
run: make test
run: make test_functional

- name: Run linter
run: make lint

- name: Teardown
run: vagrant/halt_cluster.sh
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ _testmain.go

coverage.txt
profile.out

simplest-uncommitted-msg-0.1-jar-with-dependencies.jar

6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ fmt:
gofmt -s -l -w $(FILES) $(TESTS)

lint:
golangci-lint run
GOFLAGS="-tags=functional" golangci-lint run

test:
$(GOTEST) ./...

.PHONY: test_functional
test_functional:
$(GOTEST) -tags=functional ./...
4 changes: 4 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ func (p *asyncProducer) dispatcher() {
p.inFlight.Add(1)
}

for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}

version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
Expand Down
121 changes: 121 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1230,6 +1231,126 @@ func TestBrokerProducerShutdown(t *testing.T) {
mockBroker.Close()
}

type appendInterceptor struct {
i int
}

func (b *appendInterceptor) OnSend(msg *ProducerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
v, _ := msg.Value.Encode()
msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i))
b.i++
}

func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i))
b.i++
}

func testProducerInterceptor(
t *testing.T,
interceptors []ProducerInterceptor,
expectationFn func(*testing.T, int, *ProducerMessage),
) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Interceptors = interceptors
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case msg := <-producer.Successes():
expectationFn(t, i, msg)
}
}

closeProducer(t, producer)
leader.Close()
seedBroker.Close()
}

func TestAsyncProducerInterceptors(t *testing.T) {
tests := []struct {
name string
interceptors []ProducerInterceptor
expectationFn func(*testing.T, int, *ProducerMessage)
}{
{
name: "intercept messages",
interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain",
interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+1000)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with one interceptor failing",
interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i+1000)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with all interceptors failing",
interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage
if string(v) != expected {
t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { testProducerInterceptor(t, tt.interceptors, tt.expectationFn) })
}
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
16 changes: 16 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ type Config struct {
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
}

// Interceptors to be called when the producer dispatcher reads the
// message for the first time. Interceptors allows to intercept and
// possible mutate the message before they are published to Kafka
// cluster. *ProducerMessage modified by the first interceptor's
// OnSend() is passed to the second interceptor OnSend(), and so on in
// the interceptor chain.
d1egoaz marked this conversation as resolved.
Show resolved Hide resolved
Interceptors []ProducerInterceptor
}

// Consumer is the namespace for configuration related to consuming messages,
Expand Down Expand Up @@ -391,6 +399,14 @@ type Config struct {
// - use `ReadUncommitted` (default) to consume and return all messages in message channel
// - use `ReadCommitted` to hide messages that are part of an aborted transaction
IsolationLevel IsolationLevel

// Interceptors to be called just before the record is sent to the
// messages channel. Interceptors allows to intercept and possible
// mutate the message before they are returned to the client.
// *ConsumerMessage modified by the first interceptor's OnConsume() is
// passed to the second interceptor OnConsume(), and so on in the
// interceptor chain.
Interceptors []ConsumerInterceptor
}

// A user-provided string sent with every request to the brokers for logging,
Expand Down
3 changes: 3 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ feederLoop:
}

for i, msg := range msgs {
for _, interceptor := range child.conf.Consumer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
messageSelect:
select {
case <-child.dying:
Expand Down
9 changes: 9 additions & 0 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ type ConsumerGroupSession interface {
// message twice, and your processing should ideally be idempotent.
MarkOffset(topic string, partition int32, offset int64, metadata string)

// Commit the offset to the backend
//
// Note: calling Commit performs a blocking synchronous operation.
Commit()

// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
Expand Down Expand Up @@ -624,6 +629,10 @@ func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset
}
}

func (s *consumerGroupSession) Commit() {
s.offsets.Commit()
}

func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
if pom := s.offsets.findPOM(topic, partition); pom != nil {
pom.ResetOffset(offset, metadata)
Expand Down
110 changes: 110 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1342,3 +1343,112 @@ func Test_partitionConsumer_parseResponse(t *testing.T) {
})
}
}

func testConsumerInterceptor(
t *testing.T,
interceptors []ConsumerInterceptor,
expectationFn func(*testing.T, int, *ConsumerMessage),
) {
// Given
broker0 := NewMockBroker(t, 0)

mockFetchResponse := NewMockFetchResponse(t, 1)
for i := 0; i < 10; i++ {
mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
}

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, 0),
"FetchRequest": mockFetchResponse,
})
config := NewConfig()
config.Consumer.Interceptors = interceptors
// When
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
select {
case msg := <-consumer.Messages():
expectationFn(t, i, msg)
case err := <-consumer.Errors():
t.Error(err)
}
}

d1egoaz marked this conversation as resolved.
Show resolved Hide resolved
safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func TestConsumerInterceptors(t *testing.T) {
tests := []struct {
name string
interceptors []ConsumerInterceptor
expectationFn func(*testing.T, int, *ConsumerMessage)
}{
{
name: "intercept messages",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i)
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i) + strconv.Itoa(i+1000)
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with one interceptor failing",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i+1000)
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with all interceptors failing",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev)
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) })
}
}
Loading