Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/core/configmaps/observability.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ metadata:
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
annotations:
knative.dev/example-checksum: "0270bb17"
knative.dev/example-checksum: "afa5507a"
data:
_example: |
################################
Expand Down Expand Up @@ -58,6 +58,11 @@ data:
# If a zero or negative value is passed the default reporting OTel period is used (60 secs).
metrics-export-interval: 60s

# metrics.attributes.deny is a comma-separated list of metric attribute keys to filter
# out from all metrics. This can help prevent OOM issues caused by unbounded
# metric cardinality in production (e.g. cloudevents.type, messaging.destination.name).
metrics.attributes.deny: ""

# sink-event-error-reporting.enable whether the adapter reports a kube event to the CRD indicating
# a failure to send a cloud event to the sink.
sink-event-error-reporting.enable: "false"
Expand Down
22 changes: 21 additions & 1 deletion pkg/observability/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package observability
import (
"context"
"fmt"
"strings"

configmap "knative.dev/pkg/configmap/parser"
pkgo11y "knative.dev/pkg/observability"
Expand All @@ -32,6 +33,9 @@ const (
// DefaultEnableSinkEventErrorReporting is used to set the default sink event error reporting value
DefaultEnableSinkEventErrorReporting = false

// MetricAttributesDenyListKey is the CM key for a comma-separated list of metric attribute keys to filter out
MetricAttributesDenyListKey = "metrics.attributes.deny"

// DefaultMetricsPort is the default port used for prometheus metrics if the prometheus protocol is used
DefaultMetricsPort = 9092
)
Expand All @@ -50,6 +54,10 @@ type Config struct {
// EnableSinkEventErrorReporting specifies whether we should emit a k8s
// event when delivery to a sink fails
EnableSinkEventErrorReporting bool `json:"enableSinkEventErrorReporting"`

// MetricAttributesDenyList is a list of metric attribute keys to filter out
// from kn.eventing.* metrics (e.g. cloudevents.type, messaging.destination.name)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment should be updated (not specific to kn.eventing.* anymore)

MetricAttributesDenyList []string `json:"metricAttributesDenyList,omitempty"`
}

func DefaultConfig() *Config {
Expand All @@ -74,7 +82,19 @@ func NewFromMap(m map[string]string) (*Config, error) {
c.BaseConfig.Metrics.Endpoint = fmt.Sprintf(":%d", DefaultMetricsPort)
}

err := configmap.Parse(m, configmap.As(EnableSinkEventErrorReportingKey, &c.EnableSinkEventErrorReporting))
if v, ok := m[MetricAttributesDenyListKey]; ok && v != "" {
parts := strings.Split(v, ",")
c.MetricAttributesDenyList = make([]string, 0, len(parts))
for _, p := range parts {
if t := strings.TrimSpace(p); t != "" {
c.MetricAttributesDenyList = append(c.MetricAttributesDenyList, t)
}
}
}

err := configmap.Parse(m,
configmap.As(EnableSinkEventErrorReportingKey, &c.EnableSinkEventErrorReporting),
)
if err != nil {
fmt.Printf("failed to parse enable-sink-error-reporting: %s\n", err.Error())
return c, err
Expand Down
15 changes: 12 additions & 3 deletions pkg/observability/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
)

func TestNewFromMap(t *testing.T) {
configWithOverride := DefaultConfig()
configWithOverride.EnableSinkEventErrorReporting = true
configWithSinkEventErrorReporting := DefaultConfig()
configWithSinkEventErrorReporting.EnableSinkEventErrorReporting = true

configWithDenyList := DefaultConfig()
configWithDenyList.MetricAttributesDenyList = []string{"cloudevents.type", "messaging.destination.name"}

testCases := map[string]struct {
m map[string]string
Expand All @@ -39,7 +42,13 @@ func TestNewFromMap(t *testing.T) {
m: map[string]string{
EnableSinkEventErrorReportingKey: "true",
},
want: configWithOverride,
want: configWithSinkEventErrorReporting,
},
"metric attributes deny list": {
m: map[string]string{
MetricAttributesDenyListKey: "cloudevents.type, messaging.destination.name",
},
want: configWithDenyList,
},
"valid keys, invalid sink event error reporting value": {
m: map[string]string{
Expand Down
21 changes: 20 additions & 1 deletion pkg/observability/otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -60,10 +61,15 @@ func SetupObservabilityOrDie(

otelResource := resource.Default(component)

meterOpts := []metric.Option{metric.WithResource(otelResource)}
if len(cfg.MetricAttributesDenyList) > 0 {
meterOpts = append(meterOpts, metric.WithView(metricAttributesDenyFilter(cfg.MetricAttributesDenyList)))
}

meterProvider, err := metrics.NewMeterProvider(
ctx,
cfg.Metrics,
metric.WithResource(otelResource),
meterOpts...,
)
if err != nil {
logger.Fatalw("failed to set up meter provider", zap.Error(err))
Expand Down Expand Up @@ -155,3 +161,16 @@ func GetObservabilityConfig(ctx context.Context) (*observability.Config, error)

return configmap.Parse(cm)
}

func metricAttributesDenyFilter(denyList []string) metric.View {
keys := make([]attribute.Key, len(denyList))
for i, k := range denyList {
keys[i] = attribute.Key(k)
}
return metric.NewView(
metric.Instrument{Name: "*"},
metric.Stream{
AttributeFilter: attribute.NewDenyKeysFilter(keys...),
},
)
}
57 changes: 57 additions & 0 deletions pkg/observability/otel/otel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2026 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package otel

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
)

func TestMetricAttributesDenyFilter(t *testing.T) {
view := metricAttributesDenyFilter([]string{"cloudevents.type", "messaging.destination.name"})

stream, ok := view(metric.Instrument{Name: "kn.eventing.dispatch.duration"})
assert.True(t, ok, "view should match kn.eventing.* instruments")
assert.NotNil(t, stream.AttributeFilter)

denied := []attribute.KeyValue{
attribute.String("cloudevents.type", "com.example.event"),
attribute.String("messaging.destination.name", "my-destination"),
}
for _, kv := range denied {
assert.False(t, stream.AttributeFilter(kv), "attribute %s should be denied", kv.Key)
}

allowed := []attribute.KeyValue{
attribute.String("messaging.system", "knative"),
attribute.Int("http.response.status_code", 200),
}
for _, kv := range allowed {
assert.True(t, stream.AttributeFilter(kv), "attribute %s should be allowed", kv.Key)
}
}

func TestMetricAttributesDenyFilterMatchesAllInstruments(t *testing.T) {
view := metricAttributesDenyFilter([]string{"cloudevents.type"})

stream, ok := view(metric.Instrument{Name: "http.server.request.duration"})
assert.True(t, ok, "view should match all instruments")
assert.NotNil(t, stream.AttributeFilter)
}
Loading