Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions app/vlagent/filecollector/file_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/VictoriaMetrics/VictoriaLogs/app/vlagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaLogs/app/vlagent/tail"
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

var (
Expand Down Expand Up @@ -158,6 +159,14 @@ func startRead(argIdx int, filePath string) {
tailer.StartRead(filePath, proc)
}

// DebugInfo returns debug information of each currently processing file.
func DebugInfo() [][]logstorage.Field {
if tailer == nil {
return nil
}
return tailer.DebugInfo()
}

func Stop() {
if len(*glob) == 0 {
return
Expand Down
13 changes: 13 additions & 0 deletions app/vlagent/filecollector/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type processor struct {
extraFieldsJSONLen int
tenantID logstorage.TenantID

debugFields []logstorage.Field

logRows *logstorage.LogRows

rowsIngestedLocal int
Expand Down Expand Up @@ -73,12 +75,19 @@ func newProcessor(argIdx int, filePath string, storage insertutil.LogRowsStorage
sfs = defaultStreamFields
}

debugFields := efs
debugFields = append(debugFields, logstorage.Field{
Name: "glob_pattern",
Value: glob.GetOptionalArg(argIdx),
})

logRows := logstorage.GetLogRows(sfs, *ignoreFields, *decolorizeFields, efs, *insertutil.DefaultMsgValue)

return &processor{
storage: storage,
extraFieldsJSONLen: logstorage.EstimatedJSONRowLen(efs),
tenantID: getTenantID(argIdx),
debugFields: debugFields,
logRows: logRows,
}
}
Expand Down Expand Up @@ -152,6 +161,10 @@ func (p *processor) flushMetrics() {
var rowsIngestedTotal = metrics.GetOrCreateCounter(fmt.Sprintf("vl_rows_ingested_total{type=%q}", "file_logs"))
var bytesIngestedTotal = metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", "file_logs"))

func (p *processor) DebugInfo() []logstorage.Field {
return p.debugFields
}

func (p *processor) MustClose() {
p.Flush()
logstorage.PutLogRows(p.logRows)
Expand Down
8 changes: 8 additions & 0 deletions app/vlagent/kubernetescollector/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ func Init(tmpDataPath string) {
logger.Infof("started Kubernetes log collector for node %q", currentNodeName)
}

// DebugInfo returns debug information of each currently processing file.
func DebugInfo() [][]logstorage.Field {
if collector == nil {
return nil
}
return collector.tailer.DebugInfo()
}

func Stop() {
if collector != nil {
collector.stop()
Expand Down
10 changes: 10 additions & 0 deletions app/vlagent/kubernetescollector/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type logFileProcessor struct {
commonFields []logstorage.Field
commonFieldsJSONLen int

debugFields []logstorage.Field

// fieldsBuf is used for constructing log fields from commonFields and the actual log line fields before sending them to VictoriaLogs.
fieldsBuf []logstorage.Field

Expand All @@ -91,12 +93,16 @@ func newLogFileProcessor(storage insertutil.LogRowsStorage, commonFields []logst
efs := getExtraFields()
lr := logstorage.GetLogRows(sfs, *ignoreFields, *decolorizeFields, efs, *insertutil.DefaultMsgValue)

debugFields := commonFields
debugFields = append(debugFields, efs...)

return &logFileProcessor{
storage: storage,
lr: lr,
tenantID: getTenantID(),
commonFields: commonFields,
commonFieldsJSONLen: commonFieldsJSONLen,
debugFields: debugFields,
}
}

Expand Down Expand Up @@ -467,6 +473,10 @@ func (lfp *logFileProcessor) flushMetrics() {
lfp.bytesIngestedLocal = 0
}

func (lfp *logFileProcessor) DebugInfo() []logstorage.Field {
return lfp.debugFields
}

func (lfp *logFileProcessor) MustClose() {
lfp.Flush()
lfp.partialCRIStdout.reset()
Expand Down
17 changes: 17 additions & 0 deletions app/vlagent/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"embed"
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaLogs/app/vlagent/filecollector"
"github.com/VictoriaMetrics/VictoriaLogs/app/vlagent/kubernetescollector"
"github.com/VictoriaMetrics/VictoriaLogs/app/vlagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaLogs/app/vlagent/targetsstatus"
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert"
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/insertutil"
)
Expand All @@ -34,6 +37,12 @@ var (
"and -fileCollector.checkpointsPath unless those flags are set explicitly")
)

var (
//go:embed static
staticFiles embed.FS
staticServer = http.FileServer(http.FS(staticFiles))
)

func main() {
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag.CommandLine.SetOutput(os.Stdout)
Expand Down Expand Up @@ -91,11 +100,19 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/victorialogs/vlagent/'>https://docs.victoriametrics.com/victorialogs/vlagent/</a></br>")
fmt.Fprintf(w, "Useful endpoints:</br>")
httpserver.WriteAPIHelp(w, [][2]string{
{"targets", "information about processing files and Kubernetes Pods"},
{"metrics", "available service metrics"},
{"flags", "command-line flags"},
})
return true
}
if r.URL.Path == "/targets" {
return targetsstatus.RequestHandler(w, r)
}
if strings.HasPrefix(r.URL.Path, "/static") {
staticServer.ServeHTTP(w, r)
return true
}
return vlinsert.RequestHandler(w, r)
}

Expand Down
6 changes: 6 additions & 0 deletions app/vlagent/static/css/bootstrap.min.css

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions app/vlagent/tail/logfile_timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"path/filepath"
"strings"
"testing"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

func BenchmarkReadLinesBigSizeLines(b *testing.B) {
Expand Down Expand Up @@ -62,4 +64,8 @@ func (noopProcessor) TryAddLine(_ []byte) bool {

func (noopProcessor) Flush() {}

func (noopProcessor) DebugInfo() []logstorage.Field {
return nil
}

func (noopProcessor) MustClose() {}
69 changes: 65 additions & 4 deletions app/vlagent/tail/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package tail
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"slices"
"strings"
"sync"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/metrics"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

// Processor processes log lines from a single file.
Expand All @@ -36,13 +40,18 @@ type Processor interface {
// ensuring the accumulated state is propagated without waiting for the next line.
Flush()

// DebugInfo returns diagnostic fields describing the processing target.
// The returned fields may include metadata such Kubernetes labels,
// extra fields, and other implementation-specific details.
DebugInfo() []logstorage.Field

// MustClose releases all resources associated with the Processor and ensures proper cleanup of internal states.
// It must be called after the target log file is deleted or vlagent is shutting down.
MustClose()
}

type Tailer struct {
logFiles map[string]struct{}
logFiles map[string]Processor
logFilesLock sync.Mutex

checkpointsDB *checkpointsDB
Expand All @@ -63,7 +72,7 @@ func Start(checkpointsPath string) *Tailer {
}

return &Tailer{
logFiles: make(map[string]struct{}),
logFiles: make(map[string]Processor),
checkpointsDB: checkpointsDB,
stopCh: make(chan struct{}),
}
Expand All @@ -77,14 +86,15 @@ func (fc *Tailer) StartRead(relPath string, proc Processor) {
}

fc.logFilesLock.Lock()
defer fc.logFilesLock.Unlock()

_, ok := fc.logFiles[absPath]
fc.logFiles[absPath] = struct{}{}
fc.logFilesLock.Unlock()
if ok {
// Already reading from the file.
proc.MustClose()
return
}
fc.logFiles[absPath] = proc

fc.wg.Go(func() {
lf := fc.openLogFile(absPath)
Expand Down Expand Up @@ -335,6 +345,57 @@ func (fc *Tailer) IsTailing(relPath string) bool {
return ok
}

// DebugInfo returns debug information of each currently processing file.
// The fields include information like file name, Kubernetes labels and more.
func (fc *Tailer) DebugInfo() [][]logstorage.Field {
fc.logFilesLock.Lock()
defer fc.logFilesLock.Unlock()

var targets [][]logstorage.Field
for filePath, proc := range fc.logFiles {
debugInfo := slices.Clone(proc.DebugInfo())

n := slices.IndexFunc(debugInfo, func(f logstorage.Field) bool {
return f.Name == "file"
})
if n < 0 {
debugInfo = append(debugInfo, logstorage.Field{
Name: "file",
Value: filePath,
})
}

// Attach checkpoint if exists.
cp, ok := fc.checkpointsDB.get(filePath)
if ok {
debugInfo = append(debugInfo, logstorage.Field{
Name: "file_inode",
Value: fmt.Sprintf("%d", cp.Inode),
})
debugInfo = append(debugInfo, logstorage.Field{
Name: "file_fingerprint",
Value: fmt.Sprintf("%d", cp.Fingerprint),
})
debugInfo = append(debugInfo, logstorage.Field{
Name: "file_offset",
Value: fmt.Sprintf("%d", cp.Offset),
})
}

slices.SortFunc(debugInfo, func(a, b logstorage.Field) int {
return strings.Compare(a.Name, b.Name)
})

targets = append(targets, debugInfo)
}

slices.SortFunc(targets, func(a, b []logstorage.Field) int {
return strings.Compare(a[0].Name, b[0].Name)
})

return targets
}

func (fc *Tailer) Stop() {
close(fc.stopCh)
fc.wg.Wait()
Expand Down
6 changes: 6 additions & 0 deletions app/vlagent/tail/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"testing"
"time"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

func TestTailer(t *testing.T) {
Expand Down Expand Up @@ -320,6 +322,10 @@ func (p *testProcessor) TryAddLine(line []byte) bool {

func (p *testProcessor) Flush() {}

func (p *testProcessor) DebugInfo() []logstorage.Field {
return nil
}

func (p *testProcessor) MustClose() {}

func (p *testProcessor) wait() {
Expand Down
51 changes: 51 additions & 0 deletions app/vlagent/targetsstatus/index.qtpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{% import "github.com/VictoriaMetrics/VictoriaMetrics/lib/htmlcomponents" %}

{% func TargetsPage(data []targetsData) %}
<!DOCTYPE html>
<html lang="en">
<head>
{%= htmlcomponents.CommonHeader() %}
<title>vlagent / targets</title>
</head>
<body>
{%= htmlcomponents.Navbar() %}
<div class="container-fluid py-3">
<h4 class="mb-3">Targets</h4>
{% for _, section := range data %}
<h5 class="mt-4 mb-2 pb-1 border-bottom">{%s section.Category %}</h5>
{% for _, group := range section.Groups %}
<details class="mb-2" open>
<summary class="p-2 bg-light border rounded" id="{%s group.Group %}">
<a href="#{%s group.Group %}" class="text-decoration-none text-dark font-monospace">{%s group.Group %}</a>
<span class="text-secondary fw-normal ms-2">({%d len(group.Targets) %})</span>
</summary>
<div class="ms-3 mt-2">
{% for _, t := range group.Targets %}
<details class="mb-1">
<summary class="font-monospace p-1 border-bottom">{%s t.Preview %}</summary>
<table class="table table-sm table-bordered mt-2">
<thead class="table-light">
<tr>
<th>Field</th>
<th>Value</th>
</tr>
</thead>
<tbody>
{% for _, field := range t.DebugInfo %}
<tr>
<td>{%s field.Name %}</td>
<td class="font-monospace">{%s field.Value %}</td>
</tr>
{% endfor %}
</tbody>
</table>
</details>
{% endfor %}
</div>
</details>
{% endfor %}
{% endfor %}
</div>
</body>
</html>
{% endfunc %}
Loading