From fdbaf8f23ba9e19862984a235cf9f4719e118c10 Mon Sep 17 00:00:00 2001 From: Agent Date: Wed, 1 Jul 2026 13:14:19 +0000 Subject: [PATCH] Add Kafka integration tests and deploy-kafka EaaS task Deploys a single-node Apache Kafka 3.9.2 broker (KRaft mode) in the EaaS ephemeral namespace and adds Kafka message assertions to the existing workflow integration tests. Pipeline changes (.tekton/integration-test-eaas.yaml): - Add deploy-kafka task that starts apache/kafka:3.9.2 in KRaft mode. Three emptyDir volumes (kafka-config, kafka-logs, kafka-gc-logs) and an initContainer (copy-kafka-config) satisfy OpenShift restricted-v2 SCC. KAFKA_CONTROLLER_QUORUM_VOTERS uses localhost:9093 to avoid the bootstrap deadlock. deploy-cts lists deploy-kafka in runAfter. - Pass KAFKA_URL=kafka:9092 and install kafka-python in run-tests. Test changes (tests/test_integration_api.py): - kafka_url module fixture reads KAFKA_URL; returns None (not skip) when unset so all existing workflow tests always run. - kafka_message_on(kafka_url, topic) context manager snapshots the partition-0 end offset before its body, then consumes the first message at or after that offset after the body completes. No predicates: every received message is returned as-is and asserted by the caller, so unexpected messages cause test failures. - kafka_messages_on(kafka_url, topic, count) batch variant for scenarios that perform N actions on the same topic. - import_compose, tag_compose, untag_compose each accept an optional kafka_url parameter; when set, wraps the HTTP call in the context manager and asserts the event name and compose_info reference. - test_workflow_compose_import, test_workflow_respin_increment, test_workflow_full_lifecycle all pass kafka_url to the helpers so Kafka assertions run as part of the existing scenarios (not as a separate test). The respin test uses kafka_messages_on to capture all three compose-created messages in one batch. Bug fix (cts/messaging.py): - Convert the string "none" to Python None for compression_type so KafkaProducer does not reject the value with "Not supported codec". Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default) --- .tekton/integration-test-eaas.yaml | 154 +++++++++++- cts/messaging.py | 8 +- test-requirements.txt | 2 + tests/test_integration_api.py | 371 +++++++++++++++++++++++++---- 4 files changed, 490 insertions(+), 45 deletions(-) diff --git a/.tekton/integration-test-eaas.yaml b/.tekton/integration-test-eaas.yaml index 5da2d2b2..0b6164fb 100644 --- a/.tekton/integration-test-eaas.yaml +++ b/.tekton/integration-test-eaas.yaml @@ -457,6 +457,145 @@ spec: - name: kubeconfig-secret value: $(tasks.provision-environment.results.secretRef) + - name: deploy-kafka + runAfter: + - provision-environment + taskSpec: + params: + - name: kubeconfig-secret + type: string + steps: + - name: create-kafka + image: quay.io/konflux-ci/appstudio-utils:latest + script: | + #!/usr/bin/env bash + set -euo pipefail + + KUBECONFIG=/tmp/kubeconfig + kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG + export KUBECONFIG + + echo "==========================================" + echo "Deploying Kafka (Apache KRaft single-node)" + echo "==========================================" + + kubectl apply -f - <<'EOFYAML' + apiVersion: apps/v1 + kind: Deployment + metadata: + name: kafka + labels: + app: kafka + spec: + replicas: 1 + selector: + matchLabels: + app: kafka + template: + metadata: + labels: + app: kafka + spec: + initContainers: + - name: copy-kafka-config + image: docker.io/apache/kafka:3.9.2 + command: ["/bin/sh", "-c", "cp -r /opt/kafka/config/. /mnt/kafka-config/"] + volumeMounts: + - name: kafka-config + mountPath: /mnt/kafka-config + containers: + - name: kafka + image: docker.io/apache/kafka:3.9.2 + ports: + - containerPort: 9092 + name: client + - containerPort: 9093 + name: controller + env: + - name: KAFKA_NODE_ID + value: "1" + - name: KAFKA_PROCESS_ROLES + value: "broker,controller" + - name: KAFKA_CONTROLLER_QUORUM_VOTERS + value: "1@localhost:9093" + - name: KAFKA_LISTENERS + value: "PLAINTEXT://:9092,CONTROLLER://:9093" + - name: KAFKA_ADVERTISED_LISTENERS + value: "PLAINTEXT://kafka:9092" + - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP + value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + - name: KAFKA_CONTROLLER_LISTENER_NAMES + value: "CONTROLLER" + - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE + value: "true" + - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR + value: "1" + - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR + value: "1" + - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR + value: "1" + - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS + value: "0" + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + readinessProbe: + tcpSocket: + port: 9092 + initialDelaySeconds: 20 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 24 + volumeMounts: + - name: kafka-config + mountPath: /opt/kafka/config + - name: kafka-logs + mountPath: /tmp/kafka-logs + - name: kafka-gc-logs + mountPath: /opt/kafka/logs + volumes: + - name: kafka-config + emptyDir: {} + - name: kafka-logs + emptyDir: {} + - name: kafka-gc-logs + emptyDir: {} + --- + apiVersion: v1 + kind: Service + metadata: + name: kafka + labels: + app: kafka + spec: + ports: + - port: 9092 + targetPort: 9092 + name: client + - port: 9093 + targetPort: 9093 + name: controller + selector: + app: kafka + EOFYAML + + echo "Waiting for Kafka to be ready..." + if ! kubectl wait --for=condition=available --timeout=300s deployment/kafka; then + echo "Kafka deployment failed! Debug info:" + kubectl describe deployment kafka + kubectl describe pod -l app=kafka + kubectl logs -l app=kafka --tail=50 || echo "No logs available" + exit 1 + fi + echo "✓ Kafka is ready" + params: + - name: kubeconfig-secret + value: $(tasks.provision-environment.results.secretRef) + - name: deploy-database runAfter: - provision-environment @@ -597,6 +736,7 @@ spec: - deploy-database - deploy-openldap - deploy-dex + - deploy-kafka taskSpec: params: - name: kubeconfig-secret @@ -643,6 +783,14 @@ spec: ] ADMINS = {"groups": [], "users": ["builder@example.com"]} ALLOWED_BUILDERS = {"groups": [], "users": ["builder@example.com"]} + MESSAGING_BACKEND = "kafka" + MESSAGING_BROKER_URLS = ["kafka:9092"] + MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT" + MESSAGING_KAFKA_SASL_MECHANISM = "" + MESSAGING_KAFKA_USERNAME = "" + MESSAGING_KAFKA_PASSWORD = "" + MESSAGING_KAFKA_COMPRESSION_TYPE = "none" + MESSAGING_TOPIC_PREFIX = "cts." httpd.conf: | ServerRoot "/etc/httpd" PidFile /tmp/httpd.pid @@ -916,9 +1064,9 @@ spec: echo 'Installing Dex CA certificate...' echo '$DEX_CA_B64' | base64 -d > /tmp/dex-ca.crt - echo 'Installing pytest and requests...' + echo 'Installing pytest, requests, and kafka-python...' python3 -m ensurepip - python3 -m pip install --target /tmp/test-deps --quiet pytest requests + python3 -m pip install --target /tmp/test-deps --quiet pytest requests kafka-python echo '' echo 'Cloning repository...' @@ -930,7 +1078,7 @@ spec: echo '' echo 'Running pytest...' - PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts= + PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 KAFKA_URL=kafka:9092 python3 -m pytest tests/test_integration_api.py -v -s -o addopts= " TEST_RESULT=$? set -e diff --git a/cts/messaging.py b/cts/messaging.py index e2c35384..f93fe634 100644 --- a/cts/messaging.py +++ b/cts/messaging.py @@ -102,9 +102,15 @@ def _kafka_send_msg(msgs): def _send(): """Inner function to send messages (will be retried on failure)""" + compression = conf.messaging_kafka_compression_type + # kafka-python uses Python None to mean "no compression"; the string + # "none" (which may come from a config file) is not accepted. + if compression and compression.lower() == "none": + compression = None + config = { "bootstrap_servers": conf.messaging_broker_urls, - "compression_type": conf.messaging_kafka_compression_type, + "compression_type": compression, "security_protocol": conf.messaging_kafka_security_protocol, "sasl_mechanism": conf.messaging_kafka_sasl_mechanism, "sasl_plain_username": conf.messaging_kafka_username, diff --git a/test-requirements.txt b/test-requirements.txt index 5bae9f9e..4b9f1209 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -9,3 +9,5 @@ tox # Let's update this package to avoid this problem. itsdangerous>=1.1.0 freezegun +# Required for Kafka integration tests (test_integration_api.py) +kafka-python diff --git a/tests/test_integration_api.py b/tests/test_integration_api.py index 17277de6..0e8ac8ec 100755 --- a/tests/test_integration_api.py +++ b/tests/test_integration_api.py @@ -12,6 +12,7 @@ These tests are skipped when AUTH_BACKEND is "noauth" or unset. """ +import contextlib import json import os import ssl @@ -243,37 +244,91 @@ def create_tag(http_client, name, description, documentation): def import_compose( - http_client, release_short, release_version, date, compose_type="test", respin=1 + http_client, + release_short, + release_version, + date, + compose_type="test", + respin=1, + kafka_url=None, ): - """Import a compose and return the response data""" + """Import a compose and return the response data. + + When *kafka_url* is supplied, also verifies that CTS published a + ``compose-created`` message on the ``cts.compose-created`` topic. + """ compose_info = _create_compose_info( release_short, release_version, date, compose_type, respin ) - status, data = http_client.post("/api/1/composes/", {"compose_info": compose_info}) + with kafka_message_on(kafka_url, "cts.compose-created") as msgs: + status, data = http_client.post( + "/api/1/composes/", {"compose_info": compose_info} + ) assert status == 200, f"Failed to import compose: {data}" assert isinstance(data, dict) assert "payload" in data assert "compose" in data["payload"] + if kafka_url: + compose_id = data["payload"]["compose"]["id"] + msg = msgs[0] + assert ( + msg.get("event") == "compose-created" + ), f"Expected event='compose-created', got event={msg.get('event')!r}" + assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}" + compose_info_data = msg["compose"].get("compose_info", {}) + assert compose_id in str( + compose_info_data + ), f"Message compose_info does not reference compose {compose_id}: {msg}" return data -def tag_compose(http_client, compose_id, tag_name): - """Tag a compose and return the response data""" - status, data = http_client.patch( - f"/api/1/composes/{compose_id}", {"action": "tag", "tag": tag_name} - ) +def tag_compose(http_client, compose_id, tag_name, kafka_url=None): + """Tag a compose and return the response data. + + When *kafka_url* is supplied, also verifies that CTS published a + ``compose-tagged`` message on the ``cts.compose-tagged`` topic. + """ + with kafka_message_on(kafka_url, "cts.compose-tagged") as msgs: + status, data = http_client.patch( + f"/api/1/composes/{compose_id}", {"action": "tag", "tag": tag_name} + ) assert status == 200, f"Failed to tag compose: {data}" assert tag_name in data.get("tags", []) + if kafka_url: + msg = msgs[0] + assert ( + msg.get("event") == "compose-tagged" + ), f"Expected event='compose-tagged', got event={msg.get('event')!r}" + assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}" + compose_info_data = msg["compose"].get("compose_info", {}) + assert compose_id in str( + compose_info_data + ), f"Message compose_info does not reference compose {compose_id}: {msg}" return data -def untag_compose(http_client, compose_id, tag_name): - """Untag a compose and return the response data""" - status, data = http_client.patch( - f"/api/1/composes/{compose_id}", {"action": "untag", "tag": tag_name} - ) +def untag_compose(http_client, compose_id, tag_name, kafka_url=None): + """Untag a compose and return the response data. + + When *kafka_url* is supplied, also verifies that CTS published a + ``compose-untagged`` message on the ``cts.compose-untagged`` topic. + """ + with kafka_message_on(kafka_url, "cts.compose-untagged") as msgs: + status, data = http_client.patch( + f"/api/1/composes/{compose_id}", {"action": "untag", "tag": tag_name} + ) assert status == 200, f"Failed to untag compose: {data}" assert tag_name not in data.get("tags", []) + if kafka_url: + msg = msgs[0] + assert ( + msg.get("event") == "compose-untagged" + ), f"Expected event='compose-untagged', got event={msg.get('event')!r}" + assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}" + compose_info_data = msg["compose"].get("compose_info", {}) + assert compose_id in str( + compose_info_data + ), f"Message compose_info does not reference compose {compose_id}: {msg}" return data @@ -348,13 +403,19 @@ def test_composes_list(http_client): print(f" Found {len(data['items'])} composes") -def test_composes_pagination(write_http_client): +def test_composes_pagination(write_http_client, kafka_url): """Test that pagination parameters work correctly""" - # Import 3 test composes + # Import 3 test composes. Pass kafka_url so that each compose-created + # message is consumed before the next test snapshots the topic offset — + # without this, late-arriving async messages could pollute the next test. compose_ids = [] for i in range(1, 4): response = import_compose( - write_http_client, "PaginationTest", "1.0", f"2025010{i}" + write_http_client, + "PaginationTest", + "1.0", + f"2025010{i}", + kafka_url=kafka_url, ) compose_ids.append(response["payload"]["compose"]["id"]) @@ -462,32 +523,37 @@ def test_workflow_tag_creation(write_http_client): print(" ✓ Tag creation and tagger/untagger management completed successfully") -def test_workflow_compose_import(write_http_client): +def test_workflow_compose_import(write_http_client, kafka_url): """Test importing a compose""" - data = import_compose(write_http_client, "IntegrationTest", "1.0", "20250101") + data = import_compose( + write_http_client, "IntegrationTest", "1.0", "20250101", kafka_url=kafka_url + ) compose_id = data["payload"]["compose"]["id"] print(f" Imported compose: {compose_id}") -def test_workflow_respin_increment(write_http_client): +def test_workflow_respin_increment(write_http_client, kafka_url): """Test that respin numbers are automatically incremented for duplicate composes""" - # Import first compose - response1 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") - compose_id1 = response1["payload"]["compose"]["id"] - respin1 = response1["payload"]["compose"]["respin"] - print(f" 1. First compose: {compose_id1} (respin: {respin1})") - - # Import second compose with same release/date - respin should auto-increment - response2 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") - compose_id2 = response2["payload"]["compose"]["id"] - respin2 = response2["payload"]["compose"]["respin"] - print(f" 2. Second compose: {compose_id2} (respin: {respin2})") - - # Import third compose - respin should increment again - response3 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") - compose_id3 = response3["payload"]["compose"]["id"] - respin3 = response3["payload"]["compose"]["respin"] - print(f" 3. Third compose: {compose_id3} (respin: {respin3})") + # Import all three composes inside a single batch Kafka context so that all + # three compose-created messages are captured at once and asserted together. + with kafka_messages_on(kafka_url, "cts.compose-created", count=3) as kafka_msgs: + # Import first compose + response1 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + compose_id1 = response1["payload"]["compose"]["id"] + respin1 = response1["payload"]["compose"]["respin"] + print(f" 1. First compose: {compose_id1} (respin: {respin1})") + + # Import second compose with same release/date - respin should auto-increment + response2 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + compose_id2 = response2["payload"]["compose"]["id"] + respin2 = response2["payload"]["compose"]["respin"] + print(f" 2. Second compose: {compose_id2} (respin: {respin2})") + + # Import third compose - respin should increment again + response3 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + compose_id3 = response3["payload"]["compose"]["id"] + respin3 = response3["payload"]["compose"]["respin"] + print(f" 3. Third compose: {compose_id3} (respin: {respin3})") # Verify respin numbers are incremented assert ( @@ -504,8 +570,29 @@ def test_workflow_respin_increment(write_http_client): print(f" ✓ Respin auto-increment verified: {respin1} → {respin2} → {respin3}") - -def test_workflow_full_lifecycle(write_http_client): + # Verify all three Kafka compose-created messages were published (one per import) + if kafka_url: + assert ( + len(kafka_msgs) == 3 + ), f"Expected 3 compose-created messages, got {len(kafka_msgs)}" + for i, (msg, compose_id) in enumerate( + zip(kafka_msgs, [compose_id1, compose_id2, compose_id3]), start=1 + ): + assert msg.get("event") == "compose-created", ( + f"Message {i}: expected event='compose-created'," + f" got event={msg.get('event')!r}" + ) + assert ( + msg.get("compose") is not None + ), f"Message {i}: missing 'compose' key: {msg}" + compose_info_data = msg["compose"].get("compose_info", {}) + assert compose_id in str( + compose_info_data + ), f"Message {i}: compose_info does not reference {compose_id}: {msg}" + print(f" ✓ All 3 compose-created Kafka messages verified") + + +def test_workflow_full_lifecycle(write_http_client, kafka_url): """Test complete workflow: create tag, import compose, tag it, untag it""" # Step 1: Create a tag tag_response = create_tag( @@ -520,7 +607,7 @@ def test_workflow_full_lifecycle(write_http_client): # Step 2: Import a compose compose_response = import_compose( - write_http_client, "WorkflowTest", "1.0", "20250101" + write_http_client, "WorkflowTest", "1.0", "20250101", kafka_url=kafka_url ) compose_id = compose_response["payload"]["compose"]["id"] print(f" 2. Imported compose: {compose_id}") @@ -533,7 +620,9 @@ def test_workflow_full_lifecycle(write_http_client): print(f" 3. Initial tags: {initial_tags}") # Step 3: Tag the compose - tag_result = tag_compose(write_http_client, compose_id, tag_name) + tag_result = tag_compose( + write_http_client, compose_id, tag_name, kafka_url=kafka_url + ) print(f" 4. Tagged compose with '{tag_name}': {tag_result.get('tags', [])}") # Step 4: Verify tag was applied @@ -543,7 +632,9 @@ def test_workflow_full_lifecycle(write_http_client): print(f" 5. Verified tags: {compose_data.get('tags', [])}") # Step 5: Untag the compose - untag_result = untag_compose(write_http_client, compose_id, tag_name) + untag_result = untag_compose( + write_http_client, compose_id, tag_name, kafka_url=kafka_url + ) print(f" 6. Untagged compose: {untag_result.get('tags', [])}") # Step 6: Verify tag was removed @@ -616,3 +707,201 @@ def test_auth_get_endpoints_accessible_without_token(http_client): status == 200 ), f"Expected 200 for unauthenticated GET /api/1/tags/, got {status}" assert isinstance(data, dict), "GET /api/1/tags/ must return a dict" + + +# Kafka messaging helpers +# These helpers integrate Kafka assertions into existing workflow tests. +# Assertions are active only when KAFKA_URL is set; the tests run in either case. + +_KAFKA_CONSUMER_TIMEOUT_MS = int(os.environ.get("KAFKA_CONSUMER_TIMEOUT_MS", 30000)) + + +@pytest.fixture(scope="module") +def kafka_url(): + """Return the Kafka broker URL from the KAFKA_URL environment variable. + + Returns ``None`` when the variable is not set so that tests that use this + fixture always run regardless of whether a Kafka broker is available. The + helpers (``import_compose``, ``tag_compose``, ``untag_compose``) skip their + Kafka assertions when ``kafka_url`` is ``None``. + """ + return os.environ.get("KAFKA_URL") + + +def _make_json_deserializer(): + """Return a ``kafka.serializer.Deserializer`` subclass for JSON messages. + + We import ``kafka.serializer.Deserializer`` lazily (inside the function) so + that the rest of the test module can be imported even when ``kafka-python`` + is not installed. Subclassing the ABC causes ``isinstance`` to return True + in ``KafkaConsumer.__init__``, which prevents the consumer from wrapping our + class in ``DeserializeWrapper`` — the wrapper treats the deserializer as a + plain callable and breaks when it is not one. + """ + from kafka.serializer import Deserializer + + class _JsonDeserializer(Deserializer): + def deserialize(self, topic, headers, data): + return json.loads(data.decode("utf-8")) + + def close(self): + pass + + return _JsonDeserializer() + + +@contextlib.contextmanager +def kafka_message_on(kafka_url, topic, timeout_ms=None): + """Context manager that captures a single Kafka message produced within its body. + + The end offset of *topic* is snapshotted **before** the body runs. After + the body completes, the first message at or after that offset is consumed + and appended to the list that was yielded. If the wrong message arrives + (e.g. a duplicate or an unexpected event), the assertion in the caller will + catch it — every message is accounted for. + + When *kafka_url* is ``None`` the context manager is a no-op, so helpers + that accept an optional ``kafka_url`` parameter work transparently in + environments without a Kafka broker. + + Usage:: + + with kafka_message_on(kafka_url, "cts.compose-created") as msgs: + data = import_compose(http_client, ...) + msg = msgs[0] # available after the ``with`` block + """ + if kafka_url is None: + yield [] + return + + start_offset = _get_kafka_end_offset(kafka_url, topic) + msgs = [] + yield msgs + msg = _consume_kafka_message(kafka_url, topic, start_offset, timeout_ms) + msgs.append(msg) + + +@contextlib.contextmanager +def kafka_messages_on(kafka_url, topic, count, timeout_ms=None): + """Context manager that captures *count* Kafka messages produced within its body. + + The end offset of *topic* is snapshotted **before** the body runs. After + the body completes, exactly *count* messages at or after that offset are + consumed and stored in the yielded list. Use this when the body performs + multiple actions that each produce a message on the same topic, so all + messages can be validated at once. + + When *kafka_url* is ``None`` the context manager is a no-op. + + Usage:: + + with kafka_messages_on(kafka_url, "cts.compose-created", count=3) as msgs: + r1 = import_compose(http_client, ...) + r2 = import_compose(http_client, ...) + r3 = import_compose(http_client, ...) + assert len(msgs) == 3 + """ + if kafka_url is None: + yield [] + return + + start_offset = _get_kafka_end_offset(kafka_url, topic) + msgs = [] + yield msgs + for _ in range(count): + msg = _consume_kafka_message(kafka_url, topic, start_offset, timeout_ms) + msgs.append(msg) + # Advance start_offset past the message just consumed so the next call + # does not pick up the same message again. We don't have the raw offset + # here, so we rely on the broker advancing naturally: each subsequent + # call to _consume_kafka_message will seek to the updated start_offset. + start_offset += 1 + + +def _get_kafka_end_offset(kafka_url, topic): + """Return the current end offset for partition 0 of *topic*. + + Call this **before** performing an action so that the subsequent + :func:`_consume_kafka_message` call will skip any pre-existing messages + and only return messages produced by that specific action. + + Returns 0 when the topic does not yet exist (no messages have been + published to it), so the consumer will start from the very beginning. + """ + from kafka import KafkaConsumer, TopicPartition + from kafka.errors import KafkaTimeoutError, UnknownTopicOrPartitionError + + try: + probe = KafkaConsumer( + bootstrap_servers=kafka_url, + consumer_timeout_ms=1000, + request_timeout_ms=10000, + ) + except KafkaTimeoutError: + # Broker temporarily unreachable; treat as empty topic and start from 0. + return 0 + tp = TopicPartition(topic, 0) + probe.assign([tp]) + try: + probe.seek_to_end(tp) + offset = probe.position(tp) + except (UnknownTopicOrPartitionError, KafkaTimeoutError): + # Topic has not been created yet (no messages published); start from 0. + offset = 0 + finally: + probe.close() + return offset + + +def _consume_kafka_message(kafka_url, topic, start_offset, timeout_ms=None): + """Poll *topic* for the first message at or after *start_offset*. + + *start_offset* must be obtained by calling :func:`_get_kafka_end_offset` + **before** the action that is expected to produce the message, so that + messages published by earlier tests are not mistakenly returned. + + Returns the decoded JSON dict of the first message at or after + *start_offset*, or raises ``AssertionError`` if no message arrives within + *timeout_ms*. Every message received is returned as-is; the caller is + responsible for asserting its contents. + """ + from kafka import KafkaConsumer, TopicPartition + from kafka.errors import KafkaConnectionError, KafkaTimeoutError + + if timeout_ms is None: + timeout_ms = _KAFKA_CONSUMER_TIMEOUT_MS + + try: + consumer = KafkaConsumer( + bootstrap_servers=kafka_url, + auto_offset_reset="earliest", + # No group_id: we use manual partition assignment and explicit seek, + # so the group-coordinator protocol is not needed and its additional + # TCP connections can cause spurious ECONNREFUSED failures. + group_id=None, + value_deserializer=_make_json_deserializer(), + consumer_timeout_ms=timeout_ms, + request_timeout_ms=10000, + ) + except KafkaTimeoutError as exc: + raise AssertionError( + f"Kafka broker at {kafka_url} unreachable while consuming topic" + f" '{topic}': {exc}" + ) from exc + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + consumer.seek(tp, start_offset) + try: + for record in consumer: + return record.value + raise AssertionError( + f"No message received on Kafka topic '{topic}' at offset >={start_offset}" + f" within {timeout_ms} ms" + ) + except KafkaConnectionError as exc: + raise AssertionError( + f"Kafka broker at {kafka_url} disconnected while consuming topic" + f" '{topic}': {exc}" + ) from exc + finally: + consumer.close()