diff --git a/api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java b/api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java index c5e44ef5b..049db2af9 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java @@ -30,11 +30,9 @@ public class ClusterSerdes implements Closeable { private Optional findSerdeByPatternsOrDefault(String topic, Serde.Target type, Predicate additionalCheck) { - // iterating over serdes in the same order they were added in config + // Pass 1: explicit topic pattern match (user-configured topicKeysPattern / topicValuesPattern). + // Iteration follows config registration order. for (SerdeInstance serdeInstance : serdes.values()) { - if (!serdeInstance.couldBePreferable(topic, type)) { - continue; - } var pattern = type == Serde.Target.KEY ? serdeInstance.topicKeyPattern : serdeInstance.topicValuePattern; @@ -44,6 +42,8 @@ private Optional findSerdeByPatternsOrDefault(String topic, return Optional.of(serdeInstance); } } + + // Pass 2: cluster-level explicit default (defaultKeySerde / defaultValueSerde from config). if (type == Serde.Target.KEY && defaultKeySerde != null && additionalCheck.test(defaultKeySerde)) { @@ -54,6 +54,16 @@ private Optional findSerdeByPatternsOrDefault(String topic, && additionalCheck.test(defaultValueSerde)) { return Optional.of(defaultValueSerde); } + + // Pass 3: implicit auto-detection. Serdes opt in via couldBePreferable when they have positive + // topic-specific evidence (e.g. SchemaRegistrySerde when the topic has a matching subject). + for (SerdeInstance serdeInstance : serdes.values()) { + if (serdeInstance.couldBePreferable(topic, type) + && additionalCheck.test(serdeInstance)) { + return Optional.of(serdeInstance); + } + } + return Optional.empty(); } diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java index 25f222d71..9d3014383 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -398,7 +398,29 @@ public List getParameters(String topic, Target type) { @Override public boolean couldBePreferable(String topic, Target type) { - return getSchemaSubjects(topic, type).contains(schemaSubject(topic, type)); + // Auto-prefer SR only when the registry has a subject whose name is syntactically tied + // to this topic: either the default -value/-key subject (TopicNameStrategy) + // or a - subject (TopicRecordNameStrategy). For the TopicRecordNameStrategy + // branch we additionally require the portion after the topic prefix to contain no hyphens, + // because Avro/Protobuf/JSON Schema record names (optionally fully qualified) only allow + // [A-Za-z0-9_.] characters. This precisely disambiguates sibling topics sharing a name + // prefix: subject "orders-dlq-value" or "orders-dlq-com.example.OrderCreated" belongs to + // topic "orders-dlq", not "orders", and must not auto-prefer SR for "orders". + // RecordNameStrategy subjects (bare fully qualified names) have no syntactic relationship + // to a topic and the same subject can be used by multiple topics, so we cannot safely + // auto-prefer SR for them either. + String defaultSubject = schemaSubject(topic, type); + String topicPrefix = topic + "-"; + return getSchemaSubjects(topic, type).stream() + .anyMatch(s -> s.equals(defaultSubject) || isTopicRecordSubject(s, topicPrefix)); + } + + private static boolean isTopicRecordSubject(String subject, String topicPrefix) { + if (!subject.startsWith(topicPrefix)) { + return false; + } + String recordName = subject.substring(topicPrefix.length()); + return !recordName.isEmpty() && recordName.indexOf('-') < 0; } private Serializer serializerWithSubject(String topic, Target type, String explicitSubject) { diff --git a/api/src/test/java/io/kafbat/ui/serdes/ClusterSerdesTest.java b/api/src/test/java/io/kafbat/ui/serdes/ClusterSerdesTest.java new file mode 100644 index 000000000..d4e75aa94 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/ClusterSerdesTest.java @@ -0,0 +1,253 @@ +package io.kafbat.ui.serdes; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.kafbat.ui.serde.api.DeserializeResult; +import io.kafbat.ui.serde.api.PropertyResolver; +import io.kafbat.ui.serde.api.RecordHeaders; +import io.kafbat.ui.serde.api.SchemaDescription; +import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.serdes.builtin.StringSerde; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.junit.jupiter.api.Test; + +class ClusterSerdesTest { + + private static final String TOPIC = "orders"; + + /** + * Fake serde whose canSerialize / canDeserialize / couldBePreferable are knobbed per test. + * Keeps tests focused on selection logic in ClusterSerdes without standing up a real serde. + */ + static final class StubSerde implements Serde { + private final boolean canSerialize; + private final boolean canDeserialize; + private final boolean couldBePreferable; + + StubSerde(boolean canSerialize, boolean canDeserialize, boolean couldBePreferable) { + this.canSerialize = canSerialize; + this.canDeserialize = canDeserialize; + this.couldBePreferable = couldBePreferable; + } + + static StubSerde alwaysApplicable() { + return new StubSerde(true, true, false); + } + + static StubSerde preferable() { + return new StubSerde(true, true, true); + } + + @Override + public void configure(PropertyResolver serdeProperties, + PropertyResolver kafkaClusterProperties, + PropertyResolver globalProperties) { + } + + @Override + public Optional getDescription() { + return Optional.empty(); + } + + @Override + public Optional getSchema(String topic, Target type) { + return Optional.empty(); + } + + @Override + public boolean canDeserialize(String topic, Target type) { + return canDeserialize; + } + + @Override + public boolean canSerialize(String topic, Target type) { + return canSerialize; + } + + @Override + public boolean couldBePreferable(String topic, Target type) { + return couldBePreferable; + } + + @Override + public Serializer serializer(String topic, Target type) { + return input -> input == null ? new byte[0] : input.getBytes(); + } + + @Override + public Deserializer deserializer(String topic, Target type) { + return (RecordHeaders headers, byte[] data) -> + new DeserializeResult(new String(data), DeserializeResult.Type.STRING, Map.of()); + } + } + + private static SerdeInstance instance(String name, Serde serde, + @Nullable Pattern keyPattern, + @Nullable Pattern valuePattern) { + return new SerdeInstance(name, serde, keyPattern, valuePattern, null); + } + + private static SerdeInstance stringInstance() { + return instance(StringSerde.NAME, new StringSerde(), null, null); + } + + private static ClusterSerdes cluster(LinkedHashMap serdes, + @Nullable SerdeInstance defaultKey, + @Nullable SerdeInstance defaultValue) { + var fallback = new SerdeInstance("Fallback", new StringSerde(), null, null, null); + return new ClusterSerdes(serdes, defaultKey, defaultValue, fallback); + } + + // --- Pass 3 (couldBePreferable) — the regression pin ----------------------------------------- + + @Test + void preferableSerdeIsSelectedOverStringFallbackWhenNoPatternsOrDefaults() { + // Mirrors: auto-configured SchemaRegistry + Avro topic, no defaultValueSerde configured. + // The regression in #1833: this returned StringSerde instead of the preferable SR. + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInstance()); + serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null)); + + var cs = cluster(serdes, null, null); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo("Preferable"); + } + + @Test + void preferableSerdeIsSelectedForSerializeToo() { + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInstance()); + serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null)); + + var cs = cluster(serdes, null, null); + + assertThat(cs.suggestSerdeForSerialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo("Preferable"); + } + + // --- Default-true on the Serde interface used to be the bug; pin that pass 3 doesn't over-fire. + + @Test + void fallsBackToStringWhenNoSerdeNominatesItself() { + // No serde overrides couldBePreferable to true. With the corrected interface default of + // false, pass 3 finds nothing and we fall through to StringSerde via the caller's .orElse(...) + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInstance()); + serdes.put("Other", instance("Other", StubSerde.alwaysApplicable(), null, null)); + + var cs = cluster(serdes, null, null); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo(StringSerde.NAME); + } + + // --- Pass 2 (explicit cluster default) -------------------------------------------------------- + + @Test + void explicitDefaultValueSerdeWinsOverPreferable() { + // Even when a serde nominates itself via couldBePreferable, the cluster-level explicit + // default (defaultValueSerde) takes precedence — pass 2 beats pass 3. + var stringInst = stringInstance(); + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInst); + serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null)); + + var cs = cluster(serdes, null, stringInst); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo(StringSerde.NAME); + } + + @Test + void explicitDefaultKeySerdeOnlyAppliesToKeyTarget() { + var stringInst = stringInstance(); + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInst); + serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null)); + + var cs = cluster(serdes, stringInst, null); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.KEY).getName()) + .isEqualTo(StringSerde.NAME); + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo("Preferable"); + } + + // --- Pass 1 (explicit topic patterns) --------------------------------------------------------- + + @Test + void explicitTopicValuePatternStillMatchesEvenIfCouldBePreferableFalse() { + // Regression pin for the latent dead-skip bug from PR #1650: a serde with couldBePreferable + // returning false must still be selected when its topicValuesPattern matches. + var notPreferable = new StubSerde(true, true, false); + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInstance()); + serdes.put("Patterned", instance("Patterned", notPreferable, null, Pattern.compile(TOPIC))); + + var cs = cluster(serdes, null, null); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo("Patterned"); + } + + @Test + void explicitTopicPatternBeatsPreferableSerde() { + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInstance()); + serdes.put("Patterned", instance("Patterned", StubSerde.alwaysApplicable(), null, Pattern.compile(TOPIC))); + serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null)); + + var cs = cluster(serdes, null, null); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo("Patterned"); + } + + @Test + void keyPatternDoesNotMatchValueTarget() { + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInstance()); + serdes.put("KeyOnly", instance("KeyOnly", StubSerde.alwaysApplicable(), Pattern.compile(TOPIC), null)); + + var cs = cluster(serdes, null, null); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.KEY).getName()) + .isEqualTo("KeyOnly"); + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo(StringSerde.NAME); + } + + // --- Cross-pass guards ------------------------------------------------------------------------ + + @Test + void preferableButCannotDeserializeIsSkipped() { + var preferableButCantDeser = new StubSerde(true, false, true); + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInstance()); + serdes.put("Preferable", instance("Preferable", preferableButCantDeser, null, null)); + + var cs = cluster(serdes, null, null); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo(StringSerde.NAME); + } + + @Test + void pass3IterationOrderFollowsRegistrationOrder() { + // Two serdes both nominate themselves; the first registered wins. + var serdes = new LinkedHashMap(); + serdes.put(StringSerde.NAME, stringInstance()); + serdes.put("FirstPreferable", instance("FirstPreferable", StubSerde.preferable(), null, null)); + serdes.put("SecondPreferable", instance("SecondPreferable", StubSerde.preferable(), null, null)); + + var cs = cluster(serdes, null, null); + + assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName()) + .isEqualTo("FirstPreferable"); + } +} diff --git a/api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java b/api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java index 8b559caa0..d5ff14214 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java @@ -117,6 +117,22 @@ void serdeWithCustomNameAndBuiltInClassnameAreExplicitlyConfigured() { verifyPatternsMatch(serdeConfig, explicitlyConfiguredSerde); } + @Test + void defaultValueSerdeIsNullWhenNotConfigured() { + // PR #1650 removed the implicit SchemaRegistry/ProtobufFile fallback that used to be wired + // into the cluster's defaultValueSerde when none was configured. couldBePreferable + pass 3 + // in ClusterSerdes is the replacement. Pin that no implicit default leaks back in here. + ClustersProperties.SerdeConfig serdeConfig = new ClustersProperties.SerdeConfig(); + serdeConfig.setName("BuiltIn1"); + serdeConfig.setTopicKeysPattern("keys"); + serdeConfig.setTopicValuesPattern("vals"); + + var serdes = init(serdeConfig); + + assertThat(serdes.defaultKeySerde).isNull(); + assertThat(serdes.defaultValueSerde).isNull(); + } + private ClusterSerdes init(ClustersProperties.SerdeConfig... serdeConfigs) { return initializer.init(env, createProperties(serdeConfigs), 0); } diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index 8e56cc14f..e8081a40e 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -599,6 +599,131 @@ void bothFlagsEnabledTogether() throws Exception { } } + @Nested + class CouldBePreferableTests { + + @Test + @SneakyThrows + void trueForTopicNameStrategy() { + String topic = "orders"; + registryClient.register(topic + "-value", new AvroSchema("\"int\"")); + + assertThat(serde.couldBePreferable(topic, Serde.Target.VALUE)).isTrue(); + } + + @Test + @SneakyThrows + void trueForTopicNameStrategyKey() { + String topic = "orders"; + registryClient.register(topic + "-key", new AvroSchema("\"int\"")); + + assertThat(serde.couldBePreferable(topic, Serde.Target.KEY)).isTrue(); + } + + @Test + @SneakyThrows + void trueForTopicRecordNameStrategy() { + String topic = "orders"; + // TopicRecordNameStrategy: - + registryClient.register(topic + "-com.example.OrderCreated", new AvroSchema("\"int\"")); + + assertThat(serde.couldBePreferable(topic, Serde.Target.VALUE)).isTrue(); + } + + @Test + @SneakyThrows + void falseForRecordNameStrategy() { + String topic = "orders"; + // RecordNameStrategy subjects have no syntactic relationship to a topic, so we cannot + // safely auto-prefer SR for them — the same subject can be used by multiple topics. + registryClient.register("com.example.OrderCreated", new AvroSchema("\"int\"")); + + assertThat(serde.couldBePreferable(topic, Serde.Target.VALUE)).isFalse(); + } + + @Test + void falseWhenNoSubjects() { + assertThat(serde.couldBePreferable("orders", Serde.Target.VALUE)).isFalse(); + assertThat(serde.couldBePreferable("orders", Serde.Target.KEY)).isFalse(); + } + + @Test + @SneakyThrows + void trueWhenMixedStrategiesIncludeTopicAffiliated() { + String topic = "orders"; + registryClient.register("com.example.OrderCreated", new AvroSchema("\"int\"")); + registryClient.register(topic + "-value", new AvroSchema("\"string\"")); + + assertThat(serde.couldBePreferable(topic, Serde.Target.VALUE)).isTrue(); + } + + @Test + @SneakyThrows + void falseWhenOnlyOppositeTargetSubjectExists() { + String topic = "orders"; + registryClient.register(topic + "-key", new AvroSchema("\"int\"")); + + assertThat(serde.couldBePreferable(topic, Serde.Target.VALUE)).isFalse(); + } + + @Test + @SneakyThrows + void falseForSubjectFromDifferentTopic() { + // Sanity: another topic's TopicNameStrategy subject must not nominate SR for "orders". + registryClient.register("other-topic-value", new AvroSchema("\"int\"")); + + assertThat(serde.couldBePreferable("orders", Serde.Target.VALUE)).isFalse(); + } + + @Test + @SneakyThrows + void falseForSiblingTopicSharingNamePrefix() { + // Regression: subject "orders-dlq-value" belongs to topic "orders-dlq" (TopicNameStrategy), + // not "orders". The startsWith("orders-") check alone would have nominated SR for "orders" + // here; the -key/-value exclusion in the TopicRecordNameStrategy branch prevents that. + registryClient.register("orders-dlq-value", new AvroSchema("\"int\"")); + + assertThat(serde.couldBePreferable("orders", Serde.Target.VALUE)).isFalse(); + } + + @Test + @SneakyThrows + void siblingTopicsResolveIndependently() { + // With both "orders-value" and "orders-dlq-value" registered, each topic should nominate + // SR only for its own subject. + registryClient.register("orders-value", new AvroSchema("\"int\"")); + registryClient.register("orders-dlq-value", new AvroSchema("\"string\"")); + + assertThat(serde.couldBePreferable("orders", Serde.Target.VALUE)).isTrue(); + assertThat(serde.couldBePreferable("orders-dlq", Serde.Target.VALUE)).isTrue(); + } + + @Test + @SneakyThrows + void falseForSiblingTopicTopicRecordNameStrategySubject() { + // Subject "orders-dlq-com.example.OrderCreated" is TopicRecordNameStrategy belonging to + // topic "orders-dlq", not "orders". The portion after the "orders-" prefix is + // "dlq-com.example.OrderCreated" which contains a hyphen and therefore cannot be a valid + // record name, so the subject is correctly attributed only to "orders-dlq". + registryClient.register("orders-dlq-com.example.OrderCreated", new AvroSchema("\"int\"")); + + assertThat(serde.couldBePreferable("orders", Serde.Target.VALUE)).isFalse(); + assertThat(serde.couldBePreferable("orders-dlq", Serde.Target.VALUE)).isTrue(); + } + + @Test + @SneakyThrows + void siblingTopicsWithTopicRecordNameStrategyResolveIndependently() { + // Both topics use TopicRecordNameStrategy with their own record subjects. Each must + // nominate SR only for its own subject and not the sibling's. + registryClient.register("orders-com.example.OrderPlaced", new AvroSchema("\"int\"")); + registryClient.register("orders-dlq-com.example.OrderFailed", new AvroSchema("\"string\"")); + + assertThat(serde.couldBePreferable("orders", Serde.Target.VALUE)).isTrue(); + assertThat(serde.couldBePreferable("orders-dlq", Serde.Target.VALUE)).isTrue(); + } + } + @Nested class GetSchemaSubjectsTests { diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java index 3dbba69d8..353e4d3a9 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java @@ -132,16 +132,25 @@ default List getParameters(String topic, Target type) { } /** - * Indicates whether this serde is a preferable choice for the specified topic and target. - * Implementations can use this to influence serde selection when multiple candidates are available. - * Default implementation returns {@code true}. + * Indicates whether this serde should nominate itself as the preferred default for the specified + * topic/target when no explicit pattern or cluster-level default applies. + *

+ * Implementations should override and return {@code true} only when they have positive + * topic-specific evidence (e.g. a matching schema subject for a SchemaRegistry serde). The + * default implementation returns {@code false} so that serdes opt in to being auto-preferred. + *

+ * Note: this is distinct from {@link #canSerialize(String, Target)} / + * {@link #canDeserialize(String, Target)}, which control whether the serde appears in the + * selectable list at all. {@code couldBePreferable} controls whether it is the *pre-selected* + * default. * * @param topic topic name * @param type {@code Target} for which preference is evaluated. - * @return {@code true} if this serde should be treated as preferable, otherwise {@code false}. + * @return {@code true} if this serde should be auto-preferred for the topic/target, otherwise + * {@code false}. */ default boolean couldBePreferable(String topic, Target type) { - return true; + return false; } /**