diff --git a/internal/parser/cortex.go b/internal/parser/cortex.go index 03ddf537d..c8dce0331 100644 --- a/internal/parser/cortex.go +++ b/internal/parser/cortex.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "os" - "path/filepath" "regexp" "strings" "time" @@ -335,11 +334,11 @@ func parseCortexTimestamps(_ string) map[string]time.Time { return make(map[string]time.Time) } -// ParseCortexSession parses a Cortex session from its .json metadata -// file. If the file contains an embedded "history" array, it is used -// directly. If no history is embedded (the split-file format), the -// companion .history.jsonl file is read instead. -func ParseCortexSession( +// parseSession parses a Cortex session from its .json metadata file. If the +// file contains an embedded "history" array, it is used directly. If no history +// is embedded (the split-file format), the companion .history.jsonl file is +// read instead. +func (p *cortexProvider) parseSession( path, machine string, ) (*ParsedSession, []ParsedMessage, error) { info, err := os.Stat(path) @@ -511,59 +510,3 @@ func IsCortexSessionFile(name string) bool { stem := strings.TrimSuffix(name, ".json") return IsValidSessionID(stem) } - -// DiscoverCortexSessions finds all primary session metadata files -// in the Cortex conversations directory (~/.snowflake/cortex/conversations). -// Backup files (*.back.*.json) are silently skipped. Both embedded-history -// sessions (.json with a "history" key) and split sessions -// (.json + .history.jsonl) are returned as a single entry -// pointing to the .json metadata file. -func DiscoverCortexSessions( - conversationsDir string, -) []DiscoveredFile { - if conversationsDir == "" { - return nil - } - - entries, err := os.ReadDir(conversationsDir) - if err != nil { - return nil - } - - var files []DiscoveredFile - for _, entry := range entries { - if entry.IsDir() { - continue - } - name := entry.Name() - if !IsCortexSessionFile(name) { - continue - } - files = append(files, DiscoveredFile{ - Path: filepath.Join(conversationsDir, name), - Agent: AgentCortex, - }) - } - - return files -} - -// FindCortexSourceFile locates a Cortex session file by UUID. Accepts -// both the raw UUID and the prefixed "cortex:" form. Returns the -// path to the .json metadata file if found, otherwise "". -func FindCortexSourceFile( - conversationsDir, sessionID string, -) string { - // Strip "cortex:" prefix before validation — callers may - // pass the full prefixed ID. - sessionID = strings.TrimPrefix(sessionID, "cortex:") - if conversationsDir == "" || !IsValidSessionID(sessionID) { - return "" - } - - candidate := filepath.Join(conversationsDir, sessionID+".json") - if _, err := os.Stat(candidate); err == nil { - return candidate - } - return "" -} diff --git a/internal/parser/cortex_provider.go b/internal/parser/cortex_provider.go new file mode 100644 index 000000000..d201f2761 --- /dev/null +++ b/internal/parser/cortex_provider.go @@ -0,0 +1,301 @@ +package parser + +import ( + "context" + "crypto/sha256" + "fmt" + "os" + "path/filepath" + "strings" +) + +var _ Provider = (*cortexProvider)(nil) + +type cortexProviderFactory struct { + def AgentDef +} + +func newCortexProviderFactory(def AgentDef) ProviderFactory { + return cortexProviderFactory{def: cloneAgentDef(def)} +} + +func (f cortexProviderFactory) Definition() AgentDef { + return cloneAgentDef(f.def) +} + +func (f cortexProviderFactory) Capabilities() Capabilities { + return cortexProviderCapabilities() +} + +func (f cortexProviderFactory) NewProvider(cfg ProviderConfig) Provider { + cfg = cfg.Clone() + return &cortexProvider{ + ProviderBase: ProviderBase{ + Def: cloneAgentDef(f.def), + Caps: cortexProviderCapabilities(), + Config: cfg, + }, + sources: newCortexSourceSet(cfg.Roots), + } +} + +type cortexProvider struct { + ProviderBase + sources JSONLSourceSet +} + +func (p *cortexProvider) Discover(ctx context.Context) ([]SourceRef, error) { + return p.sources.Discover(ctx) +} + +func (p *cortexProvider) WatchPlan(ctx context.Context) (WatchPlan, error) { + plan, err := p.sources.WatchPlan(ctx) + if err != nil { + return WatchPlan{}, err + } + for i := range plan.Roots { + plan.Roots[i].IncludeGlobs = append( + plan.Roots[i].IncludeGlobs, + "*.history.jsonl", + ) + } + return plan, nil +} + +func (p *cortexProvider) SourcesForChangedPath( + ctx context.Context, + req ChangedPathRequest, +) ([]SourceRef, error) { + sources, err := p.sources.SourcesForChangedPath(ctx, req) + if err != nil || len(sources) > 0 { + return sources, err + } + if source, ok, err := p.sourceForHistoryCompanion(ctx, req); err != nil { + return nil, err + } else if ok { + return []SourceRef{source}, nil + } + return nil, nil +} + +func (p *cortexProvider) FindSource( + ctx context.Context, + req FindSourceRequest, +) (SourceRef, bool, error) { + return p.sources.FindSource(ctx, providerFindRequestWithRawSessionID(p.Def, req)) +} + +func (p *cortexProvider) Fingerprint( + ctx context.Context, + source SourceRef, +) (SourceFingerprint, error) { + if err := ctx.Err(); err != nil { + return SourceFingerprint{}, err + } + path, ok, err := p.sources.pathFromSource(ctx, source) + if err != nil { + return SourceFingerprint{}, err + } + if !ok { + return SourceFingerprint{}, fmt.Errorf("cortex source path unavailable") + } + info, err := os.Stat(path) + if err != nil { + return SourceFingerprint{}, fmt.Errorf("stat %s: %w", path, err) + } + if info.IsDir() { + return SourceFingerprint{}, fmt.Errorf("stat %s: source is a directory", path) + } + fingerprint := SourceFingerprint{ + Key: firstNonEmptyJSONLString( + source.FingerprintKey, + source.Key, + path, + ), + Size: info.Size(), + MTimeNS: info.ModTime().UnixNano(), + } + + h := sha256.New() + if err := addCortexFingerprintPart(h, "metadata", path, info); err != nil { + return SourceFingerprint{}, err + } + historyPath := cortexHistoryCompanionPath(path) + if historyInfo, ok, err := cortexCompanionInfo(historyPath); err != nil { + return SourceFingerprint{}, err + } else if ok && historyInfo != nil { + fingerprint.Size += historyInfo.Size() + mtime := historyInfo.ModTime().UnixNano() + if mtime > fingerprint.MTimeNS { + fingerprint.MTimeNS = mtime + } + if err := addCortexFingerprintPart(h, "history", historyPath, historyInfo); err != nil { + return SourceFingerprint{}, err + } + } + fingerprint.Hash = fmt.Sprintf("%x", h.Sum(nil)) + return fingerprint, nil +} + +func (p *cortexProvider) Parse( + ctx context.Context, + req ParseRequest, +) (ParseOutcome, error) { + if err := ctx.Err(); err != nil { + return ParseOutcome{}, err + } + path, ok, err := p.sources.pathFromSource(ctx, req.Source) + if err != nil { + return ParseOutcome{}, err + } + if !ok { + return ParseOutcome{}, fmt.Errorf("cortex source path unavailable") + } + machine := firstNonEmptyJSONLString(req.Machine, p.Config.Machine) + sess, msgs, err := p.parseSession(path, machine) + if err != nil { + return ParseOutcome{}, err + } + if sess == nil { + return ParseOutcome{ + ResultSetComplete: true, + SkipReason: SkipNoSession, + }, nil + } + if req.Fingerprint.Hash != "" { + sess.File.Hash = req.Fingerprint.Hash + } + return ParseOutcome{ + Results: []ParseResultOutcome{{ + Result: ParseResult{ + Session: *sess, + Messages: msgs, + }, + DataVersion: DataVersionCurrent, + }}, + ResultSetComplete: true, + }, nil +} + +func newCortexSourceSet(roots []string) JSONLSourceSet { + return newJSONLSourceSet(AgentCortex, roots, + withExtensions(".json"), + withFollowSymlinkFiles(), + withIncludePath(isCortexSourcePath), + withSessionIDFromPath(cortexSessionIDFromPath), + withProjectHint(func(root, path string) string { return "" }), + ) +} + +func (p *cortexProvider) sourceForHistoryCompanion( + ctx context.Context, + req ChangedPathRequest, +) (SourceRef, bool, error) { + if req.Path == "" { + return SourceRef{}, false, nil + } + path := filepath.Clean(req.Path) + for _, root := range p.sources.roots { + if req.WatchRoot != "" && !samePath(req.WatchRoot, root) { + continue + } + source, ok, err := cortexSourceForHistoryCompanion(ctx, p.sources, root, path) + if err != nil { + return SourceRef{}, false, err + } + if ok { + return source, true, nil + } + } + return SourceRef{}, false, nil +} + +func cortexSourceForHistoryCompanion( + ctx context.Context, + sources JSONLSourceSet, + root string, + path string, +) (SourceRef, bool, error) { + root = filepath.Clean(root) + if !samePath(filepath.Dir(path), root) { + return SourceRef{}, false, nil + } + stem, ok := strings.CutSuffix(filepath.Base(path), ".history.jsonl") + if !ok || !IsCortexSessionFile(stem+".json") { + return SourceRef{}, false, nil + } + metadataPath := filepath.Join(root, stem+".json") + if source, ok, err := sources.sourceForPath(ctx, metadataPath); err != nil { + return SourceRef{}, false, err + } else if ok { + return source, true, nil + } + return SourceRef{}, false, nil +} + +func isCortexSourcePath(root, path string) bool { + if !samePath(filepath.Dir(path), filepath.Clean(root)) { + return false + } + return IsCortexSessionFile(filepath.Base(path)) +} + +func cortexSessionIDFromPath(root, path string) string { + if !isCortexSourcePath(root, path) { + return "" + } + return strings.TrimSuffix(filepath.Base(path), ".json") +} + +func cortexHistoryCompanionPath(path string) string { + return strings.TrimSuffix(path, ".json") + ".history.jsonl" +} + +func cortexCompanionInfo(path string) (os.FileInfo, bool, error) { + info, err := os.Stat(path) + if os.IsNotExist(err) { + return nil, false, nil + } + if err != nil { + return nil, false, fmt.Errorf("stat %s: %w", path, err) + } + if info.IsDir() { + return nil, false, nil + } + return info, true, nil +} + +func addCortexFingerprintPart( + h interface{ Write([]byte) (int, error) }, + label string, + path string, + info os.FileInfo, +) error { + hash, err := hashJSONLSourceFile(path) + if err != nil { + return err + } + _, _ = fmt.Fprintf( + h, + "%s:%s:%d:%d:%s\n", + label, + filepath.Base(path), + info.Size(), + info.ModTime().UnixNano(), + hash, + ) + return nil +} + +func cortexProviderCapabilities() Capabilities { + return Capabilities{ + Source: jsonlFileProviderSourceCapabilities(), + Content: ContentCapabilities{ + FirstMessage: CapabilitySupported, + SessionName: CapabilitySupported, + Cwd: CapabilitySupported, + ToolCalls: CapabilitySupported, + ToolResults: CapabilitySupported, + }, + } +} diff --git a/internal/parser/cortex_provider_test.go b/internal/parser/cortex_provider_test.go new file mode 100644 index 000000000..3d89c137c --- /dev/null +++ b/internal/parser/cortex_provider_test.go @@ -0,0 +1,218 @@ +package parser + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCortexProviderFactoryReplacesLegacyAdapter(t *testing.T) { + factory, ok := ProviderFactoryByType(AgentCortex) + require.True(t, ok) + require.NotNil(t, factory) + + caps := factory.Capabilities() + assert.Equal(t, CapabilitySupported, caps.Source.DiscoverSources) + assert.Equal(t, CapabilitySupported, caps.Source.WatchSources) + assert.Equal(t, CapabilitySupported, caps.Source.ClassifyChangedPath) + assert.Equal(t, CapabilitySupported, caps.Source.FindSource) + assert.Equal(t, CapabilitySupported, caps.Source.CompositeFingerprint) + assert.Equal(t, CapabilitySupported, caps.Content.FirstMessage) + assert.Equal(t, CapabilitySupported, caps.Content.SessionName) + assert.Equal(t, CapabilitySupported, caps.Content.Cwd) + assert.Equal(t, CapabilitySupported, caps.Content.ToolCalls) + assert.Equal(t, CapabilitySupported, caps.Content.ToolResults) + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{t.TempDir()}, + Machine: "devbox", + }) + require.True(t, ok) + require.NotNil(t, provider) +} + +func TestCortexProviderSourceMethods(t *testing.T) { + root := t.TempDir() + otherID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + sourcePath := filepath.Join(root, cortexTestUUID+".json") + otherPath := filepath.Join(root, otherID+".json") + writeSourceFile(t, sourcePath, minimalCortexSession(cortexTestUUID)) + writeSourceFile(t, otherPath, minimalCortexSession(otherID)) + writeSourceFile(t, filepath.Join(root, cortexTestUUID+".history.jsonl"), "{}\n") + writeSourceFile(t, filepath.Join(root, cortexTestUUID+".back.123.json"), "{}\n") + writeSourceFile(t, filepath.Join(root, "has spaces.json"), "{}\n") + writeSourceFile(t, filepath.Join(root, "nested", cortexTestUUID+".json"), "{}\n") + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + + discovered, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, discovered, 2) + assert.Equal(t, []string{sourcePath, otherPath}, sourceDisplayPaths(discovered)) + assert.Equal(t, []string{"", ""}, sourceProjects(discovered)) + + plan, err := provider.WatchPlan(context.Background()) + require.NoError(t, err) + require.Len(t, plan.Roots, 1) + assert.Equal(t, root, plan.Roots[0].Path) + assert.False(t, plan.Roots[0].Recursive) + assert.Equal(t, []string{"*.json", "*.history.jsonl"}, plan.Roots[0].IncludeGlobs) + + found, ok, err := provider.FindSource(context.Background(), FindSourceRequest{ + FullSessionID: "host~cortex:" + cortexTestUUID, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, sourcePath, found.DisplayPath) + + fingerprint, err := provider.Fingerprint(context.Background(), found) + require.NoError(t, err) + assert.Equal(t, sourcePath, fingerprint.Key) + assert.NotZero(t, fingerprint.Size) + assert.NotZero(t, fingerprint.MTimeNS) + + found, ok, err = provider.FindSource(context.Background(), FindSourceRequest{ + StoredFilePath: otherPath, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, otherPath, found.DisplayPath) + + _, ok, err = provider.FindSource(context.Background(), FindSourceRequest{ + RawSessionID: "../" + cortexTestUUID, + }) + require.NoError(t, err) + assert.False(t, ok) + + require.NoError(t, os.Remove(sourcePath)) + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: sourcePath, EventKind: "remove", WatchRoot: root}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) +} + +func TestCortexProviderClassifiesAndFingerprintsHistoryCompanion(t *testing.T) { + root := t.TempDir() + sourcePath := filepath.Join(root, cortexTestUUID+".json") + historyPath := filepath.Join(root, cortexTestUUID+".history.jsonl") + writeSourceFile(t, sourcePath, `{ + "session_id":"`+cortexTestUUID+`", + "working_directory":"/home/user/project" + }`) + writeSourceFile( + t, + historyPath, + `{"role":"user","id":"m1","content":[{"type":"text","text":"from history"}]}`+"\n", + ) + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: historyPath, EventKind: "write", WatchRoot: root}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) + assert.Equal(t, sourcePath, changed[0].FingerprintKey) + + before, err := provider.Fingerprint(context.Background(), changed[0]) + require.NoError(t, err) + assert.Equal(t, sourcePath, before.Key) + assert.NotEmpty(t, before.Hash) + + writeSourceFile( + t, + historyPath, + `{"role":"user","id":"m1","content":[{"type":"text","text":"updated history"}]}`+"\n", + ) + after, err := provider.Fingerprint(context.Background(), changed[0]) + require.NoError(t, err) + assert.Equal(t, sourcePath, after.Key) + assert.NotEqual(t, before.Hash, after.Hash) + assert.NotEqual(t, before.Size, after.Size) + + require.NoError(t, os.Remove(historyPath)) + changed, err = provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: historyPath, EventKind: "remove", WatchRoot: root}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) +} + +func TestCortexProviderSourceMethodsFollowSymlinkedSessionFile(t *testing.T) { + root := t.TempDir() + targetRoot := t.TempDir() + sourcePath := filepath.Join(root, cortexTestUUID+".json") + targetPath := filepath.Join(targetRoot, cortexTestUUID+".json") + writeSourceFile(t, targetPath, minimalCortexSession(cortexTestUUID)) + if err := os.Symlink(targetPath, sourcePath); err != nil { + t.Skipf("symlink not supported: %v", err) + } + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + + discovered, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, discovered, 1) + assert.Equal(t, sourcePath, discovered[0].DisplayPath) + + found, ok, err := provider.FindSource(context.Background(), FindSourceRequest{ + FullSessionID: "host~cortex:" + cortexTestUUID, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, sourcePath, found.DisplayPath) +} + +func TestCortexProviderParse(t *testing.T) { + root := t.TempDir() + sourcePath := filepath.Join(root, cortexTestUUID+".json") + writeSourceFile(t, sourcePath, minimalCortexSession(cortexTestUUID)) + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + sources, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, sources, 1) + + outcome, err := provider.Parse(context.Background(), ParseRequest{ + Source: sources[0], + Fingerprint: SourceFingerprint{Key: sourcePath, Hash: "abc123"}, + }) + require.NoError(t, err) + require.True(t, outcome.ResultSetComplete) + require.Len(t, outcome.Results, 1) + result := outcome.Results[0] + assert.Equal(t, DataVersionCurrent, result.DataVersion) + assert.Equal(t, "cortex:"+cortexTestUUID, result.Result.Session.ID) + assert.Equal(t, "project", result.Result.Session.Project) + assert.Equal(t, "devbox", result.Result.Session.Machine) + assert.Equal(t, "abc123", result.Result.Session.File.Hash) + assert.Equal(t, "Test session", result.Result.Session.SessionName) + assert.Len(t, result.Result.Messages, 2) +} diff --git a/internal/parser/cortex_test.go b/internal/parser/cortex_test.go index a1191619f..061038a61 100644 --- a/internal/parser/cortex_test.go +++ b/internal/parser/cortex_test.go @@ -1,6 +1,7 @@ package parser import ( + "context" "os" "path/filepath" "runtime" @@ -37,11 +38,23 @@ func minimalCortexSession(sessionID string) string { }` } +func parseCortexSessionForTest( + t *testing.T, + path, machine string, +) (*ParsedSession, []ParsedMessage, error) { + t.Helper() + provider, ok := NewProvider(AgentCortex, ProviderConfig{Machine: machine}) + require.True(t, ok) + cortex, ok := provider.(*cortexProvider) + require.True(t, ok) + return cortex.parseSession(path, machine) +} + func TestParseCortexSession_Basic(t *testing.T) { content := minimalCortexSession(cortexTestUUID) path := createTestFile(t, cortexTestUUID+".json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -61,7 +74,7 @@ func TestParseCortexSession_EmptySessionID(t *testing.T) { content := `{"session_id": "", "history": []}` path := createTestFile(t, "empty.json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) assert.Nil(t, sess) assert.Nil(t, msgs) @@ -92,7 +105,7 @@ func TestParseCortexSession_SkipsInternalBlocks(t *testing.T) { }` path := createTestFile(t, cortexTestUUID+".json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -137,7 +150,7 @@ func TestParseCortexSession_ToolUse(t *testing.T) { }` path := createTestFile(t, cortexTestUUID+".json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -177,7 +190,7 @@ func TestParseCortexSession_SplitHistoryJSONL(t *testing.T) { histPath := filepath.Join(dir, uuid+".history.jsonl") require.NoError(t, os.WriteFile(histPath, []byte(lines), 0o644)) - sess, msgs, err := ParseCortexSession(metaPath, "local") + sess, msgs, err := parseCortexSessionForTest(t, metaPath, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -210,7 +223,7 @@ func TestParseCortexSession_SplitHistoryReadError(t *testing.T) { require.NoError(t, os.Chmod(histPath, 0o000)) t.Cleanup(func() { os.Chmod(histPath, 0o644) }) - _, _, err := ParseCortexSession(metaPath, "local") + _, _, err := parseCortexSessionForTest(t, metaPath, "local") require.Error(t, err, "non-ENOENT read error should propagate") assert.Contains(t, err.Error(), "read history") } @@ -229,7 +242,7 @@ func TestParseCortexSession_SplitHistoryMissing(t *testing.T) { metaPath := filepath.Join(dir, uuid+".json") require.NoError(t, os.WriteFile(metaPath, []byte(meta), 0o644)) - sess, msgs, err := ParseCortexSession(metaPath, "local") + sess, msgs, err := parseCortexSessionForTest(t, metaPath, "local") require.NoError(t, err) assert.Nil(t, sess, "missing JSONL should silently skip") assert.Nil(t, msgs) @@ -260,7 +273,7 @@ func TestParseCortexSession_FirstUserTurnSystemOnly(t *testing.T) { }` path := createTestFile(t, cortexTestUUID+".json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -318,16 +331,25 @@ func TestDiscoverCortexSessions(t *testing.T) { filepath.Join(dir, name), []byte(""), 0o644)) } - files := DiscoverCortexSessions(dir) - require.Len(t, files, 2) - for _, f := range files { - assert.Equal(t, AgentCortex, f.Agent) - } + provider, ok := NewProvider(AgentCortex, ProviderConfig{Roots: []string{dir}}) + require.True(t, ok) + sources, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, sources, 2) + assert.Equal(t, []string{ + filepath.Join(dir, cortexTestUUID+".json"), + filepath.Join(dir, uuid2+".json"), + }, sourceDisplayPaths(sources)) } func TestDiscoverCortexSessions_EmptyDir(t *testing.T) { - assert.Nil(t, DiscoverCortexSessions("")) - assert.Nil(t, DiscoverCortexSessions("/nonexistent")) + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{"", "/nonexistent"}, + }) + require.True(t, ok) + sources, err := provider.Discover(context.Background()) + require.NoError(t, err) + assert.Empty(t, sources) } func TestFindCortexSourceFile(t *testing.T) { @@ -349,8 +371,21 @@ func TestFindCortexSourceFile(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := FindCortexSourceFile(tt.dir, tt.sessionID) - assert.Equal(t, tt.want, got) + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{tt.dir}, + }) + require.True(t, ok) + source, ok, err := provider.FindSource( + context.Background(), + FindSourceRequest{RawSessionID: tt.sessionID}, + ) + require.NoError(t, err) + if tt.want == "" { + assert.False(t, ok) + return + } + require.True(t, ok) + assert.Equal(t, tt.want, source.DisplayPath) }) } } diff --git a/internal/parser/provider.go b/internal/parser/provider.go index c2a56bd04..4b2d7e76a 100644 --- a/internal/parser/provider.go +++ b/internal/parser/provider.go @@ -351,6 +351,8 @@ func providerFactoryForDef(def AgentDef) ProviderFactory { return newAmpProviderFactory(def) case AgentCommandCode: return newCommandCodeProviderFactory(def) + case AgentCortex: + return newCortexProviderFactory(def) case AgentDeepSeekTUI: return newDeepSeekTUIProviderFactory(def) case AgentIflow: diff --git a/internal/parser/provider_migration.go b/internal/parser/provider_migration.go index aa1b8e323..4a1164d78 100644 --- a/internal/parser/provider_migration.go +++ b/internal/parser/provider_migration.go @@ -43,7 +43,7 @@ var providerMigrationModes = map[AgentType]ProviderMigrationMode{ AgentChatGPT: ProviderMigrationLegacyOnly, AgentKiro: ProviderMigrationLegacyOnly, AgentKiroIDE: ProviderMigrationLegacyOnly, - AgentCortex: ProviderMigrationLegacyOnly, + AgentCortex: ProviderMigrationProviderAuthoritative, AgentHermes: ProviderMigrationLegacyOnly, AgentWorkBuddy: ProviderMigrationProviderAuthoritative, AgentForge: ProviderMigrationLegacyOnly, diff --git a/internal/parser/types.go b/internal/parser/types.go index 3b4fc8997..888bfba7e 100644 --- a/internal/parser/types.go +++ b/internal/parser/types.go @@ -439,10 +439,8 @@ var Registry = []AgentDef{ DefaultDirs: []string{ ".snowflake/cortex/conversations", }, - IDPrefix: "cortex:", - FileBased: true, - DiscoverFunc: DiscoverCortexSessions, - FindSourceFunc: FindCortexSourceFile, + IDPrefix: "cortex:", + FileBased: true, }, { Type: AgentHermes, diff --git a/internal/sync/classify_cortex_test.go b/internal/sync/classify_cortex_test.go index 68e2f64d7..6f3ddc9da 100644 --- a/internal/sync/classify_cortex_test.go +++ b/internal/sync/classify_cortex_test.go @@ -21,11 +21,15 @@ func TestClassifyOnePath_Cortex(t *testing.T) { require.NoError(t, os.WriteFile(jsonlPath, []byte("{}"), 0o644)) eng := &Engine{ + db: openTestDB(t), agentDirs: map[parser.AgentType][]string{ parser.AgentCortex: {dir}, }, + providerFactories: providerFactoryMap(parser.ProviderFactories()), + providerMigrationModes: map[parser.AgentType]parser.ProviderMigrationMode{ + parser.AgentCortex: parser.ProviderMigrationProviderAuthoritative, + }, } - geminiMap := make(map[string]map[string]string) tests := []struct { name string @@ -71,9 +75,14 @@ func TestClassifyOnePath_Cortex(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, ok := eng.classifyOnePath(tt.path, geminiMap) - assert.Equal(t, tt.want, ok) - if ok { + files := eng.classifyPaths([]string{tt.path}) + if !tt.want { + assert.Empty(t, files) + return + } + require.Len(t, files, 1) + got := files[0] + if tt.want { assert.Equal(t, tt.agent, got.Agent) assert.Equal(t, tt.retPath, got.Path) } diff --git a/internal/sync/cortex_integration_test.go b/internal/sync/cortex_integration_test.go new file mode 100644 index 000000000..abbe36ee4 --- /dev/null +++ b/internal/sync/cortex_integration_test.go @@ -0,0 +1,70 @@ +package sync_test + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.kenn.io/agentsview/internal/dbtest" + "go.kenn.io/agentsview/internal/parser" + "go.kenn.io/agentsview/internal/sync" +) + +func TestSyncAllSinceCortexHistoryUpdateTriggersResync(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + cortexDir := t.TempDir() + testDB := dbtest.OpenTestDB(t) + engine := sync.NewEngine(testDB, sync.EngineConfig{ + AgentDirs: map[parser.AgentType][]string{ + parser.AgentCortex: {cortexDir}, + }, + Machine: "local", + }) + + uuid := "11111111-2222-3333-4444-555555555555" + metaPath := filepath.Join(cortexDir, uuid+".json") + historyPath := filepath.Join(cortexDir, uuid+".history.jsonl") + require.NoError(t, os.WriteFile(metaPath, []byte(cortexSyncMeta(uuid)), 0o644)) + require.NoError(t, os.WriteFile(historyPath, []byte(cortexSyncHistory("Before cutoff")), 0o644)) + + baseTime := time.Unix(1_781_475_200, 0) + require.NoError(t, os.Chtimes(metaPath, baseTime, baseTime)) + require.NoError(t, os.Chtimes(historyPath, baseTime, baseTime)) + + engine.SyncPaths([]string{metaPath}) + assertMessageContent(t, testDB, "cortex:"+uuid, "Before cutoff", "ack") + + cutoff := baseTime.Add(500 * time.Millisecond) + historyTime := baseTime.Add(time.Second) + require.NoError(t, os.WriteFile(historyPath, []byte(cortexSyncHistory("After cutoff")), 0o644)) + require.NoError(t, os.Chtimes(historyPath, historyTime, historyTime)) + + stats := engine.SyncAllSince(context.Background(), cutoff, nil) + require.Equal(t, 1, stats.Synced, "synced = %d, want 1", stats.Synced) + assertMessageContent(t, testDB, "cortex:"+uuid, "After cutoff", "ack") +} + +func cortexSyncMeta(uuid string) string { + return `{ + "session_id": "` + uuid + `", + "title": "Cortex split history", + "working_directory": "/home/user/cortex-project", + "created_at": "2024-06-01T10:00:00Z", + "last_updated": "2024-06-01T10:05:00Z" +}` +} + +func cortexSyncHistory(prompt string) string { + return strings.Join([]string{ + `{"role":"user","id":"m1","content":[{"type":"text","text":"` + prompt + `"}]}`, + `{"role":"assistant","id":"m2","content":[{"type":"text","text":"ack"}]}`, + }, "\n") + "\n" +} diff --git a/internal/sync/engine.go b/internal/sync/engine.go index e6f4830bb..6ed17725b 100644 --- a/internal/sync/engine.go +++ b/internal/sync/engine.go @@ -1416,43 +1416,6 @@ func (e *Engine) classifyOnePath( } } - // Cortex: /.json - // or: /.history.jsonl → remap to .json - for _, cortexDir := range e.agentDirs[parser.AgentCortex] { - if cortexDir == "" { - continue - } - if rel, ok := isUnder(cortexDir, path); ok { - if strings.Count(rel, sep) != 0 { - continue - } - name := filepath.Base(rel) - - // .history.jsonl companion → remap to .json metadata. - if stem, ok := strings.CutSuffix( - name, ".history.jsonl", - ); ok { - jsonPath := filepath.Join( - cortexDir, stem+".json", - ) - if parser.IsCortexSessionFile(stem + ".json") { - return parser.DiscoveredFile{ - Path: jsonPath, - Agent: parser.AgentCortex, - }, true - } - continue - } - - if parser.IsCortexSessionFile(name) { - return parser.DiscoveredFile{ - Path: path, - Agent: parser.AgentCortex, - }, true - } - } - } - // Antigravity IDE: /conversations/.db (+ -wal, -shm). // annotations/.pbtxt and brain//* sidecar events are // handled in classifyPaths via classifyAntigravitySidecarPath, @@ -2829,7 +2792,7 @@ func (e *Engine) syncAllLocked( if !since.IsZero() { all = e.dedupeClaudeDiscoveredFiles(all) - all = e.filterFilesByMtime(all, since) + all = e.filterFilesByMtime(ctx, all, since) } all = dedupeDiscoveredFiles(all) @@ -3336,13 +3299,15 @@ func (e *Engine) recordSyncFinished() { // dropped). The cost is one stat per file — acceptable for // polling use cases where most files will be skipped. func (e *Engine) filterFilesByMtime( - files []parser.DiscoveredFile, cutoff time.Time, + ctx context.Context, + files []parser.DiscoveredFile, + cutoff time.Time, ) []parser.DiscoveredFile { cutoffNs := cutoff.UnixNano() out := files[:0] codexIndexRefresh := make(map[string][]parser.DiscoveredFile) for _, f := range files { - mtime, err := discoveredFileMtime(f) + mtime, err := e.discoveredFileEffectiveMtime(ctx, f) if err != nil { out = append(out, f) continue @@ -3374,6 +3339,53 @@ func (e *Engine) filterFilesByMtime( return out } +func (e *Engine) discoveredFileEffectiveMtime( + ctx context.Context, + file parser.DiscoveredFile, +) (int64, error) { + if file.ProviderSource != nil && file.ProviderProcess { + if mtime, ok, err := e.providerFingerprintMtime(ctx, file); err != nil { + return 0, err + } else if ok { + return mtime, nil + } + } + return discoveredFileMtime(file) +} + +func (e *Engine) providerFingerprintMtime( + ctx context.Context, + file parser.DiscoveredFile, +) (int64, bool, error) { + if file.ProviderSource == nil { + return 0, false, nil + } + factory, ok := e.providerFactories[file.Agent] + if !ok || factory == nil { + return 0, false, nil + } + source := *file.ProviderSource + if source.Provider != "" && source.Provider != file.Agent { + return 0, false, fmt.Errorf( + "provider source mismatch for %s: %s", + file.Agent, + source.Provider, + ) + } + provider := factory.NewProvider(parser.ProviderConfig{ + Roots: e.agentDirs[file.Agent], + Machine: e.machine, + }) + fingerprint, err := provider.Fingerprint(ctx, source) + if err != nil { + return 0, false, err + } + if fingerprint.MTimeNS == 0 { + return 0, false, nil + } + return fingerprint.MTimeNS, true, nil +} + func discoveredFileMtime( file parser.DiscoveredFile, ) (int64, error) { @@ -4460,8 +4472,6 @@ func (e *Engine) processFile( res = e.processKiro(file, info) case parser.AgentKiroIDE: res = e.processKiroIDE(file, info) - case parser.AgentCortex: - res = e.processCortex(file, info) case parser.AgentHermes: res = e.processHermes(file, info) case parser.AgentVibe: @@ -6505,35 +6515,6 @@ func (e *Engine) processKiroIDE( } } -func (e *Engine) processCortex( - file parser.DiscoveredFile, info os.FileInfo, -) processResult { - if e.shouldSkipByPath(file.Path, info) { - return processResult{skip: true} - } - - sess, msgs, err := parser.ParseCortexSession( - file.Path, e.machine, - ) - if err != nil { - return processResult{err: err} - } - if sess == nil { - return processResult{} - } - - hash, err := ComputeFileHash(file.Path) - if err == nil { - sess.File.Hash = hash - } - - return processResult{ - results: []parser.ParseResult{ - {Session: *sess, Messages: msgs}, - }, - } -} - func (e *Engine) processHermes( file parser.DiscoveredFile, info os.FileInfo, ) processResult { @@ -8096,6 +8077,7 @@ func shouldReplaceFullParseMessages( pw.sess.Agent == parser.AgentAntigravity || pw.sess.Agent == parser.AgentAntigravityCLI || pw.sess.Agent == parser.AgentQwenPaw || + pw.sess.Agent == parser.AgentCortex || // Vibe pairs later tool-result carrier records back to an // earlier assistant tool call. An incremental append would // only add the new ordinals and leave the existing tool call's