From a234f20e67ff15558d18bb37d72671ccf8139245 Mon Sep 17 00:00:00 2001 From: Martin Wimpress Date: Fri, 26 Jun 2026 07:13:58 +0000 Subject: [PATCH 1/5] fix(agentsview): skip schema DDL for compatible pg push - Use read-only schema checks before DDL. - Skip schema and index maintenance for compatible PostgreSQL schemas. - Fall back to EnsureSchema() when the schema is missing or incompatible. - Add unit coverage for the new path. Closes #887 Signed-off-by: Martin Wimpress --- internal/postgres/push.go | 9 ++---- internal/postgres/schema.go | 25 ++++++++++++---- internal/postgres/schema_test.go | 49 ++++++++++++++++++++++++++++++++ internal/postgres/sync.go | 11 +++++++ 4 files changed, 82 insertions(+), 12 deletions(-) diff --git a/internal/postgres/push.go b/internal/postgres/push.go index 60f7b617b..ec260ec93 100644 --- a/internal/postgres/push.go +++ b/internal/postgres/push.go @@ -2682,13 +2682,8 @@ func (s *Sync) normalizeSyncTimestamps( ) error { s.schemaMu.Lock() defer s.schemaMu.Unlock() - if !s.schemaDone { - if err := EnsureSchema( - ctx, s.pg, s.schema, - ); err != nil { - return err - } - s.schemaDone = true + if err := s.ensureSchemaLocked(ctx); err != nil { + return err } return NormalizeLocalSyncStateTimestamps(s.effectiveSyncState()) } diff --git a/internal/postgres/schema.go b/internal/postgres/schema.go index a4e82973c..1fba9e18e 100644 --- a/internal/postgres/schema.go +++ b/internal/postgres/schema.go @@ -1448,9 +1448,17 @@ func CheckSchemaCompat( ctx context.Context, db *sql.DB, ) error { rows, err := db.QueryContext(ctx, - `SELECT id, created_at, deleted_at, updated_at, - termination_status, secret_leak_count, secrets_rules_version, - session_name + `SELECT key, value FROM sync_metadata LIMIT 0`) + if err != nil { + return fmt.Errorf( + "sync_metadata table missing required columns: %w", + err, + ) + } + rows.Close() + + rows, err = db.QueryContext(ctx, + `SELECT owner_marker, updated_at, `+pgSessionCols+` FROM sessions LIMIT 0`) if err != nil { return fmt.Errorf( @@ -1471,8 +1479,15 @@ func CheckSchemaCompat( rows.Close() rows, err = db.QueryContext(ctx, - `SELECT is_system, model, token_usage, context_tokens, - output_tokens, has_context_tokens, has_output_tokens + `SELECT session_id, ordinal, role, content, thinking_text, + timestamp, has_thinking, has_tool_use, + content_length, is_system, model, token_usage, + context_tokens, output_tokens, + has_context_tokens, has_output_tokens, + claude_message_id, claude_request_id, + source_type, source_subtype, source_uuid, + source_parent_uuid, is_sidechain, + is_compact_boundary FROM messages LIMIT 0`) if err != nil { return fmt.Errorf( diff --git a/internal/postgres/schema_test.go b/internal/postgres/schema_test.go index 470f33c98..cb94a7903 100644 --- a/internal/postgres/schema_test.go +++ b/internal/postgres/schema_test.go @@ -27,6 +27,11 @@ type schemaProbeRows struct { next int } +type schemaProbeQueryError struct { + contains string + err error +} + type schemaProbeState struct { mu sync.Mutex informationQueries int @@ -36,6 +41,7 @@ type schemaProbeState struct { existingColumnNames map[string][]string maxDataVersion int maxDataVersionErr error + queryErrors []schemaProbeQueryError } var ( @@ -115,6 +121,14 @@ func (c *schemaProbeConn) QueryContext( _ context.Context, query string, args []driver.NamedValue, ) (driver.Rows, error) { normalized := strings.ToLower(query) + for _, queryErr := range c.state.queryErrors { + if strings.Contains( + normalized, + strings.ToLower(queryErr.contains), + ) { + return nil, queryErr.err + } + } switch { case strings.Contains(normalized, "information_schema.columns"): c.state.mu.Lock() @@ -294,6 +308,41 @@ func TestEnsureSchemaChecksDataVersionBeforeDDL(t *testing.T) { "EnsureSchema must not mutate PG before data-version refusal") } +func TestSyncEnsureSchemaSkipsDDLWhenSchemaCompatible(t *testing.T) { + pg, state := newSchemaProbeDB(t, nil) + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Equal(t, 0, state.execCount(), + "compatible PG schema should use read-only probes only") +} + +func TestSyncEnsureSchemaRunsDDLWhenSchemaIncompatible(t *testing.T) { + pg, state := newSchemaProbeDB(t, map[string][]string{ + "sessions": { + "has_total_output_tokens", + "has_peak_context_tokens", + }, + "messages": { + "has_context_tokens", + "has_output_tokens", + }, + }) + state.queryErrors = []schemaProbeQueryError{{ + contains: "data_version", + err: errors.New( + `ERROR: column "data_version" does not exist (SQLSTATE 42703)`, + ), + }} + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Greater(t, state.execCount(), 0, + "incompatible PG schema should fall back to migration DDL") +} + func TestEnsureSchemaCreatesAnalyticsCoveringIndexes(t *testing.T) { db, state := newSchemaProbeDB(t, map[string][]string{ "sessions": { diff --git a/internal/postgres/sync.go b/internal/postgres/sync.go index 86304d11d..104c3ac4a 100644 --- a/internal/postgres/sync.go +++ b/internal/postgres/sync.go @@ -270,9 +270,20 @@ func (s *Sync) Close() error { func (s *Sync) EnsureSchema(ctx context.Context) error { s.schemaMu.Lock() defer s.schemaMu.Unlock() + return s.ensureSchemaLocked(ctx) +} + +func (s *Sync) ensureSchemaLocked(ctx context.Context) error { if s.schemaDone { return nil } + if err := CheckDataVersionCompat(ctx, s.pg); err != nil { + return err + } + if err := CheckSchemaCompat(ctx, s.pg); err == nil { + s.schemaDone = true + return nil + } if err := EnsureSchema(ctx, s.pg, s.schema); err != nil { return err } From d84b26b35abdc4fd199f43942b22fd512e778ef3 Mon Sep 17 00:00:00 2001 From: Martin Wimpress Date: Fri, 26 Jun 2026 21:30:25 +0000 Subject: [PATCH 2/5] fix(agentsview): run data repairs on compatible pg push path The compatible-schema fast path returned before EnsureSchema, so it skipped the non-DDL data repairs that keep is_automated and token coverage flags correct on existing rows. Run those repairs on the fast path while still skipping the index and column DDL that blocks pg serve reads (#887). The repairs issue only row-level writes and stay gated by schemaDone, so they run at most once per push process. Signed-off-by: Martin Wimpress --- internal/postgres/schema.go | 22 ++++++++++++++++++++++ internal/postgres/schema_test.go | 15 +++++++++++++-- internal/postgres/sync.go | 8 ++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/internal/postgres/schema.go b/internal/postgres/schema.go index 1fba9e18e..260f4f702 100644 --- a/internal/postgres/schema.go +++ b/internal/postgres/schema.go @@ -952,6 +952,28 @@ func backfillIsAutomatedPG( return nil } +// runSchemaDataRepairsPG runs the non-DDL correctness repairs that +// EnsureSchema performs: it recomputes is_automated and backfills +// token-coverage flags. These issue only row-level writes, so the +// compatible-schema fast path can run them without the index and +// column DDL that can block concurrent pg serve reads (issue #887). +func runSchemaDataRepairsPG(ctx context.Context, db *sql.DB) error { + if err := backfillIsAutomatedPG(ctx, db); err != nil { + return err + } + runRepair, err := shouldRunTokenCoverageRepair(ctx, db, false) + if err != nil { + return err + } + if !runRepair { + return nil + } + if err := backfillTokenCoverageFlags(ctx, db); err != nil { + return err + } + return markTokenCoverageRepairDone(ctx, db) +} + func batchUpdateAutomatedPG( ctx context.Context, pg *sql.DB, ids []string, val bool, diff --git a/internal/postgres/schema_test.go b/internal/postgres/schema_test.go index cb94a7903..933bbc4c4 100644 --- a/internal/postgres/schema_test.go +++ b/internal/postgres/schema_test.go @@ -314,8 +314,19 @@ func TestSyncEnsureSchemaSkipsDDLWhenSchemaCompatible(t *testing.T) { require.NoError(t, syncer.EnsureSchema(context.Background())) - assert.Equal(t, 0, state.execCount(), - "compatible PG schema should use read-only probes only") + executed := strings.ToLower(state.executedSQL()) + assert.NotContains(t, executed, "create index", + "compatible PG schema must skip index DDL") + assert.NotContains(t, executed, "alter index", + "compatible PG schema must skip index DDL") + assert.NotContains(t, executed, "create table", + "compatible PG schema must skip table DDL") + assert.NotContains(t, executed, "create schema", + "compatible PG schema must skip schema DDL") + assert.Equal(t, 0, state.alterTableExecCount(), + "compatible PG schema must not run column migrations") + assert.Contains(t, executed, "insert into sync_metadata", + "compatible PG schema must still run row-level data repairs") } func TestSyncEnsureSchemaRunsDDLWhenSchemaIncompatible(t *testing.T) { diff --git a/internal/postgres/sync.go b/internal/postgres/sync.go index 104c3ac4a..8de1029cb 100644 --- a/internal/postgres/sync.go +++ b/internal/postgres/sync.go @@ -281,6 +281,14 @@ func (s *Sync) ensureSchemaLocked(ctx context.Context) error { return err } if err := CheckSchemaCompat(ctx, s.pg); err == nil { + // Schema DDL is current, so skip the index and column + // maintenance that can lock against concurrent pg serve + // reads (issue #887). Still run the row-level data repairs + // so is_automated and token-coverage flags stay correct on + // existing rows. + if err := runSchemaDataRepairsPG(ctx, s.pg); err != nil { + return err + } s.schemaDone = true return nil } From 4724e71704d0c67e1443c3165d1c2a4ff2e3ce01 Mon Sep 17 00:00:00 2001 From: Martin Wimpress Date: Fri, 26 Jun 2026 22:14:51 +0000 Subject: [PATCH 3/5] fix(agentsview): require push tables before compatible pg fast path The compatible-schema fast path skipped EnsureSchema based on CheckSchemaCompat, which never probes model_pricing and treats cursor_usage_events as optional. Push queries model_pricing and writes cursor_usage_events, so an older schema missing those tables passed the probe and then failed the push instead of migrating. Gate the fast path on pushSchemaCurrent, which also requires both push-written tables, and fall back to EnsureSchema when either is absent. Signed-off-by: Martin Wimpress --- internal/postgres/schema.go | 14 +++++++++ internal/postgres/schema_test.go | 49 ++++++++++++++++++++++++++++++-- internal/postgres/sync.go | 2 +- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/internal/postgres/schema.go b/internal/postgres/schema.go index 260f4f702..dc600c747 100644 --- a/internal/postgres/schema.go +++ b/internal/postgres/schema.go @@ -1615,6 +1615,20 @@ func CheckSchemaCompat( return nil } +// pushSchemaCurrent reports whether the PG schema has everything a push +// needs. CheckSchemaCompat covers the read paths but does not require +// model_pricing (always queried by syncModelPricing) or +// cursor_usage_events (written by syncCursorUsageEvents), so probe those +// tables explicitly. When either is missing the caller must run +// EnsureSchema so push migrates the schema instead of failing. +func pushSchemaCurrent(ctx context.Context, db *sql.DB) bool { + if err := CheckSchemaCompat(ctx, db); err != nil { + return false + } + return pgHasTable(ctx, db, "model_pricing") && + pgHasTable(ctx, db, "cursor_usage_events") +} + // CheckDataVersionCompat rejects PG datasets containing rows written by a // newer agentsview parser. PG does not have SQLite's global user_version, so // the highest session data_version is the compatibility marker. diff --git a/internal/postgres/schema_test.go b/internal/postgres/schema_test.go index 933bbc4c4..37b53113c 100644 --- a/internal/postgres/schema_test.go +++ b/internal/postgres/schema_test.go @@ -39,6 +39,7 @@ type schemaProbeState struct { alterTableExecs []string currentSchema string existingColumnNames map[string][]string + existingTables map[string]bool maxDataVersion int maxDataVersionErr error queryErrors []schemaProbeQueryError @@ -130,6 +131,20 @@ func (c *schemaProbeConn) QueryContext( } } switch { + case strings.Contains(normalized, "information_schema.tables"): + name := "" + if len(args) > 0 { + if v, ok := args[0].Value.(string); ok { + name = v + } + } + if c.state.existingTables[name] { + return &schemaProbeRows{ + columns: []string{"exists"}, + values: [][]driver.Value{{int64(1)}}, + }, nil + } + return &schemaProbeRows{columns: []string{"exists"}}, nil case strings.Contains(normalized, "information_schema.columns"): c.state.mu.Lock() c.state.informationQueries++ @@ -310,6 +325,10 @@ func TestEnsureSchemaChecksDataVersionBeforeDDL(t *testing.T) { func TestSyncEnsureSchemaSkipsDDLWhenSchemaCompatible(t *testing.T) { pg, state := newSchemaProbeDB(t, nil) + state.existingTables = map[string]bool{ + "model_pricing": true, + "cursor_usage_events": true, + } syncer := &Sync{pg: pg, schema: "agentsview"} require.NoError(t, syncer.EnsureSchema(context.Background())) @@ -321,14 +340,40 @@ func TestSyncEnsureSchemaSkipsDDLWhenSchemaCompatible(t *testing.T) { "compatible PG schema must skip index DDL") assert.NotContains(t, executed, "create table", "compatible PG schema must skip table DDL") - assert.NotContains(t, executed, "create schema", - "compatible PG schema must skip schema DDL") assert.Equal(t, 0, state.alterTableExecCount(), "compatible PG schema must not run column migrations") assert.Contains(t, executed, "insert into sync_metadata", "compatible PG schema must still run row-level data repairs") } +func TestSyncEnsureSchemaRunsDDLWhenPushTableMissing(t *testing.T) { + pg, state := newSchemaProbeDB(t, map[string][]string{ + "sessions": { + "has_total_output_tokens", + "has_peak_context_tokens", + }, + "messages": { + "has_context_tokens", + "has_output_tokens", + }, + }) + // Read-compatible, and cursor_usage_events present, but model_pricing + // absent: the read probe passes yet a push would fail on model_pricing, + // so the fast path must fall back to EnsureSchema. + state.existingTables = map[string]bool{ + "cursor_usage_events": true, + } + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Greater(t, state.execCount(), 0, + "missing push-written table must fall back to migration DDL") + assert.Contains(t, strings.ToLower(state.executedSQL()), + "create table", + "fallback must create missing push tables") +} + func TestSyncEnsureSchemaRunsDDLWhenSchemaIncompatible(t *testing.T) { pg, state := newSchemaProbeDB(t, map[string][]string{ "sessions": { diff --git a/internal/postgres/sync.go b/internal/postgres/sync.go index 8de1029cb..74e05bb27 100644 --- a/internal/postgres/sync.go +++ b/internal/postgres/sync.go @@ -280,7 +280,7 @@ func (s *Sync) ensureSchemaLocked(ctx context.Context) error { if err := CheckDataVersionCompat(ctx, s.pg); err != nil { return err } - if err := CheckSchemaCompat(ctx, s.pg); err == nil { + if pushSchemaCurrent(ctx, s.pg) { // Schema DDL is current, so skip the index and column // maintenance that can lock against concurrent pg serve // reads (issue #887). Still run the row-level data repairs From ef5ebe321ead0a94618c44099d5d59a66e4f13a4 Mon Sep 17 00:00:00 2001 From: Martin Wimpress Date: Fri, 26 Jun 2026 22:49:34 +0000 Subject: [PATCH 4/5] fix(agentsview): require cursor dedup index for compatible pg fast path bulkInsertCursorUsageEvents dedups via a targetless ON CONFLICT DO NOTHING, which only suppresses duplicates when the partial unique index idx_cursor_usage_events_dedup exists. The compatible-schema fast path checked tables but not that index, so a schema missing it would silently duplicate cursor usage rows on repeated pushes. Gate pushSchemaCurrent on the index too and fall back to EnsureSchema when it is absent. Signed-off-by: Martin Wimpress --- internal/postgres/schema.go | 29 +++++++++++++++++--- internal/postgres/schema_test.go | 47 ++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/internal/postgres/schema.go b/internal/postgres/schema.go index dc600c747..e4fcca29d 100644 --- a/internal/postgres/schema.go +++ b/internal/postgres/schema.go @@ -1463,6 +1463,18 @@ func pgHasTable(ctx context.Context, db *sql.DB, name string) bool { return err == nil && n == 1 } +// pgHasIndex reports whether an index of the given name exists in the +// current schema. +func pgHasIndex(ctx context.Context, db *sql.DB, name string) bool { + var n int + err := db.QueryRowContext(ctx, + `SELECT 1 FROM pg_indexes + WHERE schemaname = current_schema() AND indexname = $1`, + name, + ).Scan(&n) + return err == nil && n == 1 +} + // required by query paths. This is a read-only probe that works // against any PG role. Returns nil if compatible, or an error // describing what is missing. @@ -1619,14 +1631,23 @@ func CheckSchemaCompat( // needs. CheckSchemaCompat covers the read paths but does not require // model_pricing (always queried by syncModelPricing) or // cursor_usage_events (written by syncCursorUsageEvents), so probe those -// tables explicitly. When either is missing the caller must run -// EnsureSchema so push migrates the schema instead of failing. +// tables explicitly. It also requires the cursor dedup index, which the +// cursor usage insert relies on for ON CONFLICT dedup. When any of these +// is missing the caller must run EnsureSchema so push migrates the schema +// instead of failing or duplicating rows. func pushSchemaCurrent(ctx context.Context, db *sql.DB) bool { if err := CheckSchemaCompat(ctx, db); err != nil { return false } - return pgHasTable(ctx, db, "model_pricing") && - pgHasTable(ctx, db, "cursor_usage_events") + if !pgHasTable(ctx, db, "model_pricing") || + !pgHasTable(ctx, db, "cursor_usage_events") { + return false + } + // bulkInsertCursorUsageEvents dedups via a targetless ON CONFLICT + // DO NOTHING, which only suppresses duplicates when this partial + // unique index exists. Fall back to EnsureSchema when it is missing + // so repeated pushes cannot duplicate cursor usage rows. + return pgHasIndex(ctx, db, "idx_cursor_usage_events_dedup") } // CheckDataVersionCompat rejects PG datasets containing rows written by a diff --git a/internal/postgres/schema_test.go b/internal/postgres/schema_test.go index 37b53113c..2945e4ab3 100644 --- a/internal/postgres/schema_test.go +++ b/internal/postgres/schema_test.go @@ -40,6 +40,7 @@ type schemaProbeState struct { currentSchema string existingColumnNames map[string][]string existingTables map[string]bool + existingIndexes map[string]bool maxDataVersion int maxDataVersionErr error queryErrors []schemaProbeQueryError @@ -145,6 +146,20 @@ func (c *schemaProbeConn) QueryContext( }, nil } return &schemaProbeRows{columns: []string{"exists"}}, nil + case strings.Contains(normalized, "pg_indexes"): + name := "" + if len(args) > 0 { + if v, ok := args[0].Value.(string); ok { + name = v + } + } + if c.state.existingIndexes[name] { + return &schemaProbeRows{ + columns: []string{"exists"}, + values: [][]driver.Value{{int64(1)}}, + }, nil + } + return &schemaProbeRows{columns: []string{"exists"}}, nil case strings.Contains(normalized, "information_schema.columns"): c.state.mu.Lock() c.state.informationQueries++ @@ -329,6 +344,9 @@ func TestSyncEnsureSchemaSkipsDDLWhenSchemaCompatible(t *testing.T) { "model_pricing": true, "cursor_usage_events": true, } + state.existingIndexes = map[string]bool{ + "idx_cursor_usage_events_dedup": true, + } syncer := &Sync{pg: pg, schema: "agentsview"} require.NoError(t, syncer.EnsureSchema(context.Background())) @@ -374,6 +392,35 @@ func TestSyncEnsureSchemaRunsDDLWhenPushTableMissing(t *testing.T) { "fallback must create missing push tables") } +func TestSyncEnsureSchemaRunsDDLWhenDedupIndexMissing(t *testing.T) { + pg, state := newSchemaProbeDB(t, map[string][]string{ + "sessions": { + "has_total_output_tokens", + "has_peak_context_tokens", + }, + "messages": { + "has_context_tokens", + "has_output_tokens", + }, + }) + // Tables present but the cursor dedup unique index is absent, so the + // read probe passes yet ON CONFLICT DO NOTHING would not dedup cursor + // usage rows. The fast path must fall back to EnsureSchema. + state.existingTables = map[string]bool{ + "model_pricing": true, + "cursor_usage_events": true, + } + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Greater(t, state.execCount(), 0, + "missing dedup index must fall back to migration DDL") + assert.Contains(t, strings.ToLower(state.executedSQL()), + "idx_cursor_usage_events_dedup", + "fallback must recreate the cursor dedup index") +} + func TestSyncEnsureSchemaRunsDDLWhenSchemaIncompatible(t *testing.T) { pg, state := newSchemaProbeDB(t, map[string][]string{ "sessions": { From f89cc6697b3e777a2f03b62f7e54c33458a66866 Mon Sep 17 00:00:00 2001 From: Martin Wimpress Date: Sat, 27 Jun 2026 05:54:54 +0000 Subject: [PATCH 5/5] fix(agentsview): keep push-only schema probes out of read compat check CheckSchemaCompat gates both pg serve read-only startup and the push fast path, but it probed sync_metadata and sessions.owner_marker, which only push uses. A read-only serve role or legacy schema that was fine for reads then failed compatibility at startup. Move those two probes into a new checkPushSchemaCompat that pushSchemaCurrent runs, leaving CheckSchemaCompat limited to read-path schema Signed-off-by: Martin Wimpress --- internal/postgres/schema.go | 53 +++++++++++++++++++++----------- internal/postgres/schema_test.go | 47 ++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 18 deletions(-) diff --git a/internal/postgres/schema.go b/internal/postgres/schema.go index e4fcca29d..f23db04b4 100644 --- a/internal/postgres/schema.go +++ b/internal/postgres/schema.go @@ -1482,17 +1482,7 @@ func CheckSchemaCompat( ctx context.Context, db *sql.DB, ) error { rows, err := db.QueryContext(ctx, - `SELECT key, value FROM sync_metadata LIMIT 0`) - if err != nil { - return fmt.Errorf( - "sync_metadata table missing required columns: %w", - err, - ) - } - rows.Close() - - rows, err = db.QueryContext(ctx, - `SELECT owner_marker, updated_at, `+pgSessionCols+` + `SELECT updated_at, `+pgSessionCols+` FROM sessions LIMIT 0`) if err != nil { return fmt.Errorf( @@ -1627,18 +1617,45 @@ func CheckSchemaCompat( return nil } +// checkPushSchemaCompat verifies schema elements that only push needs: +// the sync_metadata table and sessions.owner_marker. pg serve never reads +// these, so they live outside CheckSchemaCompat (which gates read-only +// serve startup) and are checked only on the push fast path. +func checkPushSchemaCompat(ctx context.Context, db *sql.DB) error { + rows, err := db.QueryContext(ctx, + `SELECT key, value FROM sync_metadata LIMIT 0`) + if err != nil { + return fmt.Errorf( + "sync_metadata table missing required columns: %w", err) + } + rows.Close() + + rows, err = db.QueryContext(ctx, + `SELECT owner_marker FROM sessions LIMIT 0`) + if err != nil { + return fmt.Errorf( + "sessions table missing owner_marker: %w", err) + } + rows.Close() + return nil +} + // pushSchemaCurrent reports whether the PG schema has everything a push -// needs. CheckSchemaCompat covers the read paths but does not require -// model_pricing (always queried by syncModelPricing) or -// cursor_usage_events (written by syncCursorUsageEvents), so probe those -// tables explicitly. It also requires the cursor dedup index, which the -// cursor usage insert relies on for ON CONFLICT dedup. When any of these -// is missing the caller must run EnsureSchema so push migrates the schema -// instead of failing or duplicating rows. +// needs. CheckSchemaCompat covers the read paths but does not require the +// push-only sync_metadata table or sessions.owner_marker (verified by +// checkPushSchemaCompat), model_pricing (always queried by syncModelPricing) +// or cursor_usage_events (written by syncCursorUsageEvents), so probe those +// explicitly. It also requires the cursor dedup index, which the cursor usage +// insert relies on for ON CONFLICT dedup. When any of these is missing the +// caller must run EnsureSchema so push migrates the schema instead of failing +// or duplicating rows. func pushSchemaCurrent(ctx context.Context, db *sql.DB) bool { if err := CheckSchemaCompat(ctx, db); err != nil { return false } + if err := checkPushSchemaCompat(ctx, db); err != nil { + return false + } if !pgHasTable(ctx, db, "model_pricing") || !pgHasTable(ctx, db, "cursor_usage_events") { return false diff --git a/internal/postgres/schema_test.go b/internal/postgres/schema_test.go index 2945e4ab3..3e4cc6132 100644 --- a/internal/postgres/schema_test.go +++ b/internal/postgres/schema_test.go @@ -364,6 +364,53 @@ func TestSyncEnsureSchemaSkipsDDLWhenSchemaCompatible(t *testing.T) { "compatible PG schema must still run row-level data repairs") } +func TestCheckSchemaCompatIgnoresPushOnlySchema(t *testing.T) { + pg, state := newSchemaProbeDB(t, nil) + state.queryErrors = []schemaProbeQueryError{ + {contains: "owner_marker", err: errors.New( + `ERROR: column "owner_marker" does not exist (SQLSTATE 42703)`)}, + {contains: "from sync_metadata", err: errors.New( + `ERROR: relation "sync_metadata" does not exist (SQLSTATE 42P01)`)}, + } + + require.NoError(t, CheckSchemaCompat(context.Background(), pg), + "read compatibility must not require push-only schema") +} + +func TestSyncEnsureSchemaRunsDDLWhenPushMetadataMissing(t *testing.T) { + pg, state := newSchemaProbeDB(t, map[string][]string{ + "sessions": { + "has_total_output_tokens", + "has_peak_context_tokens", + }, + "messages": { + "has_context_tokens", + "has_output_tokens", + }, + }) + state.existingTables = map[string]bool{ + "model_pricing": true, + "cursor_usage_events": true, + } + state.existingIndexes = map[string]bool{ + "idx_cursor_usage_events_dedup": true, + } + // Read-compatible with tables and index present, but the push-only + // owner_marker column is absent, so the push fast path must fall back + // to EnsureSchema. + state.queryErrors = []schemaProbeQueryError{{ + contains: "owner_marker", + err: errors.New( + `ERROR: column "owner_marker" does not exist (SQLSTATE 42703)`), + }} + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Greater(t, state.execCount(), 0, + "missing push-only column must fall back to migration DDL") +} + func TestSyncEnsureSchemaRunsDDLWhenPushTableMissing(t *testing.T) { pg, state := newSchemaProbeDB(t, map[string][]string{ "sessions": {