Skip to content

Commit

Permalink
Keeping Headers definition as byte. Marshalling/Unmarshalling Headers…
Browse files Browse the repository at this point in the history
… as signed numbers.
  • Loading branch information
milovacb committed Apr 28, 2023
1 parent eede131 commit 622bc5e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 11 deletions.
45 changes: 37 additions & 8 deletions events/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

package events

import (
"encoding/json"
)

type KafkaEvent struct {
EventSource string `json:"eventSource"`
EventSourceARN string `json:"eventSourceArn"`
Expand All @@ -10,12 +14,37 @@ type KafkaEvent struct {
}

type KafkaRecord struct {
Topic string `json:"topic"`
Partition int64 `json:"partition"`
Offset int64 `json:"offset"`
Timestamp MilliSecondsEpochTime `json:"timestamp"`
TimestampType string `json:"timestampType"`
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
Headers []map[string][]int8 `json:"headers"`
Topic string `json:"topic"`
Partition int64 `json:"partition"`
Offset int64 `json:"offset"`
Timestamp MilliSecondsEpochTime `json:"timestamp"`
TimestampType string `json:"timestampType"`
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
Headers []map[string]JSONNumberBytes `json:"headers"`
}

// JSONNumberBytes represents array of bytes in Headers field.
type JSONNumberBytes []byte

// MarshalJSON converts byte array into array of signed integers.
func (data JSONNumberBytes) MarshalJSON() ([]byte, error) {
signedNumbers := make([]int8, len(data))
for i, value := range data {
signedNumbers[i] = int8(value)
}
return json.Marshal(signedNumbers)
}

// UnmarshalJSON converts a given json with potential negative values into byte array.
func (b *JSONNumberBytes) UnmarshalJSON(data []byte) error {
var signedNumbers []int8
if err := json.Unmarshal(data, &signedNumbers); err != nil {
return err
}
*b = make(JSONNumberBytes, len(signedNumbers))
for i, value := range signedNumbers {
(*b)[i] = byte(value)
}
return nil
}
6 changes: 3 additions & 3 deletions events/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func TestKafkaEventMarshaling(t *testing.T) {
}

// expected values for header
var headerValues [5]int8
var headerValues [5]byte
headerValues[0] = 118
headerValues[1] = -36
headerValues[1] = 220 // -36 + 256
headerValues[2] = 0
headerValues[3] = 127
headerValues[4] = -128
headerValues[4] = 128 // -128 + 256

assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092")
assert.Equal(t, inputEvent.EventSource, "aws:kafka")
Expand Down

0 comments on commit 622bc5e

Please sign in to comment.