Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions app/vlstorage/lastnoptimization.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,44 @@ import (
"strings"
"sync"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

func runOptimizedLastNResultsQuery(qctx *logstorage.QueryContext, offset, limit uint64, writeBlock logstorage.WriteDataBlockFunc) error {
rows, err := getLastNQueryResults(qctx, offset+limit)
func runOptimizedLastNResultsQuery(qctxSearch, qctxFinal *logstorage.QueryContext, offset, limit uint64, writeBlock logstorage.WriteDataBlockFunc) error {
rowsNeeded := offset + limit
rows, err := getLastNQueryResults(qctxSearch, rowsNeeded+1)
if err != nil {
return err
}
if offset >= uint64(len(rows)) {
return nil
}
rows = rows[offset:]

var db logstorage.DataBlock
var columns []logstorage.BlockColumn
var values []string
for _, r := range rows {
columns = slicesutil.SetLength(columns, len(r.fields))
values = slicesutil.SetLength(values, len(r.fields))
for j, f := range r.fields {
values[j] = f.Value
columns[j].Name = f.Name
columns[j].Values = values[j : j+1]
if rowsNeeded > uint64(len(rows)) {
rowsNeeded = uint64(len(rows))
}
if hasDuplicateTimestampsAtLastNRange(rows, offset, rowsNeeded) {
return runQueryNoLastNOptimization(qctxFinal, writeBlock)
}

start := rows[rowsNeeded-1].timestamp
_, end := qctxFinal.Query.GetFilterTimeRange()
qFinal := qctxFinal.Query.CloneWithTimeFilter(qctxFinal.Query.GetTimestamp(), start, end)
return runQueryNoLastNOptimization(qctxFinal.WithQuery(qFinal), writeBlock)
}

func hasDuplicateTimestampsAtLastNRange(rows []logRow, offset, rowsNeeded uint64) bool {
if offset > 0 && rows[offset-1].timestamp == rows[offset].timestamp {
return true
}
if uint64(len(rows)) > rowsNeeded && rows[rowsNeeded-1].timestamp == rows[rowsNeeded].timestamp {
return true
}
for i := offset + 1; i < rowsNeeded; i++ {
if rows[i-1].timestamp == rows[i].timestamp {
return true
}
db.SetColumns(columns)
writeBlock(0, &db)
}
return nil
return false
}

func getLastNQueryResults(qctx *logstorage.QueryContext, limit uint64) ([]logRow, error) {
Expand Down
12 changes: 8 additions & 4 deletions app/vlstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,12 +553,16 @@ func (*Storage) MustAddRows(lr *logstorage.LogRows) {

// RunQuery runs the given qctx and calls writeBlock for the returned data blocks
func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
qOpt, offset, limit := qctx.Query.GetLastNResultsQuery()
if qOpt != nil {
qctxOpt := qctx.WithQuery(qOpt)
return runOptimizedLastNResultsQuery(qctxOpt, offset, limit, writeBlock)
qSearch, offset, limit := qctx.Query.GetLastNResultsQuery()
if qSearch != nil {
qctxSearch := qctx.WithQuery(qSearch)
return runOptimizedLastNResultsQuery(qctxSearch, qctx, offset, limit, writeBlock)
}

return runQueryNoLastNOptimization(qctx, writeBlock)
}

func runQueryNoLastNOptimization(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
if localStorage != nil {
return localStorage.RunQuery(qctx, writeBlock)
}
Expand Down
54 changes: 19 additions & 35 deletions lib/logstorage/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,59 +662,43 @@ func (q *Query) CloneWithTimeFilter(timestamp, start, end int64) *Query {
return qCopy
}

// GetLastNResultsQuery() returns a query for optimized querying of the last <limit> results with the biggest _time values with an optional <offset>.
// GetLastNResultsQuery returns a query for finding the minimal _time needed for optimized querying of the last <limit> results with the biggest _time values with an optional <offset>.
//
// The returned query is nil if q cannot be used for optimized querying of the last N results.
func (q *Query) GetLastNResultsQuery() (qOpt *Query, offset uint64, limit uint64) {
// The returned qSearch is nil if q cannot be used for optimized querying of the last N results.
func (q *Query) GetLastNResultsQuery() (qSearch *Query, offset uint64, limit uint64) {
start, end := q.GetFilterTimeRange()
if !CanApplyLastNResultsOptimization(start, end) {
// It is faster to execute the query as is on such a small time range.
return nil, 0, 0
}

pipes := q.pipes

// Remember the trailing 'fields' and 'delete' pipes - they are moved in front of `sort` pipe below.
tailPipes := func() []pipe {
for i := len(pipes) - 1; i >= 0; i-- {
switch pipes[i].(type) {
case *pipeFields, *pipeDelete:
// Skip 'fields' and 'delete' pipes.
default:
return pipes[i+1:]
}
lastNPipeIdx := -1
for i := len(pipes) - 1; i >= 0; i-- {
offsetLocal, limitLocal, ok := getOffsetLimitFromPipe(pipes[i])
if !ok {
continue
}
return pipes
}()
pipes = pipes[:len(pipes)-len(tailPipes)]
if len(pipes) == 0 {
return nil, 0, 0
lastNPipeIdx = i
offset = offsetLocal
limit = limitLocal
break
}

// The query must end with one of the following pipes in order to be eligible for the optimization:
// - 'sort by (_time desc) offset <offset> limit <limit>'
// - 'first <limit> by (_time desc)'
// - 'last <limit> by (_time)'
pLast := pipes[len(pipes)-1]
offset, limit, ok := getOffsetLimitFromPipe(pLast)
if !ok {
if lastNPipeIdx < 0 {
return nil, 0, 0
}

// Remove the `| sort ...` pipe from the query, add tailPipes and verify
// whether it can reliably return last N results with the biggest _time values.
qCopy := q.Clone(q.GetTimestamp())
if len(qCopy.pipes) != len(q.pipes) {
qSearch = q.Clone(q.GetTimestamp())
if len(qSearch.pipes) != len(q.pipes) {
return nil, 0, 0
}
qCopy.pipes = qCopy.pipes[:len(pipes)-1]
qCopy.pipes = append(qCopy.pipes, tailPipes...)
if !qCopy.CanReturnLastNResults() {
qSearch.pipes = qSearch.pipes[:lastNPipeIdx]
if !qSearch.CanReturnLastNResults() {
return nil, 0, 0
}
qSearch.AddPipeFields([]string{"_time"})

// The query is eligible for last N results optimization.
return qCopy, offset, limit
return qSearch, offset, limit
}

// CanApplyLastNResultsOptimization returns true if there is sense for applying 'last N' optimization for the query on the time range [start, end]
Expand Down
60 changes: 34 additions & 26 deletions lib/logstorage/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3653,20 +3653,20 @@ func TestQueryGetFilterTimeRange(t *testing.T) {
}

func TestQueryGetLastNResultsQuery_Success(t *testing.T) {
f := func(qStr, qOptExpected string, offsetExpected, limitExpected uint64) {
f := func(qStr, qSearchExpected string, offsetExpected, limitExpected uint64) {
t.Helper()

q, err := ParseQuery(qStr)
if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err)
}
qOpt, offset, limit := q.GetLastNResultsQuery()
if qOpt == nil {
t.Fatalf("unexpected nil qOpt")
qSearch, offset, limit := q.GetLastNResultsQuery()
if qSearch == nil {
t.Fatalf("unexpected nil qSearch")
}
qOptStr := qOpt.String()
if qOptStr != qOptExpected {
t.Fatalf("unexpected qOptStr; got %q; want %q", qOptStr, qOptExpected)
qSearchStr := qSearch.String()
if qSearchStr != qSearchExpected {
t.Fatalf("unexpected qSearchStr; got %q; want %q", qSearchStr, qSearchExpected)
}
if offset != offsetExpected {
t.Fatalf("unexpected offset; got %d; want %d", offset, offsetExpected)
Expand All @@ -3676,28 +3676,34 @@ func TestQueryGetLastNResultsQuery_Success(t *testing.T) {
}
}

f("* | sort (_time) desc limit 10", "*", 0, 10)
f("* | sort (_time) desc offset 5 limit 10", "*", 5, 10)
f("* | sort (_time) desc limit 5 offset 10", "*", 10, 5)
f("* | sort by (_time desc) limit 20", "*", 0, 20)
f("_time:5m error | format 'x' as y | sort by (_time desc) | limit 30", "_time:5m error | format x as y", 0, 30)
f("* | fields _time, x | sort (_time desc) limit 5", "* | fields _time, x", 0, 5)
f("* | delete x, y* | sort (_time desc) limit 5", "* | delete x, y*", 0, 5)
f("* | sort (_time) desc limit 10", "* | fields _time", 0, 10)
f("* | sort (_time) desc offset 5 limit 10", "* | fields _time", 5, 10)
f("* | sort (_time) desc limit 5 offset 10", "* | fields _time", 10, 5)
f("* | sort by (_time desc) limit 20", "* | fields _time", 0, 20)
f("_time:5m error | format 'x' as y | sort by (_time desc) | limit 30", "_time:5m error | format x as y | fields _time", 0, 30)
f("* | fields _time, x | sort (_time desc) limit 5", "* | fields _time, x | fields _time", 0, 5)
f("* | delete x, y* | sort (_time desc) limit 5", "* | delete x, y* | fields _time", 0, 5)

// fields pipe after the sort pipe
f("* | sort (_time desc) limit 5 | fields _time, x", "* | fields _time, x", 0, 5)
f("* | sort (_time desc) limit 5 | fields _time, x", "* | fields _time", 0, 5)
f("* | sort (_time desc) limit 5 | fields x", "* | fields _time", 0, 5)

// delete pipe after the sort pipe
f("* | sort (_time desc) limit 5 | delete x, y*", "* | delete x, y*", 0, 5)
f("* | sort (_time desc) limit 5 | delete x, y*", "* | fields _time", 0, 5)

// multiple keep and rm pipes
f("* | sort (_time desc) limit 5 | keep _time, x | delete x", "* | fields _time, x | delete x", 0, 5)
f("* | sort (_time desc) limit 5 | keep _time, x | delete x", "* | fields _time", 0, 5)

f("* | sort (_time desc) limit 5 | unpack_json from payload", "* | fields _time", 0, 5)
f("* | sort (_time desc) limit 5 | stats count() rows", "* | fields _time", 0, 5)

// first pipe
f(`* | first 10 (_time desc)`, `*`, 0, 10)
f(`* | first 10 (_time desc)`, `* | fields _time`, 0, 10)
f(`* | first 10 (_time desc) | format "x" as y`, `* | fields _time`, 0, 10)

// last pipe
f(`* | last 10 (_time)`, `*`, 0, 10)
f(`* | last 10 (_time)`, `* | fields _time`, 0, 10)
f(`* | last 10 (_time) | filter y:z`, `* | fields _time`, 0, 10)
}

func TestQueryGetLastNResultsQuery_Failure(t *testing.T) {
Expand All @@ -3708,9 +3714,9 @@ func TestQueryGetLastNResultsQuery_Failure(t *testing.T) {
if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err)
}
qOpt, offset, limit := q.GetLastNResultsQuery()
if qOpt != nil {
t.Fatalf("unexpected non-nil qOpt: [%s]", qOpt)
qSearch, offset, limit := q.GetLastNResultsQuery()
if qSearch != nil {
t.Fatalf("unexpected non-nil qSearch: [%s]", qSearch)
}
if offset != 0 {
t.Fatalf("unexpected offset; got %d; want 0", offset)
Expand All @@ -3720,7 +3726,7 @@ func TestQueryGetLastNResultsQuery_Failure(t *testing.T) {
}
}

// No 'sort' pipe at the end
// no last-N pipe
f("*")
f("foo")
f("foo | count()")
Expand All @@ -3731,9 +3737,11 @@ func TestQueryGetLastNResultsQuery_Failure(t *testing.T) {

// sort by multiple fields
f("* | sort (_time desc, x)")
f("* | sort (_time desc, x) limit 5")

// sort by _time in ascending order
f("* | sort (_time)")
f("* | sort (_time) limit 5")
f("* | sort (_time desc) desc")

// missing limit
Expand All @@ -3752,9 +3760,9 @@ func TestQueryGetLastNResultsQuery_Failure(t *testing.T) {
f("* | rm _time | sort (_time desc) limit 5")
f("* | keep x, y | sort (_time desc) limit 5")

// missing _time field after the sort pipe
f("* | sort (_time desc) limit 5 | keep x")
f("* | sort (_time desc) limit 5 | rm _time, x")
// multiple last-N pipes cannot be optimized, since only one time boundary can be selected.
f("* | sort (x desc) limit 5 | sort (_time desc) limit 5")
f("* | sort (_time desc) limit 5 | sort (_time desc) limit 3")

// first without descending sorting
f("* | first 10 (_time)")
Expand Down