Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions app/vtselect/traces/tempo/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ import (
// To avoid polluting the stable Jaeger API implementation, we have temporarily placed all related queries under the tempo directory.
// But they could be ported back to `app/vtselect/traces/query` and unified with queries already there in the future.

// matchedMarkerField is an internal span field set by query 2's 'format if' pipe
// to matchedMarker on spans that satisfy the TraceQL filter. It is read back in
// summarySearchTracesResult to populate spanSets[].spans[], and must not collide
// with any real OTLP span field name.
const (
matchedMarkerField = "_vt_matched"
matchedMarker = "1"
)

// GetTraceList returns multiple traceIDs and spans of them in []*Row format.
// It searches for traceIDs first, and then search for the spans of these traceIDs.
// To not miss any spans on the edge, it extends both the start time and end time
Expand All @@ -41,6 +50,12 @@ import (
// 1. input time range: [00:00, 09:00]
// 2. found 20 trace id, and adjust time range to: [08:00, 09:00]
// 3. find spans on time range: [08:00-traceMaxDurationWindow, 09:00+traceMaxDurationWindow]
//
// Each returned row carries the matchedMarkerField field when its span satisfied
// the TraceQL filter. This is computed within the same span scan via a 'format if'
// pipe, so the caller can project the matched spans into spanSets[].spans[]
// without an extra query. Non-matching spans are still returned, since trace-level
// metadata (duration, per-service stats) is derived from all spans of the trace.
func GetTraceList(ctx context.Context, cp *tracecommon.CommonParams, filterQuery *traceql.Query, start, end time.Time, limit int64) ([]string, []*tracecommon.Row, error) {
currentTime := time.Now()

Expand All @@ -53,8 +68,13 @@ func GetTraceList(ctx context.Context, cp *tracecommon.CommonParams, filterQuery
return nil, nil, nil
}

// query 2: trace_id:in(traceID, traceID, ...)
qStr := fmt.Sprintf(otelpb.TraceIDField+":in(%s)", strings.Join(traceIDs, ","))
// query 2: trace_id:in(traceID, traceID, ...) | format if (<filter>) "1" as <marker>
// The 'format if' pipe marks every span that satisfies the TraceQL filter, so the
// matched spans can be projected into the response without a second query. Non-matching
// spans are still returned (carrying no marker), since trace-level metadata is derived
// from all spans of the trace.
qStr := fmt.Sprintf("%s:in(%s) | format if (%s) %q as %s",
otelpb.TraceIDField, strings.Join(traceIDs, ","), filterQuery.Filter(), matchedMarker, matchedMarkerField)
q, err := logstorage.ParseQueryAtTimestamp(qStr, currentTime.UnixNano())
if err != nil {
return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
Expand Down Expand Up @@ -117,6 +137,7 @@ func GetTraceList(ctx context.Context, cp *tracecommon.CommonParams, filterQuery
if missingTimeColumn.Load() {
return nil, nil, fmt.Errorf("missing _time column in the result for the query [%s]", q)
}

return traceIDs, rows, nil
}

Expand Down
201 changes: 179 additions & 22 deletions app/vtselect/traces/tempo/tempo.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func processSearchRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
return
}

result, err := searchTraces(ctx, cp, params.q, params.start, params.end, params.limit)
result, err := searchTraces(ctx, cp, params.q, params.start, params.end, params.limit, params.spss)
if err != nil {
httpserver.Errorf(w, r, "cannot get traces list: %s", err)
return
Expand Down Expand Up @@ -496,7 +496,7 @@ func singleFieldQueryHelper(ctx context.Context, q *logstorage.Query, cp *tracec
return resultList, nil
}

func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr string, start, end time.Time, limit int64) ([]traceSummary, error) {
func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr string, start, end time.Time, limit, spss int64) ([]traceSummary, error) {
// transform traceQL into LogsQL as filter. It should contain filter only without any pipe.
filterQuery, err := traceql.ParseQuery(traceQLStr)
if err != nil {
Expand All @@ -516,35 +516,75 @@ func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr
return nil, err
}

result, err := summarySearchTracesResult(ctx, rows, limit)
result, err := summarySearchTracesResult(rows, filterQuery.ReferencedFields(), spss)
if err != nil {
return nil, err
}

return result, nil
}

// spanAttr is a single key/value attribute projected onto a matched span in the
// search response.
type spanAttr struct {
key string
value string
}

// matchedSpan is a span that satisfied the TraceQL filter within a trace. It
// carries the span's own intrinsics plus the attributes referenced by the query,
// which Grafana renders as the spans-table columns and uses to deep-link to the
// matched span.
type matchedSpan struct {
spanID string
name string
startTimeUnixNano int64
endTimeUnixNano int64
attributes []spanAttr
}

// serviceStat holds per-service span and error counts for a trace.
type serviceStat struct {
spanCount int
errorCount int
}

type traceSummary struct {
traceID string
rootServiceName string
rootTraceName string
rootSpanID string
startTimeUnixNano int64
endTimeUnixNano int64
// rootStartTimeUnixNano / rootEndTimeUnixNano are the root span's own bounds,
// used to populate spanSets[0].spans[0].startTimeUnixNano and durationNanos.
// Grafana's Tempo datasource reads durationNanos from the span, not the
// trace-level durationMs, so we must emit it on the synthesized root span.
rootStartTimeUnixNano int64
rootEndTimeUnixNano int64

// matchedSpans are the spans that satisfied the filter, capped at spss.
// matchedCount is the total number of matching spans (independent of the cap),
// reported as spanSet.matched per the Tempo contract.
matchedSpans []matchedSpan
matchedCount int

// serviceStats are per-service span/error counts across every span of the
// trace (not just the matched spans).
serviceStats map[string]serviceStat
}

func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, limit int64) ([]traceSummary, error) {
traceMap := make(map[string]traceSummary)
func summarySearchTracesResult(rows []*tracecommon.Row, referencedFields []string, spss int64) ([]traceSummary, error) {
// Build the VT field name -> response attribute spec once for the referenced
// fields, so each matched span projects only the queried attributes.
attrSpecs := buildAttrSpecs(referencedFields)
nameReferenced := false
for _, f := range referencedFields {
if f == otelpb.NameField {
nameReferenced = true
break
}
}

traceMap := make(map[string]*traceSummary)

for _, row := range rows {
var traceID, serviceName, spanName, spanID, parentSpanID string
var traceID, serviceName, spanName, spanID, parentSpanID, statusCode string
var startTimeUnixNano, endTimeUnixNano int64
var matched bool
var err error
for _, field := range row.Fields {
switch field.Name {
Expand All @@ -558,6 +598,11 @@ func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, lim
spanID = field.Value
case otelpb.ParentSpanIDField:
parentSpanID = field.Value
case otelpb.StatusCodeField:
statusCode = field.Value
case matchedMarkerField:
// set by query 2's 'format if' pipe on spans that satisfy the filter.
matched = field.Value == matchedMarker
case otelpb.StartTimeUnixNanoField:
startTimeUnixNano, err = strconv.ParseInt(field.Value, 10, 64)
if err != nil {
Expand All @@ -580,40 +625,141 @@ func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, lim
// get the summary for this trace
summary, ok := traceMap[traceID]
if !ok {
summary = traceSummary{
summary = &traceSummary{
traceID: traceID,
startTimeUnixNano: math.MaxInt64,
rootServiceName: "<root span not yet received>",
serviceStats: make(map[string]serviceStat),
}
traceMap[traceID] = summary
}

summary.traceID = traceID
summary.startTimeUnixNano = min(summary.startTimeUnixNano, startTimeUnixNano)
summary.endTimeUnixNano = max(summary.endTimeUnixNano, endTimeUnixNano)
// if it's the root span

// per-service stats over every span of the trace.
stat := summary.serviceStats[serviceName]
stat.spanCount++
if statusCode == statusCodeError {
stat.errorCount++
}
summary.serviceStats[serviceName] = stat

// trace-level metadata is taken from the root span (parent span id empty).
if parentSpanID == "" {
summary.rootServiceName = serviceName
summary.rootTraceName = spanName
summary.rootSpanID = spanID
summary.rootStartTimeUnixNano = startTimeUnixNano
summary.rootEndTimeUnixNano = endTimeUnixNano
}
// summary is not a pointer so it must be put back to the map.
traceMap[traceID] = summary

// collect the span into spanSets if it satisfied the filter.
if matched {
summary.matchedCount++
if int64(len(summary.matchedSpans)) < spss {
summary.matchedSpans = append(summary.matchedSpans, matchedSpan{
spanID: spanID,
name: projectName(spanName, nameReferenced),
startTimeUnixNano: startTimeUnixNano,
endTimeUnixNano: endTimeUnixNano,
attributes: projectAttributes(row, attrSpecs),
})
}
}
}

resultList := make([]traceSummary, 0, len(traceMap))
for _, summary := range traceMap {
resultList = append(resultList, summary)
resultList = append(resultList, *summary)
}
return resultList, nil
}

// statusCodeError is the OTEL StatusCode numeric value for error, used to count
// errors in serviceStats.
const statusCodeError = "2"

// attrSpec maps a stored VT field name to the response attribute key and an
// optional value transform (e.g. kind/status numeric -> name).
type attrSpec struct {
vtField string
key string
transform func(string) string
}

// buildAttrSpecs resolves the query-referenced TraceQL field names into the
// stored VT field names to read from each matched span, plus the response key
// (with scope prefix stripped) and any value transform. Intrinsics that are not
// rendered as attributes (name, duration, nestedSetParent) are skipped.
func buildAttrSpecs(referencedFields []string) []attrSpec {
specs := make([]attrSpec, 0, len(referencedFields))
for _, f := range referencedFields {
switch f {
case otelpb.NameField, "duration", "traceDuration", "nestedSetParent":
// name is emitted as the span's top-level field; the others are not
// span attributes.
continue
case "kind":
specs = append(specs, attrSpec{vtField: otelpb.KindField, key: "kind", transform: traceql.KindCodeToName})
case "status":
specs = append(specs, attrSpec{vtField: otelpb.StatusCodeField, key: "status", transform: traceql.StatusCodeToName})
default:
specs = append(specs, attrSpec{vtField: traceql.TraceQLFieldToVTField(f), key: attrDisplayKey(f)})
}
}
return specs
}

// attrDisplayKey strips the TraceQL scope prefix from a field name to produce the
// response attribute key, mirroring Tempo (e.g. "resource.service.name" ->
// "service.name", "span.server.address" -> "server.address", ".server.address"
// -> "server.address").
func attrDisplayKey(f string) string {
for _, prefix := range []string{"resource.", "span.", "event.", "link.", "instrumentation."} {
if strings.HasPrefix(f, prefix) {
return f[len(prefix):]
}
}
return strings.TrimPrefix(f, ".")
}

// projectName returns the span name only when the query referenced it, matching
// Tempo's behavior of including `name` solely when selected/filtered.
func projectName(spanName string, nameReferenced bool) string {
if nameReferenced {
return spanName
}
return ""
}

// projectAttributes reads the referenced attributes from a span row, applying
// any value transform. Attributes absent from the span are skipped.
func projectAttributes(row *tracecommon.Row, specs []attrSpec) []spanAttr {
if len(specs) == 0 {
return nil
}
attrs := make([]spanAttr, 0, len(specs))
for _, spec := range specs {
for _, field := range row.Fields {
if field.Name == spec.vtField {
value := field.Value
if spec.transform != nil {
value = spec.transform(value)
}
attrs = append(attrs, spanAttr{key: spec.key, value: value})
break
}
}
}
return attrs
}

type commonAPIParam struct {
q string
start time.Time
end time.Time
limit int64
// spss is the max number of spans per spanset returned in the search
// response (Tempo's spansPerSpanSet). matched still reports the full count.
spss int64
}

// parseTempoAPIParam parse Tempo request.
Expand All @@ -624,6 +770,7 @@ func parseTempoAPIParam(_ context.Context, r *http.Request, allowDefaultTime boo
start: time.Time{},
end: time.Time{},
limit: 100,
spss: 3, // Tempo's default spansPerSpanSet.
}

if allowDefaultTime {
Expand Down Expand Up @@ -663,6 +810,16 @@ func parseTempoAPIParam(_ context.Context, r *http.Request, allowDefaultTime boo
p.limit = max(0, min(1000, l))
}

spss := q.Get("spss")
if spss != "" {
s, err := strconv.ParseInt(spss, 10, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse spss: %s", spss)
}
// Clamp to [1, 1000] to keep responses bounded.
p.spss = max(1, min(1000, s))
}

p.q = q.Get("q")

return p, nil
Expand Down
Loading