From 33affdc1adb27898fd1bd2ca891bb08f333554b8 Mon Sep 17 00:00:00 2001 From: Ariel Melendez Date: Thu, 14 May 2026 21:10:27 -0700 Subject: [PATCH 1/2] perf(unbundle): skip re-emit for children already indexed (PE-9098) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the bundle repair worker re-queues a bundle (because some of its children were dropped at the data-item indexer's queue cap on a prior attempt), the parser previously re-emitted EVERY child — including the 90%+ that were already indexed. Each redundant emit consumed a slot in the dataItemIndexer / ans104DataIndexer queues, contending with the genuinely-new children for the cap. Under sustained backfill load this was the dominant contributor to the 2M+ items dropped per hour observed on the Turbo gw2 indexer. The fix: before parsing, the Ans104Unbundler queries the BundleIndex for the set of ids already present in new_data_items or stable_data_items under this parent. The set is passed to the parser worker. For each parsed item whose id is in the set, the parser increments matchedItemCount (preserving the bundle's matched_data_item_count invariant — see below) and skips the filter eval, hash, and DATA_ITEM_MATCHED emit. Net effect: redundant queue traffic disappears and the downstream queues breathe. matched_data_item_count invariant: upsertBundle uses `IFNULL(@matched, matched)` to update the column, and the fully-indexed predicate is strict equality between bundle_data_items row count and matched_data_item_count. If we lowered the emitted matched count by the skip amount, the new lower value would replace the existing one and the equality would never hold (rows > matched_data_item_count from then on, so fully_indexed_at never sets). Counting skips toward matchedItemCount avoids this. Assumption: the unbundle/index filter has not changed between parses — when filters do change, the filter-reprocess path clears the bundle separately and skipChildIds is empty for those. Failure handling: if getIndexedChildIds throws, we log and proceed without a skip-set rather than failing the unbundle. Worst case is the pre-fix behavior of redundant emits, not a regression. Five-edit DB plumbing for `selectIndexedChildIds` + `getIndexedChildIds`: SQL, worker impl, queue wrapper, message handler, BundleIndex interface. Tests cover three cases: - no bundleIndex provided -> skipChildIds undefined (unchanged behavior) - bundleIndex returns ids -> skipChildIds passed to parser - bundleIndex throws -> skipChildIds undefined, unbundle still proceeds Companion to PE-9098-bundle-repair-bdi-routing (the previous PR); together they address the BDI repair pipeline end-to-end. --- src/database/sql/bundles/repair.sql | 13 +++++++ src/database/standalone-sqlite.ts | 14 +++++++ src/lib/ans-104.ts | 43 ++++++++++++++++++++- src/system.ts | 1 + src/types.d.ts | 8 ++++ src/workers/ans104-unbundler.test.ts | 58 ++++++++++++++++++++++++++++ src/workers/ans104-unbundler.ts | 43 +++++++++++++++++++++ 7 files changed, 178 insertions(+), 2 deletions(-) diff --git a/src/database/sql/bundles/repair.sql b/src/database/sql/bundles/repair.sql index b80dc4518..99f4125ce 100644 --- a/src/database/sql/bundles/repair.sql +++ b/src/database/sql/bundles/repair.sql @@ -1,3 +1,16 @@ +-- selectIndexedChildIds +-- Returns ids of all data items already indexed as children of @parent_id, +-- across both new_data_items (pre-flush) and stable_data_items +-- (post-flush). Used by the Ans104Unbundler before parsing to build a +-- skip-set: any child the parser would otherwise emit but whose id is +-- already in this set can be dropped pre-queue, avoiding redundant +-- queue pressure and downstream upsert no-ops when a bundle is +-- re-parsed because some prior children were dropped at the data-item +-- indexer's queue cap. +SELECT id FROM new_data_items WHERE parent_id = @parent_id +UNION +SELECT id FROM stable_data_items WHERE parent_id = @parent_id + -- selectFailedBundleIds SELECT DISTINCT id FROM ( diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index b4de98a46..df1aad1a7 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -910,6 +910,13 @@ export class StandaloneSqliteDatabaseWorker { return row?.n ?? 0; } + getIndexedChildIds(parentId: string): string[] { + const rows = this.stmts.bundles.selectIndexedChildIds.all({ + parent_id: fromB64Url(parentId), + }); + return rows.map((row): string => toB64Url(row.id)); + } + backfillBundles() { this.stmts.bundles.insertMissingBundles.run(); } @@ -3319,6 +3326,10 @@ export class StandaloneSqliteDatabase return this.queueRead('bundles', 'getRepairBacklogCount', undefined); } + getIndexedChildIds(parentId: string): Promise { + return this.queueRead('bundles', 'getIndexedChildIds', [parentId]); + } + backfillBundles() { return this.queueRead('bundles', 'backfillBundles', undefined); } @@ -3844,6 +3855,9 @@ if (!isMainThread) { case 'getRepairBacklogCount': parentPort?.postMessage(worker.getRepairBacklogCount()); break; + case 'getIndexedChildIds': + parentPort?.postMessage(worker.getIndexedChildIds(args[0])); + break; case 'backfillBundles': worker.backfillBundles(); parentPort?.postMessage(null); diff --git a/src/lib/ans-104.ts b/src/lib/ans-104.ts index 2f24d3ffb..0883ced6a 100644 --- a/src/lib/ans-104.ts +++ b/src/lib/ans-104.ts @@ -271,11 +271,13 @@ export class Ans104Parser { parentId, parentIndex, rootParentOffset, + skipChildIds, }: { rootTxId: string; parentId: string; parentIndex: number; rootParentOffset: number; + skipChildIds?: string[]; }): Promise { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { @@ -332,6 +334,7 @@ export class Ans104Parser { parentIndex, bundlePath, rootParentOffset, + skipChildIds, }, }); this.drainQueue(); @@ -408,8 +411,14 @@ if (!isMainThread) { process.exit(0); } - const { rootTxId, parentId, parentIndex, bundlePath, rootParentOffset } = - message; + const { + rootTxId, + parentId, + parentIndex, + bundlePath, + rootParentOffset, + skipChildIds, + } = message; let stream: fs.ReadStream | undefined = undefined; try { stream = fs.createReadStream(bundlePath); @@ -417,10 +426,27 @@ if (!isMainThread) { const bundleLength = iterable.length; let matchedItemCount = 0; let duplicatedItemCount = 0; + let alreadyIndexedSkippedCount = 0; const fnLog = log.child({ rootTxId, parentId, bundleLength }); fnLog.info('Unbundling ANS-104 bundle stream data items...'); + // Set of data item ids known to already be in `new_data_items` or + // `stable_data_items` for this parent (supplied by the caller). + // Used to avoid re-emitting children that previous parses of this + // same bundle already produced — which happens whenever a bundle + // is re-parsed via the repair worker because some of its children + // were dropped at the data-item indexer's queue cap. Skipped items + // still count toward `matchedItemCount` so the bundle's + // `matched_data_item_count` invariant (= bundle_data_items rows) + // continues to hold. We assume the filter is stable between parses + // — when filters change, the filter-reprocess path resets bundles + // separately and `skipChildIds` is empty for those. + const skipSet = + Array.isArray(skipChildIds) && skipChildIds.length > 0 + ? new Set(skipChildIds) + : null; + const processedDataItemIds = new Set(); for await (const [index, dataItem] of iterable.entries()) { const diLog = fnLog.child({ @@ -447,6 +473,18 @@ if (!isMainThread) { processedDataItemIds.add(dataItem.id); + // Skip re-emit for children already indexed in a prior parse of + // this same bundle. We count toward matchedItemCount so the + // bundle's matched_data_item_count invariant continues to hold + // (the existing bundle_data_items rows already represent the + // "matched" outcome from when this item was first emitted). + if (skipSet?.has(dataItem.id)) { + alreadyIndexedSkippedCount++; + matchedItemCount++; + diLog.debug('Skipping data item: already indexed.'); + continue; + } + // compute the hash of the data item data const dataItemHash = await hashDataItemData( bundlePath, @@ -481,6 +519,7 @@ if (!isMainThread) { itemCount: bundleLength, matchedItemCount, duplicatedItemCount, + alreadyIndexedSkippedCount, }); } catch (error: any) { log.error('Error unbundling ANS-104 bundle stream', { diff --git a/src/system.ts b/src/system.ts index e7c32fe70..15e348d03 100644 --- a/src/system.ts +++ b/src/system.ts @@ -1197,6 +1197,7 @@ const ans104Unbundler = new Ans104Unbundler({ dataItemIndexFilterString: config.ANS104_INDEX_FILTER_STRING, workerCount: config.ANS104_UNBUNDLE_WORKERS, shouldUnbundle: shouldUnbundleDataItems, + bundleIndex, }); metrics.registerQueueLengthGauge('ans104Unbundler', { length: () => ans104Unbundler.queueDepth(), diff --git a/src/types.d.ts b/src/types.d.ts index ce2158d70..9ab90408e 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -287,6 +287,14 @@ export interface BundleIndex { * `bundle_repair_pending_bundles` gauge. */ getRepairBacklogCount(): Promise; + /** + * Ids of data items already indexed as children of `parentId`, across + * both `new_data_items` and `stable_data_items`. Used by the + * Ans104Unbundler to skip re-emit of children that previous parses + * already produced, when the same bundle is being re-parsed because + * some children were dropped at the data-item indexer queue cap. + */ + getIndexedChildIds(parentId: string): Promise; updateBundlesFullyIndexedAt(): Promise; updateBundlesForFilterChange( unbundleFilter: string, diff --git a/src/workers/ans104-unbundler.test.ts b/src/workers/ans104-unbundler.test.ts index a19ef0392..c76230a22 100644 --- a/src/workers/ans104-unbundler.test.ts +++ b/src/workers/ans104-unbundler.test.ts @@ -131,8 +131,66 @@ describe('Ans104Unbundler', () => { parentIndex: undefined, rootParentOffset: 0, rootTxId: 'root_tx_id', + // No bundleIndex was provided to the Ans104Unbundler in this + // test's setup, so the pre-load step is skipped entirely and + // skipChildIds is left undefined. + skipChildIds: undefined, }, ); }); + + it('passes already-indexed child ids to the parser when bundleIndex is provided', async () => { + mock.method(mockAns104Parser, 'parseBundle'); + const knownChildren = ['child-a', 'child-b', 'child-c']; + (ans104Unbundler as any).bundleIndex = { + getIndexedChildIds: async (parentId: string) => { + assert.equal(parentId, 'test-id'); + return knownChildren; + }, + }; + + const mockItem = { + id: 'test-id', + root_tx_id: 'root_tx_id', + } as UnbundleableItem; + + await ans104Unbundler.queueItem(mockItem, false, true); + + assert.deepEqual( + (mockAns104Parser.parseBundle as any).mock.calls[0].arguments[0], + { + parentId: 'test-id', + parentIndex: undefined, + rootParentOffset: 0, + rootTxId: 'root_tx_id', + skipChildIds: knownChildren, + }, + ); + }); + + it('falls back to no skip-set when getIndexedChildIds throws', async () => { + mock.method(mockAns104Parser, 'parseBundle'); + (ans104Unbundler as any).bundleIndex = { + getIndexedChildIds: async () => { + throw new Error('boom'); + }, + }; + + const mockItem = { + id: 'test-id', + root_tx_id: 'root_tx_id', + } as UnbundleableItem; + + await ans104Unbundler.queueItem(mockItem, false, true); + + // The unbundle still proceeds — failure to pre-load the skip-set + // degrades to the pre-fix behavior, not a hard failure. + assert.equal((mockAns104Parser.parseBundle as any).mock.callCount(), 1); + assert.equal( + (mockAns104Parser.parseBundle as any).mock.calls[0].arguments[0] + .skipChildIds, + undefined, + ); + }); }); }); diff --git a/src/workers/ans104-unbundler.ts b/src/workers/ans104-unbundler.ts index d304d99cd..b0b2caaaa 100644 --- a/src/workers/ans104-unbundler.ts +++ b/src/workers/ans104-unbundler.ts @@ -12,6 +12,7 @@ import * as winston from 'winston'; import { Ans104Parser } from '../lib/ans-104.js'; import * as metrics from '../metrics.js'; import { + BundleIndex, ContiguousDataSource, ItemFilter, NormalizedBundleDataItem, @@ -38,6 +39,7 @@ export class Ans104Unbundler { // Dependencies private log: winston.Logger; private filter: ItemFilter; + private bundleIndex: BundleIndex | undefined; // Unbundling queue private workerCount: number; @@ -61,6 +63,7 @@ export class Ans104Unbundler { maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, shouldUnbundle = () => true, ans104Parser, + bundleIndex, }: { log: winston.Logger; eventEmitter: EventEmitter; @@ -71,9 +74,20 @@ export class Ans104Unbundler { maxQueueSize?: number; shouldUnbundle?: () => boolean; ans104Parser?: Ans104Parser; + /** + * Optional index used to look up already-indexed children of a + * bundle being parsed. When supplied, the parser receives the + * id-set and skips re-emit for those children. Reduces redundant + * pressure on the data-item indexer queues when a bundle is + * re-parsed by the repair worker after some children were dropped + * at the indexer's queue cap on a previous attempt. Tests can omit + * this; production wires it from system.ts. + */ + bundleIndex?: BundleIndex; }) { this.log = log.child({ class: 'Ans104Unbundler' }); this.filter = filter; + this.bundleIndex = bundleIndex; this.ans104Parser = ans104Parser || new Ans104Parser({ @@ -161,11 +175,40 @@ export class Ans104Unbundler { rootParentOffset = item.root_parent_offset + item.data_offset; } + // Pre-fetch the set of children already indexed under this + // parent so the parser can skip re-emit for them. This is the + // primary efficiency lever when the repair worker re-queues a + // bundle whose previous parse had some children dropped at the + // data-item indexer's queue cap: without this set, the re-parse + // re-queues every child (90%+ of which are idempotent no-ops at + // the DB layer but still consume queue slots and contend with + // the genuinely-new children for the cap). Lookup is a single + // indexed SQLite read; if it fails for any reason we fall back + // to the un-skipped behavior rather than failing the unbundle. + let skipChildIds: string[] | undefined; + if (this.bundleIndex !== undefined) { + try { + skipChildIds = await this.bundleIndex.getIndexedChildIds(item.id); + if (skipChildIds.length > 0) { + log.debug('Pre-loaded already-indexed children to skip', { + count: skipChildIds.length, + }); + } + } catch (err: any) { + log.warn( + 'Failed to pre-load already-indexed children; proceeding without skip-set', + { error: err?.message ?? String(err) }, + ); + skipChildIds = undefined; + } + } + await this.ans104Parser.parseBundle({ rootTxId, parentId: item.id, parentIndex: item.index, rootParentOffset, + skipChildIds, }); log.info('Bundle unbundled.'); } From f7497668095a26c41a039fc6a5ae83011dffe678 Mon Sep 17 00:00:00 2001 From: Ariel Melendez Date: Thu, 14 May 2026 22:43:38 -0700 Subject: [PATCH 2/2] refine(unbundle): tighten skip predicate + add observability (PE-9098) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two refinements to the parser-skip-already-indexed work, addressing feedback from review: 1. **Tighten the predicate to `data_offset IS NOT NULL`** on both `new_data_items` and `stable_data_items`. The optimistic insert path (insertOrIgnoreNewDataItem, used by the admin `/queue-data-item` route) intentionally leaves the eleven root-atom columns NULL — including `data_offset` — expecting a subsequent full parser-emit to fill them via the COALESCE/IFNULL clauses in upsertNewDataItem's `ON CONFLICT UPDATE`. Without this predicate, our skip-set would include optimistically-inserted rows and strand their tuple fields permanently NULL across re-parses. With it, we only skip rows whose tuple fields are already populated — the case where re-emit is a true no-op. Cost: the existing covering indexes `(parent_id, id)` no longer cover the query (data_offset isn't in the index). Each row pays one extra page read. For a 10K-child bundle, ~10K extra heap reads. Still fast (sub-100ms worst case). If this becomes hot we can add a partial index `(parent_id, id) WHERE data_offset IS NOT NULL`. 2. **Add `bundles_data_items_skipped_already_indexed_total`** counter. Wired from the `ANS104_UNBUNDLE_COMPLETE` event handler off the `alreadyIndexedSkippedCount` value the parser already includes in its completion message. No label — the only realistic value would be `bundle_format` and ANS-104 is the only format in production use; an unused dimension on a Prometheus metric is just clutter. Tests unchanged (12 pass). Build + lint clean. --- src/database/sql/bundles/repair.sql | 30 +++++++++++++++++++---------- src/metrics.ts | 13 +++++++++++++ src/system.ts | 5 +++++ 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/src/database/sql/bundles/repair.sql b/src/database/sql/bundles/repair.sql index 99f4125ce..50f6ff54a 100644 --- a/src/database/sql/bundles/repair.sql +++ b/src/database/sql/bundles/repair.sql @@ -1,15 +1,25 @@ -- selectIndexedChildIds --- Returns ids of all data items already indexed as children of @parent_id, --- across both new_data_items (pre-flush) and stable_data_items --- (post-flush). Used by the Ans104Unbundler before parsing to build a --- skip-set: any child the parser would otherwise emit but whose id is --- already in this set can be dropped pre-queue, avoiding redundant --- queue pressure and downstream upsert no-ops when a bundle is --- re-parsed because some prior children were dropped at the data-item --- indexer's queue cap. -SELECT id FROM new_data_items WHERE parent_id = @parent_id +-- Returns ids of children of @parent_id whose row in new_data_items or +-- stable_data_items has been fully populated by a prior parser-emit +-- (signalled by `data_offset IS NOT NULL`). Used by the Ans104Unbundler +-- before parsing to build a skip-set so the parser can drop redundant +-- child emits when a bundle is re-parsed. +-- +-- The `data_offset IS NOT NULL` predicate is critical. The optimistic +-- insert path (insertOrIgnoreNewDataItem, used by /ar-io/admin/queue-data-item) +-- intentionally writes the eleven root-atom columns as NULL — including +-- data_offset — and a subsequent full parser-emit fills them via the +-- COALESCE/IFNULL clauses in upsertNewDataItem's ON CONFLICT UPDATE. +-- Skipping the emit for an optimistically-inserted row would strand +-- those tuple fields permanently NULL. We only skip rows whose tuple +-- fields are already populated. +SELECT id FROM new_data_items +WHERE parent_id = @parent_id + AND data_offset IS NOT NULL UNION -SELECT id FROM stable_data_items WHERE parent_id = @parent_id +SELECT id FROM stable_data_items +WHERE parent_id = @parent_id + AND data_offset IS NOT NULL -- selectFailedBundleIds SELECT DISTINCT id diff --git a/src/metrics.ts b/src/metrics.ts index 5f9f0a967..69acaeab8 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -100,6 +100,19 @@ export const dataItemsUnbundledCounter = new promClient.Counter({ labelNames: ['bundle_format'], }); +// Data items the parser would have emitted but skipped because their +// id was already present in new_data_items or stable_data_items with +// data_offset populated (i.e. fully indexed by a prior parser-emit). +// Skip-set is provided to the parser by Ans104Unbundler via +// BundleIndex.getIndexedChildIds. Increases on this counter mean +// redundant queue work that we successfully avoided; persistent zero +// after a bundle re-parse cycle would indicate the skip-set isn't +// being populated or the path isn't being exercised. +export const dataItemsSkippedAlreadyIndexedCounter = new promClient.Counter({ + name: 'bundles_data_items_skipped_already_indexed_total', + help: 'Count of data items the parser skipped re-emit for because their row was already fully populated under the same parent', +}); + // Distribution of data-item counts per unbundled bundle. Use for correlating // queue/heap spikes with the *trigger* bundle's size — without this, we only // see data_items_unbundled_total rising and can't tell if it's a single diff --git a/src/system.ts b/src/system.ts index 15e348d03..becc5ab31 100644 --- a/src/system.ts +++ b/src/system.ts @@ -1351,6 +1351,11 @@ eventEmitter.on(events.ANS104_UNBUNDLE_COMPLETE, async (bundleEvent: any) => { { bundle_format: 'ans-104' }, bundleEvent.itemCount, ); + if (typeof bundleEvent.alreadyIndexedSkippedCount === 'number') { + metrics.dataItemsSkippedAlreadyIndexedCounter.inc( + bundleEvent.alreadyIndexedSkippedCount, + ); + } db.saveBundle({ id: bundleEvent.parentId, format: 'ans-104',