Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9b4b270
docs: add AZ-aware cost optimization implementation plan
szibis May 14, 2026
584610a
feat: add AZ-aware config fields to PeerConfig and SelectConfig
szibis May 14, 2026
a36364e
feat: add AZ-aware consistent hash ring with same-AZ preference
szibis May 14, 2026
c60365a
docs: rewrite AZ plan v2 — simpler design, auto-detect, strict/prefer…
szibis May 14, 2026
70656a4
feat: add AZ auto-detection package with env/IMDS/GCP/K8s fallback chain
szibis May 14, 2026
ff75e10
feat: add AZ mode (strict/preferred) and min-peers-per-AZ config
szibis May 14, 2026
bf59e6f
feat: add AZ-aware peer cache client with stats endpoint
szibis May 14, 2026
f45b5d3
feat: add AZ-aware buffer bridge with same-AZ endpoint preference
szibis May 14, 2026
3e77d06
feat: add AZ routing metrics for peer cache and buffer bridge
szibis May 14, 2026
f8cb4f2
feat: wire AZ detection and zone-aware peer discovery at startup
szibis May 14, 2026
8bacece
feat: add AZ-aware Helm defaults — topology spread, NODE_NAME injecti…
szibis May 14, 2026
54c3541
feat: wire AZ detection into traces module
szibis May 14, 2026
e038c88
test: add AZ integration tests for peer cache and detection
szibis May 14, 2026
0a7dd60
test: add E2E tests for AZ-aware routing on docker-compose
szibis May 14, 2026
86e8608
docs: update cross-AZ optimization with implementation details and me…
szibis May 14, 2026
2b2a467
fix: peer AZ auth header mismatch, race in stats endpoint, config mer…
szibis May 14, 2026
060650c
test: add fuzz tests and edge cases for all AZ-aware components
szibis May 14, 2026
a151848
docs: add v0.23.0 changelog entries for AZ-aware cost optimization
szibis May 14, 2026
f27c5df
fix: resolve golangci-lint errcheck, staticcheck, and gosec findings
szibis May 14, 2026
084de3d
fix: move nosec annotation to correct lines for gosec G704
szibis May 14, 2026
bc9a421
fix: handle remaining errcheck findings for w.Write and resp.Body.Close
szibis May 14, 2026
cb4f291
fix: add trace-log correlation and cross-datasource trace links
szibis May 14, 2026
5ea292f
fix: add changelog entries for trace-log correlation and datasource f…
szibis May 14, 2026
4232848
fix: add Tempo datasource with correct VT v2 API path and sync changelog
szibis May 14, 2026
762655e
ci: trigger changelog gate re-check
szibis May 14, 2026
16af662
Merge remote-tracking branch 'origin/main' into fix/e2e-correlation-d…
szibis May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- E2E datagen trace-log correlation — traces now generated first with 70% of logs sharing trace IDs, span IDs, and service context for realistic cross-signal testing
- Grafana datasource log→trace links — added `derivedFields` to all VictoriaLogs and Loki datasources with `trace_id=(\w+)` regex linking to Jaeger trace views
- ClickHouse otel_logs view promoted fields — moved service.name, k8s.*, etc. into LogAttributes map only (removed ResourceAttributes duplication)
- Bump loki-vl-proxy to v1.33.0

## [0.23.0] - 2026-05-14

### Added
Expand Down
213 changes: 128 additions & 85 deletions cmd/datagen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ func main() {
}
}

type traceCtx struct {
traceID string
spanIDs []string
svc string
ns string
env string
region string
node string
host string
baseTime time.Time
}

func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix string, logsCount, tracesCount, hoursBack int, dualWrite bool, vlEndpoint, vtEndpoint string) {
now := time.Now().UTC()
rng := mrand.New(mrand.NewSource(now.UnixNano())) // #nosec G404 -- synthetic test data, not security-sensitive
Expand All @@ -98,83 +110,11 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix
}

