Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/vtselect/traces/tempo/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func GetTraceList(ctx context.Context, cp *tracecommon.CommonParams, filterQuery
return nil, nil, nil
}

// query 2: trace_id:in(traceID, traceID, ...)
qStr := fmt.Sprintf(otelpb.TraceIDField+":in(%s)", strings.Join(traceIDs, ","))
// query 2: {resource_attr:service.name!=""} AND (filter_conditions or parent_span_id:="") AND trace_id:in(traceID, traceID, ...)
qStr := `{resource_attr:service.name!=""} AND (` + filterQuery.String() + ` OR parent_span_id:="") AND ` + otelpb.TraceIDField + `:in(` + strings.Join(traceIDs, ",") + `)`
q, err := logstorage.ParseQueryAtTimestamp(qStr, currentTime.UnixNano())
if err != nil {
return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
Expand Down
101 changes: 71 additions & 30 deletions app/vtselect/traces/tempo/tempo.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,17 @@ func processSearchRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
return
}

result, err := searchTraces(ctx, cp, params.q, params.start, params.end, params.limit)
spss := 3
spssStr := r.URL.Query().Get("spss")
if spssStr != "" {
spss, err = strconv.Atoi(spssStr)
if err != nil {
httpserver.Errorf(w, r, "incorrect spss param:%s , %s", spssStr, err)
return
}
}
Comment thread
jiekun marked this conversation as resolved.

result, err := searchTraces(ctx, cp, params.q, params.start, params.end, params.limit, spss)
if err != nil {
httpserver.Errorf(w, r, "cannot get traces list: %s", err)
return
Expand Down Expand Up @@ -535,7 +545,7 @@ func singleFieldQueryHelper(ctx context.Context, q *logstorage.Query, cp *tracec
return resultList, nil
}

func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr string, start, end time.Time, limit int) ([]traceSummary, error) {
func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr string, start, end time.Time, limit, spss int) ([]traceSummary, error) {
// transform traceQL into LogsQL as filter. It should contain filter only without any pipe.
filterQuery, err := traceql.ParseQuery(traceQLStr)
if err != nil {
Expand All @@ -555,7 +565,7 @@ func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr
return nil, err
}

result, err := summarySearchTracesResult(ctx, rows, limit)
result, err := summarySearchTracesResult(ctx, rows, spss)
if err != nil {
return nil, err
}
Expand All @@ -564,18 +574,24 @@ func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr
}

type traceSummary struct {
rootSpan spanSummary
spanSet []spanSummary
}

type spanSummary struct {
traceID string
rootServiceName string
rootTraceName string
rootSpanID string
spanID string
parentSpanID string
name string
serviceName string
startTimeUnixNano int64
endTimeUnixNano int64
// rootStartTimeUnixNano / rootEndTimeUnixNano are the root span's own bounds,
// used to populate spanSets[0].spans[0].startTimeUnixNano and durationNanos.
// Grafana's Tempo datasource reads durationNanos from the span, not the
// trace-level durationMs, so we must emit it on the synthesized root span.
rootStartTimeUnixNano int64
rootEndTimeUnixNano int64
attributes []attribute
}

type attribute struct {
key string
vStr string
}

func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, limit int) ([]traceSummary, error) {
Expand All @@ -585,6 +601,7 @@ func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, lim
var traceID, serviceName, spanName, spanID, parentSpanID string
var startTimeUnixNano, endTimeUnixNano int64
var err error
var attributes []attribute
for _, field := range row.Fields {
switch field.Name {
case otelpb.ResourceAttrServiceName:
Expand All @@ -608,37 +625,61 @@ func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, lim
return nil, err
}
default:
continue
// span attributes
v := strings.Clone(field.Value)
if strings.HasPrefix(field.Name, otelpb.ResourceAttrPrefix) { // resource attributes
attributes = append(attributes, attribute{
key: strings.TrimPrefix(field.Name, otelpb.ResourceAttrPrefix),
vStr: v,
})
} else if strings.HasPrefix(field.Name, otelpb.SpanAttrPrefixField) {
attributes = append(attributes, attribute{
key: strings.TrimPrefix(field.Name, otelpb.SpanAttrPrefixField),
vStr: v,
})
}
}
}

if traceID == "" {
return nil, fmt.Errorf("trace ID not found for a span %v", row)
}

