Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 32 additions & 29 deletions app/vlselect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmalertproxy"
"github.com/VictoriaMetrics/metrics"

Expand Down Expand Up @@ -217,7 +218,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, path string) bool {
ctxWithTimeout, cancel := context.WithTimeout(ctx, d)
defer cancel()

if !incRequestConcurrency(ctxWithTimeout, w, r) {
if !incRequestConcurrency(w, r, d) {
return true
}
defer decRequestConcurrency()
Expand Down Expand Up @@ -262,39 +263,41 @@ func logRequestErrorIfNeeded(ctx context.Context, w http.ResponseWriter, r *http
}
}

func incRequestConcurrency(ctx context.Context, w http.ResponseWriter, r *http.Request) bool {
startTime := time.Now()
stopCh := ctx.Done()
func incRequestConcurrency(w http.ResponseWriter, r *http.Request, queryDuration time.Duration) bool {
select {
case concurrencyLimitCh <- struct{}{}:
return true
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
select {
case concurrencyLimitCh <- struct{}{}:
return true
case <-stopCh:
switch ctx.Err() {
case context.Canceled:
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Infof("client has canceled the pending request after %.3f seconds: remoteAddr=%s, requestURI: %q",
time.Since(startTime).Seconds(), remoteAddr, requestURI)
case context.DeadlineExceeded:
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration=%s; to increase -search.maxConcurrentRequests; "+
"to pass bigger value to 'timeout' query arg",
time.Since(startTime).Seconds(), *maxConcurrentRequests, maxQueueDuration, maxQueryDuration),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)
}
return false
}

startTime := time.Now()

concurrencyLimitReached.Inc()
t := timerpool.Get(min(queryDuration, *maxQueueDuration))
defer timerpool.Put(t)
select {
case concurrencyLimitCh <- struct{}{}:
return true
case <-r.Context().Done():
// The client has closed the connection while the request was queued.
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Infof("client has canceled the pending request after %.3f seconds: remoteAddr=%s, requestURI: %q",
time.Since(startTime).Seconds(), remoteAddr, requestURI)
return false
case <-t.C:
// Either -search.maxQueueDuration or the query execution deadline elapsed while waiting for a free slot.
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration=%s; to increase -search.maxConcurrentRequests; "+
"to pass bigger value to 'timeout' query arg",
time.Since(startTime).Seconds(), *maxConcurrentRequests, maxQueueDuration, maxQueryDuration),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)
return false
}
}

Expand Down
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ according to the following docs:
* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) for joining JSON array items stored in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string with the given delimiter. See [#712](https://github.com/VictoriaMetrics/VictoriaLogs/issues/712).
* FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): adjust [`_stream` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) value according to the actual log fields. This simplifies importing of previously modified VictoriaLogs-exported data. See [#1122](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1122).

* BUGFIX: [vlselect](https://docs.victoriametrics.com/victorialogs/): respect `-search.maxQueueDuration` for queued requests. Previously, when the number of concurrent requests exceeded `-search.maxConcurrentRequests`, the extra requests could be queued for much longer than `-search.maxQueueDuration` (up to `-search.maxQueryDuration`) before being rejected, so a few slow queries could stall other requests for everyone. See [#1554](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1554).
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): fix a `vlselect` crash in [cluster mode](https://docs.victoriametrics.com/victorialogs/cluster/) on some valid queries. This happened when a [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) operand had the same name as a function (such as `abs`, `round` or `floor`), or when `from` was used as a separator in the [`split` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#split-pipe). See [#1518](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1518).
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): allow [`filter` pipes](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) without the `filter` prefix when the filter starts with a non-word token or the `not` keyword, such as `... | !foo`, `... | {host="x"}`, `... | >5`, `... | =foo` or `... | not foo`. These were unintentionally rejected with `unexpected pipe` in [v1.51.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.51.0). A non-word token (and the `not` operator) cannot clash with a pipe name, so it is unambiguously a filter. See [#1522](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1522).
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): reject `field*` wildcards in the comma-separated (no parens) form of [`uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe), [`top`](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) and [`unroll`](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe) pipes, consistently with their parens form, which already rejects wildcards. Previously a query such as `uniq by foo*` was accepted but could not be parsed back from its string representation. See [#1487](https://github.com/VictoriaMetrics/VictoriaLogs/pull/1487).
Expand Down