From 267fcd9be2791d595ddd9f33652a99f41ada0186 Mon Sep 17 00:00:00 2001 From: func25 Date: Tue, 2 Jun 2026 09:07:01 +0700 Subject: [PATCH] test --- app/vlstorage/lastnoptimization.go | 46 +++++++++++++---------- app/vlstorage/main.go | 12 ++++-- lib/logstorage/parser.go | 54 ++++++++++----------------- lib/logstorage/parser_test.go | 60 +++++++++++++++++------------- 4 files changed, 88 insertions(+), 84 deletions(-) diff --git a/app/vlstorage/lastnoptimization.go b/app/vlstorage/lastnoptimization.go index dcfd7e11d9..ff15f39bbc 100644 --- a/app/vlstorage/lastnoptimization.go +++ b/app/vlstorage/lastnoptimization.go @@ -7,36 +7,44 @@ import ( "strings" "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" - "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" ) -func runOptimizedLastNResultsQuery(qctx *logstorage.QueryContext, offset, limit uint64, writeBlock logstorage.WriteDataBlockFunc) error { - rows, err := getLastNQueryResults(qctx, offset+limit) +func runOptimizedLastNResultsQuery(qctxSearch, qctxFinal *logstorage.QueryContext, offset, limit uint64, writeBlock logstorage.WriteDataBlockFunc) error { + rowsNeeded := offset + limit + rows, err := getLastNQueryResults(qctxSearch, rowsNeeded+1) if err != nil { return err } if offset >= uint64(len(rows)) { return nil } - rows = rows[offset:] - - var db logstorage.DataBlock - var columns []logstorage.BlockColumn - var values []string - for _, r := range rows { - columns = slicesutil.SetLength(columns, len(r.fields)) - values = slicesutil.SetLength(values, len(r.fields)) - for j, f := range r.fields { - values[j] = f.Value - columns[j].Name = f.Name - columns[j].Values = values[j : j+1] + if rowsNeeded > uint64(len(rows)) { + rowsNeeded = uint64(len(rows)) + } + if hasDuplicateTimestampsAtLastNRange(rows, offset, rowsNeeded) { + return runQueryNoLastNOptimization(qctxFinal, writeBlock) + } + + start := rows[rowsNeeded-1].timestamp + _, end := qctxFinal.Query.GetFilterTimeRange() + qFinal := qctxFinal.Query.CloneWithTimeFilter(qctxFinal.Query.GetTimestamp(), start, end) + return runQueryNoLastNOptimization(qctxFinal.WithQuery(qFinal), writeBlock) +} + +func hasDuplicateTimestampsAtLastNRange(rows []logRow, offset, rowsNeeded uint64) bool { + if offset > 0 && rows[offset-1].timestamp == rows[offset].timestamp { + return true + } + if uint64(len(rows)) > rowsNeeded && rows[rowsNeeded-1].timestamp == rows[rowsNeeded].timestamp { + return true + } + for i := offset + 1; i < rowsNeeded; i++ { + if rows[i-1].timestamp == rows[i].timestamp { + return true } - db.SetColumns(columns) - writeBlock(0, &db) } - return nil + return false } func getLastNQueryResults(qctx *logstorage.QueryContext, limit uint64) ([]logRow, error) { diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index bce1b14577..e0de765f51 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -553,12 +553,16 @@ func (*Storage) MustAddRows(lr *logstorage.LogRows) { // RunQuery runs the given qctx and calls writeBlock for the returned data blocks func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error { - qOpt, offset, limit := qctx.Query.GetLastNResultsQuery() - if qOpt != nil { - qctxOpt := qctx.WithQuery(qOpt) - return runOptimizedLastNResultsQuery(qctxOpt, offset, limit, writeBlock) + qSearch, offset, limit := qctx.Query.GetLastNResultsQuery() + if qSearch != nil { + qctxSearch := qctx.WithQuery(qSearch) + return runOptimizedLastNResultsQuery(qctxSearch, qctx, offset, limit, writeBlock) } + return runQueryNoLastNOptimization(qctx, writeBlock) +} + +func runQueryNoLastNOptimization(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error { if localStorage != nil { return localStorage.RunQuery(qctx, writeBlock) } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 02ac9680b4..24372d0875 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -662,10 +662,10 @@ func (q *Query) CloneWithTimeFilter(timestamp, start, end int64) *Query { return qCopy } -// GetLastNResultsQuery() returns a query for optimized querying of the last results with the biggest _time values with an optional . +// GetLastNResultsQuery returns a query for finding the minimal _time needed for optimized querying of the last results with the biggest _time values with an optional . // -// The returned query is nil if q cannot be used for optimized querying of the last N results. -func (q *Query) GetLastNResultsQuery() (qOpt *Query, offset uint64, limit uint64) { +// The returned qSearch is nil if q cannot be used for optimized querying of the last N results. +func (q *Query) GetLastNResultsQuery() (qSearch *Query, offset uint64, limit uint64) { start, end := q.GetFilterTimeRange() if !CanApplyLastNResultsOptimization(start, end) { // It is faster to execute the query as is on such a small time range. @@ -673,48 +673,32 @@ func (q *Query) GetLastNResultsQuery() (qOpt *Query, offset uint64, limit uint64 } pipes := q.pipes - - // Remember the trailing 'fields' and 'delete' pipes - they are moved in front of `sort` pipe below. - tailPipes := func() []pipe { - for i := len(pipes) - 1; i >= 0; i-- { - switch pipes[i].(type) { - case *pipeFields, *pipeDelete: - // Skip 'fields' and 'delete' pipes. - default: - return pipes[i+1:] - } + lastNPipeIdx := -1 + for i := len(pipes) - 1; i >= 0; i-- { + offsetLocal, limitLocal, ok := getOffsetLimitFromPipe(pipes[i]) + if !ok { + continue } - return pipes - }() - pipes = pipes[:len(pipes)-len(tailPipes)] - if len(pipes) == 0 { - return nil, 0, 0 + lastNPipeIdx = i + offset = offsetLocal + limit = limitLocal + break } - - // The query must end with one of the following pipes in order to be eligible for the optimization: - // - 'sort by (_time desc) offset limit ' - // - 'first by (_time desc)' - // - 'last by (_time)' - pLast := pipes[len(pipes)-1] - offset, limit, ok := getOffsetLimitFromPipe(pLast) - if !ok { + if lastNPipeIdx < 0 { return nil, 0, 0 } - // Remove the `| sort ...` pipe from the query, add tailPipes and verify - // whether it can reliably return last N results with the biggest _time values. - qCopy := q.Clone(q.GetTimestamp()) - if len(qCopy.pipes) != len(q.pipes) { + qSearch = q.Clone(q.GetTimestamp()) + if len(qSearch.pipes) != len(q.pipes) { return nil, 0, 0 } - qCopy.pipes = qCopy.pipes[:len(pipes)-1] - qCopy.pipes = append(qCopy.pipes, tailPipes...) - if !qCopy.CanReturnLastNResults() { + qSearch.pipes = qSearch.pipes[:lastNPipeIdx] + if !qSearch.CanReturnLastNResults() { return nil, 0, 0 } + qSearch.AddPipeFields([]string{"_time"}) - // The query is eligible for last N results optimization. - return qCopy, offset, limit + return qSearch, offset, limit } // CanApplyLastNResultsOptimization returns true if there is sense for applying 'last N' optimization for the query on the time range [start, end] diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 67fae3e4cf..44160ba93d 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -3653,20 +3653,20 @@ func TestQueryGetFilterTimeRange(t *testing.T) { } func TestQueryGetLastNResultsQuery_Success(t *testing.T) { - f := func(qStr, qOptExpected string, offsetExpected, limitExpected uint64) { + f := func(qStr, qSearchExpected string, offsetExpected, limitExpected uint64) { t.Helper() q, err := ParseQuery(qStr) if err != nil { t.Fatalf("cannot parse [%s]: %s", qStr, err) } - qOpt, offset, limit := q.GetLastNResultsQuery() - if qOpt == nil { - t.Fatalf("unexpected nil qOpt") + qSearch, offset, limit := q.GetLastNResultsQuery() + if qSearch == nil { + t.Fatalf("unexpected nil qSearch") } - qOptStr := qOpt.String() - if qOptStr != qOptExpected { - t.Fatalf("unexpected qOptStr; got %q; want %q", qOptStr, qOptExpected) + qSearchStr := qSearch.String() + if qSearchStr != qSearchExpected { + t.Fatalf("unexpected qSearchStr; got %q; want %q", qSearchStr, qSearchExpected) } if offset != offsetExpected { t.Fatalf("unexpected offset; got %d; want %d", offset, offsetExpected) @@ -3676,28 +3676,34 @@ func TestQueryGetLastNResultsQuery_Success(t *testing.T) { } } - f("* | sort (_time) desc limit 10", "*", 0, 10) - f("* | sort (_time) desc offset 5 limit 10", "*", 5, 10) - f("* | sort (_time) desc limit 5 offset 10", "*", 10, 5) - f("* | sort by (_time desc) limit 20", "*", 0, 20) - f("_time:5m error | format 'x' as y | sort by (_time desc) | limit 30", "_time:5m error | format x as y", 0, 30) - f("* | fields _time, x | sort (_time desc) limit 5", "* | fields _time, x", 0, 5) - f("* | delete x, y* | sort (_time desc) limit 5", "* | delete x, y*", 0, 5) + f("* | sort (_time) desc limit 10", "* | fields _time", 0, 10) + f("* | sort (_time) desc offset 5 limit 10", "* | fields _time", 5, 10) + f("* | sort (_time) desc limit 5 offset 10", "* | fields _time", 10, 5) + f("* | sort by (_time desc) limit 20", "* | fields _time", 0, 20) + f("_time:5m error | format 'x' as y | sort by (_time desc) | limit 30", "_time:5m error | format x as y | fields _time", 0, 30) + f("* | fields _time, x | sort (_time desc) limit 5", "* | fields _time, x | fields _time", 0, 5) + f("* | delete x, y* | sort (_time desc) limit 5", "* | delete x, y* | fields _time", 0, 5) // fields pipe after the sort pipe - f("* | sort (_time desc) limit 5 | fields _time, x", "* | fields _time, x", 0, 5) + f("* | sort (_time desc) limit 5 | fields _time, x", "* | fields _time", 0, 5) + f("* | sort (_time desc) limit 5 | fields x", "* | fields _time", 0, 5) // delete pipe after the sort pipe - f("* | sort (_time desc) limit 5 | delete x, y*", "* | delete x, y*", 0, 5) + f("* | sort (_time desc) limit 5 | delete x, y*", "* | fields _time", 0, 5) // multiple keep and rm pipes - f("* | sort (_time desc) limit 5 | keep _time, x | delete x", "* | fields _time, x | delete x", 0, 5) + f("* | sort (_time desc) limit 5 | keep _time, x | delete x", "* | fields _time", 0, 5) + + f("* | sort (_time desc) limit 5 | unpack_json from payload", "* | fields _time", 0, 5) + f("* | sort (_time desc) limit 5 | stats count() rows", "* | fields _time", 0, 5) // first pipe - f(`* | first 10 (_time desc)`, `*`, 0, 10) + f(`* | first 10 (_time desc)`, `* | fields _time`, 0, 10) + f(`* | first 10 (_time desc) | format "x" as y`, `* | fields _time`, 0, 10) // last pipe - f(`* | last 10 (_time)`, `*`, 0, 10) + f(`* | last 10 (_time)`, `* | fields _time`, 0, 10) + f(`* | last 10 (_time) | filter y:z`, `* | fields _time`, 0, 10) } func TestQueryGetLastNResultsQuery_Failure(t *testing.T) { @@ -3708,9 +3714,9 @@ func TestQueryGetLastNResultsQuery_Failure(t *testing.T) { if err != nil { t.Fatalf("cannot parse [%s]: %s", qStr, err) } - qOpt, offset, limit := q.GetLastNResultsQuery() - if qOpt != nil { - t.Fatalf("unexpected non-nil qOpt: [%s]", qOpt) + qSearch, offset, limit := q.GetLastNResultsQuery() + if qSearch != nil { + t.Fatalf("unexpected non-nil qSearch: [%s]", qSearch) } if offset != 0 { t.Fatalf("unexpected offset; got %d; want 0", offset) @@ -3720,7 +3726,7 @@ func TestQueryGetLastNResultsQuery_Failure(t *testing.T) { } } - // No 'sort' pipe at the end + // no last-N pipe f("*") f("foo") f("foo | count()") @@ -3731,9 +3737,11 @@ func TestQueryGetLastNResultsQuery_Failure(t *testing.T) { // sort by multiple fields f("* | sort (_time desc, x)") + f("* | sort (_time desc, x) limit 5") // sort by _time in ascending order f("* | sort (_time)") + f("* | sort (_time) limit 5") f("* | sort (_time desc) desc") // missing limit @@ -3752,9 +3760,9 @@ func TestQueryGetLastNResultsQuery_Failure(t *testing.T) { f("* | rm _time | sort (_time desc) limit 5") f("* | keep x, y | sort (_time desc) limit 5") - // missing _time field after the sort pipe - f("* | sort (_time desc) limit 5 | keep x") - f("* | sort (_time desc) limit 5 | rm _time, x") + // multiple last-N pipes cannot be optimized, since only one time boundary can be selected. + f("* | sort (x desc) limit 5 | sort (_time desc) limit 5") + f("* | sort (_time desc) limit 5 | sort (_time desc) limit 3") // first without descending sorting f("* | first 10 (_time)")