Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ import (
// classification, so historical skill usage is backfilled on
// re-parse.)
//
// (55: Kimi session-level usage events and native step.end model
// backfill. Re-parsing persists estimated usage events for existing
// aggregate-only Kimi sessions and preserves explicit native event
// model names instead of the proxy fallback.)
// (54: Antigravity .db sessions record a schema-fingerprint
// source_version. Re-parsing populates source_version on existing
// Antigravity IDE and CLI rows so "which agy release produced this
Expand All @@ -254,7 +258,7 @@ import (
// (51: Gemini cumulative-to-delta token reparse.)
// (17: Codex <skill> template filtering.)
// (16: <turn_aborted> system messages.)
const dataVersion = 54
const dataVersion = 55

const tokenCoverageRepairStatsKey = "token_coverage_repair_v1"

Expand Down
6 changes: 3 additions & 3 deletions internal/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,9 @@ func TestMigration_ToolResultEventsTable(t *testing.T) {
"expected tool_result_events table after reopen")
}

func TestCurrentDataVersionSanitizedMessageShape(t *testing.T) {
assert.Equal(t, 54, CurrentDataVersion(),
"Antigravity source_version backfill requires a data version bump")
func TestCurrentDataVersionKimiUsageEvents(t *testing.T) {
assert.Equal(t, 55, CurrentDataVersion(),
"Kimi persisted usage events require a data version bump")
}

func TestInsertMessages_PreservesToolResultEvents(t *testing.T) {
Expand Down
88 changes: 85 additions & 3 deletions internal/parser/kimi.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,46 @@ import (
"github.com/tidwall/gjson"
)

// defaultKimiModel is the model name used to price Kimi turns when the
// session files carry no model identifier (Kimi wire logs omit it).
//
// How the estimate works:
//
// - The wire logs do not record which model served each turn, and Kimi
// offers several models concurrently at different rates (e.g.
// kimi-k2.7-code, kimi-k2.6, kimi-k2.5), so the exact price is
// unknowable from the logs. We approximate it with a recent Kimi
// model that has a cleanly matchable rate in the pricing catalog.
// - Prices come from the LiteLLM catalog (a community-maintained price
// list), not from Kimi/Moonshot directly, so the rate itself is also
// an approximation. The cost engine canonicalizes model names before
// matching (drops the provider prefix before the last "/", lowercases,
// strips non-alphanumerics) and rejects a catalog entry whose provider
// conflicts with the model's. "moonshot/kimi-k2.6" therefore resolves
// to the catalog's "moonshot/kimi-k2.6" rate (currently 0.95 in /
// 4.0 out per Mtok).
// - k2.6 rather than k2.7 is deliberate: as of this writing the catalog
// prices k2.7 only under a cloudflare-namespaced key
// ("cloudflare/@cf/moonshotai/kimi-k2.7-code"), which the provider
// conflict rule rejects for a "moonshot/" model. k2.6 has a clean
// "moonshot/kimi-k2.6" entry and the same 0.95/4.0 rate, so it is the
// more robust proxy at an identical price. Switch to k2.7 once the
// catalog gains a cleanly matchable (moonshot/ or unqualified) entry.
//
// This is deliberately an ESTIMATE, not exact billing:
//
// - The real per-turn model is unknown; turns served by a cheaper or
// pricier Kimi model are mis-estimated by the rate gap between them.
// - Kimi's aggregate logs only expose output tokens, so input and cache
// tokens are not priced here (see ParseKimiSession). The figure is a
// floor that tracks output volume, not a full invoice.
// - Catalog prices drift; we pin to one representative version rather
// than guessing per-session rates.
//
// Bump this constant when the catalog gains a newer, cleanly matchable
// Kimi model so the estimate keeps tracking a current rate.
const defaultKimiModel = "moonshot/kimi-k2.6"

// DiscoverKimiSessions finds all wire.jsonl files under the Kimi
// sessions directory. It supports two layouts:
//
Expand Down Expand Up @@ -281,7 +321,7 @@ func ParseKimiSession(
currentTS time.Time
pendingTS time.Time

currentModel string
currentModel = defaultKimiModel
)

resetAssistantTurn := func() {
Expand All @@ -308,6 +348,13 @@ func ParseKimiSession(
return
}

// Kimi wire logs often omit the model; fall back to the current
// (defaulted) model so turns can still be priced.
turnModel := pendingModel
if turnModel == "" {
turnModel = currentModel
}

messages = append(messages, ParsedMessage{
Ordinal: ordinal,
Role: RoleAssistant,
Expand All @@ -318,7 +365,7 @@ func ParseKimiSession(
HasToolUse: hasToolUse,
ContentLength: len(content),
ToolCalls: pendingToolCall,
Model: pendingModel,
Model: turnModel,
TokenUsage: pendingTokenUsage,
ContextTokens: pendingContextTokens,
OutputTokens: pendingOutputTokens,
Expand Down Expand Up @@ -480,7 +527,9 @@ func ParseKimiSession(
ordinal++

case "step.end":
if pendingModel == "" {
if model := event.Get("model").Str; model != "" {
pendingModel = model
} else if pendingModel == "" {
pendingModel = currentModel
}
pendingStopReason = event.Get("finishReason").Str
Expand Down Expand Up @@ -736,6 +785,39 @@ func ParseKimiSession(
},
}

// When Kimi wire logs carry only session-level token aggregates
// (StatusUpdate path) rather than per-message step.end usage,
// individual messages have no token_usage JSON and no model, so the
// cost engine prices 0 tokens and the session shows $0.
//
// Emit one session-level usage event so the engine can produce an
// estimate (see defaultKimiModel for the estimation principle).
// Only output tokens are available from these aggregates, so input
// and cache tokens are intentionally left at zero — the resulting
// cost is a lower-bound estimate driven by output volume. Sessions
// whose messages already carry per-message token_usage (native
// step.end protocol) are priced message-by-message and skipped here
// to avoid double counting.
if hasTotalOutputTokens && totalOutputTokens > 0 {
hasPerMessageTokens := false
for _, m := range messages {
if len(m.TokenUsage) > 0 {
hasPerMessageTokens = true
break
}
}
if !hasPerMessageTokens {
sess.UsageEvents = []ParsedUsageEvent{{
SessionID: sess.ID,
Source: "session",
Model: currentModel,
OutputTokens: totalOutputTokens,
OccurredAt: timeString(endTime, startTime),
DedupKey: "kimi:session:" + sessionID,
}}
}
}

return sess, messages, nil
}

Expand Down
68 changes: 68 additions & 0 deletions internal/parser/kimi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,74 @@ func TestParseKimiSession_StatusUpdate(t *testing.T) {
assert.True(t, sess.HasPeakContextTokens)
}

func TestParseKimiSession_SessionLevelTokensEmitUsageEvent(t *testing.T) {
// StatusUpdate carries only session-level token aggregates; the
// individual assistant message gets no per-message token_usage.
// Without a usage event the cost engine would price 0 tokens and
// show $0, so the parser emits a session-level event defaulted to
// defaultKimiModel.
path := writeKimiWireJSONL(t,
"proj-usage", "sess-usage",
[]string{
`{"type": "metadata", "protocol_version": "1.3"}`,
`{"timestamp": 1704067200.0, "message": {"type": "TurnBegin", "payload": {"user_input": [{"type": "text", "text": "Hello"}]}}}`,
`{"timestamp": 1704067201.0, "message": {"type": "ContentPart", "payload": {"type": "text", "text": "Hi"}}}`,
`{"timestamp": 1704067201.5, "message": {"type": "StatusUpdate", "payload": {"context_tokens": 5000, "token_usage": {"output": 42}}}}`,
`{"timestamp": 1704067202.0, "message": {"type": "TurnEnd", "payload": {}}}`,
},
)

sess, _, err := ParseKimiSession(path, "testproj", "local")
require.NoError(t, err)
require.NotNil(t, sess)

require.Equal(t, 1, len(sess.UsageEvents))
ev := sess.UsageEvents[0]
assert.Equal(t, "kimi:proj-usage:sess-usage", ev.SessionID)
assert.Equal(t, "session", ev.Source)
assert.Equal(t, defaultKimiModel, ev.Model)
assert.Equal(t, 42, ev.OutputTokens)
assert.Equal(t, "kimi:session:proj-usage:sess-usage", ev.DedupKey)
}

func TestParseKimiSession_PerMessageTokensSkipUsageEvent(t *testing.T) {
// The native wire protocol (step.begin/step.end) attaches token
// usage to each assistant message, so the per-message path already
// prices the session. No session-level usage event is needed and
// emitting one would double-count tokens.
path := writeKimiWireJSONL(t,
"proj-native", "sess-native",
[]string{
`{"type": "metadata", "protocol_version": "1.3"}`,
`{"type": "turn.prompt", "timestamp": 1704067200.0, "input": [{"type": "text", "text": "Hello"}]}`,
`{"type": "context.append_loop_event", "timestamp": 1704067201.0, "event": {"type": "content.part", "part": {"type": "text", "text": "Hi"}}}`,
`{"type": "context.append_loop_event", "timestamp": 1704067202.0, "event": {"type": "step.end", "model": "moonshot/kimi-k2", "finishReason": "stop", "usage": {"output": 42, "inputOther": 100}}}`,
},
)

sess, _, err := ParseKimiSession(path, "testproj", "local")
require.NoError(t, err)
require.NotNil(t, sess)
assert.Empty(t, sess.UsageEvents)
}

func TestParseKimiSession_StepEndModelOverridesDefault(t *testing.T) {
path := writeKimiWireJSONL(t,
"proj-native-model", "sess-native-model",
[]string{
`{"type": "metadata", "protocol_version": "1.3"}`,
`{"type": "turn.prompt", "timestamp": 1704067200.0, "input": [{"type": "text", "text": "Hello"}]}`,
`{"type": "context.append_loop_event", "timestamp": 1704067201.0, "event": {"type": "content.part", "part": {"type": "text", "text": "Hi"}}}`,
`{"type": "context.append_loop_event", "timestamp": 1704067202.0, "event": {"type": "step.end", "model": "moonshot/kimi-k2", "finishReason": "stop", "usage": {"output": 42, "inputOther": 100}}}`,
},
)

_, msgs, err := ParseKimiSession(path, "testproj", "local")
require.NoError(t, err)
require.Len(t, msgs, 2)
assert.Equal(t, "moonshot/kimi-k2", msgs[1].Model)
}

func TestParseKimiSession_ZeroValuedStatusUpdatePreservesCoverage(t *testing.T) {
path := writeKimiWireJSONL(t,
"proj-zero", "sess-zero",
Expand Down
2 changes: 1 addition & 1 deletion internal/sync/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6036,7 +6036,7 @@ func (e *Engine) processKimi(

return processResult{
results: []parser.ParseResult{
{Session: *sess, Messages: msgs},
{Session: *sess, Messages: msgs, UsageEvents: sess.UsageEvents},
},
}
}
Expand Down