Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/vlstorage/netselect/netselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (s *Storage) RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.
if err != nil {
return err
}
defer nqr.MustReleaseMemory()

search := func(stopCh <-chan struct{}, q *logstorage.Query, writeBlock logstorage.WriteDataBlockFunc) error {
qctxLocal := qctx.WithQuery(q)
Expand Down
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ according to the following docs:
**Update note 2:** VictoriaLogs no longer provides a Docker image for the `linux/386` platform because the `distroless` base image [doesn't support this platform](https://github.com/GoogleContainerTools/distroless/issues/881). Executable files for `linux/386` platform are still published at [the VictoriaLogs releases page](https://github.com/VictoriaMetrics/VictoriaLogs/releases).

* FEATURE: switch base Docker image from [Alpine](https://www.alpinelinux.org/) to [distroless](https://github.com/GoogleContainerTools/distroless/) for VictoriaLogs, `vlagent` and `vlogscli`. This reduces the image size and attack surface. See [#1228](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1228).
* FEATURE: [querying](https://docs.victoriametrics.com/victorialogs/querying/): protect VictoriaLogs from out-of-memory crashes when many heavy queries run at the same time by limiting the total memory they may use together. A query that exceeds this shared limit now fails with an error instead of risking a crash. See [#1551](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1551).
* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) for joining JSON array items stored in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string with the given delimiter. See [#712](https://github.com/VictoriaMetrics/VictoriaLogs/issues/712).
* FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): adjust [`_stream` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) value according to the actual log fields. This simplifies importing of previously modified VictoriaLogs-exported data. See [#1122](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1122).

Expand Down
15 changes: 15 additions & 0 deletions docs/victorialogs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,21 @@ These metrics follow the Prometheus exposition format and can be used for monito

**Description:** Current number of queries actively executing. Real-time query processing load when the system approaches the `-search.maxConcurrentRequests` capacity limit.

### vl_query_memory_limit_reached_total
**Type:** Counter

**Description:** Memory reservation attempts rejected because the shared query memory pool is full. Indicates concurrently executed heavy queries have reached the shared memory limit and are failing with an error instead of risking an out-of-memory crash. A single rejected query may increment this more than once, since its workers keep retrying until the query stops.

### vl_query_memory_limit_bytes
**Type:** Gauge

**Description:** Maximum amount of memory all concurrently executed queries may use together for their execution state. It is set to half of the memory available to VictoriaLogs, which is in turn controlled by `-memory.allowedPercent`. Queries are rejected once their combined reserved memory would exceed this limit.

### vl_query_memory_usage_bytes
**Type:** Gauge

**Description:** Current amount of memory reserved from the shared query memory pool by the running queries. Real-time query memory load when the system approaches the [`vl_query_memory_limit_bytes`](https://docs.victoriametrics.com/victorialogs/metrics/#vl_query_memory_limit_bytes) limit.

### vl_concurrent_internalselect_requests_wait_duration
**Type:** Summary

Expand Down
90 changes: 90 additions & 0 deletions lib/logstorage/memory_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package logstorage

import (
"sync"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/metrics"
)

type memoryLimiter struct {
MaxSize uint64

mu sync.Mutex
usage uint64
}

func (ml *memoryLimiter) Get(n uint64) bool {
ml.mu.Lock()
ok := n <= ml.MaxSize && ml.MaxSize-n >= ml.usage
if ok {
ml.usage += n
}
ml.mu.Unlock()

if !ok {
// Count every denied reservation. A single rejected query may bump this more than once,
// since its workers keep trying to reserve until cancellation stops them.
queryMemoryLimitReached.Inc()
}
return ok
}

func (ml *memoryLimiter) getUsage() uint64 {
ml.mu.Lock()
defer ml.mu.Unlock()

return ml.usage
}

func (ml *memoryLimiter) Put(n uint64) {
ml.mu.Lock()
defer ml.mu.Unlock()

if n > ml.usage {
logger.Panicf("BUG: n=%d cannot exceed %d", n, ml.usage)
}
ml.usage -= n
}

var (
queryMemoryLimiter memoryLimiter
queryMemoryLimiterOnce sync.Once
)

var (
queryMemoryLimitReached = metrics.NewCounter(`vl_query_memory_limit_reached_total`)

_ = metrics.NewGauge(`vl_query_memory_limit_bytes`, func() float64 {
return float64(getQueryMemoryLimiter().MaxSize)
})
_ = metrics.NewGauge(`vl_query_memory_usage_bytes`, func() float64 {
return float64(getQueryMemoryLimiter().getUsage())
})
)

func getQueryMemoryLimiter() *memoryLimiter {
queryMemoryLimiterOnce.Do(func() {
// Allow concurrent queries to use up to 50% of memory.Allowed() for their execution state.
//
// Notes on other parts of the system that also consume the heap:
// - indexdb block caches (lib/mergeset): assume ~5%, as VictoriaLogs has much fewer streams than VictoriaMetrics
// - in-memory parts buffering: ~10% per active partition (usually 1 partition unless it's backfilling)
// - per-block scratch buffers for decoding column values during search: ~3%
// Total = 18%
//
// The Go runtime keeps the heap at roughly twice the live set under the default GOGC=100,
// so keeping the peak heap within memory.Allowed() would call for a ~32% query share ((32% + 18%) * 2 = 100%).
//
// That 32% is conservative in practice: these subsystems rarely reach their limits at the same time, and the
// OS page cache (the ~40% of RAM left by -memory.allowedPercent) is evicted under memory pressure, so the
// peak heap can borrow that headroom without an OOM.
//
// We pick 50% instead: before this limiter, a single stateful pipe was already allowed up to 40% of
// memory.Allowed() (0.4 for stats/uniq/top/running_stats, 0.2 for sort/facets/stream_context/...), so the
// shared pool must stay comfortably above 40% to avoid rejecting a single heavy pipe that used to succeed.
queryMemoryLimiter.MaxSize = uint64(float64(memory.Allowed()) * 0.5)
})
return &queryMemoryLimiter
}
56 changes: 56 additions & 0 deletions lib/logstorage/memory_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package logstorage

import (
"testing"
)

func TestMemoryLimiter(t *testing.T) {
var ml memoryLimiter
ml.MaxSize = 100

// Allocate memory
if !ml.Get(10) {
t.Fatalf("cannot get 10 out of %d bytes", ml.MaxSize)
}
if ml.usage != 10 {
t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 10)
}
if !ml.Get(20) {
t.Fatalf("cannot get 20 out of 90 bytes")
}
if ml.usage != 30 {
t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30)
}
if ml.Get(1000) {
t.Fatalf("unexpected get for 1000 bytes")
}
if ml.usage != 30 {
t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30)
}
if ml.Get(71) {
t.Fatalf("unexpected get for 71 bytes")
}
if ml.usage != 30 {
t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30)
}
if !ml.Get(70) {
t.Fatalf("cannot get 70 bytes")
}
if ml.usage != 100 {
t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 100)
}

// Return memory back
ml.Put(10)
ml.Put(70)
if ml.usage != 20 {
t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 20)
}
if !ml.Get(30) {
t.Fatalf("cannot get 30 bytes")
}
ml.Put(50)
if ml.usage != 0 {
t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 0)
}
}
34 changes: 28 additions & 6 deletions lib/logstorage/net_query_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ type NetQueryRunner struct {

// writeBlock is the function for writing the resulting data block.
writeBlock writeBlockResultFunc

// memReserved is the amount of memory reserved by subqueries.
memReserved uint64
}

