From c7ae53b3de1cf00b6adb9fd89adde4c1da7cf959 Mon Sep 17 00:00:00 2001 From: HIK <1840107239@qq.com> Date: Fri, 26 Jun 2026 17:54:06 +0800 Subject: [PATCH 1/2] feat(kimi): estimate session cost from aggregate token usage Kimi wire logs carry token counts only as session-level aggregates (StatusUpdate), so individual messages have no per-message token_usage and no model identifier. The usage/cost engine priced 0 tokens and Kimi sessions never appeared in the cost views. Emit one session-level ParsedUsageEvent when a Kimi session exposes only aggregate output tokens, defaulting the model to a recent, cleanly matchable Kimi catalog entry (moonshot/kimi-k2.6) so the cost engine can produce an estimate. The estimate is a lower bound: prices come from the LiteLLM catalog rather than Kimi directly, the wire logs do not record which model served each turn, and only output tokens are exposed by the aggregates (input and cache tokens are not priced). Sessions whose messages already carry per-message token_usage (native step.end protocol) are priced message-by-message and skipped to avoid double counting. processKimi previously dropped parser-emitted usage events; pass them through so the events reach the store. --- internal/parser/kimi.go | 84 +++++++++++++++++++++++++++++++++++- internal/parser/kimi_test.go | 51 ++++++++++++++++++++++ internal/sync/engine.go | 2 +- 3 files changed, 134 insertions(+), 3 deletions(-) diff --git a/internal/parser/kimi.go b/internal/parser/kimi.go index d5e98c6c3..c033beef7 100644 --- a/internal/parser/kimi.go +++ b/internal/parser/kimi.go @@ -12,6 +12,46 @@ import ( "github.com/tidwall/gjson" ) +// defaultKimiModel is the model name used to price Kimi turns when the +// session files carry no model identifier (Kimi wire logs omit it). +// +// How the estimate works: +// +// - The wire logs do not record which model served each turn, and Kimi +// offers several models concurrently at different rates (e.g. +// kimi-k2.7-code, kimi-k2.6, kimi-k2.5), so the exact price is +// unknowable from the logs. We approximate it with a recent Kimi +// model that has a cleanly matchable rate in the pricing catalog. +// - Prices come from the LiteLLM catalog (a community-maintained price +// list), not from Kimi/Moonshot directly, so the rate itself is also +// an approximation. The cost engine canonicalizes model names before +// matching (drops the provider prefix before the last "/", lowercases, +// strips non-alphanumerics) and rejects a catalog entry whose provider +// conflicts with the model's. "moonshot/kimi-k2.6" therefore resolves +// to the catalog's "moonshot/kimi-k2.6" rate (currently 0.95 in / +// 4.0 out per Mtok). +// - k2.6 rather than k2.7 is deliberate: as of this writing the catalog +// prices k2.7 only under a cloudflare-namespaced key +// ("cloudflare/@cf/moonshotai/kimi-k2.7-code"), which the provider +// conflict rule rejects for a "moonshot/" model. k2.6 has a clean +// "moonshot/kimi-k2.6" entry and the same 0.95/4.0 rate, so it is the +// more robust proxy at an identical price. Switch to k2.7 once the +// catalog gains a cleanly matchable (moonshot/ or unqualified) entry. +// +// This is deliberately an ESTIMATE, not exact billing: +// +// - The real per-turn model is unknown; turns served by a cheaper or +// pricier Kimi model are mis-estimated by the rate gap between them. +// - Kimi's aggregate logs only expose output tokens, so input and cache +// tokens are not priced here (see ParseKimiSession). The figure is a +// floor that tracks output volume, not a full invoice. +// - Catalog prices drift; we pin to one representative version rather +// than guessing per-session rates. +// +// Bump this constant when the catalog gains a newer, cleanly matchable +// Kimi model so the estimate keeps tracking a current rate. +const defaultKimiModel = "moonshot/kimi-k2.6" + // DiscoverKimiSessions finds all wire.jsonl files under the Kimi // sessions directory. It supports two layouts: // @@ -281,7 +321,7 @@ func ParseKimiSession( currentTS time.Time pendingTS time.Time - currentModel string + currentModel = defaultKimiModel ) resetAssistantTurn := func() { @@ -308,6 +348,13 @@ func ParseKimiSession( return } + // Kimi wire logs often omit the model; fall back to the current + // (defaulted) model so turns can still be priced. + turnModel := pendingModel + if turnModel == "" { + turnModel = currentModel + } + messages = append(messages, ParsedMessage{ Ordinal: ordinal, Role: RoleAssistant, @@ -318,7 +365,7 @@ func ParseKimiSession( HasToolUse: hasToolUse, ContentLength: len(content), ToolCalls: pendingToolCall, - Model: pendingModel, + Model: turnModel, TokenUsage: pendingTokenUsage, ContextTokens: pendingContextTokens, OutputTokens: pendingOutputTokens, @@ -736,6 +783,39 @@ func ParseKimiSession( }, } + // When Kimi wire logs carry only session-level token aggregates + // (StatusUpdate path) rather than per-message step.end usage, + // individual messages have no token_usage JSON and no model, so the + // cost engine prices 0 tokens and the session shows $0. + // + // Emit one session-level usage event so the engine can produce an + // estimate (see defaultKimiModel for the estimation principle). + // Only output tokens are available from these aggregates, so input + // and cache tokens are intentionally left at zero — the resulting + // cost is a lower-bound estimate driven by output volume. Sessions + // whose messages already carry per-message token_usage (native + // step.end protocol) are priced message-by-message and skipped here + // to avoid double counting. + if hasTotalOutputTokens && totalOutputTokens > 0 { + hasPerMessageTokens := false + for _, m := range messages { + if len(m.TokenUsage) > 0 { + hasPerMessageTokens = true + break + } + } + if !hasPerMessageTokens { + sess.UsageEvents = []ParsedUsageEvent{{ + SessionID: sess.ID, + Source: "session", + Model: currentModel, + OutputTokens: totalOutputTokens, + OccurredAt: timeString(endTime, startTime), + DedupKey: "kimi:session:" + sessionID, + }} + } + } + return sess, messages, nil } diff --git a/internal/parser/kimi_test.go b/internal/parser/kimi_test.go index 00b85796a..8d25b1702 100644 --- a/internal/parser/kimi_test.go +++ b/internal/parser/kimi_test.go @@ -250,6 +250,57 @@ func TestParseKimiSession_StatusUpdate(t *testing.T) { assert.True(t, sess.HasPeakContextTokens) } +func TestParseKimiSession_SessionLevelTokensEmitUsageEvent(t *testing.T) { + // StatusUpdate carries only session-level token aggregates; the + // individual assistant message gets no per-message token_usage. + // Without a usage event the cost engine would price 0 tokens and + // show $0, so the parser emits a session-level event defaulted to + // defaultKimiModel. + path := writeKimiWireJSONL(t, + "proj-usage", "sess-usage", + []string{ + `{"type": "metadata", "protocol_version": "1.3"}`, + `{"timestamp": 1704067200.0, "message": {"type": "TurnBegin", "payload": {"user_input": [{"type": "text", "text": "Hello"}]}}}`, + `{"timestamp": 1704067201.0, "message": {"type": "ContentPart", "payload": {"type": "text", "text": "Hi"}}}`, + `{"timestamp": 1704067201.5, "message": {"type": "StatusUpdate", "payload": {"context_tokens": 5000, "token_usage": {"output": 42}}}}`, + `{"timestamp": 1704067202.0, "message": {"type": "TurnEnd", "payload": {}}}`, + }, + ) + + sess, _, err := ParseKimiSession(path, "testproj", "local") + require.NoError(t, err) + require.NotNil(t, sess) + + require.Equal(t, 1, len(sess.UsageEvents)) + ev := sess.UsageEvents[0] + assert.Equal(t, "kimi:proj-usage:sess-usage", ev.SessionID) + assert.Equal(t, "session", ev.Source) + assert.Equal(t, defaultKimiModel, ev.Model) + assert.Equal(t, 42, ev.OutputTokens) + assert.Equal(t, "kimi:session:proj-usage:sess-usage", ev.DedupKey) +} + +func TestParseKimiSession_PerMessageTokensSkipUsageEvent(t *testing.T) { + // The native wire protocol (step.begin/step.end) attaches token + // usage to each assistant message, so the per-message path already + // prices the session. No session-level usage event is needed and + // emitting one would double-count tokens. + path := writeKimiWireJSONL(t, + "proj-native", "sess-native", + []string{ + `{"type": "metadata", "protocol_version": "1.3"}`, + `{"type": "turn.prompt", "timestamp": 1704067200.0, "input": [{"type": "text", "text": "Hello"}]}`, + `{"type": "context.append_loop_event", "timestamp": 1704067201.0, "event": {"type": "content.part", "part": {"type": "text", "text": "Hi"}}}`, + `{"type": "context.append_loop_event", "timestamp": 1704067202.0, "event": {"type": "step.end", "model": "moonshot/kimi-k2", "finishReason": "stop", "usage": {"output": 42, "inputOther": 100}}}`, + }, + ) + + sess, _, err := ParseKimiSession(path, "testproj", "local") + require.NoError(t, err) + require.NotNil(t, sess) + assert.Empty(t, sess.UsageEvents) +} + func TestParseKimiSession_ZeroValuedStatusUpdatePreservesCoverage(t *testing.T) { path := writeKimiWireJSONL(t, "proj-zero", "sess-zero", diff --git a/internal/sync/engine.go b/internal/sync/engine.go index 2a103c910..85e4dc2cb 100644 --- a/internal/sync/engine.go +++ b/internal/sync/engine.go @@ -6036,7 +6036,7 @@ func (e *Engine) processKimi( return processResult{ results: []parser.ParseResult{ - {Session: *sess, Messages: msgs}, + {Session: *sess, Messages: msgs, UsageEvents: sess.UsageEvents}, }, } } From 03e3bd9cc5aa81ce65a7ffa543c14c6f28f8a05a Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 26 Jun 2026 14:06:35 -0500 Subject: [PATCH 2/2] fix(kimi): resync usage estimates with native models Kimi usage event persistence needs a parser data-version bump so unchanged existing archives are reparsed and backfilled through the normal sync path. Without the bump, sessions already stamped current would keep showing no estimated cost until their source file changed.\n\nNative step.end records can also carry the exact model even when no prior config.update exists. Preserving that explicit event model avoids pricing per-message usage rows with the proxy fallback model. --- internal/db/db.go | 6 +++++- internal/db/db_test.go | 6 +++--- internal/parser/kimi.go | 4 +++- internal/parser/kimi_test.go | 17 +++++++++++++++++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/internal/db/db.go b/internal/db/db.go index 0de31902d..8c7c2b9dd 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -241,6 +241,10 @@ import ( // classification, so historical skill usage is backfilled on // re-parse.) // +// (55: Kimi session-level usage events and native step.end model +// backfill. Re-parsing persists estimated usage events for existing +// aggregate-only Kimi sessions and preserves explicit native event +// model names instead of the proxy fallback.) // (54: Antigravity .db sessions record a schema-fingerprint // source_version. Re-parsing populates source_version on existing // Antigravity IDE and CLI rows so "which agy release produced this @@ -254,7 +258,7 @@ import ( // (51: Gemini cumulative-to-delta token reparse.) // (17: Codex template filtering.) // (16: system messages.) -const dataVersion = 54 +const dataVersion = 55 const tokenCoverageRepairStatsKey = "token_coverage_repair_v1" diff --git a/internal/db/db_test.go b/internal/db/db_test.go index def5330f4..a92393e2b 100644 --- a/internal/db/db_test.go +++ b/internal/db/db_test.go @@ -695,9 +695,9 @@ func TestMigration_ToolResultEventsTable(t *testing.T) { "expected tool_result_events table after reopen") } -func TestCurrentDataVersionSanitizedMessageShape(t *testing.T) { - assert.Equal(t, 54, CurrentDataVersion(), - "Antigravity source_version backfill requires a data version bump") +func TestCurrentDataVersionKimiUsageEvents(t *testing.T) { + assert.Equal(t, 55, CurrentDataVersion(), + "Kimi persisted usage events require a data version bump") } func TestInsertMessages_PreservesToolResultEvents(t *testing.T) { diff --git a/internal/parser/kimi.go b/internal/parser/kimi.go index c033beef7..0514fe38a 100644 --- a/internal/parser/kimi.go +++ b/internal/parser/kimi.go @@ -527,7 +527,9 @@ func ParseKimiSession( ordinal++ case "step.end": - if pendingModel == "" { + if model := event.Get("model").Str; model != "" { + pendingModel = model + } else if pendingModel == "" { pendingModel = currentModel } pendingStopReason = event.Get("finishReason").Str diff --git a/internal/parser/kimi_test.go b/internal/parser/kimi_test.go index 8d25b1702..fa5288af3 100644 --- a/internal/parser/kimi_test.go +++ b/internal/parser/kimi_test.go @@ -301,6 +301,23 @@ func TestParseKimiSession_PerMessageTokensSkipUsageEvent(t *testing.T) { assert.Empty(t, sess.UsageEvents) } +func TestParseKimiSession_StepEndModelOverridesDefault(t *testing.T) { + path := writeKimiWireJSONL(t, + "proj-native-model", "sess-native-model", + []string{ + `{"type": "metadata", "protocol_version": "1.3"}`, + `{"type": "turn.prompt", "timestamp": 1704067200.0, "input": [{"type": "text", "text": "Hello"}]}`, + `{"type": "context.append_loop_event", "timestamp": 1704067201.0, "event": {"type": "content.part", "part": {"type": "text", "text": "Hi"}}}`, + `{"type": "context.append_loop_event", "timestamp": 1704067202.0, "event": {"type": "step.end", "model": "moonshot/kimi-k2", "finishReason": "stop", "usage": {"output": 42, "inputOther": 100}}}`, + }, + ) + + _, msgs, err := ParseKimiSession(path, "testproj", "local") + require.NoError(t, err) + require.Len(t, msgs, 2) + assert.Equal(t, "moonshot/kimi-k2", msgs[1].Model) +} + func TestParseKimiSession_ZeroValuedStatusUpdatePreservesCoverage(t *testing.T) { path := writeKimiWireJSONL(t, "proj-zero", "sess-zero",