Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions internal/postgres/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2682,13 +2682,8 @@ func (s *Sync) normalizeSyncTimestamps(
) error {
s.schemaMu.Lock()
defer s.schemaMu.Unlock()
if !s.schemaDone {
if err := EnsureSchema(
ctx, s.pg, s.schema,
); err != nil {
return err
}
s.schemaDone = true
if err := s.ensureSchemaLocked(ctx); err != nil {
return err
}
return NormalizeLocalSyncStateTimestamps(s.effectiveSyncState())
}
Expand Down
99 changes: 94 additions & 5 deletions internal/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,28 @@ func backfillIsAutomatedPG(
return nil
}

// runSchemaDataRepairsPG runs the non-DDL correctness repairs that
// EnsureSchema performs: it recomputes is_automated and backfills
// token-coverage flags. These issue only row-level writes, so the
// compatible-schema fast path can run them without the index and
// column DDL that can block concurrent pg serve reads (issue #887).
func runSchemaDataRepairsPG(ctx context.Context, db *sql.DB) error {
if err := backfillIsAutomatedPG(ctx, db); err != nil {
return err
}
runRepair, err := shouldRunTokenCoverageRepair(ctx, db, false)
if err != nil {
return err
}
if !runRepair {
return nil
}
if err := backfillTokenCoverageFlags(ctx, db); err != nil {
return err
}
return markTokenCoverageRepairDone(ctx, db)
}

func batchUpdateAutomatedPG(
ctx context.Context, pg *sql.DB,
ids []string, val bool,
Expand Down Expand Up @@ -1441,16 +1463,26 @@ func pgHasTable(ctx context.Context, db *sql.DB, name string) bool {
return err == nil && n == 1
}

// pgHasIndex reports whether an index of the given name exists in the
// current schema.
func pgHasIndex(ctx context.Context, db *sql.DB, name string) bool {
var n int
err := db.QueryRowContext(ctx,
`SELECT 1 FROM pg_indexes
WHERE schemaname = current_schema() AND indexname = $1`,
name,
).Scan(&n)
return err == nil && n == 1
}

// required by query paths. This is a read-only probe that works
// against any PG role. Returns nil if compatible, or an error
// describing what is missing.
func CheckSchemaCompat(
ctx context.Context, db *sql.DB,
) error {
rows, err := db.QueryContext(ctx,
`SELECT id, created_at, deleted_at, updated_at,
termination_status, secret_leak_count, secrets_rules_version,
session_name
`SELECT updated_at, `+pgSessionCols+`
FROM sessions LIMIT 0`)
if err != nil {
return fmt.Errorf(
Expand All @@ -1471,8 +1503,15 @@ func CheckSchemaCompat(
rows.Close()

rows, err = db.QueryContext(ctx,
`SELECT is_system, model, token_usage, context_tokens,
output_tokens, has_context_tokens, has_output_tokens
`SELECT session_id, ordinal, role, content, thinking_text,
timestamp, has_thinking, has_tool_use,
content_length, is_system, model, token_usage,
context_tokens, output_tokens,
has_context_tokens, has_output_tokens,
claude_message_id, claude_request_id,
source_type, source_subtype, source_uuid,
source_parent_uuid, is_sidechain,
is_compact_boundary
FROM messages LIMIT 0`)
if err != nil {
return fmt.Errorf(
Expand Down Expand Up @@ -1578,6 +1617,56 @@ func CheckSchemaCompat(
return nil
}

// checkPushSchemaCompat verifies schema elements that only push needs:
// the sync_metadata table and sessions.owner_marker. pg serve never reads
// these, so they live outside CheckSchemaCompat (which gates read-only
// serve startup) and are checked only on the push fast path.
func checkPushSchemaCompat(ctx context.Context, db *sql.DB) error {
rows, err := db.QueryContext(ctx,
`SELECT key, value FROM sync_metadata LIMIT 0`)
if err != nil {
return fmt.Errorf(
"sync_metadata table missing required columns: %w", err)
}
rows.Close()

rows, err = db.QueryContext(ctx,
`SELECT owner_marker FROM sessions LIMIT 0`)
if err != nil {
return fmt.Errorf(
"sessions table missing owner_marker: %w", err)
}
rows.Close()
return nil
}

// pushSchemaCurrent reports whether the PG schema has everything a push
// needs. CheckSchemaCompat covers the read paths but does not require the
// push-only sync_metadata table or sessions.owner_marker (verified by
// checkPushSchemaCompat), model_pricing (always queried by syncModelPricing)
// or cursor_usage_events (written by syncCursorUsageEvents), so probe those
// explicitly. It also requires the cursor dedup index, which the cursor usage
// insert relies on for ON CONFLICT dedup. When any of these is missing the
// caller must run EnsureSchema so push migrates the schema instead of failing
// or duplicating rows.
func pushSchemaCurrent(ctx context.Context, db *sql.DB) bool {
if err := CheckSchemaCompat(ctx, db); err != nil {
return false
}
if err := checkPushSchemaCompat(ctx, db); err != nil {
return false
}
if !pgHasTable(ctx, db, "model_pricing") ||
!pgHasTable(ctx, db, "cursor_usage_events") {
return false
}
// bulkInsertCursorUsageEvents dedups via a targetless ON CONFLICT
// DO NOTHING, which only suppresses duplicates when this partial
// unique index exists. Fall back to EnsureSchema when it is missing
// so repeated pushes cannot duplicate cursor usage rows.
return pgHasIndex(ctx, db, "idx_cursor_usage_events_dedup")
}

// CheckDataVersionCompat rejects PG datasets containing rows written by a
// newer agentsview parser. PG does not have SQLite's global user_version, so
// the highest session data_version is the compatibility marker.
Expand Down
199 changes: 199 additions & 0 deletions internal/postgres/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,23 @@ type schemaProbeRows struct {
next int
}

type schemaProbeQueryError struct {
contains string
err error
}

type schemaProbeState struct {
mu sync.Mutex
informationQueries int
execs []string
alterTableExecs []string
currentSchema string
existingColumnNames map[string][]string
existingTables map[string]bool
existingIndexes map[string]bool
maxDataVersion int
maxDataVersionErr error
queryErrors []schemaProbeQueryError
}

var (
Expand Down Expand Up @@ -115,7 +123,43 @@ func (c *schemaProbeConn) QueryContext(
_ context.Context, query string, args []driver.NamedValue,
) (driver.Rows, error) {
normalized := strings.ToLower(query)
for _, queryErr := range c.state.queryErrors {
if strings.Contains(
normalized,
strings.ToLower(queryErr.contains),
) {
return nil, queryErr.err
}
}
switch {
case strings.Contains(normalized, "information_schema.tables"):
name := ""
if len(args) > 0 {
if v, ok := args[0].Value.(string); ok {
name = v
}
}
if c.state.existingTables[name] {
return &schemaProbeRows{
columns: []string{"exists"},
values: [][]driver.Value{{int64(1)}},
}, nil
}
return &schemaProbeRows{columns: []string{"exists"}}, nil
case strings.Contains(normalized, "pg_indexes"):
name := ""
if len(args) > 0 {
if v, ok := args[0].Value.(string); ok {
name = v
}
}
if c.state.existingIndexes[name] {
return &schemaProbeRows{
columns: []string{"exists"},
values: [][]driver.Value{{int64(1)}},
}, nil
}
return &schemaProbeRows{columns: []string{"exists"}}, nil
case strings.Contains(normalized, "information_schema.columns"):
c.state.mu.Lock()
c.state.informationQueries++
Expand Down Expand Up @@ -294,6 +338,161 @@ func TestEnsureSchemaChecksDataVersionBeforeDDL(t *testing.T) {
"EnsureSchema must not mutate PG before data-version refusal")
}

func TestSyncEnsureSchemaSkipsDDLWhenSchemaCompatible(t *testing.T) {
pg, state := newSchemaProbeDB(t, nil)
state.existingTables = map[string]bool{
"model_pricing": true,
"cursor_usage_events": true,
}
state.existingIndexes = map[string]bool{
"idx_cursor_usage_events_dedup": true,
}
syncer := &Sync{pg: pg, schema: "agentsview"}

require.NoError(t, syncer.EnsureSchema(context.Background()))

executed := strings.ToLower(state.executedSQL())
assert.NotContains(t, executed, "create index",
"compatible PG schema must skip index DDL")
assert.NotContains(t, executed, "alter index",
"compatible PG schema must skip index DDL")
assert.NotContains(t, executed, "create table",
"compatible PG schema must skip table DDL")
assert.Equal(t, 0, state.alterTableExecCount(),
"compatible PG schema must not run column migrations")
assert.Contains(t, executed, "insert into sync_metadata",
"compatible PG schema must still run row-level data repairs")
}

func TestCheckSchemaCompatIgnoresPushOnlySchema(t *testing.T) {
pg, state := newSchemaProbeDB(t, nil)
state.queryErrors = []schemaProbeQueryError{
{contains: "owner_marker", err: errors.New(
`ERROR: column "owner_marker" does not exist (SQLSTATE 42703)`)},
{contains: "from sync_metadata", err: errors.New(
`ERROR: relation "sync_metadata" does not exist (SQLSTATE 42P01)`)},
}

require.NoError(t, CheckSchemaCompat(context.Background(), pg),
"read compatibility must not require push-only schema")
}

func TestSyncEnsureSchemaRunsDDLWhenPushMetadataMissing(t *testing.T) {
pg, state := newSchemaProbeDB(t, map[string][]string{
"sessions": {
"has_total_output_tokens",
"has_peak_context_tokens",
},
"messages": {
"has_context_tokens",
"has_output_tokens",
},
})
state.existingTables = map[string]bool{
"model_pricing": true,
"cursor_usage_events": true,
}
state.existingIndexes = map[string]bool{
"idx_cursor_usage_events_dedup": true,
}
// Read-compatible with tables and index present, but the push-only
// owner_marker column is absent, so the push fast path must fall back
// to EnsureSchema.
state.queryErrors = []schemaProbeQueryError{{
contains: "owner_marker",
err: errors.New(
`ERROR: column "owner_marker" does not exist (SQLSTATE 42703)`),
}}
syncer := &Sync{pg: pg, schema: "agentsview"}

require.NoError(t, syncer.EnsureSchema(context.Background()))

assert.Greater(t, state.execCount(), 0,
"missing push-only column must fall back to migration DDL")
}

func TestSyncEnsureSchemaRunsDDLWhenPushTableMissing(t *testing.T) {
pg, state := newSchemaProbeDB(t, map[string][]string{
"sessions": {
"has_total_output_tokens",
"has_peak_context_tokens",
},
"messages": {
"has_context_tokens",
"has_output_tokens",
},
})
// Read-compatible, and cursor_usage_events present, but model_pricing
// absent: the read probe passes yet a push would fail on model_pricing,
// so the fast path must fall back to EnsureSchema.
state.existingTables = map[string]bool{
"cursor_usage_events": true,
}
syncer := &Sync{pg: pg, schema: "agentsview"}

require.NoError(t, syncer.EnsureSchema(context.Background()))

assert.Greater(t, state.execCount(), 0,
"missing push-written table must fall back to migration DDL")
assert.Contains(t, strings.ToLower(state.executedSQL()),
"create table",
"fallback must create missing push tables")
}

func TestSyncEnsureSchemaRunsDDLWhenDedupIndexMissing(t *testing.T) {
pg, state := newSchemaProbeDB(t, map[string][]string{
"sessions": {
"has_total_output_tokens",
"has_peak_context_tokens",
},
"messages": {
"has_context_tokens",
"has_output_tokens",
},
})
// Tables present but the cursor dedup unique index is absent, so the
// read probe passes yet ON CONFLICT DO NOTHING would not dedup cursor
// usage rows. The fast path must fall back to EnsureSchema.
state.existingTables = map[string]bool{
"model_pricing": true,
"cursor_usage_events": true,
}
syncer := &Sync{pg: pg, schema: "agentsview"}

require.NoError(t, syncer.EnsureSchema(context.Background()))

assert.Greater(t, state.execCount(), 0,
"missing dedup index must fall back to migration DDL")
assert.Contains(t, strings.ToLower(state.executedSQL()),
"idx_cursor_usage_events_dedup",
"fallback must recreate the cursor dedup index")
}

func TestSyncEnsureSchemaRunsDDLWhenSchemaIncompatible(t *testing.T) {
pg, state := newSchemaProbeDB(t, map[string][]string{
"sessions": {
"has_total_output_tokens",
"has_peak_context_tokens",
},
"messages": {
"has_context_tokens",
"has_output_tokens",
},
})
state.queryErrors = []schemaProbeQueryError{{
contains: "data_version",
err: errors.New(
`ERROR: column "data_version" does not exist (SQLSTATE 42703)`,
),
}}
syncer := &Sync{pg: pg, schema: "agentsview"}

require.NoError(t, syncer.EnsureSchema(context.Background()))

assert.Greater(t, state.execCount(), 0,
"incompatible PG schema should fall back to migration DDL")
}

func TestEnsureSchemaCreatesAnalyticsCoveringIndexes(t *testing.T) {
db, state := newSchemaProbeDB(t, map[string][]string{
"sessions": {
Expand Down
Loading