From 22dc422610c442ba79bda4c1575f704319949390 Mon Sep 17 00:00:00 2001 From: Cuong Le Date: Wed, 27 May 2026 11:47:56 +0700 Subject: [PATCH 1/2] app/vlselect/logsql: fix silent log loss in live tail when entries share the same _time --- app/vlselect/logsql/logsql.go | 58 +++++++++++++++++++++++++++--- app/vlselect/logsql/logsql_test.go | 55 ++++++++++++++++++++++++++++ docs/victorialogs/CHANGELOG.md | 1 + 3 files changed, 109 insertions(+), 5 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index d367a44ab7..f2e8047221 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -25,6 +25,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/metrics" + "github.com/cespare/xxhash/v2" "github.com/valyala/fastjson" "github.com/valyala/quicktemplate" @@ -777,6 +778,11 @@ type tailProcessor struct { perStreamRows map[string][]logRow lastTimestamps map[string]int64 + // lastRowHashes contains xxhash fingerprints of rows already emitted at lastTimestamps[streamID]. + // Used to dedup rows that share the boundary timestamp but differ in content. + lastRowHashes map[string]map[uint64]struct{} + + keyBuf []byte err error } @@ -789,6 +795,7 @@ func newTailProcessor(cancel func(), needSortFields bool) *tailProcessor { perStreamRows: make(map[string][]logRow), lastTimestamps: make(map[string]int64), + lastRowHashes: make(map[string]map[uint64]struct{}), } } @@ -838,6 +845,16 @@ func (tp *tailProcessor) writeBlock(_ uint, db *logstorage.DataBlock) { } } +// hashLogRow returns an xxhash fingerprint of row's fields. +func (tp *tailProcessor) hashLogRow(row logRow) uint64 { + tp.keyBuf = tp.keyBuf[:0] + for _, f := range row.fields { + tp.keyBuf = encoding.MarshalBytes(tp.keyBuf, bytesutil.ToUnsafeBytes(f.Name)) + tp.keyBuf = encoding.MarshalBytes(tp.keyBuf, bytesutil.ToUnsafeBytes(f.Value)) + } + return xxhash.Sum64(tp.keyBuf) +} + func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { if tp.err != nil { return nil, tp.err @@ -848,16 +865,47 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { sortLogRows(rows) lastTimestamp, ok := tp.lastTimestamps[streamID] + lastHashes := tp.lastRowHashes[streamID] + if ok { - // Skip already written rows - for len(rows) > 0 && rows[0].timestamp <= lastTimestamp { + // Drop rows already emitted before the boundary timestamp. + for len(rows) > 0 && rows[0].timestamp < lastTimestamp { rows = rows[1:] } + // Drop rows at the boundary whose exact content was already emitted. + filtered := rows[:0] + for _, row := range rows { + if row.timestamp == lastTimestamp { + if _, seen := lastHashes[tp.hashLogRow(row)]; seen { + continue + } + } + filtered = append(filtered, row) + } + rows = filtered } - if len(rows) > 0 { - resultRows = append(resultRows, rows...) - tp.lastTimestamps[streamID] = rows[len(rows)-1].timestamp + + if len(rows) == 0 { + continue + } + + resultRows = append(resultRows, rows...) + + newLastTS := rows[len(rows)-1].timestamp + tp.lastTimestamps[streamID] = newLastTS + + // Reuse the existing hash set when the boundary timestamp does not advance, + // so rows already emitted at this timestamp remain excluded. Otherwise reset. + hashes := lastHashes + if !ok || lastTimestamp != newLastTS { + hashes = make(map[uint64]struct{}) + } + for _, row := range rows { + if row.timestamp == newLastTS { + hashes[tp.hashLogRow(row)] = struct{}{} + } } + tp.lastRowHashes[streamID] = hashes } clear(tp.perStreamRows) diff --git a/app/vlselect/logsql/logsql_test.go b/app/vlselect/logsql/logsql_test.go index 0cb30c62a0..45e2127a8c 100644 --- a/app/vlselect/logsql/logsql_test.go +++ b/app/vlselect/logsql/logsql_test.go @@ -1,7 +1,10 @@ package logsql import ( + "slices" "testing" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" ) func TestParseExtraFilters_Success(t *testing.T) { @@ -101,3 +104,55 @@ func TestParseExtraStreamFilters_Failure(t *testing.T) { // excess pipe f(`foo | count()`) } + +func TestTailProcessorGetTailRows(t *testing.T) { + tp := newTailProcessor(func() {}, false) + + const streamID = "test-stream" + const ts = int64(1e9) + + row := func(timestamp int64, msg string) logRow { + return logRow{ + timestamp: timestamp, + fields: []logstorage.Field{{Name: "_msg", Value: msg}}, + } + } + + f := func(input []logRow, wantMsgs ...string) { + t.Helper() + tp.perStreamRows[streamID] = input + got, err := tp.getTailRows() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + var gotMsgs []string + for _, r := range got { + for _, field := range r { + if field.Name == "_msg" { + gotMsgs = append(gotMsgs, field.Value) + } + } + } + if !slices.Equal(gotMsgs, wantMsgs) { + t.Fatalf("got msgs %v; want %v", gotMsgs, wantMsgs) + } + } + + // First time: row A is emitted. + f([]logRow{row(ts, "A")}, "A") + + // Same timestamp, new content: A is deduped, B is emitted. + f([]logRow{row(ts, "A"), row(ts, "B")}, "B") + + // Both seen now: nothing emitted. + f([]logRow{row(ts, "A"), row(ts, "B")}) + + // Empty input: nothing emitted. + f(nil) + + // Multiple new timestamps, unsorted input: emitted in chronological order. + f([]logRow{row(ts+2, "D"), row(ts+1, "C")}, "C", "D") + + // Boundary is now ts+2 with D seen: A is dropped (older), D is dropped (duplicate). + f([]logRow{row(ts, "A"), row(ts+2, "D")}) +} diff --git a/docs/victorialogs/CHANGELOG.md b/docs/victorialogs/CHANGELOG.md index df83371bcf..6641daf3ac 100644 --- a/docs/victorialogs/CHANGELOG.md +++ b/docs/victorialogs/CHANGELOG.md @@ -46,6 +46,7 @@ according to the following docs: * BUGFIX: [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api): return `502 Bad Gateway` from [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats) and [`/select/logsql/stats_query_range`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats) when one of `vlstorage` nodes is unavailable in cluster mode. Previously these endpoints could incorrectly return `422 Unprocessable Entity`, which could break retry and failover handling in high-availability setups. See [#1419](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1419). * BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): allow `unpack_json` to parse JSON objects starting with spaces. See [#1416](https://github.com/VictoriaMetrics/VictoriaLogs/pull/1416). * BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): preserve the selected tenant while switching between vmui tabs. See [#1447](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1447). +* BUGFIX: [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing): fix silent log loss when multiple log entries share the same `_time`. See [#1459](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1459). ## [v1.50.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.50.0) From 7a3d3d47a159bef15dc673fe0ca06ff65ad27a6a Mon Sep 17 00:00:00 2001 From: Cuong Le Date: Thu, 25 Jun 2026 11:06:51 +0700 Subject: [PATCH 2/2] app/vlselect/logsql: simplify live tail dedup state --- app/vlselect/logsql/logsql.go | 51 ++++++++++++++++-------------- app/vlselect/logsql/logsql_test.go | 23 +++++++++----- 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index f2e8047221..13b7117cb4 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -780,9 +780,7 @@ type tailProcessor struct { lastTimestamps map[string]int64 // lastRowHashes contains xxhash fingerprints of rows already emitted at lastTimestamps[streamID]. // Used to dedup rows that share the boundary timestamp but differ in content. - lastRowHashes map[string]map[uint64]struct{} - - keyBuf []byte + lastRowHashes map[string][]uint64 err error } @@ -795,7 +793,7 @@ func newTailProcessor(cancel func(), needSortFields bool) *tailProcessor { perStreamRows: make(map[string][]logRow), lastTimestamps: make(map[string]int64), - lastRowHashes: make(map[string]map[uint64]struct{}), + lastRowHashes: make(map[string][]uint64), } } @@ -846,13 +844,13 @@ func (tp *tailProcessor) writeBlock(_ uint, db *logstorage.DataBlock) { } // hashLogRow returns an xxhash fingerprint of row's fields. -func (tp *tailProcessor) hashLogRow(row logRow) uint64 { - tp.keyBuf = tp.keyBuf[:0] +func hashLogRow(keyBuf []byte, row *logRow) (uint64, []byte) { + keyBuf = keyBuf[:0] for _, f := range row.fields { - tp.keyBuf = encoding.MarshalBytes(tp.keyBuf, bytesutil.ToUnsafeBytes(f.Name)) - tp.keyBuf = encoding.MarshalBytes(tp.keyBuf, bytesutil.ToUnsafeBytes(f.Value)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(f.Name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(f.Value)) } - return xxhash.Sum64(tp.keyBuf) + return xxhash.Sum64(keyBuf), keyBuf } func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { @@ -861,6 +859,7 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { } var resultRows []logRow + var keyBuf []byte for streamID, rows := range tp.perStreamRows { sortLogRows(rows) @@ -868,19 +867,22 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { lastHashes := tp.lastRowHashes[streamID] if ok { - // Drop rows already emitted before the boundary timestamp. - for len(rows) > 0 && rows[0].timestamp < lastTimestamp { - rows = rows[1:] - } - // Drop rows at the boundary whose exact content was already emitted. filtered := rows[:0] - for _, row := range rows { + for i := range rows { + row := &rows[i] + // Ignore everything before the boundary timestamp + if row.timestamp < lastTimestamp { + continue + } + // Ignore rows with the same boundary timestamp, same content if row.timestamp == lastTimestamp { - if _, seen := lastHashes[tp.hashLogRow(row)]; seen { + var h uint64 + h, keyBuf = hashLogRow(keyBuf, row) + if slices.Contains(lastHashes, h) { continue } } - filtered = append(filtered, row) + filtered = append(filtered, *row) } rows = filtered } @@ -894,15 +896,18 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { newLastTS := rows[len(rows)-1].timestamp tp.lastTimestamps[streamID] = newLastTS - // Reuse the existing hash set when the boundary timestamp does not advance, - // so rows already emitted at this timestamp remain excluded. Otherwise reset. + // Reuse the existing hashes when the boundary timestamp does not advance, + // so rows already emitted at this timestamp remain excluded. Otherwise start a new list. hashes := lastHashes - if !ok || lastTimestamp != newLastTS { - hashes = make(map[uint64]struct{}) + if lastTimestamp != newLastTS { + hashes = nil } - for _, row := range rows { + for i := range rows { + row := &rows[i] if row.timestamp == newLastTS { - hashes[tp.hashLogRow(row)] = struct{}{} + var h uint64 + h, keyBuf = hashLogRow(keyBuf, row) + hashes = append(hashes, h) } } tp.lastRowHashes[streamID] = hashes diff --git a/app/vlselect/logsql/logsql_test.go b/app/vlselect/logsql/logsql_test.go index 45e2127a8c..d5cf29e3f5 100644 --- a/app/vlselect/logsql/logsql_test.go +++ b/app/vlselect/logsql/logsql_test.go @@ -108,7 +108,8 @@ func TestParseExtraStreamFilters_Failure(t *testing.T) { func TestTailProcessorGetTailRows(t *testing.T) { tp := newTailProcessor(func() {}, false) - const streamID = "test-stream" + const streamA = "test-stream-a" + const streamB = "test-stream-b" const ts = int64(1e9) row := func(timestamp int64, msg string) logRow { @@ -118,7 +119,7 @@ func TestTailProcessorGetTailRows(t *testing.T) { } } - f := func(input []logRow, wantMsgs ...string) { + f := func(streamID string, input []logRow, wantMsgs ...string) { t.Helper() tp.perStreamRows[streamID] = input got, err := tp.getTailRows() @@ -139,20 +140,26 @@ func TestTailProcessorGetTailRows(t *testing.T) { } // First time: row A is emitted. - f([]logRow{row(ts, "A")}, "A") + f(streamA, []logRow{row(ts, "A")}, "A") // Same timestamp, new content: A is deduped, B is emitted. - f([]logRow{row(ts, "A"), row(ts, "B")}, "B") + f(streamA, []logRow{row(ts, "A"), row(ts, "B")}, "B") // Both seen now: nothing emitted. - f([]logRow{row(ts, "A"), row(ts, "B")}) + f(streamA, []logRow{row(ts, "A"), row(ts, "B")}) // Empty input: nothing emitted. - f(nil) + f(streamA, nil) // Multiple new timestamps, unsorted input: emitted in chronological order. - f([]logRow{row(ts+2, "D"), row(ts+1, "C")}, "C", "D") + f(streamA, []logRow{row(ts+2, "D"), row(ts+1, "C")}, "C", "D") // Boundary is now ts+2 with D seen: A is dropped (older), D is dropped (duplicate). - f([]logRow{row(ts, "A"), row(ts+2, "D")}) + f(streamA, []logRow{row(ts, "A"), row(ts+2, "D")}) + + // Per-stream state: streamB emits A even though streamA already deduped it. + f(streamB, []logRow{row(ts, "A")}, "A") + + // streamA is unaffected by streamB and still drops its boundary row D. + f(streamA, []logRow{row(ts+2, "D")}) }