Skip to content

chore: Feature/kafka consumer #905

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1e64f81
feat(kafka): add Avro serialization support and implement Kafka event…
hjgraca Jun 6, 2025
7ff8953
feat(tests): enhance Kafka event handling tests and implement IEnumer…
hjgraca Jun 6, 2025
ea82e0a
feat(kafka): rename to ConsumerRecord and ConsumerRecords classes
hjgraca Jun 6, 2025
802f5a4
feat(kafka): enhance ConsumerRecord and ConsumerRecords with internal…
hjgraca Jun 6, 2025
a2169da
feat(kafka): improve ConsumerRecord properties with nullability check…
hjgraca Jun 6, 2025
a1bd206
feat(kafka): refactor ConsumerRecord and ConsumerRecords to support k…
hjgraca Jun 6, 2025
a981c95
feat(kafka): add Protobuf support with Key.proto and enhance deserial…
hjgraca Jun 6, 2025
14be14b
feat(kafka): add unit tests for PowertoolsKafkaJsonSerializer to vali…
hjgraca Jun 7, 2025
c6e155e
feat(kafka): add constructors to serializers for custom JSON serializ…
hjgraca Jun 7, 2025
4dd610b
feat(kafka): enhance serializers with AOT-compatible JSON context and…
hjgraca Jun 7, 2025
868acff
add examples
hjgraca Jun 9, 2025
733fbbd
refactor to multiple projects, add examples. add headers decoded and …
hjgraca Jun 17, 2025
ac8f29a
enhance primitive deserialization and add tests
hjgraca Jun 17, 2025
9912ec8
add versions
hjgraca Jun 17, 2025
57f5319
refactor examples
hjgraca Jun 18, 2025
40d3282
examples update
hjgraca Jun 18, 2025
5a59f86
new example with class library
hjgraca Jun 18, 2025
e9fcbf2
improve tests and examples
hjgraca Jun 18, 2025
75f4eb7
first sonar fix
hjgraca Jun 18, 2025
0ecf254
sonar fix 2
hjgraca Jun 18, 2025
5e12bca
fix examples build: add avro tools
hjgraca Jun 18, 2025
5eaffd3
add base tests
hjgraca Jun 18, 2025
0c169d1
enhance tests: add deserialization tests with serializer context and …
hjgraca Jun 18, 2025
146837f
enhance tests: add header decoding and complex key/value deserializat…
hjgraca Jun 18, 2025
0b26a1a
enhance tests: add deserialization and serialization tests for non-Co…
hjgraca Jun 18, 2025
b0387c1
add license headers to all source and test files; add KafkaHandlerFun…
hjgraca Jun 18, 2025
06cd719
adding avrogen to codeql build
hjgraca Jun 18, 2025
407964e
feat: add schema metadata support for key and value in ConsumerRecord…
hjgraca Jun 18, 2025
5976ed6
refactor: update deserialization methods to use format-specific logic…
hjgraca Jun 19, 2025
37e5ba3
feat: enhance Protobuf deserialization to support Confluent Schema Re…
hjgraca Jun 19, 2025
adda37b
refactor: reorganize test namespaces and improve deserialization erro…
hjgraca Jun 19, 2025
a64dc31
add coverage to json
hjgraca Jun 19, 2025
6f10a09
refactor: streamline deserialization logic by consolidating type-spec…
hjgraca Jun 19, 2025
1ec2439
refactor: enhance JSON deserialization logic; improve error handling …
hjgraca Jun 19, 2025
397a476
docs for nuget
hjgraca Jun 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ jobs:
with:
languages: ${{ matrix.language }}

- name: Install global tools
run: dotnet tool install --global Apache.Avro.Tools

# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/examples-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ jobs:
- name: Install dependencies
run: dotnet restore

- name: Install global tools
run: dotnet tool install --global Apache.Avro.Tools

- name: Build
run: dotnet build --configuration Release --no-restore /tl

Expand Down
35 changes: 35 additions & 0 deletions examples/Kafka/Avro/src/Avro.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<AWSProjectType>Lambda</AWSProjectType>
<!-- This property makes the build directory similar to a publish directory and helps the AWS .NET Lambda Mock Test Tool find project dependencies. -->
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
<!-- Generate ready to run images during publishing to improve cold start time. -->
<PublishReadyToRun>true</PublishReadyToRun>

<AssemblyName>Avro.Example</AssemblyName>

