From 869da753a72472480f94eff5e0a06437ecbf9a64 Mon Sep 17 00:00:00 2001 From: Cuong Le Date: Fri, 26 Jun 2026 23:32:01 +0700 Subject: [PATCH 1/5] lib/logstorage: limit the total memory used by concurrently executed queries --- lib/logstorage/memory_limiter.go | 47 ++++++++++ lib/logstorage/memory_limiter_test.go | 56 ++++++++++++ lib/logstorage/pipe_facets.go | 28 +++--- lib/logstorage/pipe_running_stats.go | 29 +++---- lib/logstorage/pipe_sort.go | 28 +++--- lib/logstorage/pipe_sort_topk.go | 28 +++--- lib/logstorage/pipe_stats.go | 29 +++---- lib/logstorage/pipe_stream_context.go | 118 ++++++++++++++++---------- lib/logstorage/pipe_top.go | 28 +++--- lib/logstorage/pipe_uniq.go | 28 +++--- 10 files changed, 268 insertions(+), 151 deletions(-) create mode 100644 lib/logstorage/memory_limiter.go create mode 100644 lib/logstorage/memory_limiter_test.go diff --git a/lib/logstorage/memory_limiter.go b/lib/logstorage/memory_limiter.go new file mode 100644 index 0000000000..b6a76d6857 --- /dev/null +++ b/lib/logstorage/memory_limiter.go @@ -0,0 +1,47 @@ +package logstorage + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" +) + +type memoryLimiter struct { + MaxSize uint64 + + mu sync.Mutex + usage uint64 +} + +func (ml *memoryLimiter) Get(n uint64) bool { + ml.mu.Lock() + ok := n <= ml.MaxSize && ml.MaxSize-n >= ml.usage + if ok { + ml.usage += n + } + ml.mu.Unlock() + return ok +} + +func (ml *memoryLimiter) Put(n uint64) { + ml.mu.Lock() + if n > ml.usage { + logger.Panicf("BUG: n=%d cannot exceed %d", n, ml.usage) + } + ml.usage -= n + ml.mu.Unlock() +} + +var ( + queryMemoryLimiter memoryLimiter + queryMemoryLimiterOnce sync.Once +) + +func getQueryMemoryLimiter() *memoryLimiter { + queryMemoryLimiterOnce.Do(func() { + // Allocate 90% of allowed memory for query execution. + queryMemoryLimiter.MaxSize = uint64(float64(memory.Allowed()) * 0.9) + }) + return &queryMemoryLimiter +} diff --git a/lib/logstorage/memory_limiter_test.go b/lib/logstorage/memory_limiter_test.go new file mode 100644 index 0000000000..20c74be397 --- /dev/null +++ b/lib/logstorage/memory_limiter_test.go @@ -0,0 +1,56 @@ +package logstorage + +import ( + "testing" +) + +func TestMemoryLimiter(t *testing.T) { + var ml memoryLimiter + ml.MaxSize = 100 + + // Allocate memory + if !ml.Get(10) { + t.Fatalf("cannot get 10 out of %d bytes", ml.MaxSize) + } + if ml.usage != 10 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 10) + } + if !ml.Get(20) { + t.Fatalf("cannot get 20 out of 90 bytes") + } + if ml.usage != 30 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30) + } + if ml.Get(1000) { + t.Fatalf("unexpected get for 1000 bytes") + } + if ml.usage != 30 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30) + } + if ml.Get(71) { + t.Fatalf("unexpected get for 71 bytes") + } + if ml.usage != 30 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30) + } + if !ml.Get(70) { + t.Fatalf("cannot get 70 bytes") + } + if ml.usage != 100 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 100) + } + + // Return memory back + ml.Put(10) + ml.Put(70) + if ml.usage != 20 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 20) + } + if !ml.Get(30) { + t.Fatalf("cannot get 30 bytes") + } + ml.Put(50) + if ml.usage != 0 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 0) + } +} diff --git a/lib/logstorage/pipe_facets.go b/lib/logstorage/pipe_facets.go index bd51193c59..23cb2bec2b 100644 --- a/lib/logstorage/pipe_facets.go +++ b/lib/logstorage/pipe_facets.go @@ -9,7 +9,6 @@ import ( "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" ) @@ -104,21 +103,16 @@ func (pf *pipeFacets) visitSubqueries(_ func(q *Query)) { } func (pf *pipeFacets) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.2) - pfp := &pipeFacetsProcessor{ pf: pf, concurrency: concurrency, stopCh: stopCh, cancel: cancel, ppNext: ppNext, - - maxStateSize: maxStateSize, } pfp.shards.Init = func(shard *pipeFacetsProcessorShard) { shard.pfp = pfp } - pfp.stateSizeBudget.Store(maxStateSize) return pfp } @@ -132,8 +126,8 @@ type pipeFacetsProcessor struct { shards atomicutil.Slice[pipeFacetsProcessorShard] - maxStateSize int64 - stateSizeBudget atomic.Int64 + budgetUsed atomic.Int64 + budgetExceeded atomic.Bool } type pipeFacetsProcessorShard struct { @@ -334,16 +328,16 @@ func (pfp *pipeFacetsProcessor) writeBlock(workerID uint, br *blockResult) { shard := pfp.shards.Get(workerID) for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := pfp.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { + // Reserve more budget for the state size from the global query memory limiter. + if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { + // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. + if pfp.budgetExceeded.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. pfp.cancel() } return } + pfp.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -351,8 +345,12 @@ func (pfp *pipeFacetsProcessor) writeBlock(workerID uint, br *blockResult) { } func (pfp *pipeFacetsProcessor) flush() error { - if n := pfp.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pfp.pf.String(), pfp.maxStateSize/(1<<20)) + defer func() { + getQueryMemoryLimiter().Put(uint64(pfp.budgetUsed.Load())) + }() + + if pfp.budgetExceeded.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pfp.pf.String(), pfp.budgetUsed.Load()/(1<<20)) } // merge state across shards diff --git a/lib/logstorage/pipe_running_stats.go b/lib/logstorage/pipe_running_stats.go index 76c9a2009b..93558462f5 100644 --- a/lib/logstorage/pipe_running_stats.go +++ b/lib/logstorage/pipe_running_stats.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" @@ -131,19 +130,13 @@ func (ps *pipeRunningStats) visitSubqueries(visitFunc func(q *Query)) { } func (ps *pipeRunningStats) newPipeProcessor(_ int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.4) - psp := &pipeRunningStatsProcessor{ ps: ps, stopCh: stopCh, cancel: cancel, ppNext: ppNext, - - maxStateSize: maxStateSize, } - psp.stateSizeBudget.Store(maxStateSize) - return psp } @@ -155,8 +148,8 @@ type pipeRunningStatsProcessor struct { shards atomicutil.Slice[pipeRunningStatsProcessorShard] - maxStateSize int64 - stateSizeBudget atomic.Int64 + budgetUsed atomic.Int64 + budgetExceeded atomic.Bool } type pipeRunningStatsProcessorShard struct { @@ -203,16 +196,16 @@ func (psp *pipeRunningStatsProcessor) writeBlock(workerID uint, br *blockResult) shard := psp.shards.Get(workerID) for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { + // Reserve more budget for the state size from the global query memory limiter. + if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { + // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. + if psp.budgetExceeded.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. psp.cancel() } return } + psp.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -220,8 +213,12 @@ func (psp *pipeRunningStatsProcessor) writeBlock(workerID uint, br *blockResult) } func (psp *pipeRunningStatsProcessor) flush() error { - if n := psp.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) + defer func() { + getQueryMemoryLimiter().Put(uint64(psp.budgetUsed.Load())) + }() + + if psp.budgetExceeded.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.budgetUsed.Load()/(1<<20)) } getKeyForRow := func(row []Field) string { diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index df2b9002b8..34941e5c7f 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -13,7 +13,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" "github.com/valyala/quicktemplate" @@ -172,20 +171,15 @@ func (ps *pipeSort) addPartitionByTime(step int64) { } func newPipeSortProcessor(ps *pipeSort, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.2) - psp := &pipeSortProcessor{ ps: ps, stopCh: stopCh, cancel: cancel, ppNext: ppNext, - - maxStateSize: maxStateSize, } psp.shards.Init = func(shard *pipeSortProcessorShard) { shard.ps = ps } - psp.stateSizeBudget.Store(maxStateSize) return psp } @@ -198,8 +192,8 @@ type pipeSortProcessor struct { shards atomicutil.Slice[pipeSortProcessorShard] - maxStateSize int64 - stateSizeBudget atomic.Int64 + budgetUsed atomic.Int64 + budgetExceeded atomic.Bool } type pipeSortProcessorShard struct { @@ -457,16 +451,16 @@ func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) { shard := psp.shards.Get(workerID) for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { + // Reserve more budget for the state size from the global query memory limiter. + if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { + // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. + if psp.budgetExceeded.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. psp.cancel() } return } + psp.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -474,8 +468,12 @@ func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) { } func (psp *pipeSortProcessor) flush() error { - if n := psp.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) + defer func() { + getQueryMemoryLimiter().Put(uint64(psp.budgetUsed.Load())) + }() + + if psp.budgetExceeded.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.budgetUsed.Load()/(1<<20)) } if needStop(psp.stopCh) { diff --git a/lib/logstorage/pipe_sort_topk.go b/lib/logstorage/pipe_sort_topk.go index bc46185004..1973b15802 100644 --- a/lib/logstorage/pipe_sort_topk.go +++ b/lib/logstorage/pipe_sort_topk.go @@ -13,26 +13,20 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" ) func newPipeTopkProcessor(ps *pipeSort, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.2) - ptp := &pipeTopkProcessor{ ps: ps, stopCh: stopCh, cancel: cancel, ppNext: ppNext, - - maxStateSize: maxStateSize, } ptp.shards.Init = func(shard *pipeTopkProcessorShard) { shard.ps = ps } - ptp.stateSizeBudget.Store(maxStateSize) return ptp } @@ -45,8 +39,8 @@ type pipeTopkProcessor struct { shards atomicutil.Slice[pipeTopkProcessorShard] - maxStateSize int64 - stateSizeBudget atomic.Int64 + budgetUsed atomic.Int64 + budgetExceeded atomic.Bool } type pipeTopkProcessorShard struct { @@ -362,16 +356,16 @@ func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) { shard := ptp.shards.Get(workerID) for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { + // Reserve more budget for the state size from the global query memory limiter. + if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { + // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. + if ptp.budgetExceeded.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. ptp.cancel() } return } + ptp.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -379,8 +373,12 @@ func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) { } func (ptp *pipeTopkProcessor) flush() error { - if n := ptp.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.ps.String(), ptp.maxStateSize/(1<<20)) + defer func() { + getQueryMemoryLimiter().Put(uint64(ptp.budgetUsed.Load())) + }() + + if ptp.budgetExceeded.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", ptp.ps.String(), ptp.budgetUsed.Load()/(1<<20)) } if needStop(ptp.stopCh) { diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 717c2cf215..3d0f67eb36 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -13,7 +13,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" @@ -428,24 +427,18 @@ func (ps *pipeStats) initRateFuncsFromTimeBucket() bool { const stateSizeBudgetChunk = 1 << 20 func (ps *pipeStats) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.4) - psp := &pipeStatsProcessor{ ps: ps, concurrency: concurrency, stopCh: stopCh, cancel: cancel, ppNext: ppNext, - - maxStateSize: maxStateSize, } psp.shards.Init = func(shard *pipeStatsProcessorShard) { shard.psp = psp shard.init() } - psp.stateSizeBudget.Store(maxStateSize) - return psp } @@ -458,8 +451,8 @@ type pipeStatsProcessor struct { shards atomicutil.Slice[pipeStatsProcessorShard] - maxStateSize int64 - stateSizeBudget atomic.Int64 + budgetUsed atomic.Int64 + budgetExceeded atomic.Bool errLock sync.Mutex err error @@ -1104,16 +1097,16 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { shard := psp.shards.Get(workerID) for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { + // Reserve more budget for the state size from the global query memory limiter. + if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { + // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. + if psp.budgetExceeded.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. psp.cancel() } return } + psp.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -1125,12 +1118,16 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { } func (psp *pipeStatsProcessor) flush() error { + defer func() { + getQueryMemoryLimiter().Put(uint64(psp.budgetUsed.Load())) + }() + if psp.err != nil { return psp.err } - if n := psp.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) + if psp.budgetExceeded.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.budgetUsed.Load()/(1<<20)) } // Merge states across shards in parallel diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index cdae9f0033..f61b3bb2fe 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -14,7 +14,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/contextutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" ) @@ -100,20 +99,15 @@ func (pc *pipeStreamContext) visitSubqueries(_ func(q *Query)) { } func (pc *pipeStreamContext) newPipeProcessor(_ int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.2) - pcp := &pipeStreamContextProcessor{ pc: pc, stopCh: stopCh, cancel: cancel, ppNext: ppNext, - - maxStateSize: maxStateSize, } pcp.shards.Init = func(shard *pipeStreamContextProcessorShard) { shard.pc = pc } - pcp.stateSizeBudget.Store(maxStateSize) return pcp } @@ -126,8 +120,26 @@ type pipeStreamContextProcessor struct { shards atomicutil.Slice[pipeStreamContextProcessorShard] - maxStateSize int64 - stateSizeBudget atomic.Int64 + budgetUsed atomic.Int64 + budgetExceeded atomic.Bool +} + +func (pcp *pipeStreamContextProcessor) reserveMemory(n int) bool { + if getQueryMemoryLimiter().Get(uint64(n)) { + pcp.budgetUsed.Add(int64(n)) + return true + } + // The limiter is exhausted. Stop processing data in order to avoid OOM crash. + pcp.budgetExceeded.Store(true) + return false +} + +func (pcp *pipeStreamContextProcessor) memoryLimitError() error { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pcp.pc.String(), pcp.budgetUsed.Load()/(1<<20)) +} + +func (pcp *pipeStreamContextProcessor) memoryLimitForSurroundingLogsError(n int) error { + return fmt.Errorf("the query memory pool can't provide more than %dMB for fetching surrounding logs of %d matching logs", pcp.budgetUsed.Load()/(1<<20), n) } type timeRange struct { @@ -135,21 +147,24 @@ type timeRange struct { end int64 } -func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRows []streamContextRow, stateSizeBudget int) ([][]*streamContextRow, error) { +func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRows []streamContextRow) ([][]*streamContextRow, error) { neededTimestamps := make([]int64, len(neededRows)) - stateSizeBudget -= int(unsafe.Sizeof(neededTimestamps[0])) * len(neededTimestamps) + memUsed := int(unsafe.Sizeof(neededTimestamps[0])) * len(neededTimestamps) + if !pcp.reserveMemory(memUsed) { + return nil, pcp.memoryLimitError() + } + for i := range neededRows { neededTimestamps[i] = neededRows[i].timestamp } slices.Sort(neededTimestamps) - trs, stateSize, err := pcp.getTimeRangesForStreamRowss(streamID, neededTimestamps, stateSizeBudget) + trs, err := pcp.getTimeRangesForStreamRowss(streamID, neededTimestamps) if err != nil { return nil, fmt.Errorf("cannot obtain time ranges for the needed timestamps: %w", err) } - stateSizeBudget -= stateSize - rowss, err := pcp.getStreamRowssByTimeRanges(streamID, neededTimestamps, trs, stateSizeBudget) + rowss, err := pcp.getStreamRowssByTimeRanges(streamID, neededTimestamps, trs) if err != nil { return nil, fmt.Errorf("cannot obtain stream rows for the selected time ranges: %w", err) } @@ -164,21 +179,21 @@ func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRow return rowss, nil } -func (pcp *pipeStreamContextProcessor) getTimeRangesForStreamRowss(streamID string, neededTimestamps []int64, stateSizeBudget int) ([]timeRange, int, error) { +func (pcp *pipeStreamContextProcessor) getTimeRangesForStreamRowss(streamID string, neededTimestamps []int64) ([]timeRange, error) { // construct the query for selecting only timestamps across all the logs for the given streamID tr := pcp.getTimeRangeForNeededTimestamps(neededTimestamps) timeFilter := getTimeFilter(tr.start, tr.end) qStr := fmt.Sprintf("_stream_id:%s %s | fields _time", streamID, timeFilter) - rowss, stateSize, err := pcp.executeQuery(streamID, qStr, neededTimestamps, stateSizeBudget) + rowss, err := pcp.executeQuery(streamID, qStr, neededTimestamps) if err != nil { - return nil, 0, err + return nil, err } trs := make([]timeRange, len(rowss)) newStateSize := int(unsafe.Sizeof(trs[0])) * len(rowss) - if stateSize+newStateSize > stateSizeBudget { - return nil, 0, fmt.Errorf("more than %dMB of memory is needed for fetching the surrounding logs for %d matching logs", stateSizeBudget/(1<<20), len(neededTimestamps)) + if !pcp.reserveMemory(newStateSize) { + return nil, pcp.memoryLimitForSurroundingLogsError(len(neededTimestamps)) } for i, rows := range rowss { if len(rows) == 0 { @@ -203,7 +218,7 @@ func (pcp *pipeStreamContextProcessor) getTimeRangesForStreamRowss(streamID stri end: maxTimestamp, } } - return trs, newStateSize, nil + return trs, nil } func (pcp *pipeStreamContextProcessor) getTimeRangeForNeededTimestamps(neededTimestamps []int64) timeRange { @@ -226,7 +241,7 @@ func (pcp *pipeStreamContextProcessor) getTimeRangeForNeededTimestamps(neededTim return tr } -func (pcp *pipeStreamContextProcessor) getStreamRowssByTimeRanges(streamID string, neededTimestamps []int64, trs []timeRange, stateSizeBudget int) ([][]*streamContextRow, error) { +func (pcp *pipeStreamContextProcessor) getStreamRowssByTimeRanges(streamID string, neededTimestamps []int64, trs []timeRange) ([][]*streamContextRow, error) { // construct the query for selecting rows on the given tr for the given streamID qStr := "_stream_id:" + streamID minTimestamp := int64(math.MaxInt64) @@ -252,7 +267,7 @@ func (pcp *pipeStreamContextProcessor) getStreamRowssByTimeRanges(streamID strin } qStr += toFieldsFilters(pcp.pc.fieldsFilter) - rowss, _, err := pcp.executeQuery(streamID, qStr, neededTimestamps, stateSizeBudget) + rowss, err := pcp.executeQuery(streamID, qStr, neededTimestamps) if err != nil { return nil, err } @@ -265,7 +280,7 @@ func getTimeFilter(start, end int64) string { return fmt.Sprintf("_time:[%s, %s]", startStr, endStr) } -func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neededTimestamps []int64, stateSizeBudget int) ([][]*streamContextRow, int, error) { +func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neededTimestamps []int64) ([][]*streamContextRow, error) { q, err := ParseQuery(qStr) if err != nil { logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err) @@ -283,7 +298,7 @@ func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neede } } - stateSize := 0 + stateSizeBudget := 0 ctxWithCancel, cancel := contextutil.NewStopChanContext(pcp.stopCh) defer cancel() @@ -292,11 +307,12 @@ func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neede mu.Lock() defer mu.Unlock() - if stateSize > stateSizeBudget { + if pcp.budgetExceeded.Load() { cancel() return } + stateSize := 0 for i := range contextRows { if needStop(pcp.stopCh) { return @@ -319,6 +335,15 @@ func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neede stateSize += contextRows[i].update(br, j, timestamp) } } + + stateSizeBudget -= stateSize + for stateSizeBudget < 0 { + if !pcp.reserveMemory(stateSizeBudgetChunk) { + cancel() + return + } + stateSizeBudget += stateSizeBudgetChunk + } } tenantID, ok := getTenantIDFromStreamIDString(streamID) @@ -329,10 +354,10 @@ func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neede qctxOrig := pcp.pc.qctx qctx := NewQueryContext(ctxWithCancel, qctxOrig.QueryStats, []TenantID{tenantID}, q, qctxOrig.AllowPartialResponse, qctxOrig.HiddenFieldsFilters) if err := pcp.pc.runQuery(qctx, writeBlock); err != nil { - return nil, 0, err + return nil, err } - if stateSize > stateSizeBudget { - return nil, 0, fmt.Errorf("more than %dMB of memory is needed for fetching the surrounding logs for %d matching logs", stateSizeBudget/(1<<20), len(neededTimestamps)) + if pcp.budgetExceeded.Load() { + return nil, pcp.memoryLimitForSurroundingLogsError(len(neededTimestamps)) } rowss := make([][]*streamContextRow, len(contextRows)) @@ -342,7 +367,7 @@ func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neede rows = append(rows, ctx.rowsAfter...) rowss[i] = rows } - return rowss, stateSize, nil + return rowss, nil } func deduplicateStreamRowss(streamRowss [][]*streamContextRow) [][]*streamContextRow { @@ -616,16 +641,11 @@ func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult shard := pcp.shards.Get(workerID) for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := pcp.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { - // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. - pcp.cancel() - } + if !pcp.reserveMemory(stateSizeBudgetChunk) { + pcp.cancel() return } + pcp.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -633,14 +653,16 @@ func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult } func (pcp *pipeStreamContextProcessor) flush() error { - n := pcp.stateSizeBudget.Load() - if n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20)) - } - if n > math.MaxInt { - logger.Panicf("BUG: stateSizeBudget shouldn't exceed math.MaxInt=%v; got %d", math.MaxInt, n) + // Return all the memory still reserved from the query memory pool on exit. + // This covers the matching rows accumulated during writeBlock plus any per-stream + // memory not released yet when flush returns early on an error. + defer func() { + getQueryMemoryLimiter().Put(uint64(pcp.budgetUsed.Load())) + }() + + if pcp.budgetExceeded.Load() { + return pcp.memoryLimitError() } - stateSizeBudget := int(n) // merge state across shards shards := pcp.shards.All() @@ -678,13 +700,15 @@ func (pcp *pipeStreamContextProcessor) flush() error { // write output contexts in the ascending order of rows streamIDs := getStreamIDsSortedByMinRowTimestamp(m) for _, streamID := range streamIDs { + budgetBefore := pcp.budgetUsed.Load() + rows := m[streamID] if len(rows) > pipeStreamContextMaxRowsPerStream { return fmt.Errorf("too many logs from a single stream passed to 'stream_context': %d; the maximum supported number of logs, which can be passed to 'stream_context' is %d; "+ "narrow down the matching logs with additional filters according to https://docs.victoriametrics.com/victorialogs/logsql/#filters", len(rows), pipeStreamContextMaxRowsPerStream) } - streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget) + streamRowss, err := pcp.getStreamRowss(streamID, rows) if err != nil { return err } @@ -703,6 +727,12 @@ func (pcp *pipeStreamContextProcessor) flush() error { wctx.writeRow(fields) } } + + // Return the memory reserved for fetching the surrounding logs of this stream, + // since these logs are already written to the output and aren't needed anymore. + streamBudget := pcp.budgetUsed.Load() - budgetBefore + getQueryMemoryLimiter().Put(uint64(streamBudget)) + pcp.budgetUsed.Add(-streamBudget) } wctx.flush() diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index a7de02b838..71b565db64 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -13,7 +13,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" ) @@ -107,21 +106,16 @@ func (pt *pipeTop) visitSubqueries(_ func(q *Query)) { } func (pt *pipeTop) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.4) - ptp := &pipeTopProcessor{ pt: pt, stopCh: stopCh, cancel: cancel, ppNext: ppNext, - - maxStateSize: maxStateSize, } ptp.shards.Init = func(shard *pipeTopProcessorShard) { shard.pt = pt shard.m.init(uint(concurrency), "", &shard.stateSizeBudget) } - ptp.stateSizeBudget.Store(maxStateSize) return ptp } @@ -134,8 +128,8 @@ type pipeTopProcessor struct { shards atomicutil.Slice[pipeTopProcessorShard] - maxStateSize int64 - stateSizeBudget atomic.Int64 + budgetUsed atomic.Int64 + budgetExceeded atomic.Bool } type pipeTopProcessorShard struct { @@ -279,16 +273,16 @@ func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) { shard := ptp.shards.Get(workerID) for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { + // Reserve more budget for the state size from the global query memory limiter. + if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { + // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. + if ptp.budgetExceeded.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. ptp.cancel() } return } + ptp.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -296,8 +290,12 @@ func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) { } func (ptp *pipeTopProcessor) flush() error { - if n := ptp.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20)) + defer func() { + getQueryMemoryLimiter().Put(uint64(ptp.budgetUsed.Load())) + }() + + if ptp.budgetExceeded.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", ptp.pt.String(), ptp.budgetUsed.Load()/(1<<20)) } // merge state across shards in parallel diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index bf6e170c80..9c56166137 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" ) @@ -87,21 +86,16 @@ func (pu *pipeUniq) visitSubqueries(_ func(q *Query)) { } func (pu *pipeUniq) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.4) - pup := &pipeUniqProcessor{ pu: pu, stopCh: stopCh, cancel: cancel, ppNext: ppNext, - - maxStateSize: maxStateSize, } pup.shards.Init = func(shard *pipeUniqProcessorShard) { shard.pu = pu shard.m.init(uint(concurrency), pu.filter, &shard.stateSizeBudget) } - pup.stateSizeBudget.Store(maxStateSize) return pup } @@ -114,8 +108,8 @@ type pipeUniqProcessor struct { shards atomicutil.Slice[pipeUniqProcessorShard] - maxStateSize int64 - stateSizeBudget atomic.Int64 + budgetUsed atomic.Int64 + budgetExceeded atomic.Bool } type pipeUniqProcessorShard struct { @@ -245,16 +239,16 @@ func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) { shard := pup.shards.Get(workerID) for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := pup.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { + // Reserve more budget for the state size from the global query memory limiter. + if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { + // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. + if pup.budgetExceeded.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. pup.cancel() } return } + pup.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -264,8 +258,12 @@ func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) { } func (pup *pipeUniqProcessor) flush() error { - if n := pup.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20)) + defer func() { + getQueryMemoryLimiter().Put(uint64(pup.budgetUsed.Load())) + }() + + if pup.budgetExceeded.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pup.pu.String(), pup.budgetUsed.Load()/(1<<20)) } // merge state across shards in parallel From 5956604600c007fa5888e6081bca809b32cca4a1 Mon Sep 17 00:00:00 2001 From: Cuong Le Date: Wed, 1 Jul 2026 11:19:22 +0700 Subject: [PATCH 2/5] lib/logstorage: limit memory of join and union subqueries via the global query memory limiter --- app/vlstorage/netselect/netselect.go | 1 + lib/logstorage/memory_limiter.go | 19 +++++- lib/logstorage/net_query_runner.go | 34 ++++++++-- lib/logstorage/pipe_facets.go | 14 ++-- lib/logstorage/pipe_join.go | 9 +-- lib/logstorage/pipe_running_stats.go | 14 ++-- lib/logstorage/pipe_sort.go | 14 ++-- lib/logstorage/pipe_sort_topk.go | 14 ++-- lib/logstorage/pipe_stats.go | 14 ++-- lib/logstorage/pipe_stream_context.go | 27 ++++---- lib/logstorage/pipe_top.go | 14 ++-- lib/logstorage/pipe_union.go | 9 +-- lib/logstorage/pipe_uniq.go | 14 ++-- lib/logstorage/storage.go | 3 +- lib/logstorage/storage_search.go | 98 +++++++++++++++++---------- 15 files changed, 183 insertions(+), 115 deletions(-) diff --git a/app/vlstorage/netselect/netselect.go b/app/vlstorage/netselect/netselect.go index 55ae0551e8..5ddb8f020b 100644 --- a/app/vlstorage/netselect/netselect.go +++ b/app/vlstorage/netselect/netselect.go @@ -434,6 +434,7 @@ func (s *Storage) RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage. if err != nil { return err } + defer nqr.MustReleaseMemory() search := func(stopCh <-chan struct{}, q *logstorage.Query, writeBlock logstorage.WriteDataBlockFunc) error { qctxLocal := qctx.WithQuery(q) diff --git a/lib/logstorage/memory_limiter.go b/lib/logstorage/memory_limiter.go index b6a76d6857..07645c47d3 100644 --- a/lib/logstorage/memory_limiter.go +++ b/lib/logstorage/memory_limiter.go @@ -26,11 +26,12 @@ func (ml *memoryLimiter) Get(n uint64) bool { func (ml *memoryLimiter) Put(n uint64) { ml.mu.Lock() + defer ml.mu.Unlock() + if n > ml.usage { logger.Panicf("BUG: n=%d cannot exceed %d", n, ml.usage) } ml.usage -= n - ml.mu.Unlock() } var ( @@ -40,8 +41,20 @@ var ( func getQueryMemoryLimiter() *memoryLimiter { queryMemoryLimiterOnce.Do(func() { - // Allocate 90% of allowed memory for query execution. - queryMemoryLimiter.MaxSize = uint64(float64(memory.Allowed()) * 0.9) + // Allow concurrent queries to use up to 50% of memory.Allowed() for their execution state. + // + // The other ~25% of memory.Allowed() goes to subsystems this limiter cannot account for: + // - indexdb block caches (lib/mergeset): ~10%. They are capped higher, but VictoriaLogs + // keeps the number of streams low, so in practice they stay small. + // - in-memory parts buffering freshly ingested logs before they are flushed to disk: + // ~10% per active partition (see getMaxInmemoryPartSize). + // - per-block scratch buffers for decoding column values during search: ~3%, bounded by + // the number of concurrent block searches (see partitionSearchConcurrencyLimitCh). + // + // That leaves the live set around 75% of memory.Allowed(). The Go runtime keeps the heap at + // roughly twice the live set under the default GOGC=100, so a higher query share would risk + // OOM when heavy queries and ingestion run at the same time. + queryMemoryLimiter.MaxSize = uint64(float64(memory.Allowed()) * 0.5) }) return &queryMemoryLimiter } diff --git a/lib/logstorage/net_query_runner.go b/lib/logstorage/net_query_runner.go index 89e0e81221..2f21a4b3fb 100644 --- a/lib/logstorage/net_query_runner.go +++ b/lib/logstorage/net_query_runner.go @@ -22,12 +22,18 @@ type NetQueryRunner struct { // writeBlock is the function for writing the resulting data block. writeBlock writeBlockResultFunc + + // memReserved is the amount of memory reserved by subqueries + memReserved uint64 } // NewNetQueryRunner creates a new NetQueryRunner for the given qctx. // // runNetQuery is used for running distributed query. // qctx results are sent to writeNetBlock. +// +// The caller must call MustReleaseMemory on the returned runner when it is no longer needed, +// typically via defer, in order to release the memory reserved for subqueries. func NewNetQueryRunner(qctx *QueryContext, runNetQuery RunNetQueryFunc, writeNetBlock WriteDataBlockFunc) (*NetQueryRunner, error) { runQuery := func(qctx *QueryContext, writeBlock writeBlockResultFunc) error { writeNetBlock := writeBlock.newDataBlockWriter() @@ -36,12 +42,16 @@ func NewNetQueryRunner(qctx *QueryContext, runNetQuery RunNetQueryFunc, writeNet qRemote, pipesLocal := splitQueryToRemoteAndLocal(qctx.Query) + var memReserved uint64 + // Eagerly execute all the subqueries for the remote query // and replace them with the query results directly in qRemote. // This is needed for proper propagation subquery results to remote storage nodes. qctxRemote := qctx.WithQuery(qRemote) - qRemote, err := initSubqueries(qctxRemote, runQuery, true) + qRemote, mem, err := initSubqueries(qctxRemote, runQuery, true) + memReserved += mem // subqueries initialization might fail, but some mem is already reserved if err != nil { + getQueryMemoryLimiter().Put(memReserved) return nil, err } @@ -54,22 +64,34 @@ func NewNetQueryRunner(qctx *QueryContext, runNetQuery RunNetQueryFunc, writeNet } qLocal.pipes = pipesLocal qctxLocal := qctx.WithQuery(qLocal) - qLocal, err = initSubqueries(qctxLocal, runQuery, false) + qLocal, mem, err = initSubqueries(qctxLocal, runQuery, false) + memReserved += mem if err != nil { + getQueryMemoryLimiter().Put(memReserved) return nil, err } writeBlock := writeNetBlock.newBlockResultWriter() nqr := &NetQueryRunner{ - qctx: qctx, - qRemote: qRemote, - pipesLocal: qLocal.pipes, - writeBlock: writeBlock, + qctx: qctx, + qRemote: qRemote, + pipesLocal: qLocal.pipes, + writeBlock: writeBlock, + memReserved: memReserved, } return nqr, nil } +// MustReleaseMemory returns the memory reserved for subqueries back to the global query memory limiter. +// +// It must be called after NewNetQueryRunner returns successfully, typically via defer, +// even if Run isn't called; otherwise the reserved memory leaks. +func (nqr *NetQueryRunner) MustReleaseMemory() { + getQueryMemoryLimiter().Put(nqr.memReserved) + nqr.memReserved = 0 +} + // Run runs the nqr query. // // The concurrency limits the number of concurrent goroutines, which process the query results at the local host. diff --git a/lib/logstorage/pipe_facets.go b/lib/logstorage/pipe_facets.go index 23cb2bec2b..032403dac1 100644 --- a/lib/logstorage/pipe_facets.go +++ b/lib/logstorage/pipe_facets.go @@ -126,8 +126,8 @@ type pipeFacetsProcessor struct { shards atomicutil.Slice[pipeFacetsProcessorShard] - budgetUsed atomic.Int64 - budgetExceeded atomic.Bool + memReserved atomic.Int64 + memReserveFailed atomic.Bool } type pipeFacetsProcessorShard struct { @@ -331,13 +331,13 @@ func (pfp *pipeFacetsProcessor) writeBlock(workerID uint, br *blockResult) { // Reserve more budget for the state size from the global query memory limiter. if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. - if pfp.budgetExceeded.CompareAndSwap(false, true) { + if pfp.memReserveFailed.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. pfp.cancel() } return } - pfp.budgetUsed.Add(stateSizeBudgetChunk) + pfp.memReserved.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -346,11 +346,11 @@ func (pfp *pipeFacetsProcessor) writeBlock(workerID uint, br *blockResult) { func (pfp *pipeFacetsProcessor) flush() error { defer func() { - getQueryMemoryLimiter().Put(uint64(pfp.budgetUsed.Load())) + getQueryMemoryLimiter().Put(uint64(pfp.memReserved.Load())) }() - if pfp.budgetExceeded.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pfp.pf.String(), pfp.budgetUsed.Load()/(1<<20)) + if pfp.memReserveFailed.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pfp.pf.String(), pfp.memReserved.Load()/(1<<20)) } // merge state across shards diff --git a/lib/logstorage/pipe_join.go b/lib/logstorage/pipe_join.go index 3dd11b4e7a..21ac6948f8 100644 --- a/lib/logstorage/pipe_join.go +++ b/lib/logstorage/pipe_join.go @@ -98,13 +98,14 @@ func (pj *pipeJoin) visitSubqueries(visitFunc func(q *Query)) { } } -func (pj *pipeJoin) initJoinMap(getJoinRows getJoinRowsFunc) (pipe, error) { +func (pj *pipeJoin) initJoinMap(getJoinRows getJoinRowsFunc) (pipe, uint64, error) { + var mem uint64 rows := pj.rows if rows == nil { var err error - rows, err = getJoinRows(pj.q) + rows, mem, err = getJoinRows(pj.q) if err != nil { - return nil, fmt.Errorf("cannot execute query at pipe [%s]: %w", pj, err) + return nil, 0, fmt.Errorf("cannot execute query at pipe [%s]: %w", pj, err) } } @@ -143,7 +144,7 @@ func (pj *pipeJoin) initJoinMap(getJoinRows getJoinRowsFunc) (pipe, error) { pjNew.q = nil pjNew.rows = rows pjNew.m = m - return &pjNew, nil + return &pjNew, mem, nil } func (pj *pipeJoin) updateNeededFields(pf *prefixfilter.Filter) { diff --git a/lib/logstorage/pipe_running_stats.go b/lib/logstorage/pipe_running_stats.go index 93558462f5..c427ab67cd 100644 --- a/lib/logstorage/pipe_running_stats.go +++ b/lib/logstorage/pipe_running_stats.go @@ -148,8 +148,8 @@ type pipeRunningStatsProcessor struct { shards atomicutil.Slice[pipeRunningStatsProcessorShard] - budgetUsed atomic.Int64 - budgetExceeded atomic.Bool + memReserved atomic.Int64 + memReserveFailed atomic.Bool } type pipeRunningStatsProcessorShard struct { @@ -199,13 +199,13 @@ func (psp *pipeRunningStatsProcessor) writeBlock(workerID uint, br *blockResult) // Reserve more budget for the state size from the global query memory limiter. if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. - if psp.budgetExceeded.CompareAndSwap(false, true) { + if psp.memReserveFailed.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. psp.cancel() } return } - psp.budgetUsed.Add(stateSizeBudgetChunk) + psp.memReserved.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -214,11 +214,11 @@ func (psp *pipeRunningStatsProcessor) writeBlock(workerID uint, br *blockResult) func (psp *pipeRunningStatsProcessor) flush() error { defer func() { - getQueryMemoryLimiter().Put(uint64(psp.budgetUsed.Load())) + getQueryMemoryLimiter().Put(uint64(psp.memReserved.Load())) }() - if psp.budgetExceeded.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.budgetUsed.Load()/(1<<20)) + if psp.memReserveFailed.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.memReserved.Load()/(1<<20)) } getKeyForRow := func(row []Field) string { diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 34941e5c7f..ee9412eb9a 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -192,8 +192,8 @@ type pipeSortProcessor struct { shards atomicutil.Slice[pipeSortProcessorShard] - budgetUsed atomic.Int64 - budgetExceeded atomic.Bool + memReserved atomic.Int64 + memReserveFailed atomic.Bool } type pipeSortProcessorShard struct { @@ -454,13 +454,13 @@ func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) { // Reserve more budget for the state size from the global query memory limiter. if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. - if psp.budgetExceeded.CompareAndSwap(false, true) { + if psp.memReserveFailed.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. psp.cancel() } return } - psp.budgetUsed.Add(stateSizeBudgetChunk) + psp.memReserved.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -469,11 +469,11 @@ func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) { func (psp *pipeSortProcessor) flush() error { defer func() { - getQueryMemoryLimiter().Put(uint64(psp.budgetUsed.Load())) + getQueryMemoryLimiter().Put(uint64(psp.memReserved.Load())) }() - if psp.budgetExceeded.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.budgetUsed.Load()/(1<<20)) + if psp.memReserveFailed.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.memReserved.Load()/(1<<20)) } if needStop(psp.stopCh) { diff --git a/lib/logstorage/pipe_sort_topk.go b/lib/logstorage/pipe_sort_topk.go index 1973b15802..5d167a9c66 100644 --- a/lib/logstorage/pipe_sort_topk.go +++ b/lib/logstorage/pipe_sort_topk.go @@ -39,8 +39,8 @@ type pipeTopkProcessor struct { shards atomicutil.Slice[pipeTopkProcessorShard] - budgetUsed atomic.Int64 - budgetExceeded atomic.Bool + memReserved atomic.Int64 + memReserveFailed atomic.Bool } type pipeTopkProcessorShard struct { @@ -359,13 +359,13 @@ func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) { // Reserve more budget for the state size from the global query memory limiter. if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. - if ptp.budgetExceeded.CompareAndSwap(false, true) { + if ptp.memReserveFailed.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. ptp.cancel() } return } - ptp.budgetUsed.Add(stateSizeBudgetChunk) + ptp.memReserved.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -374,11 +374,11 @@ func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) { func (ptp *pipeTopkProcessor) flush() error { defer func() { - getQueryMemoryLimiter().Put(uint64(ptp.budgetUsed.Load())) + getQueryMemoryLimiter().Put(uint64(ptp.memReserved.Load())) }() - if ptp.budgetExceeded.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", ptp.ps.String(), ptp.budgetUsed.Load()/(1<<20)) + if ptp.memReserveFailed.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", ptp.ps.String(), ptp.memReserved.Load()/(1<<20)) } if needStop(ptp.stopCh) { diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 3d0f67eb36..d9e74506bb 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -451,8 +451,8 @@ type pipeStatsProcessor struct { shards atomicutil.Slice[pipeStatsProcessorShard] - budgetUsed atomic.Int64 - budgetExceeded atomic.Bool + memReserved atomic.Int64 + memReserveFailed atomic.Bool errLock sync.Mutex err error @@ -1100,13 +1100,13 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { // Reserve more budget for the state size from the global query memory limiter. if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. - if psp.budgetExceeded.CompareAndSwap(false, true) { + if psp.memReserveFailed.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. psp.cancel() } return } - psp.budgetUsed.Add(stateSizeBudgetChunk) + psp.memReserved.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -1119,15 +1119,15 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { func (psp *pipeStatsProcessor) flush() error { defer func() { - getQueryMemoryLimiter().Put(uint64(psp.budgetUsed.Load())) + getQueryMemoryLimiter().Put(uint64(psp.memReserved.Load())) }() if psp.err != nil { return psp.err } - if psp.budgetExceeded.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.budgetUsed.Load()/(1<<20)) + if psp.memReserveFailed.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.memReserved.Load()/(1<<20)) } // Merge states across shards in parallel diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index f61b3bb2fe..f49fa92b2d 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -120,26 +120,26 @@ type pipeStreamContextProcessor struct { shards atomicutil.Slice[pipeStreamContextProcessorShard] - budgetUsed atomic.Int64 - budgetExceeded atomic.Bool + memReserved atomic.Int64 + memReserveFailed atomic.Bool } func (pcp *pipeStreamContextProcessor) reserveMemory(n int) bool { if getQueryMemoryLimiter().Get(uint64(n)) { - pcp.budgetUsed.Add(int64(n)) + pcp.memReserved.Add(int64(n)) return true } // The limiter is exhausted. Stop processing data in order to avoid OOM crash. - pcp.budgetExceeded.Store(true) + pcp.memReserveFailed.Store(true) return false } func (pcp *pipeStreamContextProcessor) memoryLimitError() error { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pcp.pc.String(), pcp.budgetUsed.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pcp.pc.String(), pcp.memReserved.Load()/(1<<20)) } func (pcp *pipeStreamContextProcessor) memoryLimitForSurroundingLogsError(n int) error { - return fmt.Errorf("the query memory pool can't provide more than %dMB for fetching surrounding logs of %d matching logs", pcp.budgetUsed.Load()/(1<<20), n) + return fmt.Errorf("the query memory pool can't provide more than %dMB for fetching surrounding logs of %d matching logs", pcp.memReserved.Load()/(1<<20), n) } type timeRange struct { @@ -307,7 +307,7 @@ func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neede mu.Lock() defer mu.Unlock() - if pcp.budgetExceeded.Load() { + if pcp.memReserveFailed.Load() { cancel() return } @@ -356,7 +356,7 @@ func (pcp *pipeStreamContextProcessor) executeQuery(streamID, qStr string, neede if err := pcp.pc.runQuery(qctx, writeBlock); err != nil { return nil, err } - if pcp.budgetExceeded.Load() { + if pcp.memReserveFailed.Load() { return nil, pcp.memoryLimitForSurroundingLogsError(len(neededTimestamps)) } @@ -645,7 +645,6 @@ func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult pcp.cancel() return } - pcp.budgetUsed.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -657,10 +656,10 @@ func (pcp *pipeStreamContextProcessor) flush() error { // This covers the matching rows accumulated during writeBlock plus any per-stream // memory not released yet when flush returns early on an error. defer func() { - getQueryMemoryLimiter().Put(uint64(pcp.budgetUsed.Load())) + getQueryMemoryLimiter().Put(uint64(pcp.memReserved.Load())) }() - if pcp.budgetExceeded.Load() { + if pcp.memReserveFailed.Load() { return pcp.memoryLimitError() } @@ -700,7 +699,7 @@ func (pcp *pipeStreamContextProcessor) flush() error { // write output contexts in the ascending order of rows streamIDs := getStreamIDsSortedByMinRowTimestamp(m) for _, streamID := range streamIDs { - budgetBefore := pcp.budgetUsed.Load() + budgetBefore := pcp.memReserved.Load() rows := m[streamID] if len(rows) > pipeStreamContextMaxRowsPerStream { @@ -730,9 +729,9 @@ func (pcp *pipeStreamContextProcessor) flush() error { // Return the memory reserved for fetching the surrounding logs of this stream, // since these logs are already written to the output and aren't needed anymore. - streamBudget := pcp.budgetUsed.Load() - budgetBefore + streamBudget := pcp.memReserved.Load() - budgetBefore getQueryMemoryLimiter().Put(uint64(streamBudget)) - pcp.budgetUsed.Add(-streamBudget) + pcp.memReserved.Add(-streamBudget) } wctx.flush() diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index 71b565db64..e4e096227d 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -128,8 +128,8 @@ type pipeTopProcessor struct { shards atomicutil.Slice[pipeTopProcessorShard] - budgetUsed atomic.Int64 - budgetExceeded atomic.Bool + memReserved atomic.Int64 + memReserveFailed atomic.Bool } type pipeTopProcessorShard struct { @@ -276,13 +276,13 @@ func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) { // Reserve more budget for the state size from the global query memory limiter. if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. - if ptp.budgetExceeded.CompareAndSwap(false, true) { + if ptp.memReserveFailed.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. ptp.cancel() } return } - ptp.budgetUsed.Add(stateSizeBudgetChunk) + ptp.memReserved.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -291,11 +291,11 @@ func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) { func (ptp *pipeTopProcessor) flush() error { defer func() { - getQueryMemoryLimiter().Put(uint64(ptp.budgetUsed.Load())) + getQueryMemoryLimiter().Put(uint64(ptp.memReserved.Load())) }() - if ptp.budgetExceeded.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", ptp.pt.String(), ptp.budgetUsed.Load()/(1<<20)) + if ptp.memReserveFailed.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", ptp.pt.String(), ptp.memReserved.Load()/(1<<20)) } // merge state across shards in parallel diff --git a/lib/logstorage/pipe_union.go b/lib/logstorage/pipe_union.go index 107d3d7233..40a0271182 100644 --- a/lib/logstorage/pipe_union.go +++ b/lib/logstorage/pipe_union.go @@ -30,17 +30,18 @@ type pipeUnion struct { runQuery runUnionQueryFunc } -func (pu *pipeUnion) initUnionQuery(qctx *QueryContext, runQuery runUnionQueryFunc, eagerExecute bool) (pipe, error) { +func (pu *pipeUnion) initUnionQuery(qctx *QueryContext, runQuery runUnionQueryFunc, eagerExecute bool) (pipe, uint64, error) { + var memReserved uint64 rows := pu.rows if eagerExecute && rows == nil { qctxLocal := qctx.WithQuery(pu.q) var err error - rows, err = getRows(qctxLocal, func(qctx *QueryContext, writeBlock writeBlockResultFunc) error { + rows, memReserved, err = getRows(qctxLocal, func(qctx *QueryContext, writeBlock writeBlockResultFunc) error { return runQuery(qctx.Context, qctx.Query, writeBlock) }) if err != nil { - return nil, fmt.Errorf("cannot execute query at pipe [%s]: %w", pu, err) + return nil, 0, fmt.Errorf("cannot execute query at pipe [%s]: %w", pu, err) } } @@ -51,7 +52,7 @@ func (pu *pipeUnion) initUnionQuery(qctx *QueryContext, runQuery runUnionQueryFu puNew.rows = rows puNew.runQuery = runQuery - return &puNew, nil + return &puNew, memReserved, nil } func (pu *pipeUnion) String() string { diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 9c56166137..4baed36884 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -108,8 +108,8 @@ type pipeUniqProcessor struct { shards atomicutil.Slice[pipeUniqProcessorShard] - budgetUsed atomic.Int64 - budgetExceeded atomic.Bool + memReserved atomic.Int64 + memReserveFailed atomic.Bool } type pipeUniqProcessorShard struct { @@ -242,13 +242,13 @@ func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) { // Reserve more budget for the state size from the global query memory limiter. if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. - if pup.budgetExceeded.CompareAndSwap(false, true) { + if pup.memReserveFailed.CompareAndSwap(false, true) { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. pup.cancel() } return } - pup.budgetUsed.Add(stateSizeBudgetChunk) + pup.memReserved.Add(stateSizeBudgetChunk) shard.stateSizeBudget += stateSizeBudgetChunk } @@ -259,11 +259,11 @@ func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) { func (pup *pipeUniqProcessor) flush() error { defer func() { - getQueryMemoryLimiter().Put(uint64(pup.budgetUsed.Load())) + getQueryMemoryLimiter().Put(uint64(pup.memReserved.Load())) }() - if pup.budgetExceeded.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pup.pu.String(), pup.budgetUsed.Load()/(1<<20)) + if pup.memReserveFailed.Load() { + return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pup.pu.String(), pup.memReserved.Load()/(1<<20)) } // merge state across shards in parallel diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index 80819a8d51..6fba316eac 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -1003,7 +1003,8 @@ func (s *Storage) processDeleteTask(ctx context.Context, dt *DeleteTask) bool { qctx := NewQueryContext(ctx, &qs, dt.TenantIDs, q, false, nil) // Initialize subqueries - qNew, err := initSubqueries(qctx, s.runQuery, false) + qNew, memReserved, err := initSubqueries(qctx, s.runQuery, false) + defer getQueryMemoryLimiter().Put(memReserved) if err != nil { logger.Errorf("cannot process delete task with task_id=%q while initializing subqueries: %s; retrying later", dt.TaskID, err) return false diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 7ead24a81c..3e2f057ad1 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -17,7 +17,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" @@ -217,7 +216,9 @@ func (s *Storage) RunQuery(qctx *QueryContext, writeBlock WriteDataBlockFunc) er type runQueryFunc func(qctx *QueryContext, writeBlock writeBlockResultFunc) error func (s *Storage) runQuery(qctx *QueryContext, writeBlock writeBlockResultFunc) error { - qNew, err := initSubqueries(qctx, s.runQuery, false) + qNew, memReserved, err := initSubqueries(qctx, s.runQuery, false) + // Release the reserved memory even on error, since initSubqueries may reserve some before failing. + defer getQueryMemoryLimiter().Put(memReserved) if err != nil { return err } @@ -361,10 +362,14 @@ func (s *Storage) GetFieldNames(qctx *QueryContext, filter string) ([]ValueWithH return s.runValuesWithHitsQuery(qctxNew) } -func getRows(qctx *QueryContext, runQuery runQueryFunc) ([][]Field, error) { - maxStateSize := int64(float64(memory.Allowed()) * 0.2) - var stateSizeBudget atomic.Int64 - stateSizeBudget.Add(maxStateSize) +// getRows runs runQuery and returns its rows along with the memory reserved for holding them. +// +// The reserved memory is taken from the global query memory limiter. The caller owns it and must +// release it via getQueryMemoryLimiter().Put once the rows are no longer needed. On error getRows +// releases the reservation itself and returns 0. +func getRows(qctx *QueryContext, runQuery runQueryFunc) ([][]Field, uint64, error) { + var memReserved atomic.Uint64 + var memReserveFailed atomic.Bool type rowsShard struct { rows [][]Field @@ -378,14 +383,15 @@ func getRows(qctx *QueryContext, runQuery runQueryFunc) ([][]Field, error) { } shard := shards.Get(workerID) - if shard.stateSizeBudget <= 0 { - // steal some budget for the state size from the global budget. - remaining := stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. + for shard.stateSizeBudget < 0 { + // Reserve more budget for the state size from the global query memory limiter. + if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) { + // The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash. + memReserveFailed.Store(true) return } shard.stateSizeBudget += stateSizeBudgetChunk + memReserved.Add(stateSizeBudgetChunk) } cs := br.getColumns() @@ -421,11 +427,14 @@ func getRows(qctx *QueryContext, runQuery runQueryFunc) ([][]Field, error) { } if err := runQuery(qctx, writeBlockResult); err != nil { - return nil, err + getQueryMemoryLimiter().Put(memReserved.Load()) + return nil, 0, err } - if stateSizeBudget.Load() < 0 { - return nil, fmt.Errorf("cannot load rows for [%s] because they occupy more than %dMB of memory", qctx.Query, maxStateSize/(1<<20)) + mem := memReserved.Load() + if memReserveFailed.Load() { + getQueryMemoryLimiter().Put(mem) + return nil, 0, fmt.Errorf("cannot load rows for [%s]; the query memory pool can't provide more than %dMB for it", qctx.Query, mem/(1<<20)) } var rows [][]Field @@ -433,7 +442,7 @@ func getRows(qctx *QueryContext, runQuery runQueryFunc) ([][]Field, error) { rows = append(rows, shard.rows...) } - return rows, nil + return rows, mem, nil } func marshalStrings(dst []byte, a []string) []byte { @@ -794,35 +803,46 @@ func (s *Storage) runValuesWithHitsQuery(qctx *QueryContext) ([]ValueWithHits, e return results, nil } -func initSubqueries(qctx *QueryContext, runQuery runQueryFunc, eagerExecute bool) (*Query, error) { +// initSubqueries initializes the join, union and in(...) subqueries at qctx.Query and returns the updated query. +// +// The returned uint64 is the amount of memory reserved from the global query memory limiter for the +// initialized subqueries. The caller must release it via getQueryMemoryLimiter().Put after the query +// finishes, even on error, since some memory may already be reserved when initialization fails. +func initSubqueries(qctx *QueryContext, runQuery runQueryFunc, eagerExecute bool) (*Query, uint64, error) { + var memReserved uint64 + var mem uint64 + getFieldValues := func(q *Query, fieldName string) ([]string, error) { qctxLocal := qctx.WithQuery(q) return getFieldValuesGeneric(qctxLocal, runQuery, fieldName) } qNew, err := initFilterInValues(qctx.Query, getFieldValues) if err != nil { - return nil, fmt.Errorf("cannot initialize `in` subqueries: %w", err) + return nil, memReserved, fmt.Errorf("cannot initialize `in` subqueries: %w", err) } - getJoinRows := func(q *Query) ([][]Field, error) { + getJoinRows := func(q *Query) ([][]Field, uint64, error) { qctxLocal := qctx.WithQuery(q) return getRows(qctxLocal, runQuery) } - qNew, err = initJoinMaps(qNew, getJoinRows) + qNew, mem, err = initJoinMaps(qNew, getJoinRows) + memReserved += mem if err != nil { - return nil, fmt.Errorf("cannot initialize `join` subqueries: %w", err) + return nil, memReserved, fmt.Errorf("cannot initialize `join` subqueries: %w", err) } runUnionQuery := func(ctx context.Context, q *Query, writeBlock writeBlockResultFunc) error { qctxLocal := qctx.WithContextAndQuery(ctx, q) return runQuery(qctxLocal, writeBlock) } - qNew, err = initUnionQueries(qctx, qNew, runUnionQuery, eagerExecute) + qNew, mem, err = initUnionQueries(qctx, qNew, runUnionQuery, eagerExecute) + memReserved += mem if err != nil { - return nil, fmt.Errorf("cannot initialize 'union' subqueries: %w", err) + return nil, memReserved, fmt.Errorf("cannot initialize 'union' subqueries: %w", err) } - return initStreamContextPipes(qctx, qNew, runQuery) + qNew, err = initStreamContextPipes(qctx, qNew, runQuery) + return qNew, memReserved, err } func initStreamContextPipes(qctx *QueryContext, q *Query, runQuery runQueryFunc) (*Query, error) { @@ -900,19 +920,27 @@ type inValuesCache struct { type runUnionQueryFunc func(ctx context.Context, q *Query, writeBlock writeBlockResultFunc) error -func initUnionQueries(qctx *QueryContext, q *Query, runUnionQuery runUnionQueryFunc, eagerExecute bool) (*Query, error) { +// initUnionQueries initializes the union subqueries at q and returns the updated query +// along with the memory reserved for them from the global query memory limiter. +// +// On error the returned reserved-memory amount may still be greater than 0, so the caller +// must release it via getQueryMemoryLimiter().Put in all cases. +func initUnionQueries(qctx *QueryContext, q *Query, runUnionQuery runUnionQueryFunc, eagerExecute bool) (*Query, uint64, error) { if !hasUnionPipes(q.pipes) { - return q, nil + return q, 0, nil } + var memReserved uint64 pipesNew := make([]pipe, len(q.pipes)) for i, p := range q.pipes { if pu, ok := p.(*pipeUnion); ok { var err error - p, err = pu.initUnionQuery(qctx, runUnionQuery, eagerExecute) + var mem uint64 + p, mem, err = pu.initUnionQuery(qctx, runUnionQuery, eagerExecute) if err != nil { - return nil, err + return nil, memReserved, err } + memReserved += mem } pipesNew[i] = p } @@ -920,7 +948,7 @@ func initUnionQueries(qctx *QueryContext, q *Query, runUnionQuery runUnionQueryF qNew := q.cloneShallow() qNew.pipes = pipesNew - return qNew, nil + return qNew, memReserved, nil } func hasUnionPipes(pipes []pipe) bool { @@ -932,21 +960,23 @@ func hasUnionPipes(pipes []pipe) bool { return false } -type getJoinRowsFunc func(q *Query) ([][]Field, error) +type getJoinRowsFunc func(q *Query) ([][]Field, uint64, error) -func initJoinMaps(q *Query, getJoinRows getJoinRowsFunc) (*Query, error) { +func initJoinMaps(q *Query, getJoinRows getJoinRowsFunc) (*Query, uint64, error) { if !hasJoinPipes(q.pipes) { - return q, nil + return q, 0, nil } + var memReserved uint64 pipesNew := make([]pipe, len(q.pipes)) for i, p := range q.pipes { if pj, ok := p.(*pipeJoin); ok { - pNew, err := pj.initJoinMap(getJoinRows) + pNew, mem, err := pj.initJoinMap(getJoinRows) if err != nil { - return nil, err + return nil, memReserved, err } p = pNew + memReserved += mem } pipesNew[i] = p } @@ -954,7 +984,7 @@ func initJoinMaps(q *Query, getJoinRows getJoinRowsFunc) (*Query, error) { qNew := q.cloneShallow() qNew.pipes = pipesNew - return qNew, nil + return qNew, memReserved, nil } func hasJoinPipes(pipes []pipe) bool { From da01e02063c4dee82f7613c1d44b4a495e5a7e7b Mon Sep 17 00:00:00 2001 From: Cuong Le Date: Wed, 1 Jul 2026 20:33:46 +0700 Subject: [PATCH 3/5] lib/logstorage: add tests and simplify error messages for the global query memory limiter --- docs/victorialogs/CHANGELOG.md | 1 + lib/logstorage/memory_limiter.go | 25 ++++--- lib/logstorage/pipe_facets.go | 2 +- lib/logstorage/pipe_running_stats.go | 2 +- lib/logstorage/pipe_sort.go | 2 +- lib/logstorage/pipe_sort_topk.go | 2 +- lib/logstorage/pipe_stats.go | 2 +- lib/logstorage/pipe_stream_context.go | 4 +- lib/logstorage/pipe_top.go | 2 +- lib/logstorage/pipe_uniq.go | 2 +- lib/logstorage/pipe_utils_test.go | 12 ++++ lib/logstorage/storage_search.go | 2 +- lib/logstorage/storage_search_test.go | 93 +++++++++++++++++++++++++++ 13 files changed, 131 insertions(+), 20 deletions(-) diff --git a/docs/victorialogs/CHANGELOG.md b/docs/victorialogs/CHANGELOG.md index 5cfffe1d15..b1c0412be3 100644 --- a/docs/victorialogs/CHANGELOG.md +++ b/docs/victorialogs/CHANGELOG.md @@ -27,6 +27,7 @@ according to the following docs: **Update note 2:** VictoriaLogs no longer provides a Docker image for the `linux/386` platform because the `distroless` base image [doesn't support this platform](https://github.com/GoogleContainerTools/distroless/issues/881). Executable files for `linux/386` platform are still published at [the VictoriaLogs releases page](https://github.com/VictoriaMetrics/VictoriaLogs/releases). * FEATURE: switch base Docker image from [Alpine](https://www.alpinelinux.org/) to [distroless](https://github.com/GoogleContainerTools/distroless/) for VictoriaLogs, `vlagent` and `vlogscli`. This reduces the image size and attack surface. See [#1228](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1228). +* FEATURE: [querying](https://docs.victoriametrics.com/victorialogs/querying/): protect VictoriaLogs from out-of-memory crashes when many heavy queries run at the same time by limiting the total memory they may use together. A query that exceeds this shared limit now fails with an error instead of risking a crash. See [#1551](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1551). * FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) for joining JSON array items stored in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string with the given delimiter. See [#712](https://github.com/VictoriaMetrics/VictoriaLogs/issues/712). * FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): adjust [`_stream` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) value according to the actual log fields. This simplifies importing of previously modified VictoriaLogs-exported data. See [#1122](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1122). diff --git a/lib/logstorage/memory_limiter.go b/lib/logstorage/memory_limiter.go index 07645c47d3..951487c80a 100644 --- a/lib/logstorage/memory_limiter.go +++ b/lib/logstorage/memory_limiter.go @@ -43,17 +43,22 @@ func getQueryMemoryLimiter() *memoryLimiter { queryMemoryLimiterOnce.Do(func() { // Allow concurrent queries to use up to 50% of memory.Allowed() for their execution state. // - // The other ~25% of memory.Allowed() goes to subsystems this limiter cannot account for: - // - indexdb block caches (lib/mergeset): ~10%. They are capped higher, but VictoriaLogs - // keeps the number of streams low, so in practice they stay small. - // - in-memory parts buffering freshly ingested logs before they are flushed to disk: - // ~10% per active partition (see getMaxInmemoryPartSize). - // - per-block scratch buffers for decoding column values during search: ~3%, bounded by - // the number of concurrent block searches (see partitionSearchConcurrencyLimitCh). + // Notes on other parts of the system that also consume the heap: + // - indexdb block caches (lib/mergeset): assume ~5%, as VictoriaLogs has much fewer streams than VictoriaMetrics + // - in-memory parts buffering: ~10% per active partition (usually 1 partition unless it's backfilling) + // - per-block scratch buffers for decoding column values during search: ~3% + // Total = 18% // - // That leaves the live set around 75% of memory.Allowed(). The Go runtime keeps the heap at - // roughly twice the live set under the default GOGC=100, so a higher query share would risk - // OOM when heavy queries and ingestion run at the same time. + // The Go runtime keeps the heap at roughly twice the live set under the default GOGC=100, + // so keeping the peak heap within memory.Allowed() would call for a ~32% query share ((32% + 18%) * 2 = 100%). + // + // That 32% is conservative in practice: these subsystems rarely reach their limits at the same time, and the + // OS page cache (the ~40% of RAM left by -memory.allowedPercent) is evicted under memory pressure, so the + // peak heap can borrow that headroom without an OOM. + // + // We pick 50% instead: before this limiter, a single stateful pipe was already allowed up to 40% of + // memory.Allowed() (0.4 for stats/uniq/top/running_stats, 0.2 for sort/facets/stream_context/...), so the + // shared pool must stay comfortably above 40% to avoid rejecting a single heavy pipe that used to succeed. queryMemoryLimiter.MaxSize = uint64(float64(memory.Allowed()) * 0.5) }) return &queryMemoryLimiter diff --git a/lib/logstorage/pipe_facets.go b/lib/logstorage/pipe_facets.go index 032403dac1..90191c879d 100644 --- a/lib/logstorage/pipe_facets.go +++ b/lib/logstorage/pipe_facets.go @@ -350,7 +350,7 @@ func (pfp *pipeFacetsProcessor) flush() error { }() if pfp.memReserveFailed.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pfp.pf.String(), pfp.memReserved.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", pfp.pf.String()) } // merge state across shards diff --git a/lib/logstorage/pipe_running_stats.go b/lib/logstorage/pipe_running_stats.go index c427ab67cd..926d934f12 100644 --- a/lib/logstorage/pipe_running_stats.go +++ b/lib/logstorage/pipe_running_stats.go @@ -218,7 +218,7 @@ func (psp *pipeRunningStatsProcessor) flush() error { }() if psp.memReserveFailed.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.memReserved.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", psp.ps.String()) } getKeyForRow := func(row []Field) string { diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index ee9412eb9a..fb1c4af1b2 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -473,7 +473,7 @@ func (psp *pipeSortProcessor) flush() error { }() if psp.memReserveFailed.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.memReserved.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", psp.ps.String()) } if needStop(psp.stopCh) { diff --git a/lib/logstorage/pipe_sort_topk.go b/lib/logstorage/pipe_sort_topk.go index 5d167a9c66..cf3d72c2ae 100644 --- a/lib/logstorage/pipe_sort_topk.go +++ b/lib/logstorage/pipe_sort_topk.go @@ -378,7 +378,7 @@ func (ptp *pipeTopkProcessor) flush() error { }() if ptp.memReserveFailed.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", ptp.ps.String(), ptp.memReserved.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", ptp.ps.String()) } if needStop(ptp.stopCh) { diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index d9e74506bb..739a40d9c3 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -1127,7 +1127,7 @@ func (psp *pipeStatsProcessor) flush() error { } if psp.memReserveFailed.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", psp.ps.String(), psp.memReserved.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", psp.ps.String()) } // Merge states across shards in parallel diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index f49fa92b2d..109ef026e7 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -135,11 +135,11 @@ func (pcp *pipeStreamContextProcessor) reserveMemory(n int) bool { } func (pcp *pipeStreamContextProcessor) memoryLimitError() error { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pcp.pc.String(), pcp.memReserved.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", pcp.pc.String()) } func (pcp *pipeStreamContextProcessor) memoryLimitForSurroundingLogsError(n int) error { - return fmt.Errorf("the query memory pool can't provide more than %dMB for fetching surrounding logs of %d matching logs", pcp.memReserved.Load()/(1<<20), n) + return fmt.Errorf("not enough memory in the shared query memory pool for fetching surrounding logs of %d matching logs", n) } type timeRange struct { diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index e4e096227d..58e89e8bd3 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -295,7 +295,7 @@ func (ptp *pipeTopProcessor) flush() error { }() if ptp.memReserveFailed.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", ptp.pt.String(), ptp.memReserved.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", ptp.pt.String()) } // merge state across shards in parallel diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 4baed36884..d792f51df2 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -263,7 +263,7 @@ func (pup *pipeUniqProcessor) flush() error { }() if pup.memReserveFailed.Load() { - return fmt.Errorf("cannot calculate [%s]; the query memory pool can't provide more than %dMB for it", pup.pu.String(), pup.memReserved.Load()/(1<<20)) + return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", pup.pu.String()) } // merge state across shards in parallel diff --git a/lib/logstorage/pipe_utils_test.go b/lib/logstorage/pipe_utils_test.go index 17af89d4bd..9291fe47ae 100644 --- a/lib/logstorage/pipe_utils_test.go +++ b/lib/logstorage/pipe_utils_test.go @@ -70,9 +70,21 @@ func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Fiel pp.flush() + // The pipe must release all the memory it reserved from the query memory limiter. + if n := getQueryMemoryUsage(); n != 0 { + t.Fatalf("unexpected query memory usage after pipe [%s]; got %d bytes; want 0", pipeStr, n) + } + ppTest.expectRows(t, rowsExpected) } +func getQueryMemoryUsage() uint64 { + ml := getQueryMemoryLimiter() + ml.mu.Lock() + defer ml.mu.Unlock() + return ml.usage +} + func newTestBlockResultWriter(workersCount int, ppNext pipeProcessor) *testBlockResultWriter { return &testBlockResultWriter{ workersCount: workersCount, diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 3e2f057ad1..39b7208a3d 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -434,7 +434,7 @@ func getRows(qctx *QueryContext, runQuery runQueryFunc) ([][]Field, uint64, erro mem := memReserved.Load() if memReserveFailed.Load() { getQueryMemoryLimiter().Put(mem) - return nil, 0, fmt.Errorf("cannot load rows for [%s]; the query memory pool can't provide more than %dMB for it", qctx.Query, mem/(1<<20)) + return nil, 0, fmt.Errorf("cannot load rows for [%s]: not enough memory in the shared query memory pool", qctx.Query) } var rows [][]Field diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 9a56f39982..4d995d6ba7 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -1713,3 +1713,96 @@ func newTestQueryContext(tenantIDs []TenantID, q *Query) *QueryContext { qs := &QueryStats{} return NewQueryContext(context.Background(), qs, tenantIDs, q, false, nil) } + +func TestStorageRunQueryMemoryLimiter(t *testing.T) { + // This test reads and mutates the process-global query memory limiter, so it cannot run in parallel. + + path := t.Name() + s := MustOpenStorage(path, &StorageConfig{Retention: 24 * time.Hour}) + defer func() { + s.MustClose() + fs.MustRemoveDir(path) + }() + + // Fill the storage with several streams, each split across several blocks, so the queries actually + // reserve memory: a pipe borrows from the limiter only after it has processed more than one block. + const streamsCount = 10 + const blocksPerStream = 4 + const rowsPerBlock = 10 + + tenantID := TenantID{AccountID: 0, ProjectID: 1} + streamTags := []string{"instance"} + baseTimestamp := time.Now().UnixNano() - 3600*1e9 + + var fields []Field + for j := range streamsCount { + for k := range blocksPerStream { + lr := GetLogRows(streamTags, nil, nil, nil, "") + for m := range rowsPerBlock { + timestamp := baseTimestamp + int64(m)*1e9 + int64(k) + fields = append(fields[:0], Field{ + Name: "instance", + Value: fmt.Sprintf("host-%d", j), + }, Field{ + Name: "_msg", + Value: fmt.Sprintf("log message %d at block %d", m, k), + }) + lr.mustAdd(tenantID, timestamp, fields) + } + s.MustAddRows(lr) + PutLogRows(lr) + } + } + s.DebugFlush() + + runQuery := func(query string) error { + q := mustParseQuery(query) + qctx := newTestQueryContext([]TenantID{tenantID}, q) + return s.RunQuery(qctx, func(_ uint, _ *DataBlock) {}) + } + + t.Run("released", func(t *testing.T) { + queries := []string{ + `* | stats by (instance) count() x | join on (instance) (* | stats by (instance) count() y)`, + `{instance="host-1"} | union ({instance="host-2"}) | count() hits`, + `'log message' | stream_context before 2`, + } + for _, query := range queries { + if err := runQuery(query); err != nil { + t.Fatalf("unexpected error for query [%s]: %s", query, err) + } + if n := getQueryMemoryUsage(); n != 0 { + t.Fatalf("unexpected query memory usage after query [%s]; got %d bytes; want 0", query, n) + } + } + }) + + t.Run("limit-exceeded", func(t *testing.T) { + // Shrink the limiter to zero, so every attempt to reserve memory fails. + ml := getQueryMemoryLimiter() + origMaxSize := ml.MaxSize + ml.MaxSize = 0 + defer func() { + ml.MaxSize = origMaxSize + }() + + queries := []string{ + `* | sort by (_msg)`, + `* | stats by (instance) count() x | join on (instance) (*)`, + `'log message' | stream_context before 2`, + } + for _, query := range queries { + err := runQuery(query) + if err == nil { + t.Fatalf("expecting non-nil error for query [%s]", query) + } + if !strings.Contains(err.Error(), "not enough memory in the shared query memory pool") { + t.Fatalf("unexpected error for query [%s]: %s", query, err) + } + } + + if n := getQueryMemoryUsage(); n != 0 { + t.Fatalf("unexpected query memory usage after the queries; got %d bytes; want 0", n) + } + }) +} From 8692cd9ebf4378361f3de4288756568b1f88db57 Mon Sep 17 00:00:00 2001 From: Cuong Le Date: Wed, 1 Jul 2026 21:36:07 +0700 Subject: [PATCH 4/5] lib/logstorage: expose metrics for the global query memory limiter --- lib/logstorage/memory_limiter.go | 23 +++++++++++++++++++++++ lib/logstorage/pipe_utils_test.go | 9 +-------- lib/logstorage/storage_search_test.go | 4 ++-- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/lib/logstorage/memory_limiter.go b/lib/logstorage/memory_limiter.go index 951487c80a..6479f42cf0 100644 --- a/lib/logstorage/memory_limiter.go +++ b/lib/logstorage/memory_limiter.go @@ -5,6 +5,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/metrics" ) type memoryLimiter struct { @@ -21,9 +22,20 @@ func (ml *memoryLimiter) Get(n uint64) bool { ml.usage += n } ml.mu.Unlock() + if !ok { + // Count every denied reservation. A single rejected query may bump this more than once, + // since its workers keep trying to reserve until the query is cancelled. + queryMemoryLimitReached.Inc() + } return ok } +func (ml *memoryLimiter) getUsage() uint64 { + ml.mu.Lock() + defer ml.mu.Unlock() + return ml.usage +} + func (ml *memoryLimiter) Put(n uint64) { ml.mu.Lock() defer ml.mu.Unlock() @@ -39,6 +51,17 @@ var ( queryMemoryLimiterOnce sync.Once ) +var ( + queryMemoryLimitReached = metrics.NewCounter(`vl_query_memory_limit_reached_total`) + + _ = metrics.NewGauge(`vl_query_memory_limit_bytes`, func() float64 { + return float64(getQueryMemoryLimiter().MaxSize) + }) + _ = metrics.NewGauge(`vl_query_memory_usage_bytes`, func() float64 { + return float64(getQueryMemoryLimiter().getUsage()) + }) +) + func getQueryMemoryLimiter() *memoryLimiter { queryMemoryLimiterOnce.Do(func() { // Allow concurrent queries to use up to 50% of memory.Allowed() for their execution state. diff --git a/lib/logstorage/pipe_utils_test.go b/lib/logstorage/pipe_utils_test.go index 9291fe47ae..ceeb83ee49 100644 --- a/lib/logstorage/pipe_utils_test.go +++ b/lib/logstorage/pipe_utils_test.go @@ -71,20 +71,13 @@ func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Fiel pp.flush() // The pipe must release all the memory it reserved from the query memory limiter. - if n := getQueryMemoryUsage(); n != 0 { + if n := getQueryMemoryLimiter().getUsage(); n != 0 { t.Fatalf("unexpected query memory usage after pipe [%s]; got %d bytes; want 0", pipeStr, n) } ppTest.expectRows(t, rowsExpected) } -func getQueryMemoryUsage() uint64 { - ml := getQueryMemoryLimiter() - ml.mu.Lock() - defer ml.mu.Unlock() - return ml.usage -} - func newTestBlockResultWriter(workersCount int, ppNext pipeProcessor) *testBlockResultWriter { return &testBlockResultWriter{ workersCount: workersCount, diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 4d995d6ba7..3855693936 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -1771,7 +1771,7 @@ func TestStorageRunQueryMemoryLimiter(t *testing.T) { if err := runQuery(query); err != nil { t.Fatalf("unexpected error for query [%s]: %s", query, err) } - if n := getQueryMemoryUsage(); n != 0 { + if n := getQueryMemoryLimiter().getUsage(); n != 0 { t.Fatalf("unexpected query memory usage after query [%s]; got %d bytes; want 0", query, n) } } @@ -1801,7 +1801,7 @@ func TestStorageRunQueryMemoryLimiter(t *testing.T) { } } - if n := getQueryMemoryUsage(); n != 0 { + if n := getQueryMemoryLimiter().getUsage(); n != 0 { t.Fatalf("unexpected query memory usage after the queries; got %d bytes; want 0", n) } }) From 85018c27a4685b7f2d07ff3993404db16b9cf5c5 Mon Sep 17 00:00:00 2001 From: Cuong Le Date: Wed, 1 Jul 2026 22:05:05 +0700 Subject: [PATCH 5/5] lib/logstorage: expose and document metrics for the global query memory limiter --- docs/victorialogs/metrics.md | 15 +++++++++++++++ lib/logstorage/memory_limiter.go | 4 +++- lib/logstorage/net_query_runner.go | 2 +- lib/logstorage/storage.go | 1 + 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/docs/victorialogs/metrics.md b/docs/victorialogs/metrics.md index 63aaa14b7f..1426bc1407 100644 --- a/docs/victorialogs/metrics.md +++ b/docs/victorialogs/metrics.md @@ -304,6 +304,21 @@ These metrics follow the Prometheus exposition format and can be used for monito **Description:** Current number of queries actively executing. Real-time query processing load when the system approaches the `-search.maxConcurrentRequests` capacity limit. +### vl_query_memory_limit_reached_total +**Type:** Counter + +**Description:** Memory reservation attempts rejected because the shared query memory pool is full. Indicates concurrently executed heavy queries have reached the shared memory limit and are failing with an error instead of risking an out-of-memory crash. A single rejected query may increment this more than once, since its workers keep retrying until the query stops. + +### vl_query_memory_limit_bytes +**Type:** Gauge + +**Description:** Maximum amount of memory all concurrently executed queries may use together for their execution state. It is set to half of the memory available to VictoriaLogs, which is in turn controlled by `-memory.allowedPercent`. Queries are rejected once their combined reserved memory would exceed this limit. + +### vl_query_memory_usage_bytes +**Type:** Gauge + +**Description:** Current amount of memory reserved from the shared query memory pool by the running queries. Real-time query memory load when the system approaches the [`vl_query_memory_limit_bytes`](https://docs.victoriametrics.com/victorialogs/metrics/#vl_query_memory_limit_bytes) limit. + ### vl_concurrent_internalselect_requests_wait_duration **Type:** Summary diff --git a/lib/logstorage/memory_limiter.go b/lib/logstorage/memory_limiter.go index 6479f42cf0..a8071d0f40 100644 --- a/lib/logstorage/memory_limiter.go +++ b/lib/logstorage/memory_limiter.go @@ -22,9 +22,10 @@ func (ml *memoryLimiter) Get(n uint64) bool { ml.usage += n } ml.mu.Unlock() + if !ok { // Count every denied reservation. A single rejected query may bump this more than once, - // since its workers keep trying to reserve until the query is cancelled. + // since its workers keep trying to reserve until cancellation stops them. queryMemoryLimitReached.Inc() } return ok @@ -33,6 +34,7 @@ func (ml *memoryLimiter) Get(n uint64) bool { func (ml *memoryLimiter) getUsage() uint64 { ml.mu.Lock() defer ml.mu.Unlock() + return ml.usage } diff --git a/lib/logstorage/net_query_runner.go b/lib/logstorage/net_query_runner.go index 2f21a4b3fb..48d20930a5 100644 --- a/lib/logstorage/net_query_runner.go +++ b/lib/logstorage/net_query_runner.go @@ -23,7 +23,7 @@ type NetQueryRunner struct { // writeBlock is the function for writing the resulting data block. writeBlock writeBlockResultFunc - // memReserved is the amount of memory reserved by subqueries + // memReserved is the amount of memory reserved by subqueries. memReserved uint64 } diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index 6fba316eac..5028b3dd60 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -1004,6 +1004,7 @@ func (s *Storage) processDeleteTask(ctx context.Context, dt *DeleteTask) bool { // Initialize subqueries qNew, memReserved, err := initSubqueries(qctx, s.runQuery, false) + // Release the reserved memory even on error, since initSubqueries may reserve some before failing. defer getQueryMemoryLimiter().Put(memReserved) if err != nil { logger.Errorf("cannot process delete task with task_id=%q while initializing subqueries: %s; retrying later", dt.TaskID, err)