Skip to content

Commit

Permalink
✨ adding header to consumer group as well
Browse files Browse the repository at this point in the history
Signed-off-by: Clement Montois <[email protected]>
  • Loading branch information
cmontois committed Apr 11, 2023
1 parent 92f5612 commit f7fd76a
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ func (consumer *Consumer) processOne(session sarama.ConsumerGroupSession, messag
Timestamp: message.Timestamp.String(),
Metadata: consumer.kafkaEventSource.Metadata,
}

headers := make(map[string]string)

for _, recordHeader := range message.Headers {
headers[string(recordHeader.Key)] = string(recordHeader.Value)
}

eventData.Headers = headers

if consumer.kafkaEventSource.JSONBody {
eventData.Body = (*json.RawMessage)(&message.Value)
} else {
Expand Down

0 comments on commit f7fd76a

Please sign in to comment.