batchID := randomHex(8)
logsByPartition := make(map[string][]LogRow)
for i := 0; i < logsCount; i++ {
hoursAgo := rng.Intn(hoursBack) + 1
if hoursBack <= 1 {
hoursAgo = 0
}
ts := now.Add(-time.Duration(hoursAgo) * time.Hour).Add(time.Duration(rng.Intn(3600)) * time.Second)
if hoursBack <= 1 {
ts = now.Add(-time.Duration(rng.Intn(3600)) * time.Second)
}
svc := services[rng.Intn(len(services))]
ns := namespaces[rng.Intn(len(namespaces))]
env := deployEnvs[rng.Intn(len(deployEnvs))]
region := regions[rng.Intn(len(regions))]
node := k8sNodes[rng.Intn(len(k8sNodes))]
host := hostNames[rng.Intn(len(hostNames))]
lvl := levels[rng.Intn(len(levels))]
pattern := pickPattern(rng)
body, logAttrs := pattern(rng, ts, svc, lvl)
traceID := randomHex(32)
spanID := randomHex(16)

row := LogRow{
TimestampUnixNano: ts.UnixNano(),
Body: body,
SeverityText: lvl,
SeverityNumber: levelNums[lvl],
ServiceName: svc,
K8sNamespaceName: ns,
K8sPodName: fmt.Sprintf("%s-%s", svc, randomHex(8)),
K8sDeploymentName: svc,
K8sNodeName: node,
DeployEnv: env,
CloudRegion: region,
HostName: host,
TraceID: traceID,
SpanID: spanID,
Stream: fmt.Sprintf("{service.name=%q,k8s.namespace.name=%q,k8s.deployment.name=%q,deployment.environment=%q,cloud.region=%q}", svc, ns, svc, env, region),
StreamID: randomHex(16),
ScopeName: "github.com/reliablyobserve/instrumentation",
ResourceAttributes: map[string]string{
"service.version": fmt.Sprintf("1.%d.0", rng.Intn(10)),
"telemetry.sdk.name": "opentelemetry",
},
LogAttributes: logAttrs,
}

key := partitionKeyBatch(tenantPrefix, "logs", ts, batchID)
logsByPartition[key] = append(logsByPartition[key], row)
}

for key, rows := range logsByPartition {
data, err := writeLogsParquet(rows)
if err != nil {
log.Printf("ERROR write logs parquet: %v", err)
return
}
if err := upload(ctx, client, bucket, key, data); err != nil {
log.Printf("ERROR upload %s: %v", key, err)
return
}
log.Printf(" uploaded %s (%d rows, %d bytes)", key, len(rows), len(data))
}

if dualWrite && vlEndpoint != "" {
var allLogs []LogRow
for _, rows := range logsByPartition {
allLogs = append(allLogs, rows...)
}
if err := pushNDJSON(vlEndpoint, allLogs); err != nil {
log.Printf("WARNING: dual-write to VL failed: %v", err)
} else {
log.Printf(" dual-write: pushed %d logs to VL at %s", len(allLogs), vlEndpoint)
}
}
// ── Phase 1: Generate traces first so logs can reference their IDs ──

tracesByPartition := make(map[string][]TraceRow)
var traceContexts []traceCtx
numTraces := tracesCount / 3
if numTraces < 1 {
numTraces = 1
Expand All @@ -196,10 +136,16 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix
node := k8sNodes[rng.Intn(len(k8sNodes))]
host := hostNames[rng.Intn(len(hostNames))]

tc := traceCtx{
traceID: traceID, svc: svc, ns: ns, env: env,
region: region, node: node, host: host, baseTime: baseTime,
}

spansPerTrace := 2 + rng.Intn(4)
parentSpanID := ""
for s := 0; s < spansPerTrace; s++ {
spanID := randomHex(16)
tc.spanIDs = append(tc.spanIDs, spanID)
startTime := baseTime.Add(time.Duration(s*10) * time.Millisecond)
dur := time.Duration(5+rng.Intn(50)) * time.Millisecond
endTime := startTime.Add(dur)
Expand Down Expand Up @@ -230,12 +176,6 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix
dbStmt = "GET session:user:" + randomHex(8)
}

resAttrs := map[string]string{
"service.version": fmt.Sprintf("1.%d.0", rng.Intn(10)),
"telemetry.sdk.name": "opentelemetry",
}
spanAttrs := map[string]string{}

row := TraceRow{
TimestampUnixNano: endTime.UnixNano(),
StartTimeUnixNano: startTime.UnixNano(),
Expand All @@ -260,15 +200,19 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix
HTTPUrl: httpUrl,
DBSystem: dbSystem,
DBStatement: dbStmt,
ResourceAttributes: resAttrs,
SpanAttributes: spanAttrs,
ScopeAttributes: map[string]string{},
ResourceAttributes: map[string]string{
"service.version": fmt.Sprintf("1.%d.0", rng.Intn(10)),
"telemetry.sdk.name": "opentelemetry",
},
SpanAttributes: map[string]string{},
ScopeAttributes: map[string]string{},
}

key := partitionKeyBatch(tenantPrefix, "traces", startTime, batchID)
tracesByPartition[key] = append(tracesByPartition[key], row)
parentSpanID = spanID
}
traceContexts = append(traceContexts, tc)
}

