diff --git a/internal/postgres/push.go b/internal/postgres/push.go index 60f7b617b..ec260ec93 100644 --- a/internal/postgres/push.go +++ b/internal/postgres/push.go @@ -2682,13 +2682,8 @@ func (s *Sync) normalizeSyncTimestamps( ) error { s.schemaMu.Lock() defer s.schemaMu.Unlock() - if !s.schemaDone { - if err := EnsureSchema( - ctx, s.pg, s.schema, - ); err != nil { - return err - } - s.schemaDone = true + if err := s.ensureSchemaLocked(ctx); err != nil { + return err } return NormalizeLocalSyncStateTimestamps(s.effectiveSyncState()) } diff --git a/internal/postgres/schema.go b/internal/postgres/schema.go index a4e82973c..f23db04b4 100644 --- a/internal/postgres/schema.go +++ b/internal/postgres/schema.go @@ -952,6 +952,28 @@ func backfillIsAutomatedPG( return nil } +// runSchemaDataRepairsPG runs the non-DDL correctness repairs that +// EnsureSchema performs: it recomputes is_automated and backfills +// token-coverage flags. These issue only row-level writes, so the +// compatible-schema fast path can run them without the index and +// column DDL that can block concurrent pg serve reads (issue #887). +func runSchemaDataRepairsPG(ctx context.Context, db *sql.DB) error { + if err := backfillIsAutomatedPG(ctx, db); err != nil { + return err + } + runRepair, err := shouldRunTokenCoverageRepair(ctx, db, false) + if err != nil { + return err + } + if !runRepair { + return nil + } + if err := backfillTokenCoverageFlags(ctx, db); err != nil { + return err + } + return markTokenCoverageRepairDone(ctx, db) +} + func batchUpdateAutomatedPG( ctx context.Context, pg *sql.DB, ids []string, val bool, @@ -1441,6 +1463,18 @@ func pgHasTable(ctx context.Context, db *sql.DB, name string) bool { return err == nil && n == 1 } +// pgHasIndex reports whether an index of the given name exists in the +// current schema. +func pgHasIndex(ctx context.Context, db *sql.DB, name string) bool { + var n int + err := db.QueryRowContext(ctx, + `SELECT 1 FROM pg_indexes + WHERE schemaname = current_schema() AND indexname = $1`, + name, + ).Scan(&n) + return err == nil && n == 1 +} + // required by query paths. This is a read-only probe that works // against any PG role. Returns nil if compatible, or an error // describing what is missing. @@ -1448,9 +1482,7 @@ func CheckSchemaCompat( ctx context.Context, db *sql.DB, ) error { rows, err := db.QueryContext(ctx, - `SELECT id, created_at, deleted_at, updated_at, - termination_status, secret_leak_count, secrets_rules_version, - session_name + `SELECT updated_at, `+pgSessionCols+` FROM sessions LIMIT 0`) if err != nil { return fmt.Errorf( @@ -1471,8 +1503,15 @@ func CheckSchemaCompat( rows.Close() rows, err = db.QueryContext(ctx, - `SELECT is_system, model, token_usage, context_tokens, - output_tokens, has_context_tokens, has_output_tokens + `SELECT session_id, ordinal, role, content, thinking_text, + timestamp, has_thinking, has_tool_use, + content_length, is_system, model, token_usage, + context_tokens, output_tokens, + has_context_tokens, has_output_tokens, + claude_message_id, claude_request_id, + source_type, source_subtype, source_uuid, + source_parent_uuid, is_sidechain, + is_compact_boundary FROM messages LIMIT 0`) if err != nil { return fmt.Errorf( @@ -1578,6 +1617,56 @@ func CheckSchemaCompat( return nil } +// checkPushSchemaCompat verifies schema elements that only push needs: +// the sync_metadata table and sessions.owner_marker. pg serve never reads +// these, so they live outside CheckSchemaCompat (which gates read-only +// serve startup) and are checked only on the push fast path. +func checkPushSchemaCompat(ctx context.Context, db *sql.DB) error { + rows, err := db.QueryContext(ctx, + `SELECT key, value FROM sync_metadata LIMIT 0`) + if err != nil { + return fmt.Errorf( + "sync_metadata table missing required columns: %w", err) + } + rows.Close() + + rows, err = db.QueryContext(ctx, + `SELECT owner_marker FROM sessions LIMIT 0`) + if err != nil { + return fmt.Errorf( + "sessions table missing owner_marker: %w", err) + } + rows.Close() + return nil +} + +// pushSchemaCurrent reports whether the PG schema has everything a push +// needs. CheckSchemaCompat covers the read paths but does not require the +// push-only sync_metadata table or sessions.owner_marker (verified by +// checkPushSchemaCompat), model_pricing (always queried by syncModelPricing) +// or cursor_usage_events (written by syncCursorUsageEvents), so probe those +// explicitly. It also requires the cursor dedup index, which the cursor usage +// insert relies on for ON CONFLICT dedup. When any of these is missing the +// caller must run EnsureSchema so push migrates the schema instead of failing +// or duplicating rows. +func pushSchemaCurrent(ctx context.Context, db *sql.DB) bool { + if err := CheckSchemaCompat(ctx, db); err != nil { + return false + } + if err := checkPushSchemaCompat(ctx, db); err != nil { + return false + } + if !pgHasTable(ctx, db, "model_pricing") || + !pgHasTable(ctx, db, "cursor_usage_events") { + return false + } + // bulkInsertCursorUsageEvents dedups via a targetless ON CONFLICT + // DO NOTHING, which only suppresses duplicates when this partial + // unique index exists. Fall back to EnsureSchema when it is missing + // so repeated pushes cannot duplicate cursor usage rows. + return pgHasIndex(ctx, db, "idx_cursor_usage_events_dedup") +} + // CheckDataVersionCompat rejects PG datasets containing rows written by a // newer agentsview parser. PG does not have SQLite's global user_version, so // the highest session data_version is the compatibility marker. diff --git a/internal/postgres/schema_test.go b/internal/postgres/schema_test.go index 470f33c98..3e4cc6132 100644 --- a/internal/postgres/schema_test.go +++ b/internal/postgres/schema_test.go @@ -27,6 +27,11 @@ type schemaProbeRows struct { next int } +type schemaProbeQueryError struct { + contains string + err error +} + type schemaProbeState struct { mu sync.Mutex informationQueries int @@ -34,8 +39,11 @@ type schemaProbeState struct { alterTableExecs []string currentSchema string existingColumnNames map[string][]string + existingTables map[string]bool + existingIndexes map[string]bool maxDataVersion int maxDataVersionErr error + queryErrors []schemaProbeQueryError } var ( @@ -115,7 +123,43 @@ func (c *schemaProbeConn) QueryContext( _ context.Context, query string, args []driver.NamedValue, ) (driver.Rows, error) { normalized := strings.ToLower(query) + for _, queryErr := range c.state.queryErrors { + if strings.Contains( + normalized, + strings.ToLower(queryErr.contains), + ) { + return nil, queryErr.err + } + } switch { + case strings.Contains(normalized, "information_schema.tables"): + name := "" + if len(args) > 0 { + if v, ok := args[0].Value.(string); ok { + name = v + } + } + if c.state.existingTables[name] { + return &schemaProbeRows{ + columns: []string{"exists"}, + values: [][]driver.Value{{int64(1)}}, + }, nil + } + return &schemaProbeRows{columns: []string{"exists"}}, nil + case strings.Contains(normalized, "pg_indexes"): + name := "" + if len(args) > 0 { + if v, ok := args[0].Value.(string); ok { + name = v + } + } + if c.state.existingIndexes[name] { + return &schemaProbeRows{ + columns: []string{"exists"}, + values: [][]driver.Value{{int64(1)}}, + }, nil + } + return &schemaProbeRows{columns: []string{"exists"}}, nil case strings.Contains(normalized, "information_schema.columns"): c.state.mu.Lock() c.state.informationQueries++ @@ -294,6 +338,161 @@ func TestEnsureSchemaChecksDataVersionBeforeDDL(t *testing.T) { "EnsureSchema must not mutate PG before data-version refusal") } +func TestSyncEnsureSchemaSkipsDDLWhenSchemaCompatible(t *testing.T) { + pg, state := newSchemaProbeDB(t, nil) + state.existingTables = map[string]bool{ + "model_pricing": true, + "cursor_usage_events": true, + } + state.existingIndexes = map[string]bool{ + "idx_cursor_usage_events_dedup": true, + } + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + executed := strings.ToLower(state.executedSQL()) + assert.NotContains(t, executed, "create index", + "compatible PG schema must skip index DDL") + assert.NotContains(t, executed, "alter index", + "compatible PG schema must skip index DDL") + assert.NotContains(t, executed, "create table", + "compatible PG schema must skip table DDL") + assert.Equal(t, 0, state.alterTableExecCount(), + "compatible PG schema must not run column migrations") + assert.Contains(t, executed, "insert into sync_metadata", + "compatible PG schema must still run row-level data repairs") +} + +func TestCheckSchemaCompatIgnoresPushOnlySchema(t *testing.T) { + pg, state := newSchemaProbeDB(t, nil) + state.queryErrors = []schemaProbeQueryError{ + {contains: "owner_marker", err: errors.New( + `ERROR: column "owner_marker" does not exist (SQLSTATE 42703)`)}, + {contains: "from sync_metadata", err: errors.New( + `ERROR: relation "sync_metadata" does not exist (SQLSTATE 42P01)`)}, + } + + require.NoError(t, CheckSchemaCompat(context.Background(), pg), + "read compatibility must not require push-only schema") +} + +func TestSyncEnsureSchemaRunsDDLWhenPushMetadataMissing(t *testing.T) { + pg, state := newSchemaProbeDB(t, map[string][]string{ + "sessions": { + "has_total_output_tokens", + "has_peak_context_tokens", + }, + "messages": { + "has_context_tokens", + "has_output_tokens", + }, + }) + state.existingTables = map[string]bool{ + "model_pricing": true, + "cursor_usage_events": true, + } + state.existingIndexes = map[string]bool{ + "idx_cursor_usage_events_dedup": true, + } + // Read-compatible with tables and index present, but the push-only + // owner_marker column is absent, so the push fast path must fall back + // to EnsureSchema. + state.queryErrors = []schemaProbeQueryError{{ + contains: "owner_marker", + err: errors.New( + `ERROR: column "owner_marker" does not exist (SQLSTATE 42703)`), + }} + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Greater(t, state.execCount(), 0, + "missing push-only column must fall back to migration DDL") +} + +func TestSyncEnsureSchemaRunsDDLWhenPushTableMissing(t *testing.T) { + pg, state := newSchemaProbeDB(t, map[string][]string{ + "sessions": { + "has_total_output_tokens", + "has_peak_context_tokens", + }, + "messages": { + "has_context_tokens", + "has_output_tokens", + }, + }) + // Read-compatible, and cursor_usage_events present, but model_pricing + // absent: the read probe passes yet a push would fail on model_pricing, + // so the fast path must fall back to EnsureSchema. + state.existingTables = map[string]bool{ + "cursor_usage_events": true, + } + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Greater(t, state.execCount(), 0, + "missing push-written table must fall back to migration DDL") + assert.Contains(t, strings.ToLower(state.executedSQL()), + "create table", + "fallback must create missing push tables") +} + +func TestSyncEnsureSchemaRunsDDLWhenDedupIndexMissing(t *testing.T) { + pg, state := newSchemaProbeDB(t, map[string][]string{ + "sessions": { + "has_total_output_tokens", + "has_peak_context_tokens", + }, + "messages": { + "has_context_tokens", + "has_output_tokens", + }, + }) + // Tables present but the cursor dedup unique index is absent, so the + // read probe passes yet ON CONFLICT DO NOTHING would not dedup cursor + // usage rows. The fast path must fall back to EnsureSchema. + state.existingTables = map[string]bool{ + "model_pricing": true, + "cursor_usage_events": true, + } + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Greater(t, state.execCount(), 0, + "missing dedup index must fall back to migration DDL") + assert.Contains(t, strings.ToLower(state.executedSQL()), + "idx_cursor_usage_events_dedup", + "fallback must recreate the cursor dedup index") +} + +func TestSyncEnsureSchemaRunsDDLWhenSchemaIncompatible(t *testing.T) { + pg, state := newSchemaProbeDB(t, map[string][]string{ + "sessions": { + "has_total_output_tokens", + "has_peak_context_tokens", + }, + "messages": { + "has_context_tokens", + "has_output_tokens", + }, + }) + state.queryErrors = []schemaProbeQueryError{{ + contains: "data_version", + err: errors.New( + `ERROR: column "data_version" does not exist (SQLSTATE 42703)`, + ), + }} + syncer := &Sync{pg: pg, schema: "agentsview"} + + require.NoError(t, syncer.EnsureSchema(context.Background())) + + assert.Greater(t, state.execCount(), 0, + "incompatible PG schema should fall back to migration DDL") +} + func TestEnsureSchemaCreatesAnalyticsCoveringIndexes(t *testing.T) { db, state := newSchemaProbeDB(t, map[string][]string{ "sessions": { diff --git a/internal/postgres/sync.go b/internal/postgres/sync.go index 86304d11d..74e05bb27 100644 --- a/internal/postgres/sync.go +++ b/internal/postgres/sync.go @@ -270,9 +270,28 @@ func (s *Sync) Close() error { func (s *Sync) EnsureSchema(ctx context.Context) error { s.schemaMu.Lock() defer s.schemaMu.Unlock() + return s.ensureSchemaLocked(ctx) +} + +func (s *Sync) ensureSchemaLocked(ctx context.Context) error { if s.schemaDone { return nil } + if err := CheckDataVersionCompat(ctx, s.pg); err != nil { + return err + } + if pushSchemaCurrent(ctx, s.pg) { + // Schema DDL is current, so skip the index and column + // maintenance that can lock against concurrent pg serve + // reads (issue #887). Still run the row-level data repairs + // so is_automated and token-coverage flags stay correct on + // existing rows. + if err := runSchemaDataRepairsPG(ctx, s.pg); err != nil { + return err + } + s.schemaDone = true + return nil + } if err := EnsureSchema(ctx, s.pg, s.schema); err != nil { return err }