Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/vtselect/traces/tempo/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func GetTraceList(ctx context.Context, cp *tracecommon.CommonParams, filterQuery
return nil, nil, nil
}

// query 2: trace_id:in(traceID, traceID, ...)
qStr := fmt.Sprintf(otelpb.TraceIDField+":in(%s)", strings.Join(traceIDs, ","))
// query 2: {resource_attr:service.name!=""} AND (filter_conditions or parent_span_id:="") AND trace_id:in(traceID, traceID, ...)
qStr := `{resource_attr:service.name!=""} AND (` + filterQuery.String() + ` OR parent_span_id:="") AND ` + otelpb.TraceIDField + `:in(` + strings.Join(traceIDs, ",") + `)`
q, err := logstorage.ParseQueryAtTimestamp(qStr, currentTime.UnixNano())
if err != nil {
return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
Expand Down
109 changes: 76 additions & 33 deletions app/vtselect/traces/tempo/tempo.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,17 @@ func processSearchRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
return
}

result, err := searchTraces(ctx, cp, params.q, params.start, params.end, params.limit)
spss := 3
spssStr := r.URL.Query().Get("spss")
if spssStr != "" {
spss, err = strconv.Atoi(spssStr)
if err != nil {
httpserver.Errorf(w, r, "incorrect spss param:%s , %s", spssStr, err)
return
}
}
Comment thread
jiekun marked this conversation as resolved.

result, err := searchTraces(ctx, cp, params.q, params.start, params.end, params.limit, spss)
if err != nil {
httpserver.Errorf(w, r, "cannot get traces list: %s", err)
return
Expand Down Expand Up @@ -535,7 +545,7 @@ func singleFieldQueryHelper(ctx context.Context, q *logstorage.Query, cp *tracec
return resultList, nil
}

func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr string, start, end time.Time, limit int) ([]traceSummary, error) {
func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr string, start, end time.Time, limit, spss int) ([]traceSummary, error) {
// transform traceQL into LogsQL as filter. It should contain filter only without any pipe.
filterQuery, err := traceql.ParseQuery(traceQLStr)
if err != nil {
Expand All @@ -555,7 +565,7 @@ func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr
return nil, err
}

result, err := summarySearchTracesResult(ctx, rows, limit)
result, err := summarySearchTracesResult(ctx, rows, spss)
if err != nil {
return nil, err
}
Expand All @@ -564,18 +574,24 @@ func searchTraces(ctx context.Context, cp *tracecommon.CommonParams, traceQLStr
}

type traceSummary struct {
rootSpan spanSummary
spanSet []spanSummary
}

type spanSummary struct {
traceID string
rootServiceName string
rootTraceName string
rootSpanID string
spanID string
parentSpanID string
name string
serviceName string
startTimeUnixNano int64
endTimeUnixNano int64
// rootStartTimeUnixNano / rootEndTimeUnixNano are the root span's own bounds,
// used to populate spanSets[0].spans[0].startTimeUnixNano and durationNanos.
// Grafana's Tempo datasource reads durationNanos from the span, not the
// trace-level durationMs, so we must emit it on the synthesized root span.
rootStartTimeUnixNano int64
rootEndTimeUnixNano int64
attributes []attribute
}

type attribute struct {
key string
vStr string
}

func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, limit int) ([]traceSummary, error) {
Expand All @@ -585,6 +601,7 @@ func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, lim
var traceID, serviceName, spanName, spanID, parentSpanID string
var startTimeUnixNano, endTimeUnixNano int64
var err error
var attributes []attribute
for _, field := range row.Fields {
switch field.Name {
case otelpb.ResourceAttrServiceName:
Expand All @@ -608,37 +625,62 @@ func summarySearchTracesResult(ctx context.Context, rows []*tracecommon.Row, lim
return nil, err
}
default:
continue
// span attributes
v := strings.Clone(field.Value)
if strings.HasPrefix(field.Name, otelpb.ResourceAttrPrefix) { // resource attributes
attributes = append(attributes, attribute{
key: strings.TrimPrefix(field.Name, otelpb.ResourceAttrPrefix),
vStr: v,
})
} else if strings.HasPrefix(field.Name, otelpb.SpanAttrPrefixField) {
attributes = append(attributes, attribute{
key: strings.TrimPrefix(field.Name, otelpb.SpanAttrPrefixField),
vStr: v,
})
}
}
}

if traceID == "" {
return nil, fmt.Errorf("trace ID not found for a span %v", row)
}

// get the summary for this trace
summary, ok := traceMap[traceID]
// get the trace summary for this trace
trace, ok := traceMap[traceID]
if !ok {
summary = traceSummary{
startTimeUnixNano: math.MaxInt64,
rootServiceName: "<root span not yet received>",
trace = traceSummary{
rootSpan: spanSummary{
startTimeUnixNano: math.MaxInt64,
serviceName: "<root span not yet received>",
},
}
traceMap[traceID] = summary
}

summary.traceID = traceID
summary.startTimeUnixNano = min(summary.startTimeUnixNano, startTimeUnixNano)
summary.endTimeUnixNano = max(summary.endTimeUnixNano, endTimeUnixNano)
span := spanSummary{
traceID: traceID,
spanID: spanID,
parentSpanID: parentSpanID,
name: spanName,
serviceName: serviceName,
startTimeUnixNano: startTimeUnixNano,
endTimeUnixNano: endTimeUnixNano,
attributes: attributes,
}

trace.rootSpan.traceID = traceID
trace.rootSpan.startTimeUnixNano = min(trace.rootSpan.startTimeUnixNano, span.startTimeUnixNano)
trace.rootSpan.endTimeUnixNano = max(trace.rootSpan.endTimeUnixNano, span.endTimeUnixNano)

// if it's the root span
if parentSpanID == "" {
summary.rootServiceName = serviceName
summary.rootTraceName = spanName
summary.rootSpanID = spanID
summary.rootStartTimeUnixNano = startTimeUnixNano
summary.rootEndTimeUnixNano = endTimeUnixNano
trace.rootSpan = span
}
if len(trace.spanSet) < limit {
Comment thread
jiekun marked this conversation as resolved.
trace.spanSet = append(trace.spanSet, span)
}
// summary is not a pointer so it must be put back to the map.
traceMap[traceID] = summary

// trace is not a pointer so it must be put back to the map.
traceMap[traceID] = trace
}

resultList := make([]traceSummary, 0, len(traceMap))
Expand Down Expand Up @@ -698,11 +740,12 @@ func parseTempoAPIParam(_ context.Context, r *http.Request, allowDefaultTime boo
if err != nil {
return nil, fmt.Errorf("cannot parse limit: %s, err: %v", limit, err)
}
// Let's limit this to [0, *tracecommon.TraceMaxTraces] to prevent users from specifying an excessively large value.
if l < 0 || l > maxLimit {
return nil, fmt.Errorf("limit %d out of range [0, %d]", l, maxLimit)
// Let's limit this to (0, *tracecommon.TraceMaxTraces] to prevent users from specifying an excessively large value.
if l > maxLimit {
p.limit = maxLimit
} else if l > 0 {
p.limit = l
}
p.limit = l
}

p.q = q.Get("q")
Expand Down
49 changes: 28 additions & 21 deletions app/vtselect/traces/tempo/tempo.qtpl
Original file line number Diff line number Diff line change
Expand Up @@ -132,32 +132,39 @@

{% func summaryJson(summary traceSummary) %}
{
"traceID":{%q= summary.traceID %},
"rootServiceName":{%q= summary.rootServiceName %},
"rootTraceName":{%q= summary.rootTraceName %},
"startTimeUnixNano":{%dl= summary.startTimeUnixNano %},
"durationMs":{% if summary.endTimeUnixNano > 0 && summary.startTimeUnixNano > 0 %}{%dl= (summary.endTimeUnixNano - summary.startTimeUnixNano) / 1e6 %}{% else %}0{% endif %},
"spanSet":{%= spanSetJSON(summary) %},
"spanSets":[{%= spanSetJSON(summary) %}]
"traceID":{%q= summary.rootSpan.traceID %},
"rootServiceName":{%q= summary.rootSpan.serviceName %},
"rootTraceName":{%q= summary.rootSpan.name %},
"startTimeUnixNano":{%dl= summary.rootSpan.startTimeUnixNano %},
"durationMs":{% if summary.rootSpan.endTimeUnixNano > 0 && summary.rootSpan.startTimeUnixNano > 0 %}{%dl= (summary.rootSpan.endTimeUnixNano - summary.rootSpan.startTimeUnixNano) / 1e6 %}{% else %}0{% endif %},
"spanSets":[{%= spanSetJSON(summary.spanSet) %}]
Comment thread
jiekun marked this conversation as resolved.
}
{% endfunc %}

{% func spanSetJSON(summary traceSummary) %}
{% func spanSetJSON(spanSet []spanSummary) %}
{
"spans":[
{% if summary.rootSpanID != "" %}
{
"spanID":{%q= summary.rootSpanID %},
"startTimeUnixNano":{%q= strconv.FormatInt(summary.rootStartTimeUnixNano, 10) %},
"durationNanos":{%q= strconv.FormatInt(summary.rootEndTimeUnixNano - summary.rootStartTimeUnixNano, 10) %},
"attributes":[
{"key":"service.name","value":{"stringValue":{%q= summary.rootServiceName %}}},
{"key":"nestedSetParent","value":{"intValue":"-1"}}
]
}
{% endif %}
],
"matched":{% if summary.rootSpanID != "" %}1{% else %}0{% endif %}
{% if len(spanSet) > 0 %}
{%= spanSummaryJson(spanSet[0]) %}
{% for _, span := range spanSet[1:] %}
,{%= spanSummaryJson(span) %}
{% endfor %}
{% endif %}
],
"matched": {%d= len(spanSet) %}
Comment thread
jiekun marked this conversation as resolved.
}
{% endfunc %}

{% func spanSummaryJson(span spanSummary) %}
{
"spanID":{%q= span.spanID %},
"startTimeUnixNano":{%dl= span.startTimeUnixNano %},
"durationNanos":{% if span.endTimeUnixNano > 0 && span.startTimeUnixNano > 0 %}{%dl= span.endTimeUnixNano - span.startTimeUnixNano %}{% else %}0{% endif %},
"attributes":[
{"key":"service.name","value":{"stringValue":{%q= span.serviceName %}}},
{"key":"name","value":{"stringValue":{%q= span.name %}}},
{"key":"nestedSetParent","value":{"intValue": {% if span.parentSpanID == "" %}"-1"{% else %}"0"{% endif %}}}
]
}
{% endfunc %}

Expand Down
Loading