Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ public class ClusterSerdes implements Closeable {
private Optional<SerdeInstance> findSerdeByPatternsOrDefault(String topic,
Serde.Target type,
Predicate<SerdeInstance> additionalCheck) {
// iterating over serdes in the same order they were added in config
// Pass 1: explicit topic pattern match (user-configured topicKeysPattern / topicValuesPattern).
// Iteration follows config registration order.
for (SerdeInstance serdeInstance : serdes.values()) {
if (!serdeInstance.couldBePreferable(topic, type)) {
continue;
}
var pattern = type == Serde.Target.KEY
? serdeInstance.topicKeyPattern
: serdeInstance.topicValuePattern;
Expand All @@ -44,6 +42,8 @@ private Optional<SerdeInstance> findSerdeByPatternsOrDefault(String topic,
return Optional.of(serdeInstance);
}
}

// Pass 2: cluster-level explicit default (defaultKeySerde / defaultValueSerde from config).
if (type == Serde.Target.KEY
&& defaultKeySerde != null
&& additionalCheck.test(defaultKeySerde)) {
Expand All @@ -54,6 +54,16 @@ private Optional<SerdeInstance> findSerdeByPatternsOrDefault(String topic,
&& additionalCheck.test(defaultValueSerde)) {
return Optional.of(defaultValueSerde);
}

// Pass 3: implicit auto-detection. Serdes opt in via couldBePreferable when they have positive
// topic-specific evidence (e.g. SchemaRegistrySerde when the topic has a matching subject).
for (SerdeInstance serdeInstance : serdes.values()) {
if (serdeInstance.couldBePreferable(topic, type)
&& additionalCheck.test(serdeInstance)) {
return Optional.of(serdeInstance);
}
}

return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,29 @@ public List<SerdeParameter> getParameters(String topic, Target type) {

@Override
public boolean couldBePreferable(String topic, Target type) {
return getSchemaSubjects(topic, type).contains(schemaSubject(topic, type));
// Auto-prefer SR only when the registry has a subject whose name is syntactically tied
// to this topic: either the default <topic>-value/<topic>-key subject (TopicNameStrategy)
// or a <topic>-<record> subject (TopicRecordNameStrategy). For the TopicRecordNameStrategy
// branch we additionally require the portion after the topic prefix to contain no hyphens,
// because Avro/Protobuf/JSON Schema record names (optionally fully qualified) only allow
// [A-Za-z0-9_.] characters. This precisely disambiguates sibling topics sharing a name
// prefix: subject "orders-dlq-value" or "orders-dlq-com.example.OrderCreated" belongs to
// topic "orders-dlq", not "orders", and must not auto-prefer SR for "orders".
// RecordNameStrategy subjects (bare fully qualified names) have no syntactic relationship
// to a topic and the same subject can be used by multiple topics, so we cannot safely
// auto-prefer SR for them either.
String defaultSubject = schemaSubject(topic, type);
String topicPrefix = topic + "-";
return getSchemaSubjects(topic, type).stream()
.anyMatch(s -> s.equals(defaultSubject) || isTopicRecordSubject(s, topicPrefix));
}

private static boolean isTopicRecordSubject(String subject, String topicPrefix) {
if (!subject.startsWith(topicPrefix)) {
return false;
}
String recordName = subject.substring(topicPrefix.length());
return !recordName.isEmpty() && recordName.indexOf('-') < 0;
}

private Serializer serializerWithSubject(String topic, Target type, String explicitSubject) {
Expand Down
253 changes: 253 additions & 0 deletions api/src/test/java/io/kafbat/ui/serdes/ClusterSerdesTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package io.kafbat.ui.serdes;

import static org.assertj.core.api.Assertions.assertThat;

import io.kafbat.ui.serde.api.DeserializeResult;
import io.kafbat.ui.serde.api.PropertyResolver;
import io.kafbat.ui.serde.api.RecordHeaders;
import io.kafbat.ui.serde.api.SchemaDescription;
import io.kafbat.ui.serde.api.Serde;
import io.kafbat.ui.serdes.builtin.StringSerde;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.junit.jupiter.api.Test;

class ClusterSerdesTest {

private static final String TOPIC = "orders";

/**
* Fake serde whose canSerialize / canDeserialize / couldBePreferable are knobbed per test.
* Keeps tests focused on selection logic in ClusterSerdes without standing up a real serde.
*/
static final class StubSerde implements Serde {
private final boolean canSerialize;
private final boolean canDeserialize;
private final boolean couldBePreferable;

StubSerde(boolean canSerialize, boolean canDeserialize, boolean couldBePreferable) {
this.canSerialize = canSerialize;
this.canDeserialize = canDeserialize;
this.couldBePreferable = couldBePreferable;
}

static StubSerde alwaysApplicable() {
return new StubSerde(true, true, false);
}

static StubSerde preferable() {
return new StubSerde(true, true, true);
}

@Override
public void configure(PropertyResolver serdeProperties,

Check failure on line 46 in api/src/test/java/io/kafbat/ui/serdes/ClusterSerdesTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=kafbat_kafka-ui&issues=AZ4HoKQI68xee83T1wfo&open=AZ4HoKQI68xee83T1wfo&pullRequest=1842
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
}
Comment on lines +46 to +49

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add an explicit no-op note in StubSerde.configure.

Line 46 overrides configure(...) with an empty body; this is valid for a test stub, but it currently triggers Sonar and can fail CI quality gates.

Suggested patch
     `@Override`
     public void configure(PropertyResolver serdeProperties,
                           PropertyResolver kafkaClusterProperties,
                           PropertyResolver globalProperties) {
+      // no-op: test stub intentionally does not require configuration
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
}
public void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
// no-op: test stub intentionally does not require configuration
}
🧰 Tools
🪛 GitHub Check: SonarCloud Code Analysis

[failure] 46-46: Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=kafbat_kafka-ui&issues=AZ4HoKQI68xee83T1wfo&open=AZ4HoKQI68xee83T1wfo&pullRequest=1842

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@api/src/test/java/io/kafbat/ui/serdes/ClusterSerdesTest.java` around lines 46
- 49, The override of configure(...) in StubSerde should include an explicit
no-op note to satisfy static analysis; update the
StubSerde.configure(PropertyResolver serdeProperties, PropertyResolver
kafkaClusterProperties, PropertyResolver globalProperties) method to keep the
empty implementation but add a clear comment stating it is intentionally a no-op
(e.g., "no-op for test stub" or "intentionally left blank") so Sonar/CI
recognizes it as deliberate and not an accidental omission.


@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}

@Override
public boolean canDeserialize(String topic, Target type) {
return canDeserialize;
}

@Override
public boolean canSerialize(String topic, Target type) {
return canSerialize;
}

@Override
public boolean couldBePreferable(String topic, Target type) {
return couldBePreferable;
}

@Override
public Serializer serializer(String topic, Target type) {
return input -> input == null ? new byte[0] : input.getBytes();
}

@Override
public Deserializer deserializer(String topic, Target type) {
return (RecordHeaders headers, byte[] data) ->
new DeserializeResult(new String(data), DeserializeResult.Type.STRING, Map.of());
}
}

private static SerdeInstance instance(String name, Serde serde,
@Nullable Pattern keyPattern,
@Nullable Pattern valuePattern) {
return new SerdeInstance(name, serde, keyPattern, valuePattern, null);
}

private static SerdeInstance stringInstance() {
return instance(StringSerde.NAME, new StringSerde(), null, null);
}

private static ClusterSerdes cluster(LinkedHashMap<String, SerdeInstance> serdes,
@Nullable SerdeInstance defaultKey,
@Nullable SerdeInstance defaultValue) {
var fallback = new SerdeInstance("Fallback", new StringSerde(), null, null, null);
return new ClusterSerdes(serdes, defaultKey, defaultValue, fallback);
}

// --- Pass 3 (couldBePreferable) — the regression pin -----------------------------------------

@Test
void preferableSerdeIsSelectedOverStringFallbackWhenNoPatternsOrDefaults() {
// Mirrors: auto-configured SchemaRegistry + Avro topic, no defaultValueSerde configured.
// The regression in #1833: this returned StringSerde instead of the preferable SR.
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInstance());
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));

var cs = cluster(serdes, null, null);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo("Preferable");
}

@Test
void preferableSerdeIsSelectedForSerializeToo() {
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInstance());
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));

var cs = cluster(serdes, null, null);

assertThat(cs.suggestSerdeForSerialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo("Preferable");
}

// --- Default-true on the Serde interface used to be the bug; pin that pass 3 doesn't over-fire.

@Test
void fallsBackToStringWhenNoSerdeNominatesItself() {
// No serde overrides couldBePreferable to true. With the corrected interface default of
// false, pass 3 finds nothing and we fall through to StringSerde via the caller's .orElse(...)
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInstance());
serdes.put("Other", instance("Other", StubSerde.alwaysApplicable(), null, null));

var cs = cluster(serdes, null, null);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo(StringSerde.NAME);
}

// --- Pass 2 (explicit cluster default) --------------------------------------------------------

@Test
void explicitDefaultValueSerdeWinsOverPreferable() {
// Even when a serde nominates itself via couldBePreferable, the cluster-level explicit
// default (defaultValueSerde) takes precedence — pass 2 beats pass 3.
var stringInst = stringInstance();
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInst);
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));

var cs = cluster(serdes, null, stringInst);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo(StringSerde.NAME);
}

@Test
void explicitDefaultKeySerdeOnlyAppliesToKeyTarget() {
var stringInst = stringInstance();
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInst);
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));

var cs = cluster(serdes, stringInst, null);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.KEY).getName())
.isEqualTo(StringSerde.NAME);
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo("Preferable");
}

// --- Pass 1 (explicit topic patterns) ---------------------------------------------------------

@Test
void explicitTopicValuePatternStillMatchesEvenIfCouldBePreferableFalse() {
// Regression pin for the latent dead-skip bug from PR #1650: a serde with couldBePreferable
// returning false must still be selected when its topicValuesPattern matches.
var notPreferable = new StubSerde(true, true, false);
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInstance());
serdes.put("Patterned", instance("Patterned", notPreferable, null, Pattern.compile(TOPIC)));

var cs = cluster(serdes, null, null);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo("Patterned");
}

@Test
void explicitTopicPatternBeatsPreferableSerde() {
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInstance());
serdes.put("Patterned", instance("Patterned", StubSerde.alwaysApplicable(), null, Pattern.compile(TOPIC)));
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));

var cs = cluster(serdes, null, null);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo("Patterned");
}

@Test
void keyPatternDoesNotMatchValueTarget() {
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInstance());
serdes.put("KeyOnly", instance("KeyOnly", StubSerde.alwaysApplicable(), Pattern.compile(TOPIC), null));

var cs = cluster(serdes, null, null);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.KEY).getName())
.isEqualTo("KeyOnly");
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo(StringSerde.NAME);
}

// --- Cross-pass guards ------------------------------------------------------------------------

@Test
void preferableButCannotDeserializeIsSkipped() {
var preferableButCantDeser = new StubSerde(true, false, true);
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInstance());
serdes.put("Preferable", instance("Preferable", preferableButCantDeser, null, null));

var cs = cluster(serdes, null, null);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo(StringSerde.NAME);
}

@Test
void pass3IterationOrderFollowsRegistrationOrder() {
// Two serdes both nominate themselves; the first registered wins.
var serdes = new LinkedHashMap<String, SerdeInstance>();
serdes.put(StringSerde.NAME, stringInstance());
serdes.put("FirstPreferable", instance("FirstPreferable", StubSerde.preferable(), null, null));
serdes.put("SecondPreferable", instance("SecondPreferable", StubSerde.preferable(), null, null));

var cs = cluster(serdes, null, null);

assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
.isEqualTo("FirstPreferable");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ void serdeWithCustomNameAndBuiltInClassnameAreExplicitlyConfigured() {
verifyPatternsMatch(serdeConfig, explicitlyConfiguredSerde);
}

@Test
void defaultValueSerdeIsNullWhenNotConfigured() {
// PR #1650 removed the implicit SchemaRegistry/ProtobufFile fallback that used to be wired
// into the cluster's defaultValueSerde when none was configured. couldBePreferable + pass 3
// in ClusterSerdes is the replacement. Pin that no implicit default leaks back in here.
ClustersProperties.SerdeConfig serdeConfig = new ClustersProperties.SerdeConfig();
serdeConfig.setName("BuiltIn1");
serdeConfig.setTopicKeysPattern("keys");
serdeConfig.setTopicValuesPattern("vals");

var serdes = init(serdeConfig);

assertThat(serdes.defaultKeySerde).isNull();
assertThat(serdes.defaultValueSerde).isNull();
}

private ClusterSerdes init(ClustersProperties.SerdeConfig... serdeConfigs) {
return initializer.init(env, createProperties(serdeConfigs), 0);
}
Expand Down
Loading
Loading