// get the summary for this trace
summary, ok := traceMap[traceID]
// get the trace summary for this trace
trace, ok := traceMap[traceID]
if !ok {
summary = traceSummary{
startTimeUnixNano: math.MaxInt64,
rootServiceName: "<root span not yet received>",
trace = traceSummary{
rootSpan: spanSummary{
startTimeUnixNano: math.MaxInt64,
serviceName: "<root span not yet received>",
},
}
traceMap[traceID] = summary
}

summary.traceID = traceID
summary.startTimeUnixNano = min(summary.startTimeUnixNano, startTimeUnixNano)
summary.endTimeUnixNano = max(summary.endTimeUnixNano, endTimeUnixNano)
span := spanSummary{
traceID: traceID,
spanID: spanID,
parentSpanID: parentSpanID,
name: spanName,
serviceName: serviceName,
startTimeUnixNano: startTimeUnixNano,
endTimeUnixNano: endTimeUnixNano,
attributes: attributes,
}

trace.rootSpan.traceID = traceID
trace.rootSpan.startTimeUnixNano = min(trace.rootSpan.startTimeUnixNano, span.startTimeUnixNano)
trace.rootSpan.endTimeUnixNano = max(trace.rootSpan.endTimeUnixNano, span.endTimeUnixNano)

// if it's the root span
if parentSpanID == "" {
summary.rootServiceName = serviceName
summary.rootTraceName = spanName
summary.rootSpanID = spanID
summary.rootStartTimeUnixNano = startTimeUnixNano
summary.rootEndTimeUnixNano = endTimeUnixNano
trace.rootSpan = span
} else if len(trace.spanSet) < limit {
trace.spanSet = append(trace.spanSet, span)
}
// summary is not a pointer so it must be put back to the map.
traceMap[traceID] = summary

// trace is not a pointer so it must be put back to the map.
traceMap[traceID] = trace
}

resultList := make([]traceSummary, 0, len(traceMap))
Expand Down Expand Up @@ -700,7 +741,7 @@ func parseTempoAPIParam(_ context.Context, r *http.Request, allowDefaultTime boo
}
// Let's limit this to [0, *tracecommon.TraceMaxTraces] to prevent users from specifying an excessively large value.
if l < 0 || l > maxLimit {
return nil, fmt.Errorf("limit %d out of range [0, %d]", l, maxLimit)
l = maxLimit
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
}
p.limit = l
}
Expand Down
49 changes: 28 additions & 21 deletions app/vtselect/traces/tempo/tempo.qtpl
Original file line number Diff line number Diff line change
Expand Up @@ -132,35 +132,42 @@

{% func summaryJson(summary traceSummary) %}
{
"traceID":{%q= summary.traceID %},
"rootServiceName":{%q= summary.rootServiceName %},
"rootTraceName":{%q= summary.rootTraceName %},
"startTimeUnixNano":{%dl= summary.startTimeUnixNano %},
"durationMs":{% if summary.endTimeUnixNano > 0 && summary.startTimeUnixNano > 0 %}{%dl= (summary.endTimeUnixNano - summary.startTimeUnixNano) / 1e6 %}{% else %}0{% endif %},
"spanSet":{%= spanSetJSON(summary) %},
"spanSets":[{%= spanSetJSON(summary) %}]
"traceID":{%q= summary.rootSpan.traceID %},
"rootServiceName":{%q= summary.rootSpan.serviceName %},
"rootTraceName":{%q= summary.rootSpan.name %},
"startTimeUnixNano":{%dl= summary.rootSpan.startTimeUnixNano %},
"durationMs":{% if summary.rootSpan.endTimeUnixNano > 0 && summary.rootSpan.startTimeUnixNano > 0 %}{%dl= (summary.rootSpan.endTimeUnixNano - summary.rootSpan.startTimeUnixNano) / 1e6 %}{% else %}0{% endif %},
"spanSets":[{%= spanSetJSON(summary.spanSet) %}]
Comment thread
jiekun marked this conversation as resolved.
}
{% endfunc %}

{% func spanSetJSON(summary traceSummary) %}
{% func spanSetJSON(spanSet []spanSummary) %}
{
"spans":[
{% if summary.rootSpanID != "" %}
{
"spanID":{%q= summary.rootSpanID %},
"startTimeUnixNano":{%q= strconv.FormatInt(summary.rootStartTimeUnixNano, 10) %},
"durationNanos":{%q= strconv.FormatInt(summary.rootEndTimeUnixNano - summary.rootStartTimeUnixNano, 10) %},
"attributes":[
{"key":"service.name","value":{"stringValue":{%q= summary.rootServiceName %}}},
{"key":"nestedSetParent","value":{"intValue":"-1"}}
]
}
{% endif %}
],
"matched":{% if summary.rootSpanID != "" %}1{% else %}0{% endif %}
{% if len(spanSet) > 0 %}
{%= spanSummaryJson(spanSet[0]) %}
{% for _, span := range spanSet[1:] %}
,{%= spanSummaryJson(span) %}
{% endfor %}
{% endif %}
],
"matched": {%d= len(spanSet) %}
Comment thread
jiekun marked this conversation as resolved.
}
{% endfunc %}

{% func spanSummaryJson(span spanSummary) %}
{
"spanID":{%q= span.spanID %},
"startTimeUnixNano":{%dl= span.startTimeUnixNano %},
"durationNanos":{% if span.endTimeUnixNano > 0 && span.startTimeUnixNano > 0 %}{%dl= span.endTimeUnixNano - span.startTimeUnixNano %}{% else %}0{% endif %},
"attributes":[
{"key":"service.name","value":{"stringValue":{%q= span.serviceName %}}},
{"key":"name","value":{"stringValue":{%q= span.name %}}}
]
}
{% endfunc %}


{% comment %}
TraceByIDV1JSON renders the Tempo /api/traces/<trace_id> (v1) JSON response:
the bare Trace message, whose resource spans are nested under the "batches" key.
Expand Down
Loading