Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"github.com/valyala/fastjson"
"github.com/valyala/quicktemplate"

Expand Down Expand Up @@ -777,6 +778,9 @@ type tailProcessor struct {

perStreamRows map[string][]logRow
lastTimestamps map[string]int64
// lastRowHashes contains xxhash fingerprints of rows already emitted at lastTimestamps[streamID].
// Used to dedup rows that share the boundary timestamp but differ in content.
lastRowHashes map[string][]uint64

err error
}
Expand All @@ -789,6 +793,7 @@ func newTailProcessor(cancel func(), needSortFields bool) *tailProcessor {

perStreamRows: make(map[string][]logRow),
lastTimestamps: make(map[string]int64),
lastRowHashes: make(map[string][]uint64),
}
}

Expand Down Expand Up @@ -838,26 +843,74 @@ func (tp *tailProcessor) writeBlock(_ uint, db *logstorage.DataBlock) {
}
}

// hashLogRow returns an xxhash fingerprint of row's fields.
func hashLogRow(keyBuf []byte, row *logRow) (uint64, []byte) {
keyBuf = keyBuf[:0]
for _, f := range row.fields {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(f.Name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(f.Value))
}
return xxhash.Sum64(keyBuf), keyBuf
}

func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) {
if tp.err != nil {
return nil, tp.err
}

var resultRows []logRow
var keyBuf []byte
for streamID, rows := range tp.perStreamRows {
sortLogRows(rows)

lastTimestamp, ok := tp.lastTimestamps[streamID]
lastHashes := tp.lastRowHashes[streamID]

if ok {
// Skip already written rows
for len(rows) > 0 && rows[0].timestamp <= lastTimestamp {
rows = rows[1:]
filtered := rows[:0]
for i := range rows {
row := &rows[i]
// Ignore everything before the boundary timestamp
if row.timestamp < lastTimestamp {
continue
}
// Ignore rows with the same boundary timestamp, same content
if row.timestamp == lastTimestamp {
var h uint64
h, keyBuf = hashLogRow(keyBuf, row)
if slices.Contains(lastHashes, h) {
continue
}
}
filtered = append(filtered, *row)
}
rows = filtered
}
if len(rows) > 0 {
resultRows = append(resultRows, rows...)
tp.lastTimestamps[streamID] = rows[len(rows)-1].timestamp

if len(rows) == 0 {
continue
}

resultRows = append(resultRows, rows...)

newLastTS := rows[len(rows)-1].timestamp
tp.lastTimestamps[streamID] = newLastTS

// Reuse the existing hashes when the boundary timestamp does not advance,
// so rows already emitted at this timestamp remain excluded. Otherwise start a new list.
hashes := lastHashes
if lastTimestamp != newLastTS {
hashes = nil
}
for i := range rows {
row := &rows[i]
if row.timestamp == newLastTS {
var h uint64
h, keyBuf = hashLogRow(keyBuf, row)
hashes = append(hashes, h)
}
}
tp.lastRowHashes[streamID] = hashes
}
clear(tp.perStreamRows)

Expand Down
62 changes: 62 additions & 0 deletions app/vlselect/logsql/logsql_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package logsql

import (
"slices"
"testing"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

func TestParseExtraFilters_Success(t *testing.T) {
Expand Down Expand Up @@ -101,3 +104,62 @@ func TestParseExtraStreamFilters_Failure(t *testing.T) {
// excess pipe
f(`foo | count()`)
}

func TestTailProcessorGetTailRows(t *testing.T) {
tp := newTailProcessor(func() {}, false)

const streamA = "test-stream-a"
const streamB = "test-stream-b"
const ts = int64(1e9)

row := func(timestamp int64, msg string) logRow {
return logRow{
timestamp: timestamp,
fields: []logstorage.Field{{Name: "_msg", Value: msg}},
}
}

f := func(streamID string, input []logRow, wantMsgs ...string) {
t.Helper()
tp.perStreamRows[streamID] = input
got, err := tp.getTailRows()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
var gotMsgs []string
for _, r := range got {
for _, field := range r {
if field.Name == "_msg" {
gotMsgs = append(gotMsgs, field.Value)
}
}
}
if !slices.Equal(gotMsgs, wantMsgs) {
t.Fatalf("got msgs %v; want %v", gotMsgs, wantMsgs)
}
}

// First time: row A is emitted.
f(streamA, []logRow{row(ts, "A")}, "A")

// Same timestamp, new content: A is deduped, B is emitted.
f(streamA, []logRow{row(ts, "A"), row(ts, "B")}, "B")

// Both seen now: nothing emitted.
f(streamA, []logRow{row(ts, "A"), row(ts, "B")})

// Empty input: nothing emitted.
f(streamA, nil)

// Multiple new timestamps, unsorted input: emitted in chronological order.
f(streamA, []logRow{row(ts+2, "D"), row(ts+1, "C")}, "C", "D")

// Boundary is now ts+2 with D seen: A is dropped (older), D is dropped (duplicate).
f(streamA, []logRow{row(ts, "A"), row(ts+2, "D")})

// Per-stream state: streamB emits A even though streamA already deduped it.
f(streamB, []logRow{row(ts, "A")}, "A")

// streamA is unaffected by streamB and still drops its boundary row D.
f(streamA, []logRow{row(ts+2, "D")})
}
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ according to the following docs:
* BUGFIX: [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api): return `502 Bad Gateway` from [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats) and [`/select/logsql/stats_query_range`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats) when one of `vlstorage` nodes is unavailable in cluster mode. Previously these endpoints could incorrectly return `422 Unprocessable Entity`, which could break retry and failover handling in high-availability setups. See [#1419](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1419).
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): allow `unpack_json` to parse JSON objects starting with spaces. See [#1416](https://github.com/VictoriaMetrics/VictoriaLogs/pull/1416).
* BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): preserve the selected tenant while switching between vmui tabs. See [#1447](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1447).
* BUGFIX: [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing): fix silent log loss when multiple log entries share the same `_time`. See [#1459](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1459).

## [v1.50.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.50.0)

Expand Down