Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/database/sql/bundles/repair.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
-- selectIndexedChildIds
-- Returns ids of children of @parent_id whose row in new_data_items or
-- stable_data_items has been fully populated by a prior parser-emit
-- (signalled by `data_offset IS NOT NULL`). Used by the Ans104Unbundler
-- before parsing to build a skip-set so the parser can drop redundant
-- child emits when a bundle is re-parsed.
--
-- The `data_offset IS NOT NULL` predicate is critical. The optimistic
-- insert path (insertOrIgnoreNewDataItem, used by /ar-io/admin/queue-data-item)
-- intentionally writes the eleven root-atom columns as NULL — including
-- data_offset — and a subsequent full parser-emit fills them via the
-- COALESCE/IFNULL clauses in upsertNewDataItem's ON CONFLICT UPDATE.
-- Skipping the emit for an optimistically-inserted row would strand
-- those tuple fields permanently NULL. We only skip rows whose tuple
-- fields are already populated.
SELECT id FROM new_data_items
WHERE parent_id = @parent_id
AND data_offset IS NOT NULL
UNION
SELECT id FROM stable_data_items
WHERE parent_id = @parent_id
AND data_offset IS NOT NULL

-- selectFailedBundleIds
SELECT DISTINCT id
FROM (
Expand Down
14 changes: 14 additions & 0 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,13 @@ export class StandaloneSqliteDatabaseWorker {
return row?.n ?? 0;
}

getIndexedChildIds(parentId: string): string[] {
const rows = this.stmts.bundles.selectIndexedChildIds.all({
parent_id: fromB64Url(parentId),
});
return rows.map((row): string => toB64Url(row.id));
}

backfillBundles() {
this.stmts.bundles.insertMissingBundles.run();
}
Expand Down Expand Up @@ -3319,6 +3326,10 @@ export class StandaloneSqliteDatabase
return this.queueRead('bundles', 'getRepairBacklogCount', undefined);
}

getIndexedChildIds(parentId: string): Promise<string[]> {
return this.queueRead('bundles', 'getIndexedChildIds', [parentId]);
}

backfillBundles() {
return this.queueRead('bundles', 'backfillBundles', undefined);
}
Expand Down Expand Up @@ -3844,6 +3855,9 @@ if (!isMainThread) {
case 'getRepairBacklogCount':
parentPort?.postMessage(worker.getRepairBacklogCount());
break;
case 'getIndexedChildIds':
parentPort?.postMessage(worker.getIndexedChildIds(args[0]));
break;
case 'backfillBundles':
worker.backfillBundles();
parentPort?.postMessage(null);
Expand Down
43 changes: 41 additions & 2 deletions src/lib/ans-104.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,13 @@ export class Ans104Parser {
parentId,
parentIndex,
rootParentOffset,
skipChildIds,
}: {
rootTxId: string;
parentId: string;
parentIndex: number;
rootParentOffset: number;
skipChildIds?: string[];
}): Promise<void> {
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
Expand Down Expand Up @@ -332,6 +334,7 @@ export class Ans104Parser {
parentIndex,
bundlePath,
rootParentOffset,
skipChildIds,
},
});
this.drainQueue();
Expand Down Expand Up @@ -408,19 +411,42 @@ if (!isMainThread) {
process.exit(0);
}