for key, rows := range tracesByPartition {
Expand Down Expand Up @@ -296,6 +240,105 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix
}
}

// ── Phase 2: Generate logs — 70% correlated to traces, 30% independent ──

logsByPartition := make(map[string][]LogRow)
correlatedCount := 0
for i := 0; i < logsCount; i++ {
var traceID, spanID, svc, ns, env, region, node, host string
var ts time.Time

correlated := len(traceContexts) > 0 && rng.Float64() < 0.7
if correlated {
tc := traceContexts[rng.Intn(len(traceContexts))]
traceID = tc.traceID
spanID = tc.spanIDs[rng.Intn(len(tc.spanIDs))]
svc = tc.svc
ns = tc.ns
env = tc.env
region = tc.region
node = tc.node
host = tc.host
ts = tc.baseTime.Add(time.Duration(rng.Intn(10000)-5000) * time.Millisecond)
correlatedCount++
} else {
hoursAgo := rng.Intn(hoursBack) + 1
if hoursBack <= 1 {
hoursAgo = 0
}
ts = now.Add(-time.Duration(hoursAgo) * time.Hour).Add(time.Duration(rng.Intn(3600)) * time.Second)
if hoursBack <= 1 {
ts = now.Add(-time.Duration(rng.Intn(3600)) * time.Second)
}
svc = services[rng.Intn(len(services))]
ns = namespaces[rng.Intn(len(namespaces))]
env = deployEnvs[rng.Intn(len(deployEnvs))]
region = regions[rng.Intn(len(regions))]
node = k8sNodes[rng.Intn(len(k8sNodes))]
host = hostNames[rng.Intn(len(hostNames))]
traceID = randomHex(32)
spanID = randomHex(16)
}

lvl := levels[rng.Intn(len(levels))]
pattern := pickPattern(rng)
body, logAttrs := pattern(rng, ts, svc, lvl)
body = fmt.Sprintf("%s trace_id=%s span_id=%s", body, traceID, spanID)

row := LogRow{
TimestampUnixNano: ts.UnixNano(),
Body: body,
SeverityText: lvl,
SeverityNumber: levelNums[lvl],
ServiceName: svc,
K8sNamespaceName: ns,
K8sPodName: fmt.Sprintf("%s-%s", svc, randomHex(8)),
K8sDeploymentName: svc,
K8sNodeName: node,
DeployEnv: env,
CloudRegion: region,
HostName: host,
TraceID: traceID,
SpanID: spanID,
Stream: fmt.Sprintf("{service.name=%q,k8s.namespace.name=%q,k8s.deployment.name=%q,deployment.environment=%q,cloud.region=%q}", svc, ns, svc, env, region),
StreamID: randomHex(16),
ScopeName: "github.com/reliablyobserve/instrumentation",
ResourceAttributes: map[string]string{
"service.version": fmt.Sprintf("1.%d.0", rng.Intn(10)),
"telemetry.sdk.name": "opentelemetry",
},
LogAttributes: logAttrs,
}

key := partitionKeyBatch(tenantPrefix, "logs", ts, batchID)
logsByPartition[key] = append(logsByPartition[key], row)
}

for key, rows := range logsByPartition {
data, err := writeLogsParquet(rows)
if err != nil {
log.Printf("ERROR write logs parquet: %v", err)
return
}
if err := upload(ctx, client, bucket, key, data); err != nil {
log.Printf("ERROR upload %s: %v", key, err)
return
}
log.Printf(" uploaded %s (%d rows, %d bytes)", key, len(rows), len(data))
}

