Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"github.com/valyala/fastjson"
"github.com/valyala/quicktemplate"

Expand Down Expand Up @@ -777,6 +778,11 @@ type tailProcessor struct {

perStreamRows map[string][]logRow
lastTimestamps map[string]int64
// lastRowHashes contains xxhash fingerprints of rows already emitted at lastTimestamps[streamID].
// Used to dedup rows that share the boundary timestamp but differ in content.
lastRowHashes map[string]map[uint64]struct{}
Comment thread
cuongleqq marked this conversation as resolved.
Outdated

keyBuf []byte

err error
}
Expand All @@ -789,6 +795,7 @@ func newTailProcessor(cancel func(), needSortFields bool) *tailProcessor {

perStreamRows: make(map[string][]logRow),
lastTimestamps: make(map[string]int64),
lastRowHashes: make(map[string]map[uint64]struct{}),
}
}

Expand Down Expand Up @@ -838,6 +845,16 @@ func (tp *tailProcessor) writeBlock(_ uint, db *logstorage.DataBlock) {
}
}

// hashLogRow returns an xxhash fingerprint of row's fields.
func (tp *tailProcessor) hashLogRow(row logRow) uint64 {
Comment thread
cuongleqq marked this conversation as resolved.
Outdated
tp.keyBuf = tp.keyBuf[:0]
Comment thread
cuongleqq marked this conversation as resolved.
Outdated
for _, f := range row.fields {
tp.keyBuf = encoding.MarshalBytes(tp.keyBuf, bytesutil.ToUnsafeBytes(f.Name))
tp.keyBuf = encoding.MarshalBytes(tp.keyBuf, bytesutil.ToUnsafeBytes(f.Value))
}
return xxhash.Sum64(tp.keyBuf)
}

func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) {
if tp.err != nil {
return nil, tp.err
Expand All @@ -848,16 +865,47 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) {
sortLogRows(rows)

lastTimestamp, ok := tp.lastTimestamps[streamID]
lastHashes := tp.lastRowHashes[streamID]

if ok {
// Skip already written rows
for len(rows) > 0 && rows[0].timestamp <= lastTimestamp {
// Drop rows already emitted before the boundary timestamp.
for len(rows) > 0 && rows[0].timestamp < lastTimestamp {
rows = rows[1:]
}
// Drop rows at the boundary whose exact content was already emitted.
filtered := rows[:0]
for _, row := range rows {
if row.timestamp == lastTimestamp {
if _, seen := lastHashes[tp.hashLogRow(row)]; seen {
Comment thread
cuongleqq marked this conversation as resolved.
Outdated
continue
}
}
filtered = append(filtered, row)
}
rows = filtered
}
if len(rows) > 0 {
resultRows = append(resultRows, rows...)
tp.lastTimestamps[streamID] = rows[len(rows)-1].timestamp

if len(rows) == 0 {
continue
}

resultRows = append(resultRows, rows...)

newLastTS := rows[len(rows)-1].timestamp
tp.lastTimestamps[streamID] = newLastTS

// Reuse the existing hash set when the boundary timestamp does not advance,
// so rows already emitted at this timestamp remain excluded. Otherwise reset.
hashes := lastHashes
if !ok || lastTimestamp != newLastTS {
Comment thread
cuongleqq marked this conversation as resolved.
Outdated
hashes = make(map[uint64]struct{})
}
for _, row := range rows {
if row.timestamp == newLastTS {
hashes[tp.hashLogRow(row)] = struct{}{}
}
}
tp.lastRowHashes[streamID] = hashes
}
clear(tp.perStreamRows)

Expand Down
55 changes: 55 additions & 0 deletions app/vlselect/logsql/logsql_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package logsql

import (
"slices"
"testing"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

func TestParseExtraFilters_Success(t *testing.T) {
Expand Down Expand Up @@ -101,3 +104,55 @@ func TestParseExtraStreamFilters_Failure(t *testing.T) {
// excess pipe
f(`foo | count()`)
}

func TestTailProcessorGetTailRows(t *testing.T) {
tp := newTailProcessor(func() {}, false)

const streamID = "test-stream"
const ts = int64(1e9)

row := func(timestamp int64, msg string) logRow {
return logRow{
timestamp: timestamp,
fields: []logstorage.Field{{Name: "_msg", Value: msg}},
}
}

f := func(input []logRow, wantMsgs ...string) {
t.Helper()
tp.perStreamRows[streamID] = input
got, err := tp.getTailRows()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
var gotMsgs []string
for _, r := range got {
for _, field := range r {
if field.Name == "_msg" {
gotMsgs = append(gotMsgs, field.Value)
}
}
}
if !slices.Equal(gotMsgs, wantMsgs) {
t.Fatalf("got msgs %v; want %v", gotMsgs, wantMsgs)
}
}

// First time: row A is emitted.
f([]logRow{row(ts, "A")}, "A")

// Same timestamp, new content: A is deduped, B is emitted.
f([]logRow{row(ts, "A"), row(ts, "B")}, "B")

// Both seen now: nothing emitted.
f([]logRow{row(ts, "A"), row(ts, "B")})

// Empty input: nothing emitted.
f(nil)

// Multiple new timestamps, unsorted input: emitted in chronological order.
f([]logRow{row(ts+2, "D"), row(ts+1, "C")}, "C", "D")

// Boundary is now ts+2 with D seen: A is dropped (older), D is dropped (duplicate).
f([]logRow{row(ts, "A"), row(ts+2, "D")})
}
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ according to the following docs:
* BUGFIX: [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api): return `502 Bad Gateway` from [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats) and [`/select/logsql/stats_query_range`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats) when one of `vlstorage` nodes is unavailable in cluster mode. Previously these endpoints could incorrectly return `422 Unprocessable Entity`, which could break retry and failover handling in high-availability setups. See [#1419](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1419).
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): allow `unpack_json` to parse JSON objects starting with spaces. See [#1416](https://github.com/VictoriaMetrics/VictoriaLogs/pull/1416).
* BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): preserve the selected tenant while switching between vmui tabs. See [#1447](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1447).
* BUGFIX: [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing): fix silent log loss when multiple log entries share the same `_time`. See [#1459](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1459).

## [v1.50.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.50.0)

Expand Down