diff --git a/src/database/sql/bundles/repair.sql b/src/database/sql/bundles/repair.sql index b80dc4518..50f6ff54a 100644 --- a/src/database/sql/bundles/repair.sql +++ b/src/database/sql/bundles/repair.sql @@ -1,3 +1,26 @@ +-- selectIndexedChildIds +-- Returns ids of children of @parent_id whose row in new_data_items or +-- stable_data_items has been fully populated by a prior parser-emit +-- (signalled by `data_offset IS NOT NULL`). Used by the Ans104Unbundler +-- before parsing to build a skip-set so the parser can drop redundant +-- child emits when a bundle is re-parsed. +-- +-- The `data_offset IS NOT NULL` predicate is critical. The optimistic +-- insert path (insertOrIgnoreNewDataItem, used by /ar-io/admin/queue-data-item) +-- intentionally writes the eleven root-atom columns as NULL — including +-- data_offset — and a subsequent full parser-emit fills them via the +-- COALESCE/IFNULL clauses in upsertNewDataItem's ON CONFLICT UPDATE. +-- Skipping the emit for an optimistically-inserted row would strand +-- those tuple fields permanently NULL. We only skip rows whose tuple +-- fields are already populated. +SELECT id FROM new_data_items +WHERE parent_id = @parent_id + AND data_offset IS NOT NULL +UNION +SELECT id FROM stable_data_items +WHERE parent_id = @parent_id + AND data_offset IS NOT NULL + -- selectFailedBundleIds SELECT DISTINCT id FROM ( diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index b4de98a46..df1aad1a7 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -910,6 +910,13 @@ export class StandaloneSqliteDatabaseWorker { return row?.n ?? 0; } + getIndexedChildIds(parentId: string): string[] { + const rows = this.stmts.bundles.selectIndexedChildIds.all({ + parent_id: fromB64Url(parentId), + }); + return rows.map((row): string => toB64Url(row.id)); + } + backfillBundles() { this.stmts.bundles.insertMissingBundles.run(); } @@ -3319,6 +3326,10 @@ export class StandaloneSqliteDatabase return this.queueRead('bundles', 'getRepairBacklogCount', undefined); } + getIndexedChildIds(parentId: string): Promise { + return this.queueRead('bundles', 'getIndexedChildIds', [parentId]); + } + backfillBundles() { return this.queueRead('bundles', 'backfillBundles', undefined); } @@ -3844,6 +3855,9 @@ if (!isMainThread) { case 'getRepairBacklogCount': parentPort?.postMessage(worker.getRepairBacklogCount()); break; + case 'getIndexedChildIds': + parentPort?.postMessage(worker.getIndexedChildIds(args[0])); + break; case 'backfillBundles': worker.backfillBundles(); parentPort?.postMessage(null); diff --git a/src/lib/ans-104.ts b/src/lib/ans-104.ts index 2f24d3ffb..0883ced6a 100644 --- a/src/lib/ans-104.ts +++ b/src/lib/ans-104.ts @@ -271,11 +271,13 @@ export class Ans104Parser { parentId, parentIndex, rootParentOffset, + skipChildIds, }: { rootTxId: string; parentId: string; parentIndex: number; rootParentOffset: number; + skipChildIds?: string[]; }): Promise { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { @@ -332,6 +334,7 @@ export class Ans104Parser { parentIndex, bundlePath, rootParentOffset, + skipChildIds, }, }); this.drainQueue(); @@ -408,8 +411,14 @@ if (!isMainThread) { process.exit(0); } - const { rootTxId, parentId, parentIndex, bundlePath, rootParentOffset } = - message; + const { + rootTxId, + parentId, + parentIndex, + bundlePath, + rootParentOffset, + skipChildIds, + } = message; let stream: fs.ReadStream | undefined = undefined; try { stream = fs.createReadStream(bundlePath); @@ -417,10 +426,27 @@ if (!isMainThread) { const bundleLength = iterable.length; let matchedItemCount = 0; let duplicatedItemCount = 0; + let alreadyIndexedSkippedCount = 0; const fnLog = log.child({ rootTxId, parentId, bundleLength }); fnLog.info('Unbundling ANS-104 bundle stream data items...'); + // Set of data item ids known to already be in `new_data_items` or + // `stable_data_items` for this parent (supplied by the caller). + // Used to avoid re-emitting children that previous parses of this + // same bundle already produced — which happens whenever a bundle + // is re-parsed via the repair worker because some of its children + // were dropped at the data-item indexer's queue cap. Skipped items + // still count toward `matchedItemCount` so the bundle's + // `matched_data_item_count` invariant (= bundle_data_items rows) + // continues to hold. We assume the filter is stable between parses + // — when filters change, the filter-reprocess path resets bundles + // separately and `skipChildIds` is empty for those. + const skipSet = + Array.isArray(skipChildIds) && skipChildIds.length > 0 + ? new Set(skipChildIds) + : null; + const processedDataItemIds = new Set(); for await (const [index, dataItem] of iterable.entries()) { const diLog = fnLog.child({ @@ -447,6 +473,18 @@ if (!isMainThread) { processedDataItemIds.add(dataItem.id); + // Skip re-emit for children already indexed in a prior parse of + // this same bundle. We count toward matchedItemCount so the + // bundle's matched_data_item_count invariant continues to hold + // (the existing bundle_data_items rows already represent the + // "matched" outcome from when this item was first emitted). + if (skipSet?.has(dataItem.id)) { + alreadyIndexedSkippedCount++; + matchedItemCount++; + diLog.debug('Skipping data item: already indexed.'); + continue; + } + // compute the hash of the data item data const dataItemHash = await hashDataItemData( bundlePath, @@ -481,6 +519,7 @@ if (!isMainThread) { itemCount: bundleLength, matchedItemCount, duplicatedItemCount, + alreadyIndexedSkippedCount, }); } catch (error: any) { log.error('Error unbundling ANS-104 bundle stream', { diff --git a/src/metrics.ts b/src/metrics.ts index 5f9f0a967..69acaeab8 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -100,6 +100,19 @@ export const dataItemsUnbundledCounter = new promClient.Counter({ labelNames: ['bundle_format'], }); +// Data items the parser would have emitted but skipped because their +// id was already present in new_data_items or stable_data_items with +// data_offset populated (i.e. fully indexed by a prior parser-emit). +// Skip-set is provided to the parser by Ans104Unbundler via +// BundleIndex.getIndexedChildIds. Increases on this counter mean +// redundant queue work that we successfully avoided; persistent zero +// after a bundle re-parse cycle would indicate the skip-set isn't +// being populated or the path isn't being exercised. +export const dataItemsSkippedAlreadyIndexedCounter = new promClient.Counter({ + name: 'bundles_data_items_skipped_already_indexed_total', + help: 'Count of data items the parser skipped re-emit for because their row was already fully populated under the same parent', +}); + // Distribution of data-item counts per unbundled bundle. Use for correlating // queue/heap spikes with the *trigger* bundle's size — without this, we only // see data_items_unbundled_total rising and can't tell if it's a single diff --git a/src/system.ts b/src/system.ts index e7c32fe70..becc5ab31 100644 --- a/src/system.ts +++ b/src/system.ts @@ -1197,6 +1197,7 @@ const ans104Unbundler = new Ans104Unbundler({ dataItemIndexFilterString: config.ANS104_INDEX_FILTER_STRING, workerCount: config.ANS104_UNBUNDLE_WORKERS, shouldUnbundle: shouldUnbundleDataItems, + bundleIndex, }); metrics.registerQueueLengthGauge('ans104Unbundler', { length: () => ans104Unbundler.queueDepth(), @@ -1350,6 +1351,11 @@ eventEmitter.on(events.ANS104_UNBUNDLE_COMPLETE, async (bundleEvent: any) => { { bundle_format: 'ans-104' }, bundleEvent.itemCount, ); + if (typeof bundleEvent.alreadyIndexedSkippedCount === 'number') { + metrics.dataItemsSkippedAlreadyIndexedCounter.inc( + bundleEvent.alreadyIndexedSkippedCount, + ); + } db.saveBundle({ id: bundleEvent.parentId, format: 'ans-104', diff --git a/src/types.d.ts b/src/types.d.ts index ce2158d70..9ab90408e 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -287,6 +287,14 @@ export interface BundleIndex { * `bundle_repair_pending_bundles` gauge. */ getRepairBacklogCount(): Promise; + /** + * Ids of data items already indexed as children of `parentId`, across + * both `new_data_items` and `stable_data_items`. Used by the + * Ans104Unbundler to skip re-emit of children that previous parses + * already produced, when the same bundle is being re-parsed because + * some children were dropped at the data-item indexer queue cap. + */ + getIndexedChildIds(parentId: string): Promise; updateBundlesFullyIndexedAt(): Promise; updateBundlesForFilterChange( unbundleFilter: string, diff --git a/src/workers/ans104-unbundler.test.ts b/src/workers/ans104-unbundler.test.ts index a19ef0392..c76230a22 100644 --- a/src/workers/ans104-unbundler.test.ts +++ b/src/workers/ans104-unbundler.test.ts @@ -131,8 +131,66 @@ describe('Ans104Unbundler', () => { parentIndex: undefined, rootParentOffset: 0, rootTxId: 'root_tx_id', + // No bundleIndex was provided to the Ans104Unbundler in this + // test's setup, so the pre-load step is skipped entirely and + // skipChildIds is left undefined. + skipChildIds: undefined, }, ); }); + + it('passes already-indexed child ids to the parser when bundleIndex is provided', async () => { + mock.method(mockAns104Parser, 'parseBundle'); + const knownChildren = ['child-a', 'child-b', 'child-c']; + (ans104Unbundler as any).bundleIndex = { + getIndexedChildIds: async (parentId: string) => { + assert.equal(parentId, 'test-id'); + return knownChildren; + }, + }; + + const mockItem = { + id: 'test-id', + root_tx_id: 'root_tx_id', + } as UnbundleableItem; + + await ans104Unbundler.queueItem(mockItem, false, true); + + assert.deepEqual( + (mockAns104Parser.parseBundle as any).mock.calls[0].arguments[0], + { + parentId: 'test-id', + parentIndex: undefined, + rootParentOffset: 0, + rootTxId: 'root_tx_id', + skipChildIds: knownChildren, + }, + ); + }); + + it('falls back to no skip-set when getIndexedChildIds throws', async () => { + mock.method(mockAns104Parser, 'parseBundle'); + (ans104Unbundler as any).bundleIndex = { + getIndexedChildIds: async () => { + throw new Error('boom'); + }, + }; + + const mockItem = { + id: 'test-id', + root_tx_id: 'root_tx_id', + } as UnbundleableItem; + + await ans104Unbundler.queueItem(mockItem, false, true); + + // The unbundle still proceeds — failure to pre-load the skip-set + // degrades to the pre-fix behavior, not a hard failure. + assert.equal((mockAns104Parser.parseBundle as any).mock.callCount(), 1); + assert.equal( + (mockAns104Parser.parseBundle as any).mock.calls[0].arguments[0] + .skipChildIds, + undefined, + ); + }); }); }); diff --git a/src/workers/ans104-unbundler.ts b/src/workers/ans104-unbundler.ts index d304d99cd..b0b2caaaa 100644 --- a/src/workers/ans104-unbundler.ts +++ b/src/workers/ans104-unbundler.ts @@ -12,6 +12,7 @@ import * as winston from 'winston'; import { Ans104Parser } from '../lib/ans-104.js'; import * as metrics from '../metrics.js'; import { + BundleIndex, ContiguousDataSource, ItemFilter, NormalizedBundleDataItem, @@ -38,6 +39,7 @@ export class Ans104Unbundler { // Dependencies private log: winston.Logger; private filter: ItemFilter; + private bundleIndex: BundleIndex | undefined; // Unbundling queue private workerCount: number; @@ -61,6 +63,7 @@ export class Ans104Unbundler { maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, shouldUnbundle = () => true, ans104Parser, + bundleIndex, }: { log: winston.Logger; eventEmitter: EventEmitter; @@ -71,9 +74,20 @@ export class Ans104Unbundler { maxQueueSize?: number; shouldUnbundle?: () => boolean; ans104Parser?: Ans104Parser; + /** + * Optional index used to look up already-indexed children of a + * bundle being parsed. When supplied, the parser receives the + * id-set and skips re-emit for those children. Reduces redundant + * pressure on the data-item indexer queues when a bundle is + * re-parsed by the repair worker after some children were dropped + * at the indexer's queue cap on a previous attempt. Tests can omit + * this; production wires it from system.ts. + */ + bundleIndex?: BundleIndex; }) { this.log = log.child({ class: 'Ans104Unbundler' }); this.filter = filter; + this.bundleIndex = bundleIndex; this.ans104Parser = ans104Parser || new Ans104Parser({ @@ -161,11 +175,40 @@ export class Ans104Unbundler { rootParentOffset = item.root_parent_offset + item.data_offset; } + // Pre-fetch the set of children already indexed under this + // parent so the parser can skip re-emit for them. This is the + // primary efficiency lever when the repair worker re-queues a + // bundle whose previous parse had some children dropped at the + // data-item indexer's queue cap: without this set, the re-parse + // re-queues every child (90%+ of which are idempotent no-ops at + // the DB layer but still consume queue slots and contend with + // the genuinely-new children for the cap). Lookup is a single + // indexed SQLite read; if it fails for any reason we fall back + // to the un-skipped behavior rather than failing the unbundle. + let skipChildIds: string[] | undefined; + if (this.bundleIndex !== undefined) { + try { + skipChildIds = await this.bundleIndex.getIndexedChildIds(item.id); + if (skipChildIds.length > 0) { + log.debug('Pre-loaded already-indexed children to skip', { + count: skipChildIds.length, + }); + } + } catch (err: any) { + log.warn( + 'Failed to pre-load already-indexed children; proceeding without skip-set', + { error: err?.message ?? String(err) }, + ); + skipChildIds = undefined; + } + } + await this.ans104Parser.parseBundle({ rootTxId, parentId: item.id, parentIndex: item.index, rootParentOffset, + skipChildIds, }); log.info('Bundle unbundled.'); }