if dualWrite && vlEndpoint != "" {
var allLogs []LogRow
for _, rows := range logsByPartition {
allLogs = append(allLogs, rows...)
}
if err := pushNDJSON(vlEndpoint, allLogs); err != nil {
log.Printf("WARNING: dual-write to VL failed: %v", err)
} else {
log.Printf(" dual-write: pushed %d logs to VL at %s", len(allLogs), vlEndpoint)
}
}

totalLogs := 0
for _, rows := range logsByPartition {
totalLogs += len(rows)
Expand All @@ -305,8 +348,8 @@ func generateBatch(ctx context.Context, client *s3.Client, bucket, tenantPrefix
totalTraces += len(rows)
}

log.Printf("Batch done: %d log rows in %d partitions, %d trace spans in %d partitions",
totalLogs, len(logsByPartition), totalTraces, len(tracesByPartition))
log.Printf("Batch done: %d log rows (%d correlated) in %d partitions, %d trace spans in %d partitions",
totalLogs, correlatedCount, len(logsByPartition), totalTraces, len(tracesByPartition))
}

func partitionKeyBatch(prefix, signal string, ts time.Time, batchID string) string {
Expand Down
2 changes: 1 addition & 1 deletion deployment/docker/Dockerfile.loki-vl-proxy
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM alpine:3.21
ARG VERSION=1.31.2
ARG VERSION=1.33.0
ARG TARGETARCH=amd64
RUN wget -qO /usr/local/bin/loki-vl-proxy \
"https://github.com/ReliablyObserve/loki-vl-proxy/releases/download/v${VERSION}/loki-vl-proxy-linux-${TARGETARCH}" \
Expand Down
26 changes: 18 additions & 8 deletions deployment/docker/clickhouse/init-s3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,34 @@ SELECT
body AS Body,
`service.name` AS ServiceName,
trace_id AS TraceId,
trace_id AS traceID,
span_id AS SpanId,
`scope.name` AS ScopeName,
'' AS ScopeVersion,
'' AS ResourceSchemaUrl,
'' AS ScopeSchemaUrl,
`resource.attributes` AS ResourceAttributes,
mapConcat(
`resource.attributes`,
`log.attributes`,
mapFilter((k, v) -> v != '',
mapFromArrays(
['k8s.namespace.name', 'k8s.pod.name', 'k8s.deployment.name',
'k8s.node.name', 'deployment.environment', 'cloud.region', 'host.name'],
[`k8s.namespace.name`, `k8s.pod.name`, `k8s.deployment.name`,
`k8s.node.name`, `deployment.environment`, `cloud.region`, `host.name`]
['level', 'service.name', 'k8s.namespace.name', 'k8s.pod.name',
'k8s.deployment.name', 'k8s.node.name', 'deployment.environment',
'cloud.region', 'host.name', 'trace_id', 'span_id', 'scope.name'],
[severity_text, `service.name`, `k8s.namespace.name`, `k8s.pod.name`,
`k8s.deployment.name`, `k8s.node.name`, `deployment.environment`,
`cloud.region`, `host.name`, trace_id, span_id, `scope.name`]
)
)
) AS ResourceAttributes,
`log.attributes` AS LogAttributes,
CAST(map() AS Map(String, String)) AS ScopeAttributes
) AS LogAttributes,
CAST(map() AS Map(String, String)) AS ScopeAttributes,
CAST([] AS Array(DateTime64(9))) AS `Events.Timestamp`,
CAST([] AS Array(String)) AS `Events.Name`,
CAST([] AS Array(Map(String, String))) AS `Events.Attributes`,
CAST([] AS Array(String)) AS `Links.TraceId`,
CAST([] AS Array(String)) AS `Links.SpanId`,
CAST([] AS Array(String)) AS `Links.TraceState`,
CAST([] AS Array(Map(String, String))) AS `Links.Attributes`
FROM s3(
'http://minio:9000/obs-archive/*/*/logs/dt=*/hour=*/*.parquet',
'minioadmin', 'minioadmin', 'Parquet',
Expand Down
2 changes: 1 addition & 1 deletion deployment/docker/docker-compose-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ services:
environment:
VL_BACKEND_URL: "http://victorialogs:9428"
command:
- "-label-style=underscores"
- "-label-style=passthrough"
- "-metadata-field-mode=translated"
- "-emit-structured-metadata=true"
- "-stream-fields=service.name,k8s.namespace.name,k8s.pod.name,k8s.deployment.name,deployment.environment"
Expand Down
Loading
Loading