Skip to content

Commit

Permalink
Updated the KafkaEvent header(s) value to use the correct array type …
Browse files Browse the repository at this point in the history
…of sbyte values, so that negative values could be handled.
  • Loading branch information
ashishdhingra committed Apr 6, 2023
1 parent 15aff3d commit b3fd72c
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<Description>Amazon Lambda .NET Core support - KafkaEvents package.</Description>
<AssemblyTitle>Amazon.Lambda.KafkaEvents</AssemblyTitle>
<VersionPrefix>1.0.1</VersionPrefix>
<VersionPrefix>2.0.0</VersionPrefix>
<AssemblyName>Amazon.Lambda.KafkaEvents</AssemblyName>
<PackageId>Amazon.Lambda.KafkaEvents</PackageId>
<PackageTags>AWS;Amazon;Lambda;Kafka</PackageTags>
Expand Down
2 changes: 1 addition & 1 deletion Libraries/src/Amazon.Lambda.KafkaEvents/KafkaEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class KafkaEventRecord
/// <summary>
/// The Kafka event record headers.
/// </summary>
public IList<IDictionary<string, byte[]>> Headers { get; set; }
public IList<IDictionary<string, sbyte[]>> Headers { get; set; }
}
}
}
11 changes: 8 additions & 3 deletions Libraries/test/EventsTests.Shared/EventTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2729,20 +2729,25 @@ public void KafkaEventTest(Type serializerType)
var eventRecord = record.Value.FirstOrDefault();
Assert.Equal(eventRecord.Topic, "mytopic");
Assert.Equal(eventRecord.Partition, 12);
Assert.Equal(eventRecord.Offset, 15);
Assert.Equal(eventRecord.Offset, 3043205);
Assert.Equal(eventRecord.Timestamp, 1545084650987);
Assert.Equal(eventRecord.TimestampType, "CREATE_TIME");

Assert.Equal(new StreamReader(eventRecord.Value).ReadToEnd(), "Hello, this is a test.");

Assert.Equal(eventRecord.Headers.Count, 1);
Assert.Equal(eventRecord.Headers.Count, 8);
var eventRecordHeader = eventRecord.Headers.FirstOrDefault();
Assert.NotNull(eventRecordHeader);
Assert.Equal(eventRecordHeader.Count, 1);
var eventRecordHeaderValue = eventRecordHeader.FirstOrDefault();
Assert.NotNull(eventRecordHeaderValue);
Assert.Equal(eventRecordHeaderValue.Key, "headerKey");
Assert.Equal(Encoding.UTF8.GetString(eventRecordHeaderValue.Value), "headerValue");

// Convert sbyte[] to byte[] array.
var tempHeaderValueByteArray = new byte[eventRecordHeaderValue.Value.Length];
Buffer.BlockCopy(eventRecordHeaderValue.Value, 0, tempHeaderValueByteArray, 0, tempHeaderValueByteArray.Length);

Assert.Equal(Encoding.UTF8.GetString(tempHeaderValueByteArray), "headerValue");

Handle(kafkaEvent);
}
Expand Down
170 changes: 169 additions & 1 deletion Libraries/test/EventsTests.Shared/kafka-event.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
{
"topic": "mytopic",
"partition": 12,
"offset": 15,
"offset": 3043205,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "MzM4NjQ2Njcy",
"value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"headers": [
{
Expand All @@ -26,6 +27,173 @@
117,
101
]
},
{
"Id": [
55,
48,
57,
101,
52,
57,
98,
54,
45,
57,
57,
56,
100,
45,
52,
99,
48,
99,
45,
56,
101,
102,
55,
45,
98,
57,
56,
99,
102,
97,
102,
101,
100,
48,
56,
98
]
},
{
"ModelType": [
82,
101,
115,
105,
100,
101,
110,
116,
105,
97,
108,
80,
114,
111,
100,
117,
99,
116,
95,
65,
103,
101,
110,
116,
68,
101,
116,
97,
105,
108
]
},
{
"Version": [
0,
0,
0,
0,
0,
0,
0,
3
]
},
{
"IsFullBuild": [
1
]
},
{
"IsLowPriority": [
1
]
},
{
"OriginationTimestamp": [
0,
0,
1,
-123,
-11,
-81,
-50,
-2
]
},
{
"traceparent": [
48,
48,
45,
54,
57,
49,
99,
99,
101,
55,
50,
102,
50,
55,
52,
97,
54,
100,
54,
56,
97,
102,
54,
54,
56,
102,
54,
51,
97,
98,
52,
97,
57,
99,
49,
45,
53,
97,
97,
50,
57,
98,
54,
99,
97,
51,
97,
49,
101,
57,
50,
102,
45,
48,
48
]
}
]
}
Expand Down

0 comments on commit b3fd72c

Please sign in to comment.