// NewNetQueryRunner creates a new NetQueryRunner for the given qctx.
//
// runNetQuery is used for running distributed query.
// qctx results are sent to writeNetBlock.
//
// The caller must call MustReleaseMemory on the returned runner when it is no longer needed,
// typically via defer, in order to release the memory reserved for subqueries.
func NewNetQueryRunner(qctx *QueryContext, runNetQuery RunNetQueryFunc, writeNetBlock WriteDataBlockFunc) (*NetQueryRunner, error) {
runQuery := func(qctx *QueryContext, writeBlock writeBlockResultFunc) error {
writeNetBlock := writeBlock.newDataBlockWriter()
Expand All @@ -36,12 +42,16 @@ func NewNetQueryRunner(qctx *QueryContext, runNetQuery RunNetQueryFunc, writeNet

qRemote, pipesLocal := splitQueryToRemoteAndLocal(qctx.Query)

var memReserved uint64

// Eagerly execute all the subqueries for the remote query
// and replace them with the query results directly in qRemote.
// This is needed for proper propagation subquery results to remote storage nodes.
qctxRemote := qctx.WithQuery(qRemote)
qRemote, err := initSubqueries(qctxRemote, runQuery, true)
qRemote, mem, err := initSubqueries(qctxRemote, runQuery, true)
memReserved += mem // subqueries initialization might fail, but some mem is already reserved
if err != nil {
getQueryMemoryLimiter().Put(memReserved)
return nil, err
}

Expand All @@ -54,22 +64,34 @@ func NewNetQueryRunner(qctx *QueryContext, runNetQuery RunNetQueryFunc, writeNet
}
qLocal.pipes = pipesLocal
qctxLocal := qctx.WithQuery(qLocal)
qLocal, err = initSubqueries(qctxLocal, runQuery, false)
qLocal, mem, err = initSubqueries(qctxLocal, runQuery, false)
memReserved += mem
if err != nil {
getQueryMemoryLimiter().Put(memReserved)
return nil, err
}

writeBlock := writeNetBlock.newBlockResultWriter()

nqr := &NetQueryRunner{
qctx: qctx,
qRemote: qRemote,
pipesLocal: qLocal.pipes,
writeBlock: writeBlock,
qctx: qctx,
qRemote: qRemote,
pipesLocal: qLocal.pipes,
writeBlock: writeBlock,
memReserved: memReserved,
}
return nqr, nil
}

// MustReleaseMemory returns the memory reserved for subqueries back to the global query memory limiter.
//
// It must be called after NewNetQueryRunner returns successfully, typically via defer,
// even if Run isn't called; otherwise the reserved memory leaks.
func (nqr *NetQueryRunner) MustReleaseMemory() {
getQueryMemoryLimiter().Put(nqr.memReserved)
nqr.memReserved = 0
}

// Run runs the nqr query.
//
// The concurrency limits the number of concurrent goroutines, which process the query results at the local host.
Expand Down
28 changes: 13 additions & 15 deletions lib/logstorage/pipe_facets.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"unsafe"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"

"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)
Expand Down Expand Up @@ -104,21 +103,16 @@ func (pf *pipeFacets) visitSubqueries(_ func(q *Query)) {
}

func (pf *pipeFacets) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2)

pfp := &pipeFacetsProcessor{
pf: pf,
concurrency: concurrency,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,

maxStateSize: maxStateSize,
}
pfp.shards.Init = func(shard *pipeFacetsProcessorShard) {
shard.pfp = pfp
}
pfp.stateSizeBudget.Store(maxStateSize)

return pfp
}
Expand All @@ -132,8 +126,8 @@ type pipeFacetsProcessor struct {

shards atomicutil.Slice[pipeFacetsProcessorShard]

maxStateSize int64
stateSizeBudget atomic.Int64
memReserved atomic.Int64
memReserveFailed atomic.Bool
}

