Skip to content

[WIP] POC: Add Kafka middleware package#169

Open
aliok wants to merge 1 commit into
knative-extensions:mainfrom
aliok:add-kafka-middleware
Open

[WIP] POC: Add Kafka middleware package#169
aliok wants to merge 1 commit into
knative-extensions:mainfrom
aliok:add-kafka-middleware

Conversation

@aliok

@aliok aliok commented Jun 25, 2026

Copy link
Copy Markdown
Member

NOTE: THIS IS A POC. Do not review the whole thing, but review the overall structure and the integration points.

Add a new kafka/ package that implements a Functions Kafka middleware,
following the same patterns as the existing http/ and cloudevents/ packages.

The middleware runs a sarama consumer group alongside an HTTP health server,
delivering raw messages (key, value, headers) to the user's Handle method.
Offset is committed on success; errors are logged and skipped (for the initial impl's sake).

Example usage is in cmd/fkafka/

This work is part of the ongoing effort to support Kafka in the Functions. Related: knative/func#3923

Delivery semantics

  • At-least-once, in-order per partition.
  • Offset is committed after the handler returns nil.
  • On error: the error is logged, the message is skipped (not retried). If a
    later message in the same partition succeeds, the failed message's offset is
    implicitly committed (Kafka commits are high-water marks).
  • No retry, no deduplication, no exactly-once. Implement idempotency in the
    handler if needed.

vs. Knative Eventing + KafkaSource

Kafka invoke type CloudEvents + KafkaSource
Message format Raw kafka.Message ([]byte) CloudEvent
Dependencies Kafka brokers only Knative Eventing + KafkaSource CRD
Scaling Must set min-scale: 1 Scales to zero
Protocol Kafka (TCP) HTTP
Languages Go only All

These two approaches are not compatible. The handler signatures are
different and functions cannot be switched between them without code changes:

// CloudEvents + KafkaSource
func Handle(ctx context.Context, event cloudevents.Event) error

// Kafka invoke type
func Handle(ctx context.Context, msg kafka.Message) error

Current limitations

  • Go only
  • No retry or dead-letter queue
  • No exactly-once semantics
  • No SASL/TLS authentication in the middleware (needs sarama config extension)
  • No func.yaml schema for Kafka settings — configuration via env vars only
  • Initial offset is always newest (not configurable)

Testing Kafka Functions End-to-End

This guide walks through testing the Kafka middleware on a local kind cluster.

Limitations

  • Go only: Kafka invoke type is currently only supported for Go functions.
    Other languages (Python, Node.js, etc.) do not have Kafka templates. Setting
    invoke: kafka on a non-Go function has no effect.

Kafka configuration

The Kafka middleware reads its configuration from environment variables:

Variable Required Description
KAFKA_BROKERS Yes Comma-separated list of broker addresses (e.g. broker1:9092,broker2:9092)
KAFKA_TOPICS Yes Comma-separated list of topics to consume from
KAFKA_CONSUMER_GROUP Yes Consumer group ID

These can be set in func.yaml as plain values, from local environment
variables, or from Kubernetes Secrets and ConfigMaps:

envs:
# Plain value
- name: KAFKA_CONSUMER_GROUP
  value: "my-group"

# From a local environment variable (resolved at deploy time)
- name: KAFKA_BROKERS
  value: "{{ env:MY_LOCAL_KAFKA_BROKERS }}"

# From a specific key in a Secret
- name: KAFKA_BROKERS
  value: "{{ secret:kafka-credentials:brokers }}"

# From a specific key in a ConfigMap
- name: KAFKA_TOPICS
  value: "{{ configMap:kafka-config:topics }}"

# All key-value pairs from a Secret as env vars
- value: "{{ secret:kafka-credentials }}"

# All key-value pairs from a ConfigMap as env vars
- value: "{{ configMap:kafka-config }}"

Prerequisites

  • kind
  • kubectl
  • func CLI (built from this branch)
  • Go 1.25+
  • Docker
  • A container registry (e.g. docker.io/youruser or ttl.sh for ephemeral images)

1. Create a kind cluster with Knative

# Create the cluster
kind create cluster --name kafka-test

# Install Knative Serving
kubectl apply -f https://github.com/knative/serving/releases/latest/download/serving-crds.yaml
kubectl apply -f https://github.com/knative/serving/releases/latest/download/serving-core.yaml

# Install Kourier (networking layer)
kubectl apply -f https://github.com/knative/net-kourier/releases/latest/download/kourier.yaml
kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress-class":"kourier.ingress.sigs.k8s.io"}}'

# Wait for Knative to be ready
kubectl wait --for=condition=Ready pods --all -n knative-serving --timeout=120s

2. Install Kafka using Strimzi

# Install Strimzi operator
kubectl create namespace kafka
kubectl apply -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl wait --for=condition=Ready pods --all -n kafka --timeout=120s

# Create a single-node Kafka cluster
kubectl apply -n kafka -f - <<EOF
apiVersion: kafka.strimzi.io/v1
kind: KafkaNodePool
metadata:
  name: dual-role
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 1
  roles:
    - controller
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 1Gi
        deleteClaim: true
---
apiVersion: kafka.strimzi.io/v1
kind: Kafka
metadata:
  name: my-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 4.2.0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
  entityOperator:
    topicOperator: {}
EOF

# Wait for Kafka to be ready (this can take a few minutes)
kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka

3. Create a Kafka topic

kubectl apply -n kafka -f - <<EOF
apiVersion: kafka.strimzi.io/v1
kind: KafkaTopic
metadata:
  name: test-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1
EOF

4. Create the function

Build the func CLI from this branch first:

go build -o /tmp/func-local ./cmd/func

Then create the function:

mkdir /tmp/my-kafka-func && cd /tmp/my-kafka-func
/tmp/func-local create -l go -t kafka

This creates a static Kafka function. The generated handle.go looks like:

package function

import (
	"context"
	"fmt"

	"knative.dev/func-go/kafka"
)

// Handle a Kafka message.
//
// Returning nil signals successful processing and the message offset is
// committed.  Returning an error logs the error and the message is skipped
// (not retried).
func Handle(ctx context.Context, msg kafka.Message) error {
	fmt.Printf("Received message: topic=%s partition=%d offset=%d key=%s value=%s\n",
		msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
	return nil
}

And func.yaml should have invoke: kafka.

5. Configure Kafka connection

Add the Kafka environment variables to func.yaml:

/tmp/func-local config envs add --name KAFKA_BROKERS --value "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
/tmp/func-local config envs add --name KAFKA_TOPICS --value "test-topic"
/tmp/func-local config envs add --name KAFKA_CONSUMER_GROUP --value "my-kafka-func-group"

Verify the config:

cat func.yaml

The resulting func.yaml should look like:

specVersion: 0.36.0
name: my-kafka-func
runtime: go
invoke: kafka
run:
  envs:
  - name: KAFKA_BROKERS
    value: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
  - name: KAFKA_TOPICS
    value: test-topic
  - name: KAFKA_CONSUMER_GROUP
    value: my-kafka-func-group

Using Secrets (alternative)

Instead of plain values, you can reference Kubernetes Secrets:

# Create the secret
kubectl create secret generic kafka-config \
  --from-literal=brokers="my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" \
  --from-literal=topics="test-topic" \
  --from-literal=group="my-kafka-func-group"

# Reference it in func.yaml
/tmp/func-local config envs add --name KAFKA_BROKERS --value '{{ secret:kafka-config:brokers }}'
/tmp/func-local config envs add --name KAFKA_TOPICS --value '{{ secret:kafka-config:topics }}'
/tmp/func-local config envs add --name KAFKA_CONSUMER_GROUP --value '{{ secret:kafka-config:group }}'

The resulting func.yaml should look like:

specVersion: 0.36.0
name: my-kafka-func
runtime: go
invoke: kafka
run:
  envs:
  - name: KAFKA_BROKERS
    value: '{{ secret:kafka-config:brokers }}'
  - name: KAFKA_TOPICS
    value: '{{ secret:kafka-config:topics }}'
  - name: KAFKA_CONSUMER_GROUP
    value: '{{ secret:kafka-config:group }}'

6. Build and deploy

Note: Once knative.dev/func-go is released with the kafka package,
skip this entire section and just run:

FUNC_REGISTRY=ttl.sh/my-kafka-func-test /tmp/func-local deploy --build --verbose

The manual steps below are only needed because the module is not yet published.

Since knative.dev/func-go/kafka is not yet published, func deploy cannot
resolve the module. Instead, build manually with a local replace directive:

# 1. Run func build to generate the scaffolding (it will fail at compile, that's OK)
FUNC_ENABLE_HOST_BUILDER=true FUNC_REGISTRY=ttl.sh /tmp/func-local build --builder=host --verbose 2>&1 || true

# 2. Find the build directory
BUILDDIR=$(find /tmp/my-kafka-func/.func/builds/by-hash -mindepth 1 -maxdepth 1 -type d | head -1)
cd "$BUILDDIR"

# 3. Patch go.mod to use local func-go source
go mod edit -replace knative.dev/func-go=$GOPATH/src/knative.dev/func-go
go mod tidy

# 4. Cross-compile (match your cluster's architecture)
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o result/f .

# 5. Build container image
mkdir -p /tmp/kafka-func-build
cp "$BUILDDIR/result/f" /tmp/kafka-func-build/f
cp "$BUILDDIR/ca-certificates.crt" /tmp/kafka-func-build/
cat > /tmp/kafka-func-build/Dockerfile <<'DOCKERFILE'
FROM gcr.io/distroless/static:nonroot
COPY f /usr/local/bin/f
COPY ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
ENTRYPOINT ["/usr/local/bin/f"]
DOCKERFILE

TTL_TAG="ttl.sh/my-kafka-func-$(date +%s):2h"
docker build --platform linux/amd64 -t "$TTL_TAG" /tmp/kafka-func-build
docker push "$TTL_TAG"
echo "Image: $TTL_TAG"

7. Deploy the Knative Service

Create the service manually (or use func deploy once func-go is published):

kubectl apply -f - <<EOF
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: my-kafka-func
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/min-scale: "1"
    spec:
      containers:
        - image: $TTL_TAG
          env:
            - name: KAFKA_BROKERS
              value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
            - name: KAFKA_TOPICS
              value: "test-topic"
            - name: KAFKA_CONSUMER_GROUP
              value: "my-kafka-func-group"
EOF

The min-scale: "1" annotation is important — Kafka functions must stay
running to consume messages. Without it, Knative will scale the function to
zero and no messages will be consumed.

Wait for the pod to be running:

kubectl wait pods -l serving.knative.dev/service=my-kafka-func --for=condition=Ready --timeout=120s

8. Check function logs

In a separate terminal, tail the function logs:

kubectl logs -l serving.knative.dev/service=my-kafka-func -c user-container -f

You should see the function starting up and connecting to Kafka:

{"level":"info","brokers":["my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"],"topics":["test-topic"],"group":"my-kafka-func-group","message":"connecting to kafka"}
{"level":"info","message":"kafka consumer ready (partitions assigned)"}

9. Send a test message

Use a Kafka producer pod to send a message:

kubectl run kafka-producer -n kafka \
  --image=quay.io/strimzi/kafka:latest-kafka-4.2.0 \
  --restart=Never \
  --command -- sh -c \
  'echo "Hello from Kafka!" | bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic test-topic'

Wait for it to complete and clean up:

kubectl wait pod/kafka-producer -n kafka --for=jsonpath='{.status.phase}'=Succeeded --timeout=60s
kubectl delete pod kafka-producer -n kafka

10. Verify the function received the message

Check the function logs from step 8. You should see:

Received message: topic=test-topic partition=0 offset=0 key= value=Hello from Kafka!

11. Send a message with a key

kubectl run kafka-producer-keyed -n kafka \
  --image=quay.io/strimzi/kafka:latest-kafka-4.2.0 \
  --restart=Never \
  --command -- sh -c \
  'echo "mykey:Hello with a key!" | bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic test-topic --property parse.key=true --property key.separator=:'

kubectl wait pod/kafka-producer-keyed -n kafka --for=jsonpath='{.status.phase}'=Succeeded --timeout=60s
kubectl delete pod kafka-producer-keyed -n kafka

Expected log output:

Received message: topic=test-topic partition=0 offset=1 key=mykey value=Hello with a key!

Cleanup

# Delete the Knative Service
kubectl delete ksvc my-kafka-func

# Delete Kafka
kubectl delete kafka my-cluster -n kafka
kubectl delete kafkatopic test-topic -n kafka
kubectl delete -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl delete namespace kafka

# Delete the kind cluster
kind delete cluster --name kafka-test

# Remove function directory
rm -rf /tmp/my-kafka-func /tmp/kafka-func-build /tmp/func-local

@knative-prow knative-prow Bot requested review from matejvasek and matzew June 25, 2026 14:43
@knative-prow

knative-prow Bot commented Jun 25, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: aliok
Once this PR has been reviewed and has the lgtm label, please assign lkingland for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow knative-prow Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jun 25, 2026
@aliok aliok changed the title Add Kafka middleware package [WIP] POC: Add Kafka middleware package Jun 25, 2026
@knative-prow knative-prow Bot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jun 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant