diff --git a/.tekton/integration-test-eaas.yaml b/.tekton/integration-test-eaas.yaml index 5da2d2b..0b6164f 100644 --- a/.tekton/integration-test-eaas.yaml +++ b/.tekton/integration-test-eaas.yaml @@ -457,6 +457,145 @@ spec: - name: kubeconfig-secret value: $(tasks.provision-environment.results.secretRef) + - name: deploy-kafka + runAfter: + - provision-environment + taskSpec: + params: + - name: kubeconfig-secret + type: string + steps: + - name: create-kafka + image: quay.io/konflux-ci/appstudio-utils:latest + script: | + #!/usr/bin/env bash + set -euo pipefail + + KUBECONFIG=/tmp/kubeconfig + kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG + export KUBECONFIG + + echo "==========================================" + echo "Deploying Kafka (Apache KRaft single-node)" + echo "==========================================" + + kubectl apply -f - <<'EOFYAML' + apiVersion: apps/v1 + kind: Deployment + metadata: + name: kafka + labels: + app: kafka + spec: + replicas: 1 + selector: + matchLabels: + app: kafka + template: + metadata: + labels: + app: kafka + spec: + initContainers: + - name: copy-kafka-config + image: docker.io/apache/kafka:3.9.2 + command: ["/bin/sh", "-c", "cp -r /opt/kafka/config/. /mnt/kafka-config/"] + volumeMounts: + - name: kafka-config + mountPath: /mnt/kafka-config + containers: + - name: kafka + image: docker.io/apache/kafka:3.9.2 + ports: + - containerPort: 9092 + name: client + - containerPort: 9093 + name: controller + env: + - name: KAFKA_NODE_ID + value: "1" + - name: KAFKA_PROCESS_ROLES + value: "broker,controller" + - name: KAFKA_CONTROLLER_QUORUM_VOTERS + value: "1@localhost:9093" + - name: KAFKA_LISTENERS + value: "PLAINTEXT://:9092,CONTROLLER://:9093" + - name: KAFKA_ADVERTISED_LISTENERS + value: "PLAINTEXT://kafka:9092" + - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP + value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + - name: KAFKA_CONTROLLER_LISTENER_NAMES + value: "CONTROLLER" + - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE + value: "true" + - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR + value: "1" + - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR + value: "1" + - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR + value: "1" + - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS + value: "0" + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + readinessProbe: + tcpSocket: + port: 9092 + initialDelaySeconds: 20 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 24 + volumeMounts: + - name: kafka-config + mountPath: /opt/kafka/config + - name: kafka-logs + mountPath: /tmp/kafka-logs + - name: kafka-gc-logs + mountPath: /opt/kafka/logs + volumes: + - name: kafka-config + emptyDir: {} + - name: kafka-logs + emptyDir: {} + - name: kafka-gc-logs + emptyDir: {} + --- + apiVersion: v1 + kind: Service + metadata: + name: kafka + labels: + app: kafka + spec: + ports: + - port: 9092 + targetPort: 9092 + name: client + - port: 9093 + targetPort: 9093 + name: controller + selector: + app: kafka + EOFYAML + + echo "Waiting for Kafka to be ready..." + if ! kubectl wait --for=condition=available --timeout=300s deployment/kafka; then + echo "Kafka deployment failed! Debug info:" + kubectl describe deployment kafka + kubectl describe pod -l app=kafka + kubectl logs -l app=kafka --tail=50 || echo "No logs available" + exit 1 + fi + echo "✓ Kafka is ready" + params: + - name: kubeconfig-secret + value: $(tasks.provision-environment.results.secretRef) + - name: deploy-database runAfter: - provision-environment @@ -597,6 +736,7 @@ spec: - deploy-database - deploy-openldap - deploy-dex + - deploy-kafka taskSpec: params: - name: kubeconfig-secret @@ -643,6 +783,14 @@ spec: ] ADMINS = {"groups": [], "users": ["builder@example.com"]} ALLOWED_BUILDERS = {"groups": [], "users": ["builder@example.com"]} + MESSAGING_BACKEND = "kafka" + MESSAGING_BROKER_URLS = ["kafka:9092"] + MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT" + MESSAGING_KAFKA_SASL_MECHANISM = "" + MESSAGING_KAFKA_USERNAME = "" + MESSAGING_KAFKA_PASSWORD = "" + MESSAGING_KAFKA_COMPRESSION_TYPE = "none" + MESSAGING_TOPIC_PREFIX = "cts." httpd.conf: | ServerRoot "/etc/httpd" PidFile /tmp/httpd.pid @@ -916,9 +1064,9 @@ spec: echo 'Installing Dex CA certificate...' echo '$DEX_CA_B64' | base64 -d > /tmp/dex-ca.crt - echo 'Installing pytest and requests...' + echo 'Installing pytest, requests, and kafka-python...' python3 -m ensurepip - python3 -m pip install --target /tmp/test-deps --quiet pytest requests + python3 -m pip install --target /tmp/test-deps --quiet pytest requests kafka-python echo '' echo 'Cloning repository...' @@ -930,7 +1078,7 @@ spec: echo '' echo 'Running pytest...' - PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts= + PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 KAFKA_URL=kafka:9092 python3 -m pytest tests/test_integration_api.py -v -s -o addopts= " TEST_RESULT=$? set -e diff --git a/cts/messaging.py b/cts/messaging.py index e2c3538..f93fe63 100644 --- a/cts/messaging.py +++ b/cts/messaging.py @@ -102,9 +102,15 @@ def _kafka_send_msg(msgs): def _send(): """Inner function to send messages (will be retried on failure)""" + compression = conf.messaging_kafka_compression_type + # kafka-python uses Python None to mean "no compression"; the string + # "none" (which may come from a config file) is not accepted. + if compression and compression.lower() == "none": + compression = None + config = { "bootstrap_servers": conf.messaging_broker_urls, - "compression_type": conf.messaging_kafka_compression_type, + "compression_type": compression, "security_protocol": conf.messaging_kafka_security_protocol, "sasl_mechanism": conf.messaging_kafka_sasl_mechanism, "sasl_plain_username": conf.messaging_kafka_username, diff --git a/test-requirements.txt b/test-requirements.txt index 5bae9f9..4b9f120 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -9,3 +9,5 @@ tox # Let's update this package to avoid this problem. itsdangerous>=1.1.0 freezegun +# Required for Kafka integration tests (test_integration_api.py) +kafka-python diff --git a/tests/test_integration_api.py b/tests/test_integration_api.py index 17277de..d9dfd24 100755 --- a/tests/test_integration_api.py +++ b/tests/test_integration_api.py @@ -243,9 +243,22 @@ def create_tag(http_client, name, description, documentation): def import_compose( - http_client, release_short, release_version, date, compose_type="test", respin=1 + http_client, + release_short, + release_version, + date, + compose_type="test", + respin=1, + *, + kafka_consumer, ): - """Import a compose and return the response data""" + """Import a compose and return the response data. + + *kafka_consumer* is the ``kafka_consumer`` fixture value. It is ``None`` + when ``KAFKA_URL`` is not set, in which case the Kafka assertion is skipped. + Callers must always pass it explicitly so that the compose-created message + is never silently left unconsumed. + """ compose_info = _create_compose_info( release_short, release_version, date, compose_type, respin ) @@ -254,26 +267,48 @@ def import_compose( assert isinstance(data, dict) assert "payload" in data assert "compose" in data["payload"] + compose_id = data["payload"]["compose"]["id"] + _assert_compose_message( + kafka_consumer, "cts.compose-created", "compose-created", compose_id + ) return data -def tag_compose(http_client, compose_id, tag_name): - """Tag a compose and return the response data""" +def tag_compose(http_client, compose_id, tag_name, *, kafka_consumer): + """Tag a compose and return the response data. + + *kafka_consumer* is the ``kafka_consumer`` fixture value. It is ``None`` + when ``KAFKA_URL`` is not set, in which case the Kafka assertion is skipped. + Callers must always pass it explicitly so that the compose-tagged message + is never silently left unconsumed. + """ status, data = http_client.patch( f"/api/1/composes/{compose_id}", {"action": "tag", "tag": tag_name} ) assert status == 200, f"Failed to tag compose: {data}" assert tag_name in data.get("tags", []) + _assert_compose_message( + kafka_consumer, "cts.compose-tagged", "compose-tagged", compose_id + ) return data -def untag_compose(http_client, compose_id, tag_name): - """Untag a compose and return the response data""" +def untag_compose(http_client, compose_id, tag_name, *, kafka_consumer): + """Untag a compose and return the response data. + + *kafka_consumer* is the ``kafka_consumer`` fixture value. It is ``None`` + when ``KAFKA_URL`` is not set, in which case the Kafka assertion is skipped. + Callers must always pass it explicitly so that the compose-untagged message + is never silently left unconsumed. + """ status, data = http_client.patch( f"/api/1/composes/{compose_id}", {"action": "untag", "tag": tag_name} ) assert status == 200, f"Failed to untag compose: {data}" assert tag_name not in data.get("tags", []) + _assert_compose_message( + kafka_consumer, "cts.compose-untagged", "compose-untagged", compose_id + ) return data @@ -348,13 +383,18 @@ def test_composes_list(http_client): print(f" Found {len(data['items'])} composes") -def test_composes_pagination(write_http_client): +def test_composes_pagination(write_http_client, kafka_consumer): """Test that pagination parameters work correctly""" - # Import 3 test composes + # Import 3 test composes. Pass kafka_consumer so that each compose-created + # message is asserted as part of this test. compose_ids = [] for i in range(1, 4): response = import_compose( - write_http_client, "PaginationTest", "1.0", f"2025010{i}" + write_http_client, + "PaginationTest", + "1.0", + f"2025010{i}", + kafka_consumer=kafka_consumer, ) compose_ids.append(response["payload"]["compose"]["id"]) @@ -462,29 +502,53 @@ def test_workflow_tag_creation(write_http_client): print(" ✓ Tag creation and tagger/untagger management completed successfully") -def test_workflow_compose_import(write_http_client): +def test_workflow_compose_import(write_http_client, kafka_consumer): """Test importing a compose""" - data = import_compose(write_http_client, "IntegrationTest", "1.0", "20250101") + data = import_compose( + write_http_client, + "IntegrationTest", + "1.0", + "20250101", + kafka_consumer=kafka_consumer, + ) compose_id = data["payload"]["compose"]["id"] print(f" Imported compose: {compose_id}") -def test_workflow_respin_increment(write_http_client): +def test_workflow_respin_increment(write_http_client, kafka_consumer): """Test that respin numbers are automatically incremented for duplicate composes""" # Import first compose - response1 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + response1 = import_compose( + write_http_client, + "RespinTest", + "1.0", + "20250102", + kafka_consumer=kafka_consumer, + ) compose_id1 = response1["payload"]["compose"]["id"] respin1 = response1["payload"]["compose"]["respin"] print(f" 1. First compose: {compose_id1} (respin: {respin1})") # Import second compose with same release/date - respin should auto-increment - response2 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + response2 = import_compose( + write_http_client, + "RespinTest", + "1.0", + "20250102", + kafka_consumer=kafka_consumer, + ) compose_id2 = response2["payload"]["compose"]["id"] respin2 = response2["payload"]["compose"]["respin"] print(f" 2. Second compose: {compose_id2} (respin: {respin2})") # Import third compose - respin should increment again - response3 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + response3 = import_compose( + write_http_client, + "RespinTest", + "1.0", + "20250102", + kafka_consumer=kafka_consumer, + ) compose_id3 = response3["payload"]["compose"]["id"] respin3 = response3["payload"]["compose"]["respin"] print(f" 3. Third compose: {compose_id3} (respin: {respin3})") @@ -505,7 +569,7 @@ def test_workflow_respin_increment(write_http_client): print(f" ✓ Respin auto-increment verified: {respin1} → {respin2} → {respin3}") -def test_workflow_full_lifecycle(write_http_client): +def test_workflow_full_lifecycle(write_http_client, kafka_consumer): """Test complete workflow: create tag, import compose, tag it, untag it""" # Step 1: Create a tag tag_response = create_tag( @@ -520,7 +584,11 @@ def test_workflow_full_lifecycle(write_http_client): # Step 2: Import a compose compose_response = import_compose( - write_http_client, "WorkflowTest", "1.0", "20250101" + write_http_client, + "WorkflowTest", + "1.0", + "20250101", + kafka_consumer=kafka_consumer, ) compose_id = compose_response["payload"]["compose"]["id"] print(f" 2. Imported compose: {compose_id}") @@ -533,7 +601,9 @@ def test_workflow_full_lifecycle(write_http_client): print(f" 3. Initial tags: {initial_tags}") # Step 3: Tag the compose - tag_result = tag_compose(write_http_client, compose_id, tag_name) + tag_result = tag_compose( + write_http_client, compose_id, tag_name, kafka_consumer=kafka_consumer + ) print(f" 4. Tagged compose with '{tag_name}': {tag_result.get('tags', [])}") # Step 4: Verify tag was applied @@ -543,7 +613,9 @@ def test_workflow_full_lifecycle(write_http_client): print(f" 5. Verified tags: {compose_data.get('tags', [])}") # Step 5: Untag the compose - untag_result = untag_compose(write_http_client, compose_id, tag_name) + untag_result = untag_compose( + write_http_client, compose_id, tag_name, kafka_consumer=kafka_consumer + ) print(f" 6. Untagged compose: {untag_result.get('tags', [])}") # Step 6: Verify tag was removed @@ -572,7 +644,7 @@ def test_auth_unauthenticated_write_returns_401(http_client): assert status != 200, "Unauthenticated write must not succeed" -def test_auth_builder_can_post_compose(auth_http_client_builder): +def test_auth_builder_can_post_compose(auth_http_client_builder, kafka_consumer): """Authenticated 'builder' user (in ALLOWED_BUILDERS) can POST a compose.""" compose_info = _create_compose_info("AuthBuilderTest", "1.0", "20260101") status, data = auth_http_client_builder.post( @@ -586,6 +658,9 @@ def test_auth_builder_can_post_compose(auth_http_client_builder): assert "compose" in data["payload"] compose_id = data["payload"]["compose"]["id"] assert compose_id, "Compose ID must be non-empty" + _assert_compose_message( + kafka_consumer, "cts.compose-created", "compose-created", compose_id + ) def test_auth_unauthorized_user_returns_403(auth_http_client_readonly): @@ -616,3 +691,181 @@ def test_auth_get_endpoints_accessible_without_token(http_client): status == 200 ), f"Expected 200 for unauthenticated GET /api/1/tags/, got {status}" assert isinstance(data, dict), "GET /api/1/tags/ must return a dict" + + +# Kafka messaging helpers +# These helpers integrate Kafka assertions into existing workflow tests. +# Assertions are active only when KAFKA_URL is set; the tests run in either case. + +_KAFKA_CONSUMER_TIMEOUT_MS = int(os.environ.get("KAFKA_CONSUMER_TIMEOUT_MS", 30000)) + +# Topics that CTS publishes to. +_CTS_KAFKA_TOPICS = [ + "cts.compose-created", + "cts.compose-tagged", + "cts.compose-untagged", +] + + +def _make_json_deserializer(): + """Return a ``kafka.serializer.Deserializer`` subclass for JSON messages. + + We import ``kafka.serializer.Deserializer`` lazily (inside the function) so + that the rest of the test module can be imported even when ``kafka-python`` + is not installed. Subclassing the ABC causes ``isinstance`` to return True + in ``KafkaConsumer.__init__``, which prevents the consumer from wrapping our + class in ``DeserializeWrapper`` — the wrapper treats the deserializer as a + plain callable and breaks when it is not one. + """ + from kafka.serializer import Deserializer + + class _JsonDeserializer(Deserializer): + def deserialize(self, topic, headers, data): + return json.loads(data.decode("utf-8")) + + def close(self): + pass + + return _JsonDeserializer() + + +@pytest.fixture(scope="module") +def kafka_consumer(): + """Return a long-lived KafkaConsumer subscribed to all CTS topics. + + The consumer is positioned at the *current* end of each topic when the + module starts, so it only sees messages produced during this test run. It + acts as a cursor: each call to ``_consume_one`` advances the position + forward, making offset tracking unnecessary. + + Returns ``None`` when ``KAFKA_URL`` is not set. All Kafka-aware helpers + and tests check for ``None`` and skip their assertions accordingly, so the + full test suite runs in environments without a Kafka broker. + """ + kafka_url = os.environ.get("KAFKA_URL") + if not kafka_url: + yield None + return + + from kafka import KafkaConsumer, TopicPartition + from kafka.errors import KafkaConnectionError, KafkaTimeoutError + + try: + consumer = KafkaConsumer( + bootstrap_servers=kafka_url, + # No group_id: we use manual partition assignment, so the + # group-coordinator protocol is not needed. + group_id=None, + value_deserializer=_make_json_deserializer(), + consumer_timeout_ms=_KAFKA_CONSUMER_TIMEOUT_MS, + request_timeout_ms=10000, + ) + except (KafkaTimeoutError, KafkaConnectionError) as exc: + pytest.fail(f"Cannot connect to Kafka broker at {kafka_url}: {exc}") + return + + from kafka.errors import UnknownTopicOrPartitionError + + # Assign all topic partitions and seek to the current end so we only + # see messages produced during this test run. + partitions = [TopicPartition(t, 0) for t in _CTS_KAFKA_TOPICS] + consumer.assign(partitions) + for tp in partitions: + try: + consumer.seek_to_end(tp) + except (UnknownTopicOrPartitionError, KafkaTimeoutError): + # Topic may not exist yet (no messages published); seek to 0 so + # that the first message on the topic is visible. + consumer.seek(tp, 0) + + yield consumer + consumer.close() + + +@pytest.fixture(autouse=True) +def _kafka_drain_check(kafka_consumer, request): + """After each test, assert that no Kafka messages were left unconsumed. + + Any message on a CTS topic that was not explicitly consumed by the test is + a sign of a bug (e.g. the application sent a duplicate or unexpected + message, or the test forgot to consume a message it produced). The fixture + fails the test in that case so problems are caught immediately. + """ + yield + if kafka_consumer is None: + return + from kafka.errors import KafkaConnectionError + + stale = [] + try: + records = kafka_consumer.poll(timeout_ms=500, max_records=10) + except KafkaConnectionError: + records = {} + for recs in records.values(): + for rec in recs: + stale.append((rec.topic, rec.offset, rec.value)) + if stale: + details = "\n".join( + f" topic={t!r} offset={o} value={v!r}" for t, o, v in stale + ) + pytest.fail( + f"Unconsumed Kafka messages found after test {request.node.name!r}:\n" + + details + ) + + +def _assert_compose_message(kafka_consumer, topic, event_name, compose_id): + """Consume one message and assert it matches the expected event and compose. + + When *kafka_consumer* is ``None`` (no Kafka broker configured), this + function returns immediately without making any assertions. + """ + if kafka_consumer is None: + return + msg = _consume_one(kafka_consumer, topic) + assert ( + msg.get("event") == event_name + ), f"Expected event={event_name!r}, got event={msg.get('event')!r}" + assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}" + compose_info_data = msg["compose"].get("compose_info", {}) + assert compose_id in str( + compose_info_data + ), f"Message compose_info does not reference compose {compose_id}: {msg}" + + +def _consume_one(consumer, topic, timeout_ms=None): + """Consume and return the next message on *topic* from *consumer*. + + The consumer is long-lived and acts as a cursor, so successive calls to + this function return successive messages in order without any offset + bookkeeping. + + Raises ``AssertionError`` if a message arrives on any topic other than + *topic* (unexpected message), or if no message arrives within *timeout_ms* + (default: ``_KAFKA_CONSUMER_TIMEOUT_MS``). + """ + from kafka.errors import KafkaConnectionError + + if timeout_ms is None: + timeout_ms = _KAFKA_CONSUMER_TIMEOUT_MS + + deadline_ms = timeout_ms + while deadline_ms > 0: + poll_ms = min(deadline_ms, 500) + try: + records = consumer.poll(timeout_ms=poll_ms, max_records=1) + except KafkaConnectionError as exc: + raise AssertionError( + f"Kafka broker disconnected while consuming topic '{topic}': {exc}" + ) from exc + for tp_key, recs in records.items(): + for rec in recs: + if tp_key.topic != topic: + raise AssertionError( + f"Expected message on topic '{topic}' but received one on '{tp_key.topic}' (offset={rec.offset}, value={rec.value!r})" + ) + return rec.value + deadline_ms -= poll_ms + raise AssertionError( + f"No message received on Kafka topic '{topic}' within {timeout_ms} ms" + )