type pipeFacetsProcessorShard struct {
Expand Down Expand Up @@ -334,25 +328,29 @@ func (pfp *pipeFacetsProcessor) writeBlock(workerID uint, br *blockResult) {
shard := pfp.shards.Get(workerID)

for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := pfp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Reserve more budget for the state size from the global query memory limiter.
if !getQueryMemoryLimiter().Get(stateSizeBudgetChunk) {
// The query memory limiter is exhausted. Stop processing data in order to avoid OOM crash.
if pfp.memReserveFailed.CompareAndSwap(false, true) {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
pfp.cancel()
}
return
}
pfp.memReserved.Add(stateSizeBudgetChunk)
shard.stateSizeBudget += stateSizeBudgetChunk
}

shard.writeBlock(br)
}

func (pfp *pipeFacetsProcessor) flush() error {
if n := pfp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pfp.pf.String(), pfp.maxStateSize/(1<<20))
defer func() {
getQueryMemoryLimiter().Put(uint64(pfp.memReserved.Load()))
}()

if pfp.memReserveFailed.Load() {
return fmt.Errorf("cannot calculate [%s]: not enough memory in the shared query memory pool", pfp.pf.String())
}

// merge state across shards
Expand Down
9 changes: 5 additions & 4 deletions lib/logstorage/pipe_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,14 @@ func (pj *pipeJoin) visitSubqueries(visitFunc func(q *Query)) {
}
}

func (pj *pipeJoin) initJoinMap(getJoinRows getJoinRowsFunc) (pipe, error) {
func (pj *pipeJoin) initJoinMap(getJoinRows getJoinRowsFunc) (pipe, uint64, error) {
var mem uint64
rows := pj.rows
if rows == nil {
var err error
rows, err = getJoinRows(pj.q)
rows, mem, err = getJoinRows(pj.q)
if err != nil {
return nil, fmt.Errorf("cannot execute query at pipe [%s]: %w", pj, err)
return nil, 0, fmt.Errorf("cannot execute query at pipe [%s]: %w", pj, err)
}
}

Expand Down Expand Up @@ -143,7 +144,7 @@ func (pj *pipeJoin) initJoinMap(getJoinRows getJoinRowsFunc) (pipe, error) {
pjNew.q = nil
pjNew.rows = rows
pjNew.m = m
return &pjNew, nil
return &pjNew, mem, nil
}

func (pj *pipeJoin) updateNeededFields(pf *prefixfilter.Filter) {
Expand Down
Loading