</PropertyGroup>
<ItemGroup>
<PackageReference Include="Amazon.Lambda.RuntimeSupport" Version="1.12.0"/>
<PackageReference Include="Amazon.Lambda.Core" Version="2.5.0"/>
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4"/>
<PackageReference Include="AWS.Lambda.Powertools.Logging" Version="2.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\libraries\src\AWS.Lambda.Powertools.Kafka.Avro\AWS.Lambda.Powertools.Kafka.Avro.csproj" />
</ItemGroup>
<Target Name="GenerateAvroClasses" BeforeTargets="CoreCompile">
<Exec Command="avrogen -s $(ProjectDir)CustomerProfile.avsc $(ProjectDir)Generated"/>
</Target>
<ItemGroup>
<None Remove="kafka-avro-event.json" />
<Content Include="kafka-avro-event.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
</Project>
46 changes: 46 additions & 0 deletions examples/Kafka/Avro/src/CustomerProfile.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"type": "record",
"name": "CustomerProfile",
"namespace": "com.example",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "full_name", "type": "string"},
{"name": "email", "type": {
"type": "record",
"name": "EmailAddress",
"fields": [
{"name": "address", "type": "string"},
{"name": "verified", "type": "boolean"},
{"name": "primary", "type": "boolean"}
]
}},
{"name": "age", "type": "int"},
{"name": "address", "type": {
"type": "record",
"name": "Address",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"},
{"name": "state", "type": "string"},
{"name": "country", "type": "string"},
{"name": "zip_code", "type": "string"}
]
}},
{"name": "phone_numbers", "type": {
"type": "array",
"items": {
"type": "record",
"name": "PhoneNumber",
"fields": [
{"name": "number", "type": "string"},
{"name": "type", "type": {"type": "enum", "name": "PhoneType", "symbols": ["HOME", "WORK", "MOBILE"]}}
]
}
}},
{"name": "preferences", "type": {
"type": "map",
"values": "string"
}},
{"name": "account_status", "type": {"type": "enum", "name": "AccountStatus", "symbols": ["ACTIVE", "INACTIVE", "SUSPENDED"]}}
]
}
21 changes: 21 additions & 0 deletions examples/Kafka/Avro/src/Function.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Amazon.Lambda.Core;
using Amazon.Lambda.RuntimeSupport;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Avro;
using AWS.Lambda.Powertools.Logging;
using com.example;

string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
{
foreach (var record in records)
{
Logger.LogInformation("Record Value: {@record}", record.Value);
}

return "Processed " + records.Count() + " records";
}

await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
new PowertoolsKafkaAvroSerializer()) // Use PowertoolsKafkaAvroSerializer for Avro serialization
.Build()
.RunAsync();
23 changes: 23 additions & 0 deletions examples/Kafka/Avro/src/Generated/com/example/AccountStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace com.example
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;

[global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
public enum AccountStatus
{
ACTIVE,
INACTIVE,
SUSPENDED,
}
}
115 changes: 115 additions & 0 deletions examples/Kafka/Avro/src/Generated/com/example/Address.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// ------------------------------------------------------------------------------
// <auto-generated>
// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
// Changes to this file may cause incorrect behavior and will be lost if code
// is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace com.example
{
using System;
using System.Collections.Generic;
using System.Text;
using global::Avro;
using global::Avro.Specific;

[global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
public partial class Address : global::Avro.Specific.ISpecificRecord
{
public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"st" +
"reet\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"s" +
"tring\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"zip_code\",\"type\":\"string\"}]}" +
"");
private string _street;
private string _city;
private string _state;
private string _country;
private string _zip_code;
public virtual global::Avro.Schema Schema
{
get
{
return Address._SCHEMA;
}
}
public string street
{
get
{
return this._street;
}
set
{
this._street = value;
}
}
public string city
{
get
{
return this._city;
}
set
{
this._city = value;
}
}
public string state
{
get
{
return this._state;
}
set
{
this._state = value;
}
}
public string country
{
get
{
return this._country;
}
set
{
this._country = value;
}
}
public string zip_code
{
get
{
return this._zip_code;
}
set
{
this._zip_code = value;
}
}
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.street;
case 1: return this.city;
case 2: return this.state;
case 3: return this.country;
case 4: return this.zip_code;
default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.street = (System.String)fieldValue; break;
case 1: this.city = (System.String)fieldValue; break;
case 2: this.state = (System.String)fieldValue; break;
case 3: this.country = (System.String)fieldValue; break;
case 4: this.zip_code = (System.String)fieldValue; break;
default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
}
Loading
Loading