Skip to content

Add integration tests for Kafka messaging#85

Open
lubomir wants to merge 1 commit into
mainfrom
overseer/84
Open

Add integration tests for Kafka messaging#85
lubomir wants to merge 1 commit into
mainfrom
overseer/84

Conversation

@lubomir

@lubomir lubomir commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

🤖 This was posted automatically by an AI agent.

Add integration tests for Kafka messaging

Adds end-to-end verification that CTS publishes Kafka messages when images are
created, tagged, and untagged. This is tracked by
release-engineering/cts#84.

What changed

.tekton/integration-test-eaas.yaml

Added a deploy-kafka task that provisions a single-node Apache Kafka 3.9.2
broker (KRaft mode, no ZooKeeper) in the EaaS ephemeral namespace before
deploy-cts runs. The broker is exposed as kafka:9092 (plain PLAINTEXT, no
TLS or SASL). Three emptyDir volumes (kafka-config, kafka-logs,
kafka-gc-logs) plus an init container are used to satisfy OpenShift's
restricted-v2 SCC (arbitrary non-root UID cannot write to directories owned
by root in the base image).

tests/test_integration_api.py

  • kafka_url fixture — reads KAFKA_URL from the environment; the test is
    skipped when the variable is absent, so the three new tests are no-ops in
    environments that don't have Kafka.
  • _get_kafka_end_offset(kafka_url, topic) — snapshots the end offset of
    partition 0 before each action, so messages from earlier tests are excluded.
    Handles UnknownTopicOrPartitionError (topic not yet created) and
    KafkaTimeoutError (broker briefly unreachable) by returning 0.
  • _consume_kafka_message(kafka_url, topic, start_offset, ...) — polls
    for the first message at or after start_offset that satisfies an optional
    predicate. Uses manual partition assignment (consumer.assign + seek)
    with group_id=None to avoid the group-coordinator protocol. Raises a clear
    AssertionError on timeout or broker disconnect.
  • test_kafka_compose_created — imports a compose and verifies a message
    arrives on cts.compose-created referencing that compose ID.
  • test_kafka_compose_tagged — tags a compose and verifies a message
    arrives on cts.compose-tagged.
  • test_kafka_compose_untagged — untags a compose and verifies a message
    arrives on cts.compose-untagged.

cts/messaging.py

Fixed a bug where compression_type="none" (a string) was passed to
KafkaProducer. kafka-python requires Python None for "no compression";
the string caused a "Not supported codec: none" error at runtime. The value
is now converted: None if v == "none" else v.

test-requirements.txt

Added kafka-python so the consumer helpers are available in the test
environment.

.github/workflows/gating.yaml / .tekton/cts-pull-request.yaml

Added feature/integration-tests to the CI branch trigger lists so the
pipeline runs on push and pull-request events targeting that branch.

@lubomir lubomir changed the base branch from main to feature/integration-tests June 26, 2026 13:21
@lubomir

lubomir commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

With the PR being opened against a feature branch, no CI is actually triggering. Update the configuration for both Github actions and konflux pull request pipeline.

@lubomir

lubomir commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

All three MUST FIX issues and the SHOULD FIX addressed:

  1. Offset-anchored consumption — added _get_kafka_end_offset() which snapshots the partition-0 end offset before each action. _consume_kafka_message() now takes a mandatory start_offset parameter and seeks to it, so only messages produced by the test's own action are visible.

  2. Meaningful assertions — replaced the always-true compose_id or id or "compose" in str(msg) guards with checks that msg["compose"] is present and that msg["compose"]["compose_info"] references the specific compose ID created in that test.

  3. Configurable timeout — default raised to 30 s; overridable via KAFKA_CONSUMER_TIMEOUT_MS.

@lubomir

lubomir commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

CI triggers now include feature/integration-tests:

  • GitHub Actions (gating.yaml): added feature/integration-tests to both push and pull_request branch lists.
  • Konflux PR pipeline (cts-pull-request.yaml): updated the CEL expression to target_branch in ["main", "feature/integration-tests"].

@codecov-commenter

codecov-commenter commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 66.66667% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 83.65%. Comparing base (37bad18) to head (fdbaf8f).

Files with missing lines Patch % Lines
cts/messaging.py 66.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #85      +/-   ##
==========================================
- Coverage   83.69%   83.65%   -0.04%     
==========================================
  Files          13       13              
  Lines        1325     1328       +3     