const { rootTxId, parentId, parentIndex, bundlePath, rootParentOffset } =
message;
const {
rootTxId,
parentId,
parentIndex,
bundlePath,
rootParentOffset,
skipChildIds,
} = message;
let stream: fs.ReadStream | undefined = undefined;
try {
stream = fs.createReadStream(bundlePath);
const iterable = await processBundleStream(stream);
const bundleLength = iterable.length;
let matchedItemCount = 0;
let duplicatedItemCount = 0;
let alreadyIndexedSkippedCount = 0;

const fnLog = log.child({ rootTxId, parentId, bundleLength });
fnLog.info('Unbundling ANS-104 bundle stream data items...');

// Set of data item ids known to already be in `new_data_items` or
// `stable_data_items` for this parent (supplied by the caller).
// Used to avoid re-emitting children that previous parses of this
// same bundle already produced — which happens whenever a bundle
// is re-parsed via the repair worker because some of its children
// were dropped at the data-item indexer's queue cap. Skipped items
// still count toward `matchedItemCount` so the bundle's
// `matched_data_item_count` invariant (= bundle_data_items rows)
// continues to hold. We assume the filter is stable between parses
// — when filters change, the filter-reprocess path resets bundles
// separately and `skipChildIds` is empty for those.
const skipSet =
Array.isArray(skipChildIds) && skipChildIds.length > 0
? new Set<string>(skipChildIds)
: null;

const processedDataItemIds = new Set<string>();
for await (const [index, dataItem] of iterable.entries()) {
const diLog = fnLog.child({
Expand All @@ -447,6 +473,18 @@ if (!isMainThread) {

processedDataItemIds.add(dataItem.id);

// Skip re-emit for children already indexed in a prior parse of
// this same bundle. We count toward matchedItemCount so the
// bundle's matched_data_item_count invariant continues to hold
// (the existing bundle_data_items rows already represent the
// "matched" outcome from when this item was first emitted).
if (skipSet?.has(dataItem.id)) {
alreadyIndexedSkippedCount++;
matchedItemCount++;
diLog.debug('Skipping data item: already indexed.');
continue;
}

// compute the hash of the data item data
const dataItemHash = await hashDataItemData(
bundlePath,
Expand Down Expand Up @@ -481,6 +519,7 @@ if (!isMainThread) {
itemCount: bundleLength,
matchedItemCount,
duplicatedItemCount,
alreadyIndexedSkippedCount,
});
} catch (error: any) {
log.error('Error unbundling ANS-104 bundle stream', {
Expand Down
13 changes: 13 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,19 @@ export const dataItemsUnbundledCounter = new promClient.Counter({
labelNames: ['bundle_format'],
});

// Data items the parser would have emitted but skipped because their
// id was already present in new_data_items or stable_data_items with
// data_offset populated (i.e. fully indexed by a prior parser-emit).
// Skip-set is provided to the parser by Ans104Unbundler via
// BundleIndex.getIndexedChildIds. Increases on this counter mean
// redundant queue work that we successfully avoided; persistent zero
// after a bundle re-parse cycle would indicate the skip-set isn't
// being populated or the path isn't being exercised.
export const dataItemsSkippedAlreadyIndexedCounter = new promClient.Counter({
name: 'bundles_data_items_skipped_already_indexed_total',
help: 'Count of data items the parser skipped re-emit for because their row was already fully populated under the same parent',
});

// Distribution of data-item counts per unbundled bundle. Use for correlating
// queue/heap spikes with the *trigger* bundle's size — without this, we only
// see data_items_unbundled_total rising and can't tell if it's a single
Expand Down
6 changes: 6 additions & 0 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ const ans104Unbundler = new Ans104Unbundler({
dataItemIndexFilterString: config.ANS104_INDEX_FILTER_STRING,
workerCount: config.ANS104_UNBUNDLE_WORKERS,
shouldUnbundle: shouldUnbundleDataItems,
bundleIndex,
});
metrics.registerQueueLengthGauge('ans104Unbundler', {
length: () => ans104Unbundler.queueDepth(),
Expand Down Expand Up @@ -1350,6 +1351,11 @@ eventEmitter.on(events.ANS104_UNBUNDLE_COMPLETE, async (bundleEvent: any) => {
{ bundle_format: 'ans-104' },
bundleEvent.itemCount,
);
if (typeof bundleEvent.alreadyIndexedSkippedCount === 'number') {
metrics.dataItemsSkippedAlreadyIndexedCounter.inc(
bundleEvent.alreadyIndexedSkippedCount,
);
}
db.saveBundle({
id: bundleEvent.parentId,
format: 'ans-104',
Expand Down
8 changes: 8 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ export interface BundleIndex {
* `bundle_repair_pending_bundles` gauge.
*/
getRepairBacklogCount(): Promise<number>;
/**
* Ids of data items already indexed as children of `parentId`, across
* both `new_data_items` and `stable_data_items`. Used by the
* Ans104Unbundler to skip re-emit of children that previous parses
* already produced, when the same bundle is being re-parsed because
* some children were dropped at the data-item indexer queue cap.
*/
getIndexedChildIds(parentId: string): Promise<string[]>;
updateBundlesFullyIndexedAt(): Promise<void>;
updateBundlesForFilterChange(
unbundleFilter: string,
Expand Down
58 changes: 58 additions & 0 deletions src/workers/ans104-unbundler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,66 @@ describe('Ans104Unbundler', () => {
parentIndex: undefined,
rootParentOffset: 0,
rootTxId: 'root_tx_id',
// No bundleIndex was provided to the Ans104Unbundler in this
// test's setup, so the pre-load step is skipped entirely and
// skipChildIds is left undefined.
skipChildIds: undefined,
},
);
});

it('passes already-indexed child ids to the parser when bundleIndex is provided', async () => {
mock.method(mockAns104Parser, 'parseBundle');
const knownChildren = ['child-a', 'child-b', 'child-c'];
(ans104Unbundler as any).bundleIndex = {
getIndexedChildIds: async (parentId: string) => {
assert.equal(parentId, 'test-id');
return knownChildren;
},
};

const mockItem = {
id: 'test-id',
root_tx_id: 'root_tx_id',
} as UnbundleableItem;

await ans104Unbundler.queueItem(mockItem, false, true);

assert.deepEqual(
(mockAns104Parser.parseBundle as any).mock.calls[0].arguments[0],
{
parentId: 'test-id',
parentIndex: undefined,
rootParentOffset: 0,
rootTxId: 'root_tx_id',
skipChildIds: knownChildren,
},
);
});

it('falls back to no skip-set when getIndexedChildIds throws', async () => {
mock.method(mockAns104Parser, 'parseBundle');
(ans104Unbundler as any).bundleIndex = {
getIndexedChildIds: async () => {
throw new Error('boom');
},
};

const mockItem = {
id: 'test-id',
root_tx_id: 'root_tx_id',
} as UnbundleableItem;

await ans104Unbundler.queueItem(mockItem, false, true);

// The unbundle still proceeds — failure to pre-load the skip-set
// degrades to the pre-fix behavior, not a hard failure.
assert.equal((mockAns104Parser.parseBundle as any).mock.callCount(), 1);
assert.equal(
(mockAns104Parser.parseBundle as any).mock.calls[0].arguments[0]
.skipChildIds,
undefined,
);
});
});
});
43 changes: 43 additions & 0 deletions src/workers/ans104-unbundler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import * as winston from 'winston';
import { Ans104Parser } from '../lib/ans-104.js';
import * as metrics from '../metrics.js';
import {
BundleIndex,
ContiguousDataSource,
ItemFilter,
NormalizedBundleDataItem,
Expand All @@ -38,6 +39,7 @@ export class Ans104Unbundler {
// Dependencies
private log: winston.Logger;
private filter: ItemFilter;
private bundleIndex: BundleIndex | undefined;

// Unbundling queue
private workerCount: number;
Expand All @@ -61,6 +63,7 @@ export class Ans104Unbundler {
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
shouldUnbundle = () => true,
ans104Parser,
bundleIndex,
}: {
log: winston.Logger;
eventEmitter: EventEmitter;
Expand All @@ -71,9 +74,20 @@ export class Ans104Unbundler {
maxQueueSize?: number;
shouldUnbundle?: () => boolean;
ans104Parser?: Ans104Parser;
/**
* Optional index used to look up already-indexed children of a
* bundle being parsed. When supplied, the parser receives the
* id-set and skips re-emit for those children. Reduces redundant
* pressure on the data-item indexer queues when a bundle is
* re-parsed by the repair worker after some children were dropped
* at the indexer's queue cap on a previous attempt. Tests can omit
* this; production wires it from system.ts.
*/
bundleIndex?: BundleIndex;
}) {
this.log = log.child({ class: 'Ans104Unbundler' });
this.filter = filter;
this.bundleIndex = bundleIndex;
this.ans104Parser =
ans104Parser ||
new Ans104Parser({
Expand Down Expand Up @@ -161,11 +175,40 @@ export class Ans104Unbundler {
rootParentOffset = item.root_parent_offset + item.data_offset;
}

// Pre-fetch the set of children already indexed under this
// parent so the parser can skip re-emit for them. This is the
// primary efficiency lever when the repair worker re-queues a
// bundle whose previous parse had some children dropped at the
// data-item indexer's queue cap: without this set, the re-parse
// re-queues every child (90%+ of which are idempotent no-ops at
// the DB layer but still consume queue slots and contend with
// the genuinely-new children for the cap). Lookup is a single
// indexed SQLite read; if it fails for any reason we fall back
// to the un-skipped behavior rather than failing the unbundle.
let skipChildIds: string[] | undefined;
if (this.bundleIndex !== undefined) {
try {
skipChildIds = await this.bundleIndex.getIndexedChildIds(item.id);
if (skipChildIds.length > 0) {
log.debug('Pre-loaded already-indexed children to skip', {
count: skipChildIds.length,
});
}
} catch (err: any) {
log.warn(
'Failed to pre-load already-indexed children; proceeding without skip-set',
{ error: err?.message ?? String(err) },
);
skipChildIds = undefined;
}
}

await this.ans104Parser.parseBundle({
rootTxId,
parentId: item.id,
parentIndex: item.index,
rootParentOffset,
skipChildIds,
});
log.info('Bundle unbundled.');
}
Expand Down
Loading