From 7171763852d5e1282d20aeb12182acca27ac3276 Mon Sep 17 00:00:00 2001 From: matheus1lva Date: Tue, 30 Jun 2026 20:50:30 -0300 Subject: [PATCH] perf(ingest): atomic upsert for thing.defaults, drop FOR UPDATE lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace SELECT ... FOR UPDATE + read-modify-write in upsertThing with a single INSERT ... ON CONFLICT DO UPDATE that merges defaults in-DB via jsonb || (COALESCE(thing.defaults,'{}') || EXCLUDED.defaults). Same shallow right-wins merge as the old {...current,...new}, but atomic under the ON CONFLICT row lock instead of an explicit transaction + row lock — removes the lock-wait contention on hot thing rows that drove a 138ms mean over 3.87M calls, and closes a latent new-row clobber race in the old path. Moved the upsert into db.ts as upsertThingDefaults so it's testable without going through ThingSchema.parse; db.spec.ts pins the merge semantics so this can't silently regress. No change to probe.fetchEventCounts (TTL bump explored, reverted: not needed, the query is already optimally planned). --- packages/ingest/db.spec.ts | 90 +++++++++++++++++++++++++++++++++++ packages/ingest/db.ts | 17 ++++++- packages/ingest/load/index.ts | 23 +-------- 3 files changed, 108 insertions(+), 22 deletions(-) create mode 100644 packages/ingest/db.spec.ts diff --git a/packages/ingest/db.spec.ts b/packages/ingest/db.spec.ts new file mode 100644 index 00000000..58848524 --- /dev/null +++ b/packages/ingest/db.spec.ts @@ -0,0 +1,90 @@ +import { expect } from 'chai' +import { types } from 'lib' +import db, { upsertThingDefaults } from './db' + +// upsertThingDefaults replaced a SELECT ... FOR UPDATE + read-modify-write +// transaction with a single INSERT ... ON CONFLICT DO UPDATE that merges +// `defaults` in-DB via jsonb `||`. This pins the old `{ ...current, ...new }` +// shallow right-wins merge semantics so the perf rewrite can't silently change +// upsert behavior. Cases below mirror real callers: registry/event/hook.ts sets +// a vault's initial defaults (yearn, origin, registry, apiVersion, ...), and +// StrategyChanged/hook.ts later upserts the same row with only a subset of keys +// (v3, erc4626, apiVersion, asset, decimals, inceptBlock, inceptTime) — the +// omitted keys (yearn, origin, registry) must survive the merge, since +// idx_thing_chain_id_address_defaults filters on defaults->>'yearn'. +describe('upsertThingDefaults', () => { + const thing = { chainId: 1, address: '0x1', label: 'vault' } + + afterEach(async () => { + await db.query('DELETE FROM thing WHERE chain_id = $1 AND address = $2 AND label = $3', + [thing.chainId, thing.address, thing.label]) + }) + + async function getDefaults() { + const result = await db.query('SELECT defaults FROM thing WHERE chain_id = $1 AND address = $2 AND label = $3', + [thing.chainId, thing.address, thing.label]) + return result.rows[0]?.defaults + } + + it('inserts defaults on a new row', async () => { + await upsertThingDefaults({ ...thing, defaults: { apiVersion: '1.0.0' } } as types.Thing) + expect(await getDefaults()).to.deep.equal({ apiVersion: '1.0.0' }) + }) + + it('shallow merges new keys into existing defaults, new value wins on overlap', async () => { + await upsertThingDefaults({ ...thing, defaults: { apiVersion: '1.0.0', origin: 'yearn' } } as types.Thing) + await upsertThingDefaults({ ...thing, defaults: { apiVersion: '2.0.0', inceptBlock: 100 } } as types.Thing) + + expect(await getDefaults()).to.deep.equal({ apiVersion: '2.0.0', origin: 'yearn', inceptBlock: 100 }) + }) + + it('preserves keys a later upsert omits (registry hook, then StrategyChanged hook on the same vault)', async () => { + await upsertThingDefaults({ + ...thing, + defaults: { erc4626: true, v3: true, yearn: true, origin: 'yearn', registry: '0xregistry', apiVersion: '3.0.0' }, + } as types.Thing) + + // StrategyChanged/hook.ts never sends yearn/origin/registry. + await upsertThingDefaults({ + ...thing, + defaults: { v3: true, erc4626: true, apiVersion: '3.0.4', asset: '0xasset', decimals: 18, inceptBlock: 100, inceptTime: 1000 }, + } as types.Thing) + + expect(await getDefaults()).to.deep.equal({ + erc4626: true, v3: true, yearn: true, origin: 'yearn', registry: '0xregistry', + apiVersion: '3.0.4', asset: '0xasset', decimals: 18, inceptBlock: 100, inceptTime: 1000, + }) + }) + + it('does not drop defaults.yearn when a later upsert omits it', async () => { + await upsertThingDefaults({ ...thing, defaults: { yearn: true } } as types.Thing) + await upsertThingDefaults({ ...thing, defaults: { inceptBlock: 100 } } as types.Thing) + + expect((await getDefaults()).yearn).to.equal(true) + }) + + it('upserting {} defaults is a no-op on existing keys', async () => { + await upsertThingDefaults({ ...thing, defaults: { apiVersion: '1.0.0', yearn: true } } as types.Thing) + await upsertThingDefaults({ ...thing, defaults: {} } as types.Thing) + + expect(await getDefaults()).to.deep.equal({ apiVersion: '1.0.0', yearn: true }) + }) + + it('replaces a nested object wholesale instead of deep-merging it (roleManager project)', async () => { + await upsertThingDefaults({ + ...thing, + defaults: { roleManagerFactory: '0xfactory', project: { id: '0xproject-a' }, inceptBlock: 100 }, + } as types.Thing) + + await upsertThingDefaults({ + ...thing, + defaults: { project: { id: '0xproject-b' } }, + } as types.Thing) + + const defaults = await getDefaults() + // project is fully replaced, not deep-merged: no leftover keys from the old nested object. + expect(defaults.project).to.deep.equal({ id: '0xproject-b' }) + expect(defaults.roleManagerFactory).to.equal('0xfactory') + expect(defaults.inceptBlock).to.equal(100) + }) +}) diff --git a/packages/ingest/db.ts b/packages/ingest/db.ts index ec21543c..ed1f1f8d 100644 --- a/packages/ingest/db.ts +++ b/packages/ingest/db.ts @@ -2,7 +2,7 @@ import { z } from 'zod' import { strings } from 'lib' -import { StrideSchema } from 'lib/types' +import { StrideSchema, Thing } from 'lib/types' import { Pool, PoolClient, types as pgTypes } from 'pg' import { snakeToCamelCols } from 'lib/strings' @@ -110,6 +110,21 @@ export async function getSparkline(chainId: number, address: string, label: stri }).array().parse(result.rows) } +export async function upsertThingDefaults(thing: Thing, client?: PoolClient) { + // Single-statement upsert with an in-DB jsonb merge, replacing a prior + // SELECT ... FOR UPDATE + read-modify-write that serialized concurrent ingest + // on hot `thing` rows (the lock wait dominated the cost). `||` is a shallow + // right-wins merge, matching the former { ...currentDefaults, ...thing.defaults }, + // and runs atomically under the row lock taken by ON CONFLICT, so there is no + // lost-update race left to guard against. + await (client ?? db).query(` + INSERT INTO thing (chain_id, address, label, defaults) + VALUES ($1, $2, $3, $4) + ON CONFLICT (chain_id, address, label) + DO UPDATE SET defaults = COALESCE(thing.defaults, '{}'::jsonb) || EXCLUDED.defaults + `, [thing.chainId, thing.address, thing.label, thing.defaults]) +} + export function toUpsertSql(table: string, pk: string, data: object, where?: string) { const timestampConversionExceptions = [ 'profit_max_unlock_time' ] diff --git a/packages/ingest/load/index.ts b/packages/ingest/load/index.ts index 59897be0..22df23ab 100644 --- a/packages/ingest/load/index.ts +++ b/packages/ingest/load/index.ts @@ -1,6 +1,6 @@ import { z } from 'zod' import { mq, strider, types } from 'lib' -import db, { firstRow, getTravelledStrides, toUpsertSql } from '../db' +import db, { firstRow, getTravelledStrides, toUpsertSql, upsertThingDefaults } from '../db' import { Processor } from 'lib/processor' import { PoolClient } from 'pg' import { OutputSchema, SnapshotSchema, ThingSchema, zhexstring } from 'lib/types' @@ -121,26 +121,7 @@ export async function upsertSnapshot(data: object) { export async function upsertThing(data: object) { const thing = ThingSchema.parse(data) - const client = await db.connect() - - try { - await client.query('BEGIN') - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const currentDefaults: any = (await client.query( - 'SELECT defaults FROM thing WHERE chain_id = $1 AND address = $2 AND label = $3 FOR UPDATE', - [thing.chainId, thing.address, thing.label])) - .rows[0]?.defaults - if (currentDefaults) thing.defaults = { ...currentDefaults, ...thing.defaults } - await upsert(thing, 'thing', 'chain_id, address, label', undefined, client) - await client.query('COMMIT') - - } catch(error) { - await client.query('ROLLBACK') - throw error - - } finally { - client.release() - } + await upsertThingDefaults(thing) } // eslint-disable-next-line @typescript-eslint/no-explicit-any