==========================================
+ Hits         1109     1111       +2     
- Misses        216      217       +1     
Flag Coverage Δ
unit-tests 83.65% <66.66%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@lubomir

lubomir commented Jun 30, 2026

Copy link
Copy Markdown
Contributor Author

/retest

@lubomir lubomir force-pushed the overseer/84 branch 2 times, most recently from a94a676 to 2f0c5bc Compare June 30, 2026 12:33
@lubomir

lubomir commented Jun 30, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

The deploy-kafka step failed with ImagePullBackOff because docker.io/bitnami/kafka:3.7.0 no longer exists on Docker Hub (Bitnami removed their images from the public registry).

Switched to the official docker.io/apache/kafka:3.9.2 image. This required updating all environment variable names — the Apache image uses KAFKA_* instead of the Bitnami-specific KAFKA_CFG_* prefix — and adding the required single-node replication factor settings (KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1, KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1, KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1, KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0) that are mandatory when overriding any configuration in the Apache image.

@lubomir

lubomir commented Jun 30, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

The deploy-kafka failure was caused by OpenShift's restricted-v2 SCC running the container as an arbitrary non-root UID (1007820000), which cannot write to /opt/kafka/config/ — a directory owned by the kafka user in the Apache image.

Fixed by adding:

  • An initContainer (copy-kafka-config) that copies /opt/kafka/config/ into an emptyDir volume before the broker starts.
  • Two emptyDir volumes (kafka-config mounted at /opt/kafka/config, kafka-logs mounted at /tmp/kafka-logs) so the main container has writable directories without needing elevated privileges.

No changes to the Kafka image, environment variables, or any other part of the pipeline.

@lubomir

lubomir commented Jun 30, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

The deploy-kafka crash was caused by the JVM trying to write GC logs to /opt/kafka/logs — a directory owned by root in the Apache image — which fails under OpenShift's restricted-v2 SCC (arbitrary non-root UID).

Fixed by mounting a third emptyDir volume at /opt/kafka/logs alongside the existing ones for /opt/kafka/config and /tmp/kafka-logs.

@lubomir

lubomir commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

Fix the PR description and address the nits in the last review.

@lubomir

lubomir commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Removed the unused import uuid (the nit from the last review round).

Updated the PR description — the previous one described this as a rebase-only PR with "no feature changes", which no longer matches the branch content.

@lubomir

lubomir commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

The implementation satisfies all six acceptance criteria from the spec:

  1. ✅ Tests skip cleanly when KAFKA_URL is unset.
    2–4. ✅ All three Kafka tests exist and are correctly structured.
  2. deploy-kafka appears in deploy-cts's runAfter field.
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest.

The cts/messaging.py bug fix ("none"None for compression_type) is a justified addition not in the original spec: without it, the integration tests would fail at the broker side, so it is a legitimate prerequisite to the feature working at all.

All issues raised in prior review rounds have been addressed, including the unused import time, unused import uuid, DeprecationWarning in _JsonDeserializer, group_id=None for group-coordinator protocol avoidance, and the KafkaTimeoutError guard in both helper functions.


NIT

Dead assert msg is not None guards (introduced by this PR)

In all three tests (lines 785, 823, 863), the pattern is:

msg = _consume_kafka_message(...)
assert msg is not None, "Expected a message on …, got None"

_consume_kafka_message never returns None — on failure it raises AssertionError. The guard therefore can never fail and only adds noise. It can be removed.


Observation (not a blocker)

feature/integration-tests added to CI branch lists

.github/workflows/gating.yaml and .tekton/cts-pull-request.yaml now trigger on feature/integration-tests. This was explicitly requested by the reviewer and makes sense for this development workflow. Once the feature/integration-tests scaffolding branch is retired these entries should be cleaned up, but that is a follow-up concern, not a reason to block merging.

@lubomir lubomir left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not happy with the implementation. I don't think new tests are needed for Kafka validation. Instead, the assertions should be happening in the existing test scenarios. The interface is also very fragile.

Instead, the existing helpers should be expanded to verify correct messages were published. The import_compose/tag_compose helpers already encapsulate the perform-action-and-assert-it-work combo. Adding Kafka validation there feels natural.

The logic to snapshot offset, perform action, consume message should be abstracted into a context manager to make incorrect use impossible.

