diff --git a/build.gradle b/build.gradle
index 69c4fefd47..90d1e4c543 100644
--- a/build.gradle
+++ b/build.gradle
@@ -310,6 +310,7 @@ if (repo != null) {
'docs/inkless/**',
'dump-schema-compose.yml',
'inkless-benchmarks/**',
+ 'tests/kafkatest/services/inkless/**',
'tests/kafkatest/tests/inkless/**',
'inkless-sync/**',
'core/src/main/scala/io/aiven/inkless/**',
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index e60ce645c5..af495a6462 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -60,7 +60,9 @@ ARG ducker_creator=default
LABEL ducker.creator=$ducker_creator
# Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute iproute2 iputils-ping && apt-get -y clean
+# postgresql-client (psql) is used by the inkless consolidation pipeline test to
+# query the Postgres control plane (WAL batches / log_start_offset) and assert WAL pruning.
+RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute iproute2 iputils-ping postgresql-client && apt-get -y clean
RUN python3 -m pip install -U pip==21.1.1;
# NOTE: ducktape 0.12.0 supports py 3.9, 3.10, 3.11 and 3.12
COPY requirements.txt requirements.txt
@@ -132,6 +134,19 @@ RUN mkdir -p /opt/tiered-storage-plugin/core /opt/tiered-storage-plugin/s3 && \
tar xzf /tmp/s3.tgz -C /opt/tiered-storage-plugin/s3 --strip-components=1 && \
rm /tmp/*.tgz
+# MinIO client (mc) for consolidation system tests that assert object movement
+# (diskless WAL -> tiered-storage prefix) in MinIO.
+ARG mc_version=mc.RELEASE.2025-08-13T08-35-41Z
+RUN arch="$(dpkg --print-architecture)"; \
+ case "$arch" in \
+ amd64) mc_arch=amd64; mc_sha=01f866e9c5f9b87c2b09116fa5d7c06695b106242d829a8bb32990c00312e891 ;; \
+ arm64) mc_arch=arm64; mc_sha=14c8c9616cfce4636add161304353244e8de383b2e2752c0e9dad01d4c27c12c ;; \
+ *) echo "unsupported arch for mc: $arch" >&2; exit 1 ;; \
+ esac; \
+ curl -fsSL --retry 5 --retry-delay 5 "https://dl.min.io/client/mc/release/linux-${mc_arch}/archive/${mc_version}" -o /usr/local/bin/mc && \
+ echo "${mc_sha} /usr/local/bin/mc" | sha256sum -c - && \
+ chmod +x /usr/local/bin/mc
+
# To ensure the Kafka cluster starts successfully under JDK 17, we need to update the Zookeeper
# client from version 3.4.x to 3.5.7 in Kafka versions 2.1.1, 2.2.2, and 2.3.1, as the older Zookeeper
# client is incompatible with JDK 17. See KAFKA-17888 for more details.
diff --git a/tests/kafkatest/services/inkless/__init__.py b/tests/kafkatest/services/inkless/__init__.py
new file mode 100644
index 0000000000..98b8951775
--- /dev/null
+++ b/tests/kafkatest/services/inkless/__init__.py
@@ -0,0 +1,15 @@
+# Inkless
+# Copyright (C) 2024 - 2026 Aiven OY
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
diff --git a/tests/kafkatest/services/inkless/consolidation_verifier.py b/tests/kafkatest/services/inkless/consolidation_verifier.py
new file mode 100644
index 0000000000..97256c12d0
--- /dev/null
+++ b/tests/kafkatest/services/inkless/consolidation_verifier.py
@@ -0,0 +1,272 @@
+# Inkless
+# Copyright (C) 2024 - 2026 Aiven OY
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+"""Helper to assert the inkless consolidation pipeline moved and pruned data.
+
+Exposes three signals about a consolidating topic: the JMX consolidation gauges,
+MinIO object counts (via ``mc``), and the Postgres control plane (via ``psql``).
+Construct it with a started ``KafkaService`` and call the helpers from a test;
+the Postgres/MinIO deps and the mc/psql clients are provided by the system-test
+harness (see ``.github/workflows/inkless-system-tests.yml`` and the Dockerfile).
+"""
+
+import csv
+import json
+import shlex
+import time
+
+
+class ConsolidationVerifier(object):
+ # Broker-aggregate consolidation gauges; Yammer gauges expose "Value".
+ JMX_PACKAGE = "io.aiven.inkless.consolidation"
+ JMX_TYPE = "ConsolidationMetrics"
+ TOTAL_LAG = "ConsolidationTotalLag"
+ LOCAL_LAG = "ConsolidationLocalLag"
+ DELETABLE_MESSAGES = "ConsolidationDeletableMessages"
+ JMX_ATTRIBUTE = "Value"
+
+ # MinIO (on ducknet alias "storage").
+ MINIO_ALIAS = "systest"
+ MINIO_ENDPOINT = "http://storage:9000"
+ MINIO_BUCKET = "inkless"
+ MINIO_ACCESS_KEY = "minioadmin"
+ MINIO_SECRET_KEY = "minioadmin"
+ # rsm.config.key.prefix; WAL objects live at the bucket root instead.
+ TIERED_PREFIX = "tiered-storage/"
+
+ # Postgres control plane (on ducknet alias "postgres").
+ PG_HOST = "postgres"
+ PG_PORT = 5432
+ PG_DB = "inkless"
+ PG_USER = "admin"
+ PG_PASSWORD = "admin"
+
+ def __init__(self, kafka):
+ self.kafka = kafka
+ self.logger = kafka.logger
+ self._mc_alias_ready = False
+
+ # --------------------- JMX ---------------------
+
+ @classmethod
+ def aggregate_object_name(cls, name):
+ # Object name for the broker-aggregate gauge (property order is irrelevant).
+ return "%s:type=%s,name=%s" % (cls.JMX_PACKAGE, cls.JMX_TYPE, name)
+
+ @classmethod
+ def aggregate_object_names(cls):
+ # Object names for the three broker-aggregate gauges.
+ return [cls.aggregate_object_name(n)
+ for n in (cls.TOTAL_LAG, cls.LOCAL_LAG, cls.DELETABLE_MESSAGES)]
+
+ @classmethod
+ def find_aggregate_key(cls, keys, name):
+ # Locate the aggregate gauge's CSV key, ignoring key-property ordering and
+ # skipping per-partition keys. Returns the key, or None if absent.
+ attr_suffix = ":" + cls.JMX_ATTRIBUTE
+ for k in keys:
+ if not (k.startswith(cls.JMX_PACKAGE + ":") and k.endswith(attr_suffix)):
+ continue
+ if ("type=%s" % cls.JMX_TYPE) not in k:
+ continue
+ if "topic=" in k or "partition=" in k:
+ continue
+ if ("name=%s," % name) in k or ("name=%s:" % name) in k:
+ return k
+ return None
+
+ def start_jmx(self):
+ # Start JmxTool on the broker nodes only (the controller never exposes
+ # these MBeans, so wiring it into KafkaService would hang --wait).
+ self.kafka.jmx_object_names = self.aggregate_object_names()
+ self.kafka.jmx_attributes = [self.JMX_ATTRIBUTE]
+ for node in self.kafka.nodes:
+ self.kafka.start_jmx_tool(self.kafka.idx(node), node)
+
+ def latest_jmx_values(self):
+ # Most recent sample of each monitored attribute, summed across brokers
+ # (only the partition leader reports non-zero, so summing is safe).
+ totals = {}
+ for node in self.kafka.nodes:
+ sample = self._latest_jmx_sample_on(node)
+ for key, value in sample.items():
+ totals[key] = totals.get(key, 0.0) + value
+ return totals
+
+ def _latest_jmx_sample_on(self, node):
+ log = self.kafka.jmx_tool_log
+ try:
+ header = list(node.account.ssh_capture("head -n 1 %s" % log, allow_fail=True))
+ last = list(node.account.ssh_capture("tail -n 1 %s" % log, allow_fail=True))
+ except Exception as e: # noqa: BLE001 - best effort; missing log -> no sample
+ self.logger.debug("Could not read JMX log on %s: %s" % (node.account, e))
+ return {}
+ if not header or not last:
+ return {}
+ # Parse with csv: JMX ObjectNames contain commas (property separators) and
+ # are quoted in the header, so a naive comma split would misalign column
+ # names with their values. Header: "time",":",...
+ names = next(csv.reader([header[0].strip()]), [])
+ data_line = last[0].strip()
+ if not data_line or data_line.startswith('"'):
+ # Only the header has been written so far; no sample yet.
+ return {}
+ fields = next(csv.reader([data_line]), [])
+ sample = {}
+ for i in range(1, len(names)):
+ if i >= len(fields):
+ break
+ try:
+ sample[names[i]] = float(fields[i])
+ except ValueError:
+ continue
+ return sample
+
+ def _aggregate_value(self, name):
+ values = self.latest_jmx_values()
+ key = self.find_aggregate_key(values.keys(), name)
+ return values.get(key, 0.0) if key is not None else 0.0
+
+ def total_lag(self):
+ return self._aggregate_value(self.TOTAL_LAG)
+
+ def local_lag(self):
+ return self._aggregate_value(self.LOCAL_LAG)
+
+ def deletable_messages(self):
+ return self._aggregate_value(self.DELETABLE_MESSAGES)
+
+ # --------------------- Object store ---------------------
+
+ def _storage_node(self):
+ # Any broker node resolves the postgres/storage aliases on ducknet.
+ return self.kafka.nodes[0]
+
+ def _ensure_mc_alias(self, node):
+ if self._mc_alias_ready:
+ return
+ cmd = "mc alias set %s %s %s %s" % (
+ self.MINIO_ALIAS, self.MINIO_ENDPOINT,
+ self.MINIO_ACCESS_KEY, self.MINIO_SECRET_KEY)
+ rc = None
+ for _ in range(5):
+ rc = node.account.ssh(cmd, allow_fail=True)
+ if rc == 0:
+ self._mc_alias_ready = True
+ return
+ time.sleep(2)
+ raise AssertionError(
+ "Failed to configure mc alias '%s' against %s (rc=%s). Check MinIO "
+ "connectivity/credentials for the inkless system-test dependencies."
+ % (self.MINIO_ALIAS, self.MINIO_ENDPOINT, rc))
+
+ def object_keys(self, prefix=""):
+ # Object keys under the given bucket prefix, via mc ls --recursive --json.
+ node = self._storage_node()
+ self._ensure_mc_alias(node)
+ target = "%s/%s" % (self.MINIO_ALIAS, self.MINIO_BUCKET)
+ if prefix:
+ target += "/" + prefix
+ cmd = "mc ls --recursive --json %s" % target
+ keys = []
+ for line in node.account.ssh_capture(cmd, allow_fail=False):
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ obj = json.loads(line)
+ except ValueError:
+ raise AssertionError(
+ "Unexpected non-JSON output from '%s': %s" % (cmd, line))
+ status = obj.get("status")
+ if status == "error":
+ err = obj.get("error") or {}
+ detail = err.get("message") or err.get("cause") or obj
+ raise AssertionError("mc failed listing %s: %s" % (target, detail))
+ if status != "success":
+ continue
+ key = obj.get("key")
+ if key:
+ keys.append(key)
+ return keys
+
+ def tiered_object_count(self):
+ # Count under the tiered-storage prefix only (scanning the whole bucket
+ # in wait_until would slow and flap). Not topic-scoped, and the prefix
+ # accumulates across tests in one `ducker-ak test` run (the whole inkless
+ # suite is one invocation). Use baseline + delta (`> baseline`);
+ # absolute `== N` flaps with test order.
+ return len(self.object_keys(self.TIERED_PREFIX))
+
+ def wal_object_count(self):
+ # Bucket-root count (outside tiered-storage/), i.e. diskless WAL files.
+ # Same accumulation caveats as tiered_object_count: baseline + delta
+ # only, never absolute.
+ return len([k for k in self.object_keys() if not k.startswith(self.TIERED_PREFIX)])
+
+ # --------------------- Control plane ---------------------
+
+ def _psql(self, sql):
+ node = self._storage_node()
+ cmd = "PGPASSWORD=%s psql -h %s -p %d -U %s -d %s -tAc %s" % (
+ shlex.quote(self.PG_PASSWORD), shlex.quote(self.PG_HOST), self.PG_PORT,
+ shlex.quote(self.PG_USER), shlex.quote(self.PG_DB), shlex.quote(sql))
+ return "".join(node.account.ssh_capture(cmd, allow_fail=False)).strip()
+
+ @staticmethod
+ def _sql_literal(value):
+ # SQL string literal with single quotes doubled (standard escaping), so a
+ # topic with a quote can't break or inject into the query.
+ return "'%s'" % str(value).replace("'", "''")
+
+ def _psql_int(self, sql, default=0):
+ out = self._psql(sql)
+ if out == "" or out is None:
+ return default
+ try:
+ return int(out.splitlines()[0].strip())
+ except (ValueError, IndexError):
+ return default
+
+ def wal_batch_count(self, topic):
+ # WAL batch rows for the topic. Returns -1 when the topic has no `logs` rows
+ # yet, so "not registered" stays distinct from "registered and pruned to 0".
+ literal = self._sql_literal(topic)
+ return self._psql_int(
+ "SELECT CASE WHEN EXISTS (SELECT 1 FROM logs WHERE topic_name = %s) "
+ "THEN (SELECT count(*) FROM batches b JOIN logs l ON b.topic_id = l.topic_id "
+ "WHERE l.topic_name = %s) ELSE -1 END" % (literal, literal),
+ default=-1)
+
+ def min_log_start_offset(self, topic):
+ # Min log_start_offset across partitions; advances past 0 once WAL is pruned.
+ return self._psql_int(
+ "SELECT coalesce(min(log_start_offset), 0) FROM logs WHERE topic_name = %s"
+ % self._sql_literal(topic))
+
+ # --------------------- Tooling ---------------------
+
+ def verify_tooling(self):
+ # Fail fast if the ducker image lacks the mc/psql clients.
+ node = self._storage_node()
+ missing = []
+ for tool in ("mc", "psql"):
+ if node.account.ssh("command -v %s" % tool, allow_fail=True) != 0:
+ missing.append(tool)
+ assert not missing, (
+ "Missing client tool(s) %s on the broker node. Rebuild the ducker image "
+ "(tests/docker/Dockerfile installs `mc` and `postgresql-client`) with "
+ "`tests/docker/ducker-ak up` before running consolidation pipeline tests." % missing)
diff --git a/tests/kafkatest/tests/inkless/consolidation_pipeline_test.py b/tests/kafkatest/tests/inkless/consolidation_pipeline_test.py
new file mode 100644
index 0000000000..9de9ddb54f
--- /dev/null
+++ b/tests/kafkatest/tests/inkless/consolidation_pipeline_test.py
@@ -0,0 +1,185 @@
+# Inkless
+# Copyright (C) 2024 - 2026 Aiven OY
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import uuid
+
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.inkless.consolidation_verifier import ConsolidationVerifier
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+
+class ConsolidationPipelineTest(Test):
+ """Test for a born-consolidated topic: WAL -> local log -> remote, then prune.
+
+ Asserts the consolidation gauges are exported via JMX, the tiered-storage
+ prefix grows in MinIO, the WAL is pruned in the Postgres control plane, and
+ every acked record is still consumable afterwards.
+ """
+
+ # Unique per run: the Postgres/MinIO containers persist across runs, so a
+ # stale topic name would let old rows skew the control-plane queries.
+ TOPIC_PREFIX = "consolidation-pipeline"
+ NUM_PARTITIONS = 1
+ REPLICATION_FACTOR = 3
+ # Sized to roll several segments by size (1 MiB floor); only closed segments tier.
+ NUM_RECORDS = 300000
+
+ def __init__(self, test_context):
+ super(ConsolidationPipelineTest, self).__init__(test_context=test_context)
+ self.num_brokers = 3
+ self.TOPIC = "%s-%s" % (self.TOPIC_PREFIX, uuid.uuid4().hex[:8])
+
+ @cluster(num_nodes=6)
+ @matrix(metadata_quorum=[quorum.isolated_kraft])
+ def test_consolidation_pipeline(self, metadata_quorum):
+ self.kafka = KafkaService(
+ self.test_context,
+ num_nodes=self.num_brokers,
+ zk=None,
+ controller_num_nodes_override=1,
+ # Enable consolidation per-test; the constructor also flips the controller.
+ consolidation=True,
+ # Run the WAL pruner / file cleaner fast; cache TTL must stay <= retention/2.
+ server_prop_overrides=[
+ ["inkless.consolidation.cleanup.interval.ms", "5000"],
+ ["inkless.file.cleaner.interval.ms", "5000"],
+ ["inkless.file.cleaner.retention.period.ms", "6000"],
+ ["inkless.consume.batch.coordinate.cache.ttl.ms", "2000"],
+ ],
+ # JMX is started later on brokers only (see start_jmx); the controller
+ # never exposes these MBeans.
+ topics={
+ self.TOPIC: {
+ "partitions": self.NUM_PARTITIONS,
+ "replication-factor": self.REPLICATION_FACTOR,
+ "configs": {
+ "diskless.enable": "true",
+ "remote.storage.enable": "true",
+ "min.insync.replicas": 2,
+ # Roll segments by size/time so they close and get tiered.
+ "segment.bytes": 1048576,
+ "segment.ms": 5000,
+ },
+ },
+ },
+ )
+ self.kafka.start()
+
+ verifier = ConsolidationVerifier(self.kafka)
+ verifier.verify_tooling()
+
+ baseline_tiered = verifier.tiered_object_count()
+ self.logger.info("Baseline tiered-storage object count: %d" % baseline_tiered)
+ baseline_wal = verifier.wal_object_count()
+ self.logger.info("Baseline WAL object count: %d" % baseline_wal)
+
+ # Start JmxTool on the brokers before producing so we sample the whole run.
+ verifier.start_jmx()
+
+ # Produce the dataset, unthrottled.
+ producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka,
+ topic=self.TOPIC, max_messages=self.NUM_RECORDS,
+ throughput=-1)
+ producer.start()
+
+ wait_until(lambda: producer.num_acked >= self.NUM_RECORDS or producer.worker_errors,
+ timeout_sec=180, backoff_sec=1,
+ err_msg="Producer failed to produce %d records." % self.NUM_RECORDS)
+ assert not producer.worker_errors, "Producer errors: %s" % producer.worker_errors
+ producer.stop()
+ acked = producer.num_acked
+ producer.free()
+ self.logger.info("Produced and acked %d records" % acked)
+
+ # Wait for the WAL -> local log fetcher to catch up.
+ wait_until(lambda: verifier.local_lag() == 0,
+ timeout_sec=180, backoff_sec=2,
+ err_msg="Consolidation local lag did not drain to 0.")
+ self.logger.info("Consolidation local lag drained to 0")
+
+ # Wait for data to reach remote: the tiered-storage prefix grows.
+ wait_until(lambda: verifier.tiered_object_count() > baseline_tiered,
+ timeout_sec=180, backoff_sec=2,
+ err_msg="Tiered-storage object count did not grow above baseline.")
+ self.logger.info("Tiered-storage object count grew to %d" % verifier.tiered_object_count())
+
+ # Wait for the control plane to prune the WAL.
+ wait_until(lambda: verifier.wal_batch_count(self.TOPIC) == 0
+ or verifier.min_log_start_offset(self.TOPIC) > 0,
+ timeout_sec=180, backoff_sec=2,
+ err_msg="WAL was not pruned (log_start_offset did not advance) in the Postgres control plane.")
+ self.logger.info("Control plane pruned WAL: batch_count=%d, min_log_start_offset=%d" %
+ (verifier.wal_batch_count(self.TOPIC), verifier.min_log_start_offset(self.TOPIC)))
+
+ wal_after_prune = verifier.wal_object_count()
+ self.logger.info("WAL object count after prune: %d (baseline %d, delta %+d)" %
+ (wal_after_prune, baseline_wal, wal_after_prune - baseline_wal))
+
+ # Assert the consolidation gauges were exported and scraped (peaks logged
+ # for diagnostics; we don't assert lag rose, as it is not reliably observable).
+ self.kafka.read_jmx_output_all_nodes()
+ jmx_keys = self.kafka.maximum_jmx_value
+ local_lag_key = ConsolidationVerifier.find_aggregate_key(jmx_keys, ConsolidationVerifier.LOCAL_LAG)
+ total_lag_key = ConsolidationVerifier.find_aggregate_key(jmx_keys, ConsolidationVerifier.TOTAL_LAG)
+ deletable_key = ConsolidationVerifier.find_aggregate_key(jmx_keys, ConsolidationVerifier.DELETABLE_MESSAGES)
+ local_lag_peak = jmx_keys.get(local_lag_key, 0) if local_lag_key else 0
+ total_lag_peak = jmx_keys.get(total_lag_key, 0) if total_lag_key else 0
+ deletable_peak = jmx_keys.get(deletable_key, 0) if deletable_key else 0
+ self.logger.info("Consolidation JMX peaks: local_lag=%s total_lag=%s deletable=%s" %
+ (local_lag_peak, total_lag_peak, deletable_peak))
+ assert local_lag_key is not None, \
+ ("ConsolidationLocalLag was not exported/scraped via JMX. Expected a "
+ "%s 'name=ConsolidationLocalLag' Value column; got keys: %s"
+ % (ConsolidationVerifier.JMX_PACKAGE, sorted(jmx_keys)))
+
+ # Read every acked record back (from remote). Track unique offsets to avoid a false pass if
+ # the consumer skips ranges (e.g. due to data loss) or re-reads duplicates during retries/rebalances.
+ seen_offsets = set()
+ max_offset = {"value": -1}
+
+ def on_record_consumed(event, _node):
+ try:
+ offset = int(event.get("offset", -1))
+ except (TypeError, ValueError):
+ return
+ if offset >= 0:
+ seen_offsets.add(offset)
+ max_offset["value"] = max(max_offset["value"], offset)
+
+ consumer = VerifiableConsumer(
+ self.test_context, num_nodes=1, kafka=self.kafka,
+ topic=self.TOPIC, group_id="consolidation-pipeline-group",
+ on_record_consumed=on_record_consumed
+ )
+ consumer.start()
+ wait_until(lambda: (max_offset["value"] >= (acked - 1) and len(seen_offsets) >= acked) or consumer.worker_errors,
+ timeout_sec=180, backoff_sec=1,
+ err_msg="Consumer failed to read back all %d acked records." % acked)
+ assert not consumer.worker_errors, "Consumer errors: %s" % consumer.worker_errors
+ consumer.stop()
+ total_consumed = consumer.total_consumed()
+ consumer.free()
+ assert len(seen_offsets) == acked, (
+ "Expected to read %d unique offsets [0..%d], but saw %d (max_offset=%d)" %
+ (acked, acked - 1, len(seen_offsets), max_offset["value"])
+ )
+ self.logger.info("Read back %d records after consolidation + prune" % total_consumed)