From 8ef823a40e67734ed62e3cf50b717635932ad23d Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 25 Jun 2026 17:21:04 +0300 Subject: [PATCH 01/18] Add Kafka middleware package --- cmd/fkafka/main.go | 45 ++++ go.mod | 16 +- go.sum | 78 +++++- kafka/instance.go | 42 ++++ kafka/logs.go | 23 ++ kafka/message.go | 20 ++ kafka/mock/function.go | 25 ++ kafka/service.go | 443 ++++++++++++++++++++++++++++++++ kafka/service_test.go | 556 +++++++++++++++++++++++++++++++++++++++++ 9 files changed, 1244 insertions(+), 4 deletions(-) create mode 100644 cmd/fkafka/main.go create mode 100644 kafka/instance.go create mode 100644 kafka/logs.go create mode 100644 kafka/message.go create mode 100644 kafka/mock/function.go create mode 100644 kafka/service.go create mode 100644 kafka/service_test.go diff --git a/cmd/fkafka/main.go b/cmd/fkafka/main.go new file mode 100644 index 0000000..0fc568b --- /dev/null +++ b/cmd/fkafka/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "context" + "fmt" + "os" + + "knative.dev/func-go/kafka" +) + +// Main illustrates how scaffolding works to wrap a user's function. +func main() { + // Instanced Example + // (in scaffolding 'New()' will be in module 'f') + if err := kafka.Start(New()); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + + // Static Example + // (in scaffolding 'Handle' will be in the module 'f') + // if err := kafka.Start(kafka.DefaultHandler{Handler: Handle}); err != nil { + // fmt.Fprintln(os.Stderr, err.Error()) + // os.Exit(1) + // } +} + +// Handle is an example static function implementation. +func Handle(ctx context.Context, msg kafka.Message) error { + fmt.Println("Static Kafka Handler invoked") + return nil +} + +// MyFunction is an example instanced Kafka function implementation. +type MyFunction struct{} + +func New() *MyFunction { + return &MyFunction{} +} + +func (f *MyFunction) Handle(ctx context.Context, msg kafka.Message) error { + fmt.Printf("Received message: topic=%s partition=%d offset=%d key=%s value=%s\n", + msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) + return nil +} diff --git a/go.mod b/go.mod index 343d999..e48e3fd 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,33 @@ module knative.dev/func-go go 1.25.0 require ( + github.com/IBM/sarama v1.50.3 github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/rs/zerolog v1.32.0 knative.dev/hack v0.0.0-20260428014158-b2a37f1b6e7b ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.6 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pierrec/lz4/v4 v4.1.27 // indirect + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/crypto v0.53.0 // indirect + golang.org/x/net v0.56.0 // indirect + golang.org/x/sys v0.46.0 // indirect ) diff --git a/go.sum b/go.sum index d18bdd9..f716086 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,40 @@ +github.com/IBM/sarama v1.50.3 h1:zpY2iZYmt+z+0Bo3aYF+cD48OBt2hIgiDPZUuZKTXcc= +github.com/IBM/sarama v1.50.3/go.mod h1:Jo4MSfdDT3ycmQj7/ab8eLZwnvwCKZm/8H7SCbtyo8U= github.com/cloudevents/sdk-go/v2 v2.15.2 h1:54+I5xQEnI73RBhWHxbI1XJcqOFOVJN85vb41+8mHUc= github.com/cloudevents/sdk-go/v2 v2.15.2/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao= +github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -23,33 +46,82 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pierrec/lz4/v4 v4.1.27 h1:+PhzhWDrjRj89TH2sw43nE3+4+W8lSxIuQadEHZyjUk= +github.com/pierrec/lz4/v4 v4.1.27/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.53.0 h1:QZ4Muo8THX6CizN2vPPd5fBGHyogrdK9fG4wLPFUsto= +golang.org/x/crypto v0.53.0/go.mod h1:DNLU434OwVakk9PzuwV8w62mAJpRJL3vsgcfp4Qnsio= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.56.0 h1:Rw8j/hFzGvJUZwNBXnAtf5sVDVt+65SK2C7IxCxZt5o= +golang.org/x/net v0.56.0/go.mod h1:D3Ku6r+V6JROoZK144D2XfMHFcMq/0zSfLelVTCFKec= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.21.0 h1:HLII4xRRTtCRkxYp4HNFF0Js/Og6q2i++KXbg0gHCwM= +golang.org/x/sync v0.21.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= +golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= knative.dev/hack v0.0.0-20260428014158-b2a37f1b6e7b h1:MvbV2F2BdI8qKrYYUhDwbUZbX0BAYRSIpXM2TOtTvs0= diff --git a/kafka/instance.go b/kafka/instance.go new file mode 100644 index 0000000..b283b7a --- /dev/null +++ b/kafka/instance.go @@ -0,0 +1,42 @@ +package kafka + +import "context" + +// Handler is a function instance which can handle a Kafka message. +type Handler interface { + // Handle a Kafka message. + Handle(context.Context, Message) error +} + +// Starter is an instance which has defined the Start hook. +type Starter interface { + // Start instance event hook. + Start(context.Context, map[string]string) error +} + +// Stopper is an instance which has defined the Stop hook. +type Stopper interface { + // Stop instance event hook. + Stop(context.Context) error +} + +// ReadinessReporter is an instance which reports its readiness. +type ReadinessReporter interface { + // Ready to be invoked or not. + Ready(context.Context) (bool, error) +} + +// LivenessReporter is an instance which reports it is alive. +type LivenessReporter interface { + // Alive allows the instance to report its liveness status. + Alive(context.Context) (bool, error) +} + +// DefaultHandler wraps a static function for use with the Kafka middleware. +type DefaultHandler struct { + Handler func(context.Context, Message) error +} + +func (f DefaultHandler) Handle(ctx context.Context, msg Message) error { + return f.Handler(ctx, msg) +} \ No newline at end of file diff --git a/kafka/logs.go b/kafka/logs.go new file mode 100644 index 0000000..f2c45e3 --- /dev/null +++ b/kafka/logs.go @@ -0,0 +1,23 @@ +package kafka + +import "github.com/rs/zerolog" + +func init() { + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + SetLogLevel(DefaultLogLevel) +} + +type logLevel zerolog.Level + +const ( + LogDebug = logLevel(zerolog.DebugLevel) + LogInfo = logLevel(zerolog.InfoLevel) + LogWarn = logLevel(zerolog.WarnLevel) + LogDisabled = logLevel(zerolog.Disabled) +) + +// SetLogLevel to LogDebug, LogInfo, LogWarn, or LogDisabled +// Errors are always returned as values. +func SetLogLevel(l logLevel) { + zerolog.SetGlobalLevel(zerolog.Level(l)) +} diff --git a/kafka/message.go b/kafka/message.go new file mode 100644 index 0000000..fbf4f4b --- /dev/null +++ b/kafka/message.go @@ -0,0 +1,20 @@ +package kafka + +import "time" + +// Message represents a Kafka message delivered to the function's handler. +type Message struct { + Key []byte + Value []byte + Headers []Header + Topic string + Partition int32 + Offset int64 + Timestamp time.Time +} + +// Header is a key-value pair attached to a Kafka message. +type Header struct { + Key string + Value []byte +} \ No newline at end of file diff --git a/kafka/mock/function.go b/kafka/mock/function.go new file mode 100644 index 0000000..7b547f6 --- /dev/null +++ b/kafka/mock/function.go @@ -0,0 +1,25 @@ +package mock + +import "context" + +// Function is a mock for testing lifecycle hooks (Start/Stop). +// It does not implement kafka.Handler to avoid import cycles. +// Tests that need a full kafka.Handler should define one inline. +type Function struct { + OnStart func(context.Context, map[string]string) error + OnStop func(context.Context) error +} + +func (f *Function) Start(ctx context.Context, cfg map[string]string) error { + if f.OnStart != nil { + return f.OnStart(ctx, cfg) + } + return nil +} + +func (f *Function) Stop(ctx context.Context) error { + if f.OnStop != nil { + return f.OnStop(ctx) + } + return nil +} \ No newline at end of file diff --git a/kafka/service.go b/kafka/service.go new file mode 100644 index 0000000..5dbfd29 --- /dev/null +++ b/kafka/service.go @@ -0,0 +1,443 @@ +// Package kafka implements a Functions Kafka middleware for use by +// scaffolding which exposes a function as a service that consumes +// messages from Kafka topics. +package kafka + +import ( + "bufio" + "context" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "runtime" + "strings" + "sync/atomic" + "syscall" + "time" + + "github.com/IBM/sarama" + "github.com/rs/zerolog/log" +) + +const ( + DefaultLogLevel = LogDebug + DefaultListenAddress = "[::]:8080" +) + +const ( + ServerShutdownTimeout = 30 * time.Second + InstanceStopTimeout = 30 * time.Second +) + +// Start an instance using a new Service. +func Start(f Handler) error { + log.Debug().Msg("func runtime creating function instance") + return New(f).Start(context.Background()) +} + +// Service exposes a Function Instance as a Kafka consumer with HTTP health +// endpoints. +type Service struct { + http.Server + listener net.Listener + stop chan error + f Handler + ready atomic.Bool +} + +// New Service which serves the given instance. +func New(f Handler) *Service { + svc := &Service{ + f: f, + stop: make(chan error, 1), + Server: http.Server{ + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 30 * time.Second, + MaxHeaderBytes: 1 << 20, + ReadHeaderTimeout: 2 * time.Second, + }, + } + mux := http.NewServeMux() + mux.HandleFunc("/health/readiness", svc.Ready) + mux.HandleFunc("/health/liveness", svc.Alive) + svc.Handler = mux + + logImplements(f) + + return svc +} + +func logImplements(f any) { + if _, ok := f.(Starter); ok { + log.Info().Msg("Function implements Start") + } + if _, ok := f.(Stopper); ok { + log.Info().Msg("Function implements Stop") + } + if _, ok := f.(ReadinessReporter); ok { + log.Info().Msg("Function implements Ready") + } + if _, ok := f.(LivenessReporter); ok { + log.Info().Msg("Function implements Alive") + } +} + +// Start the service. Blocks until the context is canceled, a runtime error +// occurs, or an OS interrupt/kill signal is received. +func (s *Service) Start(ctx context.Context) (err error) { + addr := listenAddress() + log.Debug().Str("address", addr).Msg("function starting") + + if s.listener, err = net.Listen("tcp", addr); err != nil { + return + } + + if err = s.startInstance(ctx); err != nil { + return + } + + s.handleSignals() + + // Start HTTP health server + go func() { + if err := s.Serve(s.listener); err != http.ErrServerClosed { + log.Error().Err(err).Msg("http server exited with unexpected error") + s.stop <- err + } + }() + + // Start Kafka consumer + consumerCtx, consumerCancel := context.WithCancel(ctx) + defer consumerCancel() + go func() { + if err := s.consumeLoop(consumerCtx); err != nil { + log.Error().Err(err).Msg("kafka consumer exited with error") + s.stop <- err + } + }() + + log.Debug().Msg("waiting for stop signals or errors") + select { + case err = <-s.stop: + if err != nil { + log.Error().Err(err).Msg("function error") + } + case <-ctx.Done(): + log.Debug().Msg("function canceled") + } + consumerCancel() + return s.shutdown(err) +} + +// Addr returns the address upon which the service is listening if started; +// nil otherwise. +func (s *Service) Addr() net.Addr { + if s.listener == nil { + return nil + } + return s.listener.Addr() +} + +// Ready handles readiness checks. +func (s *Service) Ready(w http.ResponseWriter, r *http.Request) { + if i, ok := s.f.(ReadinessReporter); ok { + ready, err := i.Ready(r.Context()) + if err != nil { + log.Debug().Err(err).Msg("error checking readiness") + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, "error checking readiness: ", err.Error()) + return + } + if !ready { + log.Debug().Msg("function not yet ready") + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintln(w, "function not yet ready") + return + } + } else if !s.ready.Load() { + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintln(w, "kafka consumer not yet ready") + return + } + fmt.Fprintf(w, "READY") +} + +// Alive handles liveness checks. +func (s *Service) Alive(w http.ResponseWriter, r *http.Request) { + if i, ok := s.f.(LivenessReporter); ok { + alive, err := i.Alive(r.Context()) + if err != nil { + log.Err(err).Msg("error checking liveness") + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, "error checking liveness: ", err.Error()) + return + } + if !alive { + log.Debug().Msg("function not alive") + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte("function not alive")) + return + } + } + fmt.Fprintf(w, "ALIVE") +} + +func (s *Service) startInstance(ctx context.Context) error { + if i, ok := s.f.(Starter); ok { + cfg, err := newCfg() + if err != nil { + return err + } + go func() { + if err := i.Start(ctx, cfg); err != nil { + s.stop <- err + } + }() + } else { + log.Debug().Msg("function does not implement Start. Skipping") + } + return nil +} + +func (s *Service) handleSignals() { + sigs := make(chan os.Signal, 2) + signal.Notify(sigs) + go func() { + for { + sig := <-sigs + if sig == syscall.SIGINT || sig == syscall.SIGTERM { + log.Debug().Any("signal", sig).Msg("signal received") + s.stop <- nil + } else if runtime.GOOS == "linux" && sig == syscall.Signal(0x17) { + // Ignore SIGURG; signal 23 (0x17) + } + } + }() +} + +func (s *Service) shutdown(sourceErr error) (err error) { + log.Debug().Msg("function stopping") + var runtimeErr, instanceErr error + + ctx, cancel := context.WithTimeout(context.Background(), ServerShutdownTimeout) + defer cancel() + runtimeErr = s.Shutdown(ctx) + + if i, ok := s.f.(Stopper); ok { + ctx, cancel = context.WithTimeout(context.Background(), InstanceStopTimeout) + defer cancel() + instanceErr = i.Stop(ctx) + } + + return collapseErrors("shutdown error", sourceErr, instanceErr, runtimeErr) +} + +// consumeLoop connects to Kafka and consumes messages, calling the function +// handler for each message. +func (s *Service) consumeLoop(ctx context.Context) error { + brokers := kafkaBrokers() + topics := kafkaTopics() + group := kafkaConsumerGroup() + + if len(brokers) == 0 { + return fmt.Errorf("KAFKA_BROKERS environment variable is required") + } + if len(topics) == 0 { + return fmt.Errorf("KAFKA_TOPICS environment variable is required") + } + if group == "" { + return fmt.Errorf("KAFKA_CONSUMER_GROUP environment variable is required") + } + + log.Info(). + Strs("brokers", brokers). + Strs("topics", topics). + Str("group", group). + Msg("connecting to kafka") + + config := sarama.NewConfig() + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{ + sarama.NewBalanceStrategyRoundRobin(), + } + config.Consumer.Offsets.Initial = sarama.OffsetNewest + + client, err := sarama.NewConsumerGroup(brokers, group, config) + if err != nil { + return fmt.Errorf("creating consumer group: %w", err) + } + defer client.Close() + + handler := &consumerGroupHandler{ + f: s.f, + ready: &s.ready, + } + + for { + if err := client.Consume(ctx, topics, handler); err != nil { + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("consumer error: %w", err) + } + if ctx.Err() != nil { + return nil + } + // Rebalance happened; loop to rejoin. + s.ready.Store(false) + } +} + +// consumerGroupHandler implements sarama.ConsumerGroupHandler. +// +// TODO: support exactly-once semantics via transactional consumer/producer +// TODO: add optional deduplication (e.g. by message key or offset tracking) +type consumerGroupHandler struct { + f Handler + ready *atomic.Bool +} + +func (h *consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { + h.ready.Store(true) + log.Info().Msg("kafka consumer ready (partitions assigned)") + return nil +} + +func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { + h.ready.Store(false) + log.Info().Msg("kafka consumer partitions revoked") + return nil +} + +func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + m := Message{ + Key: msg.Key, + Value: msg.Value, + Topic: msg.Topic, + Partition: msg.Partition, + Offset: msg.Offset, + Timestamp: msg.Timestamp, + } + for _, h := range msg.Headers { + if h != nil { + m.Headers = append(m.Headers, Header{ + Key: string(h.Key), + Value: h.Value, + }) + } + } + + // TODO: add retry support (backoff, max attempts, DLQ) + // TODO: a failed message is effectively lost if a later message in the + // same partition succeeds, because MarkMessage on a higher offset + // implicitly commits the earlier one. Consider stopping the partition + // on error or tracking failed offsets separately. + if err := h.f.Handle(session.Context(), m); err != nil { + log.Error().Err(err). + Str("topic", msg.Topic). + Int32("partition", msg.Partition). + Int64("offset", msg.Offset). + Msg("error handling kafka message") + continue + } + session.MarkMessage(msg, "") + } + return nil +} + +func kafkaBrokers() []string { + v := os.Getenv("KAFKA_BROKERS") + if v == "" { + return nil + } + return strings.Split(v, ",") +} + +func kafkaTopics() []string { + v := os.Getenv("KAFKA_TOPICS") + if v == "" { + return nil + } + return strings.Split(v, ",") +} + +func kafkaConsumerGroup() string { + return os.Getenv("KAFKA_CONSUMER_GROUP") +} + +func listenAddress() string { + listenAddress := os.Getenv("LISTEN_ADDRESS") + if listenAddress != "" { + return listenAddress + } + + address := os.Getenv("ADDRESS") + port := os.Getenv("PORT") + if address != "" || port != "" { + if address != "" { + log.Warn().Msg("Environment variable ADDRESS is deprecated and support will be removed in future versions. Try rebuilding your Function with the latest version of func to use LISTEN_ADDRESS instead.") + } else { + address = "127.0.0.1" + } + if port != "" { + log.Warn().Msg("Environment variable PORT is deprecated and support will be removed in future version.s Try rebuilding your Function with the latest version of func to use LISTEN_ADDRESS instead.") + } else { + port = "8080" + } + return address + ":" + port + } + + return DefaultListenAddress +} + +func readCfg() (map[string]string, error) { + cfg := map[string]string{} + + f, err := os.Open("cfg") + if err != nil { + log.Debug().Msg("no static config") + return cfg, nil + } + defer f.Close() + + scanner := bufio.NewScanner(f) + i := 0 + for scanner.Scan() { + i++ + line := scanner.Text() + parts := strings.SplitN(line, "=", 2) + if len(parts) != 2 { + return cfg, fmt.Errorf("config line %v invalid: %v", i, line) + } + cfg[strings.TrimSpace(parts[0])] = strings.Trim(strings.TrimSpace(parts[1]), "\"") + } + return cfg, scanner.Err() +} + +func newCfg() (cfg map[string]string, err error) { + if cfg, err = readCfg(); err != nil { + return + } + + for _, e := range os.Environ() { + pair := strings.SplitN(e, "=", 2) + cfg[pair[0]] = pair[1] + } + return +} + +func collapseErrors(msg string, ee ...error) (err error) { + for _, e := range ee { + if e != nil { + if err == nil { + err = e + } else { + log.Error().Err(e).Msg(msg) + } + } + } + return +} \ No newline at end of file diff --git a/kafka/service_test.go b/kafka/service_test.go new file mode 100644 index 0000000..db72db7 --- /dev/null +++ b/kafka/service_test.go @@ -0,0 +1,556 @@ +package kafka + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "os" + "testing" + "time" +) + +// testFunction is a mock function used in tests. Defined inline to avoid +// import cycles with the kafka/mock package. +type testFunction struct { + onStart func(context.Context, map[string]string) error + onStop func(context.Context) error + onHandle func(context.Context, Message) error +} + +func (f *testFunction) Start(ctx context.Context, cfg map[string]string) error { + if f.onStart != nil { + return f.onStart(ctx, cfg) + } + return nil +} + +func (f *testFunction) Stop(ctx context.Context) error { + if f.onStop != nil { + return f.onStop(ctx) + } + return nil +} + +func (f *testFunction) Handle(ctx context.Context, msg Message) error { + if f.onHandle != nil { + return f.onHandle(ctx, msg) + } + return nil +} + +// TestStart_Invoked ensures that the Start method of a function is invoked +// if it is implemented by the function instance. +func TestStart_Invoked(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + ctx, cancel = context.WithCancel(context.Background()) + startCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, _ map[string]string) error { + startCh <- true + return nil + } + ) + defer cancel() + + f := &testFunction{onStart: onStart} + + go func() { + if err := New(f).Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + // Consumer loop will error because KAFKA_BROKERS is not set, + // but Start should still be invoked before that. + _ = err + case <-startCh: + t.Log("start signal received") + } + cancel() +} + +// TestStart_Static checks that static method Start(f) is a convenience method +// for New(f).Start(). +func TestStart_Static(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + startCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, _ map[string]string) error { + startCh <- true + return nil + } + ) + + f := &testFunction{onStart: onStart} + + go func() { + if err := Start(f); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + _ = err + case <-startCh: + t.Log("start signal received") + } +} + +// TestStart_CfgEnvs ensures that the function's Start method receives a map +// containing all available environment variables as a parameter. +func TestStart_CfgEnvs(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + ctx, cancel = context.WithCancel(context.Background()) + startCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, cfg map[string]string) error { + v := cfg["TEST_ENV"] + if v != "example_value" { + t.Fatalf("did not receive TEST_ENV. got %v", cfg["TEST_ENV"]) + } else { + t.Log("expected value received") + } + startCh <- true + return nil + } + ) + defer cancel() + + f := &testFunction{onStart: onStart} + + t.Setenv("TEST_ENV", "example_value") + + go func() { + if err := New(f).Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + _ = err + case <-startCh: + t.Log("start signal received") + } +} + +// TestCfg_Static ensures that additional static "environment variables" +// built into the container as cfg are correctly read. +func TestCfg_Static(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + ctx, cancel = context.WithCancel(context.Background()) + startCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + ) + defer cancel() + + dir := t.TempDir() + if err := os.Chdir(dir); err != nil { + t.Fatal(err) + } + + if err := os.WriteFile("cfg", []byte(`FUNC_VERSION="v1.2.3"`), os.ModePerm); err != nil { + t.Fatal(err) + } + + f := &testFunction{onStart: func(_ context.Context, cfg map[string]string) error { + v := cfg["FUNC_VERSION"] + if v != "v1.2.3" { + t.Fatalf("FUNC_VERSION not received. Expected 'v1.2.3', got '%v'", + cfg["FUNC_VERSION"]) + } else { + t.Log("expected value received") + } + startCh <- true + return nil + }} + + go func() { + if err := New(f).Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + _ = err + case <-startCh: + t.Log("start signal received") + } +} + +// TestStop_Invoked ensures the Stop method of a function is invoked on context +// cancellation if it is implemented by the function instance. +func TestStop_Invoked(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + ctx, cancel = context.WithCancel(context.Background()) + startCh = make(chan any) + stopCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, _ map[string]string) error { + startCh <- true + return nil + } + onStop = func(_ context.Context) error { + stopCh <- true + return nil + } + ) + + f := &testFunction{onStart: onStart, onStop: onStop} + + go func() { + if err := New(f).Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + _ = err + return + case <-startCh: + t.Log("start signal received") + } + + cancel() + + select { + case <-time.After(500 * time.Millisecond): + t.Fatal("function failed to notify of stop") + case err := <-errCh: + _ = err + case <-stopCh: + t.Log("stop signal received") + } +} + +// TestReady_Invoked ensures the default Ready endpoint returns OK when +// the consumer is marked as ready. +func TestReady_Invoked(t *testing.T) { + f := &testFunction{} + service := New(f) + service.ready.Store(true) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected http status code: %v", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if string(body) != "READY" { + t.Fatalf("unexpected body: %v", string(body)) + } +} + +// TestReady_NotReady ensures the readiness endpoint returns 503 when the +// consumer has not yet joined the group. +func TestReady_NotReady(t *testing.T) { + f := &testFunction{} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %v", resp.StatusCode) + } +} + +// TestAlive_Invoked ensures the default Alive endpoint returns OK. +func TestAlive_Invoked(t *testing.T) { + f := &testFunction{} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/liveness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected http status code: %v", resp.StatusCode) + } +} + +// TestHandle_Direct ensures the handler is invoked correctly when called +// directly (without a real Kafka broker). +func TestHandle_Direct(t *testing.T) { + handled := false + + f := &testFunction{ + onHandle: func(_ context.Context, msg Message) error { + if string(msg.Value) != "test-value" { + t.Fatalf("unexpected message value: %s", msg.Value) + } + if string(msg.Key) != "test-key" { + t.Fatalf("unexpected message key: %s", msg.Key) + } + if msg.Topic != "test-topic" { + t.Fatalf("unexpected topic: %s", msg.Topic) + } + if len(msg.Headers) != 1 || msg.Headers[0].Key != "h1" { + t.Fatalf("unexpected headers: %v", msg.Headers) + } + handled = true + return nil + }, + } + + msg := Message{ + Key: []byte("test-key"), + Value: []byte("test-value"), + Topic: "test-topic", + Partition: 0, + Offset: 42, + Headers: []Header{ + {Key: "h1", Value: []byte("v1")}, + }, + } + + err := f.Handle(context.Background(), msg) + if err != nil { + t.Fatal(err) + } + if !handled { + t.Fatal("handler was not invoked") + } +} + +// TestDefaultHandler ensures the DefaultHandler wrapper works correctly +// for static function implementations. +func TestDefaultHandler(t *testing.T) { + handled := false + + staticFn := func(_ context.Context, msg Message) error { + handled = true + if string(msg.Value) != "hello" { + t.Fatalf("unexpected value: %s", msg.Value) + } + return nil + } + + dh := DefaultHandler{Handler: staticFn} + err := dh.Handle(context.Background(), Message{Value: []byte("hello")}) + if err != nil { + t.Fatal(err) + } + if !handled { + t.Fatal("static handler was not invoked") + } +} + +// TestHandle_Error ensures that a handler returning an error does not +// cause a panic or unexpected behavior. +func TestHandle_Error(t *testing.T) { + expectedErr := "processing failed" + f := &testFunction{ + onHandle: func(_ context.Context, _ Message) error { + return fmt.Errorf("%s", expectedErr) + }, + } + + err := f.Handle(context.Background(), Message{Value: []byte("bad")}) + if err == nil { + t.Fatal("expected error, got nil") + } + if err.Error() != expectedErr { + t.Fatalf("expected %q, got %q", expectedErr, err.Error()) + } +} + +// TestReady_CustomReporter ensures the readiness endpoint delegates to +// the function's Ready method when it implements ReadinessReporter. +func TestReady_CustomReporter(t *testing.T) { + f := &readyFunction{ready: true} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %v", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if string(body) != "READY" { + t.Fatalf("expected READY, got %q", string(body)) + } +} + +// TestReady_CustomReporterNotReady ensures the readiness endpoint returns +// 503 when the function's Ready method returns false. +func TestReady_CustomReporterNotReady(t *testing.T) { + f := &readyFunction{ready: false} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %v", resp.StatusCode) + } +} + +// TestAlive_CustomReporter ensures the liveness endpoint delegates to +// the function's Alive method when it implements LivenessReporter. +func TestAlive_CustomReporter(t *testing.T) { + f := &aliveFunction{alive: true} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/liveness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %v", resp.StatusCode) + } +} + +// TestAlive_CustomReporterNotAlive ensures the liveness endpoint returns +// 503 when the function's Alive method returns false. +func TestAlive_CustomReporterNotAlive(t *testing.T) { + f := &aliveFunction{alive: false} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/liveness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %v", resp.StatusCode) + } +} + +// readyFunction implements Handler and ReadinessReporter for testing. +type readyFunction struct { + ready bool +} + +func (f *readyFunction) Handle(_ context.Context, _ Message) error { return nil } +func (f *readyFunction) Ready(_ context.Context) (bool, error) { return f.ready, nil } + +// aliveFunction implements Handler and LivenessReporter for testing. +type aliveFunction struct { + alive bool +} + +func (f *aliveFunction) Handle(_ context.Context, _ Message) error { return nil } +func (f *aliveFunction) Alive(_ context.Context) (bool, error) { return f.alive, nil } \ No newline at end of file From 16f56e8bd096809284dd70bcb7e7b7e2475838e4 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 00:19:09 +0300 Subject: [PATCH 02/18] Fix missing EOF newlines and goimports alignment --- kafka/instance.go | 2 +- kafka/message.go | 2 +- kafka/mock/function.go | 2 +- kafka/service.go | 2 +- kafka/service_test.go | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/instance.go b/kafka/instance.go index b283b7a..92b359e 100644 --- a/kafka/instance.go +++ b/kafka/instance.go @@ -39,4 +39,4 @@ type DefaultHandler struct { func (f DefaultHandler) Handle(ctx context.Context, msg Message) error { return f.Handler(ctx, msg) -} \ No newline at end of file +} diff --git a/kafka/message.go b/kafka/message.go index fbf4f4b..39c0905 100644 --- a/kafka/message.go +++ b/kafka/message.go @@ -17,4 +17,4 @@ type Message struct { type Header struct { Key string Value []byte -} \ No newline at end of file +} diff --git a/kafka/mock/function.go b/kafka/mock/function.go index 7b547f6..dd5383e 100644 --- a/kafka/mock/function.go +++ b/kafka/mock/function.go @@ -22,4 +22,4 @@ func (f *Function) Stop(ctx context.Context) error { return f.OnStop(ctx) } return nil -} \ No newline at end of file +} diff --git a/kafka/service.go b/kafka/service.go index 5dbfd29..e978912 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -440,4 +440,4 @@ func collapseErrors(msg string, ee ...error) (err error) { } } return -} \ No newline at end of file +} diff --git a/kafka/service_test.go b/kafka/service_test.go index db72db7..81a2056 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -545,7 +545,7 @@ type readyFunction struct { } func (f *readyFunction) Handle(_ context.Context, _ Message) error { return nil } -func (f *readyFunction) Ready(_ context.Context) (bool, error) { return f.ready, nil } +func (f *readyFunction) Ready(_ context.Context) (bool, error) { return f.ready, nil } // aliveFunction implements Handler and LivenessReporter for testing. type aliveFunction struct { @@ -553,4 +553,4 @@ type aliveFunction struct { } func (f *aliveFunction) Handle(_ context.Context, _ Message) error { return nil } -func (f *aliveFunction) Alive(_ context.Context) (bool, error) { return f.alive, nil } \ No newline at end of file +func (f *aliveFunction) Alive(_ context.Context) (bool, error) { return f.alive, nil } From b3c3cded6df24620896daa52b95452227b75f2cc Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 00:31:22 +0300 Subject: [PATCH 03/18] Add HandleFunc type and fix context leak in test --- kafka/instance.go | 3 +++ kafka/service_test.go | 1 + 2 files changed, 4 insertions(+) diff --git a/kafka/instance.go b/kafka/instance.go index 92b359e..d7fb9ec 100644 --- a/kafka/instance.go +++ b/kafka/instance.go @@ -32,6 +32,9 @@ type LivenessReporter interface { Alive(context.Context) (bool, error) } +// HandleFunc is a function type that implements Handler. +type HandleFunc func(context.Context, Message) error + // DefaultHandler wraps a static function for use with the Kafka middleware. type DefaultHandler struct { Handler func(context.Context, Message) error diff --git a/kafka/service_test.go b/kafka/service_test.go index 81a2056..089ec58 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -224,6 +224,7 @@ func TestStop_Invoked(t *testing.T) { return nil } ) + defer cancel() f := &testFunction{onStart: onStart, onStop: onStop} From 974b9400dda70d25ab0a28bab9978bef92d05971 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 00:33:55 +0300 Subject: [PATCH 04/18] Add Handle method to kafka mock Function --- kafka/mock/function.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/kafka/mock/function.go b/kafka/mock/function.go index dd5383e..4afeb9c 100644 --- a/kafka/mock/function.go +++ b/kafka/mock/function.go @@ -1,13 +1,15 @@ package mock -import "context" +import ( + "context" + + "knative.dev/func-go/kafka" +) -// Function is a mock for testing lifecycle hooks (Start/Stop). -// It does not implement kafka.Handler to avoid import cycles. -// Tests that need a full kafka.Handler should define one inline. type Function struct { - OnStart func(context.Context, map[string]string) error - OnStop func(context.Context) error + OnStart func(context.Context, map[string]string) error + OnStop func(context.Context) error + OnHandle func(context.Context, kafka.Message) error } func (f *Function) Start(ctx context.Context, cfg map[string]string) error { @@ -23,3 +25,10 @@ func (f *Function) Stop(ctx context.Context) error { } return nil } + +func (f *Function) Handle(ctx context.Context, msg kafka.Message) error { + if f.OnHandle != nil { + return f.OnHandle(ctx, msg) + } + return nil +} From a0f3ba1b770a4886b00afae3cf888665ed8ed672 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 00:56:02 +0300 Subject: [PATCH 05/18] Add consumer loop tests for ConsumeClaim handler --- kafka/service_test.go | 156 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/kafka/service_test.go b/kafka/service_test.go index 089ec58..3d557cc 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -7,8 +7,11 @@ import ( "net" "net/http" "os" + "sync/atomic" "testing" "time" + + "github.com/IBM/sarama" ) // testFunction is a mock function used in tests. Defined inline to avoid @@ -555,3 +558,156 @@ type aliveFunction struct { func (f *aliveFunction) Handle(_ context.Context, _ Message) error { return nil } func (f *aliveFunction) Alive(_ context.Context) (bool, error) { return f.alive, nil } + +// mockSession implements sarama.ConsumerGroupSession for testing. +type mockSession struct { + ctx context.Context + marked []*sarama.ConsumerMessage +} + +func (s *mockSession) Claims() map[string][]int32 { return nil } +func (s *mockSession) MemberID() string { return "test" } +func (s *mockSession) GenerationID() int32 { return 1 } +func (s *mockSession) MarkOffset(string, int32, int64, string) {} +func (s *mockSession) Commit() {} +func (s *mockSession) ResetOffset(string, int32, int64, string) {} +func (s *mockSession) Context() context.Context { return s.ctx } +func (s *mockSession) MarkMessage(msg *sarama.ConsumerMessage, _ string) { + s.marked = append(s.marked, msg) +} + +// mockClaim implements sarama.ConsumerGroupClaim for testing. +type mockClaim struct { + ch chan *sarama.ConsumerMessage +} + +func (c *mockClaim) Topic() string { return "test-topic" } +func (c *mockClaim) Partition() int32 { return 0 } +func (c *mockClaim) InitialOffset() int64 { return 0 } +func (c *mockClaim) HighWaterMarkOffset() int64 { return 0 } +func (c *mockClaim) Messages() <-chan *sarama.ConsumerMessage { return c.ch } + +// TestConsumeClaim_Success ensures that ConsumeClaim converts sarama messages +// to kafka.Message, calls the handler, and marks the message on success. +func TestConsumeClaim_Success(t *testing.T) { + var received Message + f := &testFunction{ + onHandle: func(_ context.Context, msg Message) error { + received = msg + return nil + }, + } + + var ready atomic.Bool + h := &consumerGroupHandler{f: f, ready: &ready} + + ch := make(chan *sarama.ConsumerMessage, 1) + ch <- &sarama.ConsumerMessage{ + Key: []byte("k1"), + Value: []byte("v1"), + Topic: "my-topic", + Partition: 2, + Offset: 99, + Headers: []*sarama.RecordHeader{ + {Key: []byte("hk"), Value: []byte("hv")}, + }, + } + close(ch) + + session := &mockSession{ctx: context.Background()} + claim := &mockClaim{ch: ch} + + if err := h.ConsumeClaim(session, claim); err != nil { + t.Fatal(err) + } + + if string(received.Key) != "k1" { + t.Fatalf("expected key 'k1', got '%s'", received.Key) + } + if string(received.Value) != "v1" { + t.Fatalf("expected value 'v1', got '%s'", received.Value) + } + if received.Topic != "my-topic" { + t.Fatalf("expected topic 'my-topic', got '%s'", received.Topic) + } + if received.Partition != 2 { + t.Fatalf("expected partition 2, got %d", received.Partition) + } + if received.Offset != 99 { + t.Fatalf("expected offset 99, got %d", received.Offset) + } + if len(received.Headers) != 1 || received.Headers[0].Key != "hk" || string(received.Headers[0].Value) != "hv" { + t.Fatalf("unexpected headers: %v", received.Headers) + } + if len(session.marked) != 1 { + t.Fatalf("expected 1 marked message, got %d", len(session.marked)) + } +} + +// TestConsumeClaim_Error ensures that when the handler returns an error, +// the message is not marked and processing continues. +func TestConsumeClaim_Error(t *testing.T) { + callCount := 0 + f := &testFunction{ + onHandle: func(_ context.Context, msg Message) error { + callCount++ + if string(msg.Value) == "bad" { + return fmt.Errorf("handle error") + } + return nil + }, + } + + var ready atomic.Bool + h := &consumerGroupHandler{f: f, ready: &ready} + + ch := make(chan *sarama.ConsumerMessage, 2) + ch <- &sarama.ConsumerMessage{Value: []byte("bad"), Topic: "t", Partition: 0, Offset: 1} + ch <- &sarama.ConsumerMessage{Value: []byte("good"), Topic: "t", Partition: 0, Offset: 2} + close(ch) + + session := &mockSession{ctx: context.Background()} + claim := &mockClaim{ch: ch} + + if err := h.ConsumeClaim(session, claim); err != nil { + t.Fatal(err) + } + + if callCount != 2 { + t.Fatalf("expected handler called 2 times, got %d", callCount) + } + if len(session.marked) != 1 { + t.Fatalf("expected 1 marked message (only the good one), got %d", len(session.marked)) + } + if session.marked[0].Offset != 2 { + t.Fatalf("expected marked offset 2, got %d", session.marked[0].Offset) + } +} + +// TestConsumeClaim_SetupCleanup ensures Setup sets ready=true and +// Cleanup sets ready=false. +func TestConsumeClaim_SetupCleanup(t *testing.T) { + var ready atomic.Bool + h := &consumerGroupHandler{ + f: &testFunction{}, + ready: &ready, + } + + if ready.Load() { + t.Fatal("expected ready=false initially") + } + + if err := h.Setup(nil); err != nil { + t.Fatal(err) + } + if !ready.Load() { + t.Fatal("expected ready=true after Setup") + } + + if err := h.Cleanup(nil); err != nil { + t.Fatal(err) + } + if ready.Load() { + t.Fatal("expected ready=false after Cleanup") + } +} From bb09f2889091aa2b8db36d2a2c2c7da82af0a1b7 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 01:21:59 +0300 Subject: [PATCH 06/18] Fix goimports alignment in mock types --- kafka/service_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/kafka/service_test.go b/kafka/service_test.go index 3d557cc..0c40cf9 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -565,13 +565,13 @@ type mockSession struct { marked []*sarama.ConsumerMessage } -func (s *mockSession) Claims() map[string][]int32 { return nil } -func (s *mockSession) MemberID() string { return "test" } -func (s *mockSession) GenerationID() int32 { return 1 } -func (s *mockSession) MarkOffset(string, int32, int64, string) {} -func (s *mockSession) Commit() {} +func (s *mockSession) Claims() map[string][]int32 { return nil } +func (s *mockSession) MemberID() string { return "test" } +func (s *mockSession) GenerationID() int32 { return 1 } +func (s *mockSession) MarkOffset(string, int32, int64, string) {} +func (s *mockSession) Commit() {} func (s *mockSession) ResetOffset(string, int32, int64, string) {} -func (s *mockSession) Context() context.Context { return s.ctx } +func (s *mockSession) Context() context.Context { return s.ctx } func (s *mockSession) MarkMessage(msg *sarama.ConsumerMessage, _ string) { s.marked = append(s.marked, msg) } @@ -581,10 +581,10 @@ type mockClaim struct { ch chan *sarama.ConsumerMessage } -func (c *mockClaim) Topic() string { return "test-topic" } -func (c *mockClaim) Partition() int32 { return 0 } -func (c *mockClaim) InitialOffset() int64 { return 0 } -func (c *mockClaim) HighWaterMarkOffset() int64 { return 0 } +func (c *mockClaim) Topic() string { return "test-topic" } +func (c *mockClaim) Partition() int32 { return 0 } +func (c *mockClaim) InitialOffset() int64 { return 0 } +func (c *mockClaim) HighWaterMarkOffset() int64 { return 0 } func (c *mockClaim) Messages() <-chan *sarama.ConsumerMessage { return c.ch } // TestConsumeClaim_Success ensures that ConsumeClaim converts sarama messages From 1abbeb275ebb6c1550ed564072699d20b8359007 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 10:16:05 +0300 Subject: [PATCH 07/18] Trim whitespace in KAFKA_BROKERS and KAFKA_TOPICS parsing --- kafka/service.go | 16 ++++++++++++++-- kafka/service_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/kafka/service.go b/kafka/service.go index e978912..85ba96a 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -353,7 +353,7 @@ func kafkaBrokers() []string { if v == "" { return nil } - return strings.Split(v, ",") + return splitAndTrim(v) } func kafkaTopics() []string { @@ -361,7 +361,19 @@ func kafkaTopics() []string { if v == "" { return nil } - return strings.Split(v, ",") + return splitAndTrim(v) +} + +func splitAndTrim(s string) []string { + parts := strings.Split(s, ",") + var result []string + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + result = append(result, p) + } + } + return result } func kafkaConsumerGroup() string { diff --git a/kafka/service_test.go b/kafka/service_test.go index 0c40cf9..533da4c 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -711,3 +711,31 @@ func TestConsumeClaim_SetupCleanup(t *testing.T) { t.Fatal("expected ready=false after Cleanup") } } + +func TestSplitAndTrim(t *testing.T) { + tests := []struct { + name string + input string + expected []string + }{ + {"simple", "a,b,c", []string{"a", "b", "c"}}, + {"with spaces", "a, b, c", []string{"a", "b", "c"}}, + {"trailing comma", "a,b,", []string{"a", "b"}}, + {"empty entries", "a,,b", []string{"a", "b"}}, + {"spaces only", " , , ", nil}, + {"single value", "a", []string{"a"}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := splitAndTrim(tt.input) + if len(got) != len(tt.expected) { + t.Fatalf("expected %v, got %v", tt.expected, got) + } + for i := range got { + if got[i] != tt.expected[i] { + t.Fatalf("expected %v, got %v", tt.expected, got) + } + } + }) + } +} From 273bd3ffc29bfd70c5aa014000a2db464de9f03f Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 10:21:26 +0300 Subject: [PATCH 08/18] Fix typo --- kafka/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/service.go b/kafka/service.go index 85ba96a..7117ed2 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -395,7 +395,7 @@ func listenAddress() string { address = "127.0.0.1" } if port != "" { - log.Warn().Msg("Environment variable PORT is deprecated and support will be removed in future version.s Try rebuilding your Function with the latest version of func to use LISTEN_ADDRESS instead.") + log.Warn().Msg("Environment variable PORT is deprecated and support will be removed in future versions. Try rebuilding your Function with the latest version of func to use LISTEN_ADDRESS instead.") } else { port = "8080" } From 40f835618e21c5ccee201f19c5ff3302b1badce4 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 10:22:33 +0300 Subject: [PATCH 09/18] Restore working directory after TestCfg_Static --- kafka/service_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kafka/service_test.go b/kafka/service_test.go index 533da4c..b143af1 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -170,6 +170,12 @@ func TestCfg_Static(t *testing.T) { ) defer cancel() + origDir, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { os.Chdir(origDir) }) + dir := t.TempDir() if err := os.Chdir(dir); err != nil { t.Fatal(err) From 10ae231e860571845157c79ff374b1b1c75c2a82 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 10:54:36 +0300 Subject: [PATCH 10/18] Assert hooks are invoked even when consumer errors first --- kafka/service_test.go | 57 +++++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/kafka/service_test.go b/kafka/service_test.go index b143af1..0b55293 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -71,10 +71,13 @@ func TestStart_Invoked(t *testing.T) { select { case <-timeoutCh: t.Fatal("function failed to notify of start") - case err := <-errCh: - // Consumer loop will error because KAFKA_BROKERS is not set, - // but Start should still be invoked before that. - _ = err + case <-errCh: + select { + case <-startCh: + t.Log("start signal received") + case <-time.After(100 * time.Millisecond): + t.Fatal("Start hook was not invoked") + } case <-startCh: t.Log("start signal received") } @@ -107,8 +110,13 @@ func TestStart_Static(t *testing.T) { select { case <-timeoutCh: t.Fatal("function failed to notify of start") - case err := <-errCh: - _ = err + case <-errCh: + select { + case <-startCh: + t.Log("start signal received") + case <-time.After(100 * time.Millisecond): + t.Fatal("Start hook was not invoked") + } case <-startCh: t.Log("start signal received") } @@ -150,8 +158,13 @@ func TestStart_CfgEnvs(t *testing.T) { select { case <-timeoutCh: t.Fatal("function failed to notify of start") - case err := <-errCh: - _ = err + case <-errCh: + select { + case <-startCh: + t.Log("start signal received") + case <-time.After(100 * time.Millisecond): + t.Fatal("Start hook was not invoked") + } case <-startCh: t.Log("start signal received") } @@ -206,8 +219,13 @@ func TestCfg_Static(t *testing.T) { select { case <-timeoutCh: t.Fatal("function failed to notify of start") - case err := <-errCh: - _ = err + case <-errCh: + select { + case <-startCh: + t.Log("start signal received") + case <-time.After(100 * time.Millisecond): + t.Fatal("Start hook was not invoked") + } case <-startCh: t.Log("start signal received") } @@ -246,9 +264,13 @@ func TestStop_Invoked(t *testing.T) { select { case <-timeoutCh: t.Fatal("function failed to notify of start") - case err := <-errCh: - _ = err - return + case <-errCh: + select { + case <-startCh: + t.Log("start signal received") + case <-time.After(100 * time.Millisecond): + t.Fatal("Start hook was not invoked") + } case <-startCh: t.Log("start signal received") } @@ -258,8 +280,13 @@ func TestStop_Invoked(t *testing.T) { select { case <-time.After(500 * time.Millisecond): t.Fatal("function failed to notify of stop") - case err := <-errCh: - _ = err + case <-errCh: + select { + case <-stopCh: + t.Log("stop signal received") + case <-time.After(100 * time.Millisecond): + t.Fatal("Stop hook was not invoked") + } case <-stopCh: t.Log("stop signal received") } From e32161f36ce3e3818ed4343bb56157acbd43a50f Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 10:59:46 +0300 Subject: [PATCH 11/18] Rename loop variable to avoid shadowing receiver in ConsumeClaim --- kafka/service.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/service.go b/kafka/service.go index 7117ed2..88a7b55 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -321,11 +321,11 @@ func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Offset: msg.Offset, Timestamp: msg.Timestamp, } - for _, h := range msg.Headers { - if h != nil { + for _, rh := range msg.Headers { + if rh != nil { m.Headers = append(m.Headers, Header{ - Key: string(h.Key), - Value: h.Value, + Key: string(rh.Key), + Value: rh.Value, }) } } From b824b68f29ffacb187e1b73247bd7d2c2fdc747f Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 11:38:46 +0300 Subject: [PATCH 12/18] Select on session context in ConsumeClaim for prompt shutdown --- kafka/service.go | 67 ++++++++++++++++++++++++------------------- kafka/service_test.go | 24 ++++++++++++++++ 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/kafka/service.go b/kafka/service.go index 88a7b55..f7e2172 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -312,40 +312,47 @@ func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { } func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - m := Message{ - Key: msg.Key, - Value: msg.Value, - Topic: msg.Topic, - Partition: msg.Partition, - Offset: msg.Offset, - Timestamp: msg.Timestamp, - } - for _, rh := range msg.Headers { - if rh != nil { - m.Headers = append(m.Headers, Header{ - Key: string(rh.Key), - Value: rh.Value, - }) + for { + select { + case msg, ok := <-claim.Messages(): + if !ok { + return nil + } + m := Message{ + Key: msg.Key, + Value: msg.Value, + Topic: msg.Topic, + Partition: msg.Partition, + Offset: msg.Offset, + Timestamp: msg.Timestamp, + } + for _, rh := range msg.Headers { + if rh != nil { + m.Headers = append(m.Headers, Header{ + Key: string(rh.Key), + Value: rh.Value, + }) + } } - } - // TODO: add retry support (backoff, max attempts, DLQ) - // TODO: a failed message is effectively lost if a later message in the - // same partition succeeds, because MarkMessage on a higher offset - // implicitly commits the earlier one. Consider stopping the partition - // on error or tracking failed offsets separately. - if err := h.f.Handle(session.Context(), m); err != nil { - log.Error().Err(err). - Str("topic", msg.Topic). - Int32("partition", msg.Partition). - Int64("offset", msg.Offset). - Msg("error handling kafka message") - continue + // TODO: add retry support (backoff, max attempts, DLQ) + // TODO: a failed message is effectively lost if a later message in the + // same partition succeeds, because MarkMessage on a higher offset + // implicitly commits the earlier one. Consider stopping the partition + // on error or tracking failed offsets separately. + if err := h.f.Handle(session.Context(), m); err != nil { + log.Error().Err(err). + Str("topic", msg.Topic). + Int32("partition", msg.Partition). + Int64("offset", msg.Offset). + Msg("error handling kafka message") + continue + } + session.MarkMessage(msg, "") + case <-session.Context().Done(): + return nil } - session.MarkMessage(msg, "") } - return nil } func kafkaBrokers() []string { diff --git a/kafka/service_test.go b/kafka/service_test.go index 0b55293..5e948f5 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -745,6 +745,30 @@ func TestConsumeClaim_SetupCleanup(t *testing.T) { } } +// TestConsumeClaim_SessionCancel ensures that ConsumeClaim returns promptly +// when the session context is canceled, even if messages are still available. +func TestConsumeClaim_SessionCancel(t *testing.T) { + f := &testFunction{ + onHandle: func(_ context.Context, _ Message) error { + return nil + }, + } + + var ready atomic.Bool + h := &consumerGroupHandler{f: f, ready: &ready} + + ch := make(chan *sarama.ConsumerMessage, 1) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + session := &mockSession{ctx: ctx} + claim := &mockClaim{ch: ch} + + if err := h.ConsumeClaim(session, claim); err != nil { + t.Fatal(err) + } +} + func TestSplitAndTrim(t *testing.T) { tests := []struct { name string From 4eec5a1bcd804f794a6cfbdba4eafa2a427fb42a Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 13:09:09 +0300 Subject: [PATCH 13/18] Close listener on startInstance failure and log consumer close errors --- kafka/service.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka/service.go b/kafka/service.go index f7e2172..757418f 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -96,6 +96,7 @@ func (s *Service) Start(ctx context.Context) (err error) { } if err = s.startInstance(ctx); err != nil { + s.listener.Close() return } @@ -268,7 +269,11 @@ func (s *Service) consumeLoop(ctx context.Context) error { if err != nil { return fmt.Errorf("creating consumer group: %w", err) } - defer client.Close() + defer func() { + if err := client.Close(); err != nil { + log.Error().Err(err).Msg("error closing kafka consumer group") + } + }() handler := &consumerGroupHandler{ f: s.f, From cd3c7e52141b861b974ae47ea4a5e29edb4dad75 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 13:29:14 +0300 Subject: [PATCH 14/18] Trim whitespace in KAFKA_CONSUMER_GROUP parsing --- kafka/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/service.go b/kafka/service.go index 757418f..40f08d0 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -389,7 +389,7 @@ func splitAndTrim(s string) []string { } func kafkaConsumerGroup() string { - return os.Getenv("KAFKA_CONSUMER_GROUP") + return strings.TrimSpace(os.Getenv("KAFKA_CONSUMER_GROUP")) } func listenAddress() string { From b82fdb2dbc7e7e4572a7e6547a10d756afaca72e Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 13:41:06 +0300 Subject: [PATCH 15/18] Add tests for health error paths and listenAddress --- kafka/service_test.go | 113 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 2 deletions(-) diff --git a/kafka/service_test.go b/kafka/service_test.go index 5e948f5..f42f482 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -576,21 +576,130 @@ func TestAlive_CustomReporterNotAlive(t *testing.T) { } } +// TestReady_CustomReporterError ensures the readiness endpoint returns 500 +// when the function's Ready method returns an error. +func TestReady_CustomReporterError(t *testing.T) { + f := &readyFunction{ready: false, err: fmt.Errorf("db connection failed")} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusInternalServerError { + t.Fatalf("expected 500, got %v", resp.StatusCode) + } +} + +// TestAlive_CustomReporterError ensures the liveness endpoint returns 500 +// when the function's Alive method returns an error. +func TestAlive_CustomReporterError(t *testing.T) { + f := &aliveFunction{alive: false, err: fmt.Errorf("health check failed")} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/liveness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusInternalServerError { + t.Fatalf("expected 500, got %v", resp.StatusCode) + } +} + +// TestListenAddress ensures the listen address is resolved from environment +// variables with the correct precedence. +func TestListenAddress(t *testing.T) { + tests := []struct { + name string + envs map[string]string + expected string + }{ + { + name: "default", + envs: nil, + expected: "[::]:8080", + }, + { + name: "LISTEN_ADDRESS", + envs: map[string]string{"LISTEN_ADDRESS": "0.0.0.0:9090"}, + expected: "0.0.0.0:9090", + }, + { + name: "deprecated ADDRESS and PORT", + envs: map[string]string{"ADDRESS": "10.0.0.1", "PORT": "3000"}, + expected: "10.0.0.1:3000", + }, + { + name: "deprecated ADDRESS only", + envs: map[string]string{"ADDRESS": "10.0.0.1"}, + expected: "10.0.0.1:8080", + }, + { + name: "deprecated PORT only", + envs: map[string]string{"PORT": "3000"}, + expected: "127.0.0.1:3000", + }, + { + name: "LISTEN_ADDRESS takes precedence", + envs: map[string]string{"LISTEN_ADDRESS": "0.0.0.0:9090", "ADDRESS": "10.0.0.1", "PORT": "3000"}, + expected: "0.0.0.0:9090", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "") + t.Setenv("ADDRESS", "") + t.Setenv("PORT", "") + for k, v := range tt.envs { + t.Setenv(k, v) + } + got := listenAddress() + if got != tt.expected { + t.Fatalf("expected %q, got %q", tt.expected, got) + } + }) + } +} + // readyFunction implements Handler and ReadinessReporter for testing. type readyFunction struct { ready bool + err error } func (f *readyFunction) Handle(_ context.Context, _ Message) error { return nil } -func (f *readyFunction) Ready(_ context.Context) (bool, error) { return f.ready, nil } +func (f *readyFunction) Ready(_ context.Context) (bool, error) { return f.ready, f.err } // aliveFunction implements Handler and LivenessReporter for testing. type aliveFunction struct { alive bool + err error } func (f *aliveFunction) Handle(_ context.Context, _ Message) error { return nil } -func (f *aliveFunction) Alive(_ context.Context) (bool, error) { return f.alive, nil } +func (f *aliveFunction) Alive(_ context.Context) (bool, error) { return f.alive, f.err } // mockSession implements sarama.ConsumerGroupSession for testing. type mockSession struct { From a579bf173944116def8f72b6e73b00fb04cfc66a Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 14:30:16 +0300 Subject: [PATCH 16/18] Improve tests and fix goroutine leaks --- kafka/service_test.go | 58 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/kafka/service_test.go b/kafka/service_test.go index f42f482..1a8071d 100644 --- a/kafka/service_test.go +++ b/kafka/service_test.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "os" + "strings" "sync/atomic" "testing" "time" @@ -47,11 +48,14 @@ func (f *testFunction) Handle(ctx context.Context, msg Message) error { // if it is implemented by the function instance. func TestStart_Invoked(t *testing.T) { t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + t.Setenv("KAFKA_BROKERS", "") + t.Setenv("KAFKA_TOPICS", "") + t.Setenv("KAFKA_CONSUMER_GROUP", "") var ( ctx, cancel = context.WithCancel(context.Background()) startCh = make(chan any) - errCh = make(chan error) + errCh = make(chan error, 1) timeoutCh = time.After(500 * time.Millisecond) onStart = func(_ context.Context, _ map[string]string) error { startCh <- true @@ -88,10 +92,13 @@ func TestStart_Invoked(t *testing.T) { // for New(f).Start(). func TestStart_Static(t *testing.T) { t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + t.Setenv("KAFKA_BROKERS", "") + t.Setenv("KAFKA_TOPICS", "") + t.Setenv("KAFKA_CONSUMER_GROUP", "") var ( startCh = make(chan any) - errCh = make(chan error) + errCh = make(chan error, 1) timeoutCh = time.After(500 * time.Millisecond) onStart = func(_ context.Context, _ map[string]string) error { startCh <- true @@ -126,11 +133,14 @@ func TestStart_Static(t *testing.T) { // containing all available environment variables as a parameter. func TestStart_CfgEnvs(t *testing.T) { t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + t.Setenv("KAFKA_BROKERS", "") + t.Setenv("KAFKA_TOPICS", "") + t.Setenv("KAFKA_CONSUMER_GROUP", "") var ( ctx, cancel = context.WithCancel(context.Background()) startCh = make(chan any) - errCh = make(chan error) + errCh = make(chan error, 1) timeoutCh = time.After(500 * time.Millisecond) onStart = func(_ context.Context, cfg map[string]string) error { v := cfg["TEST_ENV"] @@ -174,11 +184,14 @@ func TestStart_CfgEnvs(t *testing.T) { // built into the container as cfg are correctly read. func TestCfg_Static(t *testing.T) { t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + t.Setenv("KAFKA_BROKERS", "") + t.Setenv("KAFKA_TOPICS", "") + t.Setenv("KAFKA_CONSUMER_GROUP", "") var ( ctx, cancel = context.WithCancel(context.Background()) startCh = make(chan any) - errCh = make(chan error) + errCh = make(chan error, 1) timeoutCh = time.After(500 * time.Millisecond) ) defer cancel() @@ -235,12 +248,15 @@ func TestCfg_Static(t *testing.T) { // cancellation if it is implemented by the function instance. func TestStop_Invoked(t *testing.T) { t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + t.Setenv("KAFKA_BROKERS", "") + t.Setenv("KAFKA_TOPICS", "") + t.Setenv("KAFKA_CONSUMER_GROUP", "") var ( ctx, cancel = context.WithCancel(context.Background()) startCh = make(chan any) stopCh = make(chan any) - errCh = make(chan error) + errCh = make(chan error, 1) timeoutCh = time.After(500 * time.Millisecond) onStart = func(_ context.Context, _ map[string]string) error { startCh <- true @@ -878,6 +894,38 @@ func TestConsumeClaim_SessionCancel(t *testing.T) { } } +// TestConsumeLoop_MissingConfig ensures consumeLoop returns an error when +// required Kafka environment variables are not set. +func TestConsumeLoop_MissingConfig(t *testing.T) { + tests := []struct { + name string + brokers string + topics string + group string + errMsg string + }{ + {"no brokers", "", "t1", "g1", "KAFKA_BROKERS"}, + {"no topics", "b1:9092", "", "g1", "KAFKA_TOPICS"}, + {"no group", "b1:9092", "t1", "", "KAFKA_CONSUMER_GROUP"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("KAFKA_BROKERS", tt.brokers) + t.Setenv("KAFKA_TOPICS", tt.topics) + t.Setenv("KAFKA_CONSUMER_GROUP", tt.group) + + s := New(&testFunction{}) + err := s.consumeLoop(context.Background()) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), tt.errMsg) { + t.Fatalf("expected error containing %q, got %q", tt.errMsg, err.Error()) + } + }) + } +} + func TestSplitAndTrim(t *testing.T) { tests := []struct { name string From 19bac59c1466143af84d80857c0e2d0761053a16 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 14:40:10 +0300 Subject: [PATCH 17/18] Only ignore file-not-found in readCfg, propagate other errors --- kafka/service.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kafka/service.go b/kafka/service.go index 40f08d0..e94ec53 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -422,8 +422,11 @@ func readCfg() (map[string]string, error) { f, err := os.Open("cfg") if err != nil { - log.Debug().Msg("no static config") - return cfg, nil + if os.IsNotExist(err) { + log.Debug().Msg("no static config") + return cfg, nil + } + return cfg, fmt.Errorf("opening static config: %w", err) } defer f.Close() From 137e2fcabd32f7ddcee791697d6acde9ccda52c5 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 2 Jul 2026 15:21:53 +0300 Subject: [PATCH 18/18] Clear readiness when consumeLoop exits --- kafka/service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/service.go b/kafka/service.go index e94ec53..2d3d2ed 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -239,6 +239,8 @@ func (s *Service) shutdown(sourceErr error) (err error) { // consumeLoop connects to Kafka and consumes messages, calling the function // handler for each message. func (s *Service) consumeLoop(ctx context.Context) error { + defer s.ready.Store(false) + brokers := kafkaBrokers() topics := kafkaTopics() group := kafkaConsumerGroup()