Comment thread .github/workflows/gating.yaml Outdated
on:
push:
branches: [ "main" ]
branches: [ "main", "feature/integration-tests" ]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

Comment thread .github/workflows/gating.yaml Outdated
branches: [ "main", "feature/integration-tests" ]
pull_request:
branches: [ "main" ]
branches: [ "main", "feature/integration-tests" ]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

Comment thread .tekton/cts-pull-request.yaml Outdated
build.appstudio.redhat.com/target_branch: '{{target_branch}}'
pipelinesascode.tekton.dev/max-keep-runs: "3"
pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch == "main"
pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch in ["main", "feature/integration-tests"]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

@lubomir

lubomir commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Three changes applied:

  1. feature/integration-tests removed from CI configsgating.yaml and cts-pull-request.yaml now trigger only on main (restored to their state on main).

  2. kafka_message_on() context manager — snapshots the partition-0 end offset before its body runs, then consumes the first matching message after the body completes. The offset snapshot and consumption are managed by the context manager, so incorrect ordering is structurally impossible.

  3. Kafka assertions moved into helpersimport_compose, tag_compose, and untag_compose now accept an optional kafka_url parameter. When set, each helper wraps the HTTP call in kafka_message_on() and asserts the expected Kafka message. The three standalone Kafka tests (test_kafka_compose_created/tagged/untagged) are replaced by a single test_kafka_messaging test that calls all three helpers with kafka_url.

@lubomir

lubomir commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Fixed the test_kafka_messaging failure.

Root cause: import_compose called kafka_message_on without a predicate. Because CTS publishes asynchronously, a stale compose-created message from test_workflow_full_lifecycle arrived on the topic after the offset snapshot, so the consumer picked it up instead of the message for KafkaTest-1.0-20260601.t.1.

Fix: import_compose now computes the expected compose_id from its inputs before the POST and passes a predicate to kafka_message_on that filters for messages whose compose_info references that specific ID. Stale in-flight messages from earlier tests are drained and skipped.

@lubomir

lubomir commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

Acceptance Criteria Compliance

All six acceptance criteria from the plan are satisfied:

  1. kafka_url fixture calls pytest.skip when KAFKA_URL is unset — all Kafka-dependent tests are skipped transparently.
    2–4. ✅ The three Kafka operations (compose-created, compose-tagged, compose-untagged) are all exercised in test_kafka_messaging. The plan's original three separate test names (test_kafka_compose_created/tagged/untagged) were intentionally replaced by a single test at the reviewer's explicit request (comment 2026-07-01 11:44) — this is a legitimate deviation from the spec.
  2. deploy-kafka is listed in deploy-cts's runAfter field.
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest in the run-tests step.

Files Claimed vs. Present in Diff

All four files mentioned in the PR description appear in git diff main...HEAD --name-only: .tekton/integration-test-eaas.yaml, cts/messaging.py, test-requirements.txt, tests/test_integration_api.py. No claimed file is absent.

Issues Raised in Prior Review Rounds

All prior issues are resolved:

  • Unused import time and import uuid — removed.
  • DeprecationWarning from value_deserializer — fixed via _JsonDeserializer subclassing kafka.serializer.Deserializer.
  • group_id=None to avoid group-coordinator protocol — applied.
  • KafkaTimeoutError guards in both _get_kafka_end_offset and _consume_kafka_message — applied.
  • Dead assert msg is not None guards — removed (no longer possible since _consume_kafka_message always raises on failure, never returns None).
  • feature/integration-tests entries in gating.yaml / cts-pull-request.yaml — reverted; CI config files are not in the diff.

NIT

tag_compose and untag_compose assertions run after Kafka consume, which can obscure HTTP failures (introduced by this PR)

In both helpers the HTTP call happens inside the with kafka_message_on(...) block, but the assert status == 200 check is outside it (i.e., after the context manager exits and after _consume_kafka_message has returned). If the Kafka consumer times out, the consumer's AssertionError surfaces first and the HTTP status check never runs, making it harder to diagnose whether the root cause was an HTTP failure or a messaging failure.

This is a minor diagnostic concern, not a correctness bug, and does not affect the passing pipeline. The same pattern is used by import_compose, so it is at least consistent.


The implementation is correct, well-structured, and satisfies the issue's requirements. The cts/messaging.py bug fix ("none"None for compression_type) is a justified prerequisite with no spec violation. The single-commit history is clean and the message is accurate and descriptive.

@lubomir lubomir left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kafka_url fixture skips the test when Kafka is not configured. The existing tests should run in either case, and Kafka existence should only trigger the assert.

For the respin increment test it would be helpful to capture the messages in batch and assert them at once.

Naming variable _kafka suggests the variable is unused, which is not the case here.

The predicate to find correct message seems like a wrong solution. Every published message should be asserted. There should be no message that is not accounted for. Imagine a bug where the application starts sending duplicated or plain wrong messages. The tests need to fail in that case.

Comment thread tests/test_integration_api.py Outdated
consumer.close()


def test_kafka_messaging(write_http_client, kafka_url):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should not exist as a standalone entity. The pre-existing tests cover this scenario already, and validate that messages are published.

compose_info_data = msg["compose"].get("compose_info", {})
assert compose_id in str(
compose_info_data
), f"Message compose_info does not reference compose {compose_id}: {msg}"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these tests have near-identical block of asserts. Create a helper to deduplicate this.

@lubomir

lubomir commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Four issues from the last review addressed:

  1. kafka_url fixture no longer skips — returns None when KAFKA_URL is unset; all workflow tests always run, and Kafka assertions activate only when the broker is present.

  2. Kafka assertions moved into existing scenariostest_workflow_compose_import, test_workflow_respin_increment, and test_workflow_full_lifecycle now pass kafka_url to the helpers. The standalone test_kafka_messaging test is removed.

  3. Respin test uses a batch context managerkafka_messages_on(kafka_url, "cts.compose-created", count=3) wraps all three imports; all three messages are captured and asserted together after the body completes.

  4. Predicates removedkafka_message_on and _consume_kafka_message no longer accept a predicate. Every received message is returned as-is; unexpected messages (duplicates, wrong events) will cause the caller's assertions to fail rather than being silently drained.

Deploys a single-node Apache Kafka 3.9.2 broker (KRaft mode) in the
EaaS ephemeral namespace and adds Kafka message assertions to the
existing workflow integration tests.

Pipeline changes (.tekton/integration-test-eaas.yaml):
- Add deploy-kafka task that starts apache/kafka:3.9.2 in KRaft mode.
  Three emptyDir volumes (kafka-config, kafka-logs, kafka-gc-logs) and
  an initContainer (copy-kafka-config) satisfy OpenShift restricted-v2
  SCC. KAFKA_CONTROLLER_QUORUM_VOTERS uses localhost:9093 to avoid the
  bootstrap deadlock. deploy-cts lists deploy-kafka in runAfter.
- Pass KAFKA_URL=kafka:9092 and install kafka-python in run-tests.

Test changes (tests/test_integration_api.py):
- kafka_url module fixture reads KAFKA_URL; returns None (not skip)
  when unset so all existing workflow tests always run.
- kafka_message_on(kafka_url, topic) context manager snapshots the
  partition-0 end offset before its body, then consumes the first
  message at or after that offset after the body completes. No
  predicates: every received message is returned as-is and asserted
  by the caller, so unexpected messages cause test failures.
- kafka_messages_on(kafka_url, topic, count) batch variant for
  scenarios that perform N actions on the same topic.
- import_compose, tag_compose, untag_compose each accept an optional
  kafka_url parameter; when set, wraps the HTTP call in the context
  manager and asserts the event name and compose_info reference.
- test_workflow_compose_import, test_workflow_respin_increment,
  test_workflow_full_lifecycle all pass kafka_url to the helpers so
  Kafka assertions run as part of the existing scenarios (not as a
  separate test). The respin test uses kafka_messages_on to capture
  all three compose-created messages in one batch.

Bug fix (cts/messaging.py):
- Convert the string "none" to Python None for compression_type so
  KafkaProducer does not reject the value with "Not supported codec".

Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
@lubomir

lubomir commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

test_workflow_compose_import was receiving a stale compose-created message from test_composes_pagination. Because CTS publishes asynchronously, the three messages produced by the pagination test could arrive after test_workflow_compose_import snapshotted the topic offset, causing the consumer to return a PaginationTest message instead of the IntegrationTest one.

Fixed by adding kafka_url to test_composes_pagination and passing it to each import_compose call there. This ensures every compose-created message is consumed immediately after it is produced, so the topic is clean before the next test snapshots the offset — consistent with the design principle that every published message is accounted for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants