Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class KafkaConfigSanitizer {

private static final String[] REGEX_PARTS = {"*", "$", "^", "+"};

private static final Pattern CONFIG_PROVIDER_REFERENCE =
Pattern.compile("\\$\\{[^}:]+:([^}:]+:)?[^}]+}");

private static final List<String> DEFAULT_PATTERNS_TO_SANITIZE = ImmutableList.<String>builder()
.addAll(kafkaConfigKeysToSanitize())
.add(
Expand Down Expand Up @@ -72,12 +75,27 @@ private static Set<String> kafkaConfigKeysToSanitize() {
public Object sanitize(String key, @Nullable Object value) {
for (Pattern pattern : sanitizeKeysPatterns) {
if (pattern.matcher(key).matches()) {
if (isConfigProviderReference(value)) {
return value;
}
return SANITIZED_VALUE;
}
}
return value;
}

/**
* Checks if config value is an externalized secret / config provider indirection: ${provider:[path:]key}.
* Such a value is only a reference resolved at runtime, not an actual
* secret, so it must not be masked (masking would clobber the reference on re-submit).
* @param value config value
* @return true if provider reference, false otherwise
*/
private static boolean isConfigProviderReference(@Nullable Object value) {
return value instanceof CharSequence charsequence
&& CONFIG_PROVIDER_REFERENCE.matcher(charsequence).find();
}

public Map<String, Object> sanitizeConnectorConfig(@Nullable Map<String, Object> original) {
var result = new HashMap<String, Object>(); //null-values supporting map!
if (original != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,22 @@ void obfuscateCredentialsWithDefinedPatterns() {
assertThat(sanitizer.sanitize("consumer.kafka.ui", "secret")).isEqualTo("******");
assertThat(sanitizer.sanitize("this.is.test.credentials", "secret")).isEqualTo("******");
assertThat(sanitizer.sanitize("this.is.not.credential", "not.credential"))
.isEqualTo("not.credential");
.isEqualTo("not.credential");
assertThat(sanitizer.sanitize("database.password", "no longer credential"))
.isEqualTo("no longer credential");
.isEqualTo("no longer credential");
}

@Test
void doNotObfuscateConfigProviderReferences() {
final var sanitizer = new KafkaConfigSanitizer(true, List.of());
assertThat(sanitizer.sanitize("database.password", "${file:/data/secrets.properties:db-password}"))
.isEqualTo("${file:/data/secrets.properties:db-password}");
assertThat(sanitizer.sanitize("password", "${vault:secret/data/connector:password}"))
.isEqualTo("${vault:secret/data/connector:password}");
assertThat(sanitizer.sanitize("aws.secret.access.key", "${env:AWS_SECRET}"))
.isEqualTo("${env:AWS_SECRET}");
assertThat(sanitizer.sanitize("password", "${notAReference}")).isEqualTo("******");
assertThat(sanitizer.sanitize("password", "plain-secret")).isEqualTo("******");
}

@Test
Expand Down
Loading