diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index d367a44ab7..13b7117cb4 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -25,6 +25,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/metrics" + "github.com/cespare/xxhash/v2" "github.com/valyala/fastjson" "github.com/valyala/quicktemplate" @@ -777,6 +778,9 @@ type tailProcessor struct { perStreamRows map[string][]logRow lastTimestamps map[string]int64 + // lastRowHashes contains xxhash fingerprints of rows already emitted at lastTimestamps[streamID]. + // Used to dedup rows that share the boundary timestamp but differ in content. + lastRowHashes map[string][]uint64 err error } @@ -789,6 +793,7 @@ func newTailProcessor(cancel func(), needSortFields bool) *tailProcessor { perStreamRows: make(map[string][]logRow), lastTimestamps: make(map[string]int64), + lastRowHashes: make(map[string][]uint64), } } @@ -838,26 +843,74 @@ func (tp *tailProcessor) writeBlock(_ uint, db *logstorage.DataBlock) { } } +// hashLogRow returns an xxhash fingerprint of row's fields. +func hashLogRow(keyBuf []byte, row *logRow) (uint64, []byte) { + keyBuf = keyBuf[:0] + for _, f := range row.fields { + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(f.Name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(f.Value)) + } + return xxhash.Sum64(keyBuf), keyBuf +} + func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { if tp.err != nil { return nil, tp.err } var resultRows []logRow + var keyBuf []byte for streamID, rows := range tp.perStreamRows { sortLogRows(rows) lastTimestamp, ok := tp.lastTimestamps[streamID] + lastHashes := tp.lastRowHashes[streamID] + if ok { - // Skip already written rows - for len(rows) > 0 && rows[0].timestamp <= lastTimestamp { - rows = rows[1:] + filtered := rows[:0] + for i := range rows { + row := &rows[i] + // Ignore everything before the boundary timestamp + if row.timestamp < lastTimestamp { + continue + } + // Ignore rows with the same boundary timestamp, same content + if row.timestamp == lastTimestamp { + var h uint64 + h, keyBuf = hashLogRow(keyBuf, row) + if slices.Contains(lastHashes, h) { + continue + } + } + filtered = append(filtered, *row) } + rows = filtered } - if len(rows) > 0 { - resultRows = append(resultRows, rows...) - tp.lastTimestamps[streamID] = rows[len(rows)-1].timestamp + + if len(rows) == 0 { + continue + } + + resultRows = append(resultRows, rows...) + + newLastTS := rows[len(rows)-1].timestamp + tp.lastTimestamps[streamID] = newLastTS + + // Reuse the existing hashes when the boundary timestamp does not advance, + // so rows already emitted at this timestamp remain excluded. Otherwise start a new list. + hashes := lastHashes + if lastTimestamp != newLastTS { + hashes = nil + } + for i := range rows { + row := &rows[i] + if row.timestamp == newLastTS { + var h uint64 + h, keyBuf = hashLogRow(keyBuf, row) + hashes = append(hashes, h) + } } + tp.lastRowHashes[streamID] = hashes } clear(tp.perStreamRows) diff --git a/app/vlselect/logsql/logsql_test.go b/app/vlselect/logsql/logsql_test.go index 0cb30c62a0..d5cf29e3f5 100644 --- a/app/vlselect/logsql/logsql_test.go +++ b/app/vlselect/logsql/logsql_test.go @@ -1,7 +1,10 @@ package logsql import ( + "slices" "testing" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" ) func TestParseExtraFilters_Success(t *testing.T) { @@ -101,3 +104,62 @@ func TestParseExtraStreamFilters_Failure(t *testing.T) { // excess pipe f(`foo | count()`) } + +func TestTailProcessorGetTailRows(t *testing.T) { + tp := newTailProcessor(func() {}, false) + + const streamA = "test-stream-a" + const streamB = "test-stream-b" + const ts = int64(1e9) + + row := func(timestamp int64, msg string) logRow { + return logRow{ + timestamp: timestamp, + fields: []logstorage.Field{{Name: "_msg", Value: msg}}, + } + } + + f := func(streamID string, input []logRow, wantMsgs ...string) { + t.Helper() + tp.perStreamRows[streamID] = input + got, err := tp.getTailRows() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + var gotMsgs []string + for _, r := range got { + for _, field := range r { + if field.Name == "_msg" { + gotMsgs = append(gotMsgs, field.Value) + } + } + } + if !slices.Equal(gotMsgs, wantMsgs) { + t.Fatalf("got msgs %v; want %v", gotMsgs, wantMsgs) + } + } + + // First time: row A is emitted. + f(streamA, []logRow{row(ts, "A")}, "A") + + // Same timestamp, new content: A is deduped, B is emitted. + f(streamA, []logRow{row(ts, "A"), row(ts, "B")}, "B") + + // Both seen now: nothing emitted. + f(streamA, []logRow{row(ts, "A"), row(ts, "B")}) + + // Empty input: nothing emitted. + f(streamA, nil) + + // Multiple new timestamps, unsorted input: emitted in chronological order. + f(streamA, []logRow{row(ts+2, "D"), row(ts+1, "C")}, "C", "D") + + // Boundary is now ts+2 with D seen: A is dropped (older), D is dropped (duplicate). + f(streamA, []logRow{row(ts, "A"), row(ts+2, "D")}) + + // Per-stream state: streamB emits A even though streamA already deduped it. + f(streamB, []logRow{row(ts, "A")}, "A") + + // streamA is unaffected by streamB and still drops its boundary row D. + f(streamA, []logRow{row(ts+2, "D")}) +} diff --git a/docs/victorialogs/CHANGELOG.md b/docs/victorialogs/CHANGELOG.md index df83371bcf..6641daf3ac 100644 --- a/docs/victorialogs/CHANGELOG.md +++ b/docs/victorialogs/CHANGELOG.md @@ -46,6 +46,7 @@ according to the following docs: * BUGFIX: [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api): return `502 Bad Gateway` from [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats) and [`/select/logsql/stats_query_range`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats) when one of `vlstorage` nodes is unavailable in cluster mode. Previously these endpoints could incorrectly return `422 Unprocessable Entity`, which could break retry and failover handling in high-availability setups. See [#1419](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1419). * BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): allow `unpack_json` to parse JSON objects starting with spaces. See [#1416](https://github.com/VictoriaMetrics/VictoriaLogs/pull/1416). * BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): preserve the selected tenant while switching between vmui tabs. See [#1447](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1447). +* BUGFIX: [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing): fix silent log loss when multiple log entries share the same `_time`. See [#1459](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1459). ## [v1.50.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.50.0)