diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a7840a6..82c5b0db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- E2E datagen trace-log correlation — traces now generated first with 70% of logs sharing trace IDs, span IDs, and service context for realistic cross-signal testing +- Grafana datasource log→trace links — added `derivedFields` to all VictoriaLogs and Loki datasources with `trace_id=(\w+)` regex linking to Jaeger trace views +- ClickHouse otel_logs view promoted fields — moved service.name, k8s.*, etc. into LogAttributes map only (removed ResourceAttributes duplication) +- Bump loki-vl-proxy to v1.33.0 + ## [0.23.0] - 2026-05-14 ### Added diff --git a/cmd/datagen/main.go b/cmd/datagen/main.go index 6c920009..0032a3cd 100644 --- a/cmd/datagen/main.go +++ b/cmd/datagen/main.go @@ -87,6 +87,18 @@ func main() { } } +type traceCtx struct { + traceID string + spanIDs []string + svc string + ns string + env string + region string + node string + host string + baseTime time.Time +} + func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix string, logsCount, tracesCount, hoursBack int, dualWrite bool, vlEndpoint, vtEndpoint string) { now := time.Now().UTC() rng := mrand.New(mrand.NewSource(now.UnixNano())) // #nosec G404 -- synthetic test data, not security-sensitive @@ -98,83 +110,11 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix } batchID := randomHex(8) - logsByPartition := make(map[string][]LogRow) - for i := 0; i < logsCount; i++ { - hoursAgo := rng.Intn(hoursBack) + 1 - if hoursBack <= 1 { - hoursAgo = 0 - } - ts := now.Add(-time.Duration(hoursAgo) * time.Hour).Add(time.Duration(rng.Intn(3600)) * time.Second) - if hoursBack <= 1 { - ts = now.Add(-time.Duration(rng.Intn(3600)) * time.Second) - } - svc := services[rng.Intn(len(services))] - ns := namespaces[rng.Intn(len(namespaces))] - env := deployEnvs[rng.Intn(len(deployEnvs))] - region := regions[rng.Intn(len(regions))] - node := k8sNodes[rng.Intn(len(k8sNodes))] - host := hostNames[rng.Intn(len(hostNames))] - lvl := levels[rng.Intn(len(levels))] - pattern := pickPattern(rng) - body, logAttrs := pattern(rng, ts, svc, lvl) - traceID := randomHex(32) - spanID := randomHex(16) - row := LogRow{ - TimestampUnixNano: ts.UnixNano(), - Body: body, - SeverityText: lvl, - SeverityNumber: levelNums[lvl], - ServiceName: svc, - K8sNamespaceName: ns, - K8sPodName: fmt.Sprintf("%s-%s", svc, randomHex(8)), - K8sDeploymentName: svc, - K8sNodeName: node, - DeployEnv: env, - CloudRegion: region, - HostName: host, - TraceID: traceID, - SpanID: spanID, - Stream: fmt.Sprintf("{service.name=%q,k8s.namespace.name=%q,k8s.deployment.name=%q,deployment.environment=%q,cloud.region=%q}", svc, ns, svc, env, region), - StreamID: randomHex(16), - ScopeName: "github.com/reliablyobserve/instrumentation", - ResourceAttributes: map[string]string{ - "service.version": fmt.Sprintf("1.%d.0", rng.Intn(10)), - "telemetry.sdk.name": "opentelemetry", - }, - LogAttributes: logAttrs, - } - - key := partitionKeyBatch(tenantPrefix, "logs", ts, batchID) - logsByPartition[key] = append(logsByPartition[key], row) - } - - for key, rows := range logsByPartition { - data, err := writeLogsParquet(rows) - if err != nil { - log.Printf("ERROR write logs parquet: %v", err) - return - } - if err := upload(ctx, client, bucket, key, data); err != nil { - log.Printf("ERROR upload %s: %v", key, err) - return - } - log.Printf(" uploaded %s (%d rows, %d bytes)", key, len(rows), len(data)) - } - - if dualWrite && vlEndpoint != "" { - var allLogs []LogRow - for _, rows := range logsByPartition { - allLogs = append(allLogs, rows...) - } - if err := pushNDJSON(vlEndpoint, allLogs); err != nil { - log.Printf("WARNING: dual-write to VL failed: %v", err) - } else { - log.Printf(" dual-write: pushed %d logs to VL at %s", len(allLogs), vlEndpoint) - } - } + // ── Phase 1: Generate traces first so logs can reference their IDs ── tracesByPartition := make(map[string][]TraceRow) + var traceContexts []traceCtx numTraces := tracesCount / 3 if numTraces < 1 { numTraces = 1 @@ -196,10 +136,16 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix node := k8sNodes[rng.Intn(len(k8sNodes))] host := hostNames[rng.Intn(len(hostNames))] + tc := traceCtx{ + traceID: traceID, svc: svc, ns: ns, env: env, + region: region, node: node, host: host, baseTime: baseTime, + } + spansPerTrace := 2 + rng.Intn(4) parentSpanID := "" for s := 0; s < spansPerTrace; s++ { spanID := randomHex(16) + tc.spanIDs = append(tc.spanIDs, spanID) startTime := baseTime.Add(time.Duration(s*10) * time.Millisecond) dur := time.Duration(5+rng.Intn(50)) * time.Millisecond endTime := startTime.Add(dur) @@ -230,12 +176,6 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix dbStmt = "GET session:user:" + randomHex(8) } - resAttrs := map[string]string{ - "service.version": fmt.Sprintf("1.%d.0", rng.Intn(10)), - "telemetry.sdk.name": "opentelemetry", - } - spanAttrs := map[string]string{} - row := TraceRow{ TimestampUnixNano: endTime.UnixNano(), StartTimeUnixNano: startTime.UnixNano(), @@ -260,15 +200,19 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix HTTPUrl: httpUrl, DBSystem: dbSystem, DBStatement: dbStmt, - ResourceAttributes: resAttrs, - SpanAttributes: spanAttrs, - ScopeAttributes: map[string]string{}, + ResourceAttributes: map[string]string{ + "service.version": fmt.Sprintf("1.%d.0", rng.Intn(10)), + "telemetry.sdk.name": "opentelemetry", + }, + SpanAttributes: map[string]string{}, + ScopeAttributes: map[string]string{}, } key := partitionKeyBatch(tenantPrefix, "traces", startTime, batchID) tracesByPartition[key] = append(tracesByPartition[key], row) parentSpanID = spanID } + traceContexts = append(traceContexts, tc) } for key, rows := range tracesByPartition { @@ -296,6 +240,105 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix } } + // ── Phase 2: Generate logs — 70% correlated to traces, 30% independent ── + + logsByPartition := make(map[string][]LogRow) + correlatedCount := 0 + for i := 0; i < logsCount; i++ { + var traceID, spanID, svc, ns, env, region, node, host string + var ts time.Time + + correlated := len(traceContexts) > 0 && rng.Float64() < 0.7 + if correlated { + tc := traceContexts[rng.Intn(len(traceContexts))] + traceID = tc.traceID + spanID = tc.spanIDs[rng.Intn(len(tc.spanIDs))] + svc = tc.svc + ns = tc.ns + env = tc.env + region = tc.region + node = tc.node + host = tc.host + ts = tc.baseTime.Add(time.Duration(rng.Intn(10000)-5000) * time.Millisecond) + correlatedCount++ + } else { + hoursAgo := rng.Intn(hoursBack) + 1 + if hoursBack <= 1 { + hoursAgo = 0 + } + ts = now.Add(-time.Duration(hoursAgo) * time.Hour).Add(time.Duration(rng.Intn(3600)) * time.Second) + if hoursBack <= 1 { + ts = now.Add(-time.Duration(rng.Intn(3600)) * time.Second) + } + svc = services[rng.Intn(len(services))] + ns = namespaces[rng.Intn(len(namespaces))] + env = deployEnvs[rng.Intn(len(deployEnvs))] + region = regions[rng.Intn(len(regions))] + node = k8sNodes[rng.Intn(len(k8sNodes))] + host = hostNames[rng.Intn(len(hostNames))] + traceID = randomHex(32) + spanID = randomHex(16) + } + + lvl := levels[rng.Intn(len(levels))] + pattern := pickPattern(rng) + body, logAttrs := pattern(rng, ts, svc, lvl) + body = fmt.Sprintf("%s trace_id=%s span_id=%s", body, traceID, spanID) + + row := LogRow{ + TimestampUnixNano: ts.UnixNano(), + Body: body, + SeverityText: lvl, + SeverityNumber: levelNums[lvl], + ServiceName: svc, + K8sNamespaceName: ns, + K8sPodName: fmt.Sprintf("%s-%s", svc, randomHex(8)), + K8sDeploymentName: svc, + K8sNodeName: node, + DeployEnv: env, + CloudRegion: region, + HostName: host, + TraceID: traceID, + SpanID: spanID, + Stream: fmt.Sprintf("{service.name=%q,k8s.namespace.name=%q,k8s.deployment.name=%q,deployment.environment=%q,cloud.region=%q}", svc, ns, svc, env, region), + StreamID: randomHex(16), + ScopeName: "github.com/reliablyobserve/instrumentation", + ResourceAttributes: map[string]string{ + "service.version": fmt.Sprintf("1.%d.0", rng.Intn(10)), + "telemetry.sdk.name": "opentelemetry", + }, + LogAttributes: logAttrs, + } + + key := partitionKeyBatch(tenantPrefix, "logs", ts, batchID) + logsByPartition[key] = append(logsByPartition[key], row) + } + + for key, rows := range logsByPartition { + data, err := writeLogsParquet(rows) + if err != nil { + log.Printf("ERROR write logs parquet: %v", err) + return + } + if err := upload(ctx, client, bucket, key, data); err != nil { + log.Printf("ERROR upload %s: %v", key, err) + return + } + log.Printf(" uploaded %s (%d rows, %d bytes)", key, len(rows), len(data)) + } + + if dualWrite && vlEndpoint != "" { + var allLogs []LogRow + for _, rows := range logsByPartition { + allLogs = append(allLogs, rows...) + } + if err := pushNDJSON(vlEndpoint, allLogs); err != nil { + log.Printf("WARNING: dual-write to VL failed: %v", err) + } else { + log.Printf(" dual-write: pushed %d logs to VL at %s", len(allLogs), vlEndpoint) + } + } + totalLogs := 0 for _, rows := range logsByPartition { totalLogs += len(rows) @@ -305,8 +348,8 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix totalTraces += len(rows) } - log.Printf("Batch done: %d log rows in %d partitions, %d trace spans in %d partitions", - totalLogs, len(logsByPartition), totalTraces, len(tracesByPartition)) + log.Printf("Batch done: %d log rows (%d correlated) in %d partitions, %d trace spans in %d partitions", + totalLogs, correlatedCount, len(logsByPartition), totalTraces, len(tracesByPartition)) } func partitionKeyBatch(prefix, signal string, ts time.Time, batchID string) string { diff --git a/deployment/docker/Dockerfile.loki-vl-proxy b/deployment/docker/Dockerfile.loki-vl-proxy index 37ed4c54..a6e89b94 100644 --- a/deployment/docker/Dockerfile.loki-vl-proxy +++ b/deployment/docker/Dockerfile.loki-vl-proxy @@ -1,5 +1,5 @@ FROM alpine:3.21 -ARG VERSION=1.31.2 +ARG VERSION=1.33.0 ARG TARGETARCH=amd64 RUN wget -qO /usr/local/bin/loki-vl-proxy \ "https://github.com/ReliablyObserve/loki-vl-proxy/releases/download/v${VERSION}/loki-vl-proxy-linux-${TARGETARCH}" \ diff --git a/deployment/docker/clickhouse/init-s3.sql b/deployment/docker/clickhouse/init-s3.sql index 03d9b5a1..ffa6a4ef 100644 --- a/deployment/docker/clickhouse/init-s3.sql +++ b/deployment/docker/clickhouse/init-s3.sql @@ -19,24 +19,34 @@ SELECT body AS Body, `service.name` AS ServiceName, trace_id AS TraceId, + trace_id AS traceID, span_id AS SpanId, `scope.name` AS ScopeName, '' AS ScopeVersion, '' AS ResourceSchemaUrl, '' AS ScopeSchemaUrl, + `resource.attributes` AS ResourceAttributes, mapConcat( - `resource.attributes`, + `log.attributes`, mapFilter((k, v) -> v != '', mapFromArrays( - ['k8s.namespace.name', 'k8s.pod.name', 'k8s.deployment.name', - 'k8s.node.name', 'deployment.environment', 'cloud.region', 'host.name'], - [`k8s.namespace.name`, `k8s.pod.name`, `k8s.deployment.name`, - `k8s.node.name`, `deployment.environment`, `cloud.region`, `host.name`] + ['level', 'service.name', 'k8s.namespace.name', 'k8s.pod.name', + 'k8s.deployment.name', 'k8s.node.name', 'deployment.environment', + 'cloud.region', 'host.name', 'trace_id', 'span_id', 'scope.name'], + [severity_text, `service.name`, `k8s.namespace.name`, `k8s.pod.name`, + `k8s.deployment.name`, `k8s.node.name`, `deployment.environment`, + `cloud.region`, `host.name`, trace_id, span_id, `scope.name`] ) ) - ) AS ResourceAttributes, - `log.attributes` AS LogAttributes, - CAST(map() AS Map(String, String)) AS ScopeAttributes + ) AS LogAttributes, + CAST(map() AS Map(String, String)) AS ScopeAttributes, + CAST([] AS Array(DateTime64(9))) AS `Events.Timestamp`, + CAST([] AS Array(String)) AS `Events.Name`, + CAST([] AS Array(Map(String, String))) AS `Events.Attributes`, + CAST([] AS Array(String)) AS `Links.TraceId`, + CAST([] AS Array(String)) AS `Links.SpanId`, + CAST([] AS Array(String)) AS `Links.TraceState`, + CAST([] AS Array(Map(String, String))) AS `Links.Attributes` FROM s3( 'http://minio:9000/obs-archive/*/*/logs/dt=*/hour=*/*.parquet', 'minioadmin', 'minioadmin', 'Parquet', diff --git a/deployment/docker/docker-compose-e2e.yml b/deployment/docker/docker-compose-e2e.yml index 763540d6..136ab49c 100644 --- a/deployment/docker/docker-compose-e2e.yml +++ b/deployment/docker/docker-compose-e2e.yml @@ -266,7 +266,7 @@ services: environment: VL_BACKEND_URL: "http://victorialogs:9428" command: - - "-label-style=underscores" + - "-label-style=passthrough" - "-metadata-field-mode=translated" - "-emit-structured-metadata=true" - "-stream-fields=service.name,k8s.namespace.name,k8s.pod.name,k8s.deployment.name,deployment.environment" diff --git a/deployment/docker/grafana/provisioning/datasources/datasources.yaml b/deployment/docker/grafana/provisioning/datasources/datasources.yaml index bddfd131..4b2b8ce4 100644 --- a/deployment/docker/grafana/provisioning/datasources/datasources.yaml +++ b/deployment/docker/grafana/provisioning/datasources/datasources.yaml @@ -13,6 +13,11 @@ datasources: isDefault: true jsonData: maxLines: 1000 + derivedFields: + - datasourceUid: victoriatraces-global + matcherRegex: 'trace_id=(\w+)' + name: traceID + url: "$${__value.raw}" - name: "VictoriaTraces Global (Hot+Cold)" type: jaeger @@ -38,6 +43,11 @@ datasources: url: http://victorialogs:9428 jsonData: maxLines: 1000 + derivedFields: + - datasourceUid: victoriatraces-hot + matcherRegex: 'trace_id=(\w+)' + name: traceID + url: "$${__value.raw}" - name: "VictoriaTraces Hot (Disk 24h)" type: jaeger @@ -63,6 +73,11 @@ datasources: url: http://lakehouse-logs:9428 jsonData: maxLines: 1000 + derivedFields: + - datasourceUid: victoria-lakehouse-traces + matcherRegex: 'trace_id=(\w+)' + name: traceID + url: "$${__value.raw}" - name: "Lakehouse Traces Cold (S3 Jaeger)" type: jaeger @@ -77,6 +92,23 @@ datasources: filterByTraceID: true filterBySpanID: false + # ========================================================================= + # Tempo API — backed by VictoriaTraces for Loki→trace correlation + # ========================================================================= + + - name: "Tempo via VictoriaTraces (Hot+Cold)" + type: tempo + uid: tempo-global + access: proxy + url: http://vtselect:10428/select/tempo + jsonData: + tracesToLogsV2: + datasourceUid: loki-vl-proxy + spanStartTimeShift: "-1h" + spanEndTimeShift: "1h" + filterByTraceID: true + filterBySpanID: false + # ========================================================================= # Loki API — hot+cold routing via loki-vl-proxy # ========================================================================= @@ -89,8 +121,8 @@ datasources: jsonData: maxLines: 1000 derivedFields: - - datasourceUid: victoriatraces-global - matcherRegex: "trace_id=(\\w+)" + - datasourceUid: tempo-global + matcherRegex: 'trace_id=(\w+)' name: traceID url: "$${__value.raw}" @@ -110,7 +142,7 @@ datasources: defaultDatabase: lakehouse defaultTable: logs_raw - - name: "ClickHouse Logs (OTEL S3 Parquet)" + - name: "ClickHouse Logs (S3 Parquet)" type: grafana-clickhouse-datasource uid: clickhouse-logs access: proxy @@ -129,14 +161,8 @@ datasources: timeColumn: Timestamp levelColumn: SeverityText messageColumn: Body - tracesToLogsV2: - datasourceUid: clickhouse-traces - spanStartTimeShift: "-1h" - spanEndTimeShift: "1h" - filterByTraceID: true - filterBySpanID: false - - name: "ClickHouse Traces (OTEL S3 Parquet)" + - name: "ClickHouse Traces (S3 Parquet)" type: grafana-clickhouse-datasource uid: clickhouse-traces access: proxy