Skip to content

Commit ca2e62a

Browse files
BE: Restore SchemaRegistry auto-selection for Avro topics (#1833)
1 parent b68779a commit ca2e62a

6 files changed

Lines changed: 445 additions & 10 deletions

File tree

api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@ public class ClusterSerdes implements Closeable {
3030
private Optional<SerdeInstance> findSerdeByPatternsOrDefault(String topic,
3131
Serde.Target type,
3232
Predicate<SerdeInstance> additionalCheck) {
33-
// iterating over serdes in the same order they were added in config
33+
// Pass 1: explicit topic pattern match (user-configured topicKeysPattern / topicValuesPattern).
34+
// Iteration follows config registration order.
3435
for (SerdeInstance serdeInstance : serdes.values()) {
35-
if (!serdeInstance.couldBePreferable(topic, type)) {
36-
continue;
37-
}
3836
var pattern = type == Serde.Target.KEY
3937
? serdeInstance.topicKeyPattern
4038
: serdeInstance.topicValuePattern;
@@ -44,6 +42,8 @@ private Optional<SerdeInstance> findSerdeByPatternsOrDefault(String topic,
4442
return Optional.of(serdeInstance);
4543
}
4644
}
45+
46+
// Pass 2: cluster-level explicit default (defaultKeySerde / defaultValueSerde from config).
4747
if (type == Serde.Target.KEY
4848
&& defaultKeySerde != null
4949
&& additionalCheck.test(defaultKeySerde)) {
@@ -54,6 +54,16 @@ private Optional<SerdeInstance> findSerdeByPatternsOrDefault(String topic,
5454
&& additionalCheck.test(defaultValueSerde)) {
5555
return Optional.of(defaultValueSerde);
5656
}
57+
58+
// Pass 3: implicit auto-detection. Serdes opt in via couldBePreferable when they have positive
59+
// topic-specific evidence (e.g. SchemaRegistrySerde when the topic has a matching subject).
60+
for (SerdeInstance serdeInstance : serdes.values()) {
61+
if (serdeInstance.couldBePreferable(topic, type)
62+
&& additionalCheck.test(serdeInstance)) {
63+
return Optional.of(serdeInstance);
64+
}
65+
}
66+
5767
return Optional.empty();
5868
}
5969

api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,29 @@ public List<SerdeParameter> getParameters(String topic, Target type) {
398398

399399
@Override
400400
public boolean couldBePreferable(String topic, Target type) {
401-
return getSchemaSubjects(topic, type).contains(schemaSubject(topic, type));
401+
// Auto-prefer SR only when the registry has a subject whose name is syntactically tied
402+
// to this topic: either the default <topic>-value/<topic>-key subject (TopicNameStrategy)
403+
// or a <topic>-<record> subject (TopicRecordNameStrategy). For the TopicRecordNameStrategy
404+
// branch we additionally require the portion after the topic prefix to contain no hyphens,
405+
// because Avro/Protobuf/JSON Schema record names (optionally fully qualified) only allow
406+
// [A-Za-z0-9_.] characters. This precisely disambiguates sibling topics sharing a name
407+
// prefix: subject "orders-dlq-value" or "orders-dlq-com.example.OrderCreated" belongs to
408+
// topic "orders-dlq", not "orders", and must not auto-prefer SR for "orders".
409+
// RecordNameStrategy subjects (bare fully qualified names) have no syntactic relationship
410+
// to a topic and the same subject can be used by multiple topics, so we cannot safely
411+
// auto-prefer SR for them either.
412+
String defaultSubject = schemaSubject(topic, type);
413+
String topicPrefix = topic + "-";
414+
return getSchemaSubjects(topic, type).stream()
415+
.anyMatch(s -> s.equals(defaultSubject) || isTopicRecordSubject(s, topicPrefix));
416+
}
417+
418+
private static boolean isTopicRecordSubject(String subject, String topicPrefix) {
419+
if (!subject.startsWith(topicPrefix)) {
420+
return false;
421+
}
422+
String recordName = subject.substring(topicPrefix.length());
423+
return !recordName.isEmpty() && recordName.indexOf('-') < 0;
402424
}
403425

404426
private Serializer serializerWithSubject(String topic, Target type, String explicitSubject) {
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
package io.kafbat.ui.serdes;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.kafbat.ui.serde.api.DeserializeResult;
6+
import io.kafbat.ui.serde.api.PropertyResolver;
7+
import io.kafbat.ui.serde.api.RecordHeaders;
8+
import io.kafbat.ui.serde.api.SchemaDescription;
9+
import io.kafbat.ui.serde.api.Serde;
10+
import io.kafbat.ui.serdes.builtin.StringSerde;
11+
import java.util.LinkedHashMap;
12+
import java.util.Map;
13+
import java.util.Optional;
14+
import java.util.regex.Pattern;
15+
import javax.annotation.Nullable;
16+
import org.junit.jupiter.api.Test;
17+
18+
class ClusterSerdesTest {
19+
20+
private static final String TOPIC = "orders";
21+
22+
/**
23+
* Fake serde whose canSerialize / canDeserialize / couldBePreferable are knobbed per test.
24+
* Keeps tests focused on selection logic in ClusterSerdes without standing up a real serde.
25+
*/
26+
static final class StubSerde implements Serde {
27+
private final boolean canSerialize;
28+
private final boolean canDeserialize;
29+
private final boolean couldBePreferable;
30+
31+
StubSerde(boolean canSerialize, boolean canDeserialize, boolean couldBePreferable) {
32+
this.canSerialize = canSerialize;
33+
this.canDeserialize = canDeserialize;
34+
this.couldBePreferable = couldBePreferable;
35+
}
36+
37+
static StubSerde alwaysApplicable() {
38+
return new StubSerde(true, true, false);
39+
}
40+
41+
static StubSerde preferable() {
42+
return new StubSerde(true, true, true);
43+
}
44+
45+
@Override
46+
public void configure(PropertyResolver serdeProperties,
47+
PropertyResolver kafkaClusterProperties,
48+
PropertyResolver globalProperties) {
49+
}
50+
51+
@Override
52+
public Optional<String> getDescription() {
53+
return Optional.empty();
54+
}
55+
56+
@Override
57+
public Optional<SchemaDescription> getSchema(String topic, Target type) {
58+
return Optional.empty();
59+
}
60+
61+
@Override
62+
public boolean canDeserialize(String topic, Target type) {
63+
return canDeserialize;
64+
}
65+
66+
@Override
67+
public boolean canSerialize(String topic, Target type) {
68+
return canSerialize;
69+
}
70+
71+
@Override
72+
public boolean couldBePreferable(String topic, Target type) {
73+
return couldBePreferable;
74+
}
75+
76+
@Override
77+
public Serializer serializer(String topic, Target type) {
78+
return input -> input == null ? new byte[0] : input.getBytes();
79+
}
80+
81+
@Override
82+
public Deserializer deserializer(String topic, Target type) {
83+
return (RecordHeaders headers, byte[] data) ->
84+
new DeserializeResult(new String(data), DeserializeResult.Type.STRING, Map.of());
85+
}
86+
}
87+
88+
private static SerdeInstance instance(String name, Serde serde,
89+
@Nullable Pattern keyPattern,
90+
@Nullable Pattern valuePattern) {
91+
return new SerdeInstance(name, serde, keyPattern, valuePattern, null);
92+
}
93+
94+
private static SerdeInstance stringInstance() {
95+
return instance(StringSerde.NAME, new StringSerde(), null, null);
96+
}
97+
98+
private static ClusterSerdes cluster(LinkedHashMap<String, SerdeInstance> serdes,
99+
@Nullable SerdeInstance defaultKey,
100+
@Nullable SerdeInstance defaultValue) {
101+
var fallback = new SerdeInstance("Fallback", new StringSerde(), null, null, null);
102+
return new ClusterSerdes(serdes, defaultKey, defaultValue, fallback);
103+
}
104+
105+
// --- Pass 3 (couldBePreferable) — the regression pin -----------------------------------------
106+
107+
@Test
108+
void preferableSerdeIsSelectedOverStringFallbackWhenNoPatternsOrDefaults() {
109+
// Mirrors: auto-configured SchemaRegistry + Avro topic, no defaultValueSerde configured.
110+
// The regression in #1833: this returned StringSerde instead of the preferable SR.
111+
var serdes = new LinkedHashMap<String, SerdeInstance>();
112+
serdes.put(StringSerde.NAME, stringInstance());
113+
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));
114+
115+
var cs = cluster(serdes, null, null);
116+
117+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
118+
.isEqualTo("Preferable");
119+
}
120+
121+
@Test
122+
void preferableSerdeIsSelectedForSerializeToo() {
123+
var serdes = new LinkedHashMap<String, SerdeInstance>();
124+
serdes.put(StringSerde.NAME, stringInstance());
125+
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));
126+
127+
var cs = cluster(serdes, null, null);
128+
129+
assertThat(cs.suggestSerdeForSerialize(TOPIC, Serde.Target.VALUE).getName())
130+
.isEqualTo("Preferable");
131+
}
132+
133+
// --- Default-true on the Serde interface used to be the bug; pin that pass 3 doesn't over-fire.
134+
135+
@Test
136+
void fallsBackToStringWhenNoSerdeNominatesItself() {
137+
// No serde overrides couldBePreferable to true. With the corrected interface default of
138+
// false, pass 3 finds nothing and we fall through to StringSerde via the caller's .orElse(...)
139+
var serdes = new LinkedHashMap<String, SerdeInstance>();
140+
serdes.put(StringSerde.NAME, stringInstance());
141+
serdes.put("Other", instance("Other", StubSerde.alwaysApplicable(), null, null));
142+
143+
var cs = cluster(serdes, null, null);
144+
145+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
146+
.isEqualTo(StringSerde.NAME);
147+
}
148+
149+
// --- Pass 2 (explicit cluster default) --------------------------------------------------------
150+
151+
@Test
152+
void explicitDefaultValueSerdeWinsOverPreferable() {
153+
// Even when a serde nominates itself via couldBePreferable, the cluster-level explicit
154+
// default (defaultValueSerde) takes precedence — pass 2 beats pass 3.
155+
var stringInst = stringInstance();
156+
var serdes = new LinkedHashMap<String, SerdeInstance>();
157+
serdes.put(StringSerde.NAME, stringInst);
158+
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));
159+
160+
var cs = cluster(serdes, null, stringInst);
161+
162+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
163+
.isEqualTo(StringSerde.NAME);
164+
}
165+
166+
@Test
167+
void explicitDefaultKeySerdeOnlyAppliesToKeyTarget() {
168+
var stringInst = stringInstance();
169+
var serdes = new LinkedHashMap<String, SerdeInstance>();
170+
serdes.put(StringSerde.NAME, stringInst);
171+
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));
172+
173+
var cs = cluster(serdes, stringInst, null);
174+
175+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.KEY).getName())
176+
.isEqualTo(StringSerde.NAME);
177+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
178+
.isEqualTo("Preferable");
179+
}
180+
181+
// --- Pass 1 (explicit topic patterns) ---------------------------------------------------------
182+
183+
@Test
184+
void explicitTopicValuePatternStillMatchesEvenIfCouldBePreferableFalse() {
185+
// Regression pin for the latent dead-skip bug from PR #1650: a serde with couldBePreferable
186+
// returning false must still be selected when its topicValuesPattern matches.
187+
var notPreferable = new StubSerde(true, true, false);
188+
var serdes = new LinkedHashMap<String, SerdeInstance>();
189+
serdes.put(StringSerde.NAME, stringInstance());
190+
serdes.put("Patterned", instance("Patterned", notPreferable, null, Pattern.compile(TOPIC)));
191+
192+
var cs = cluster(serdes, null, null);
193+
194+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
195+
.isEqualTo("Patterned");
196+
}
197+
198+
@Test
199+
void explicitTopicPatternBeatsPreferableSerde() {
200+
var serdes = new LinkedHashMap<String, SerdeInstance>();
201+
serdes.put(StringSerde.NAME, stringInstance());
202+
serdes.put("Patterned", instance("Patterned", StubSerde.alwaysApplicable(), null, Pattern.compile(TOPIC)));
203+
serdes.put("Preferable", instance("Preferable", StubSerde.preferable(), null, null));
204+
205+
var cs = cluster(serdes, null, null);
206+
207+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
208+
.isEqualTo("Patterned");
209+
}
210+
211+
@Test
212+
void keyPatternDoesNotMatchValueTarget() {
213+
var serdes = new LinkedHashMap<String, SerdeInstance>();
214+
serdes.put(StringSerde.NAME, stringInstance());
215+
serdes.put("KeyOnly", instance("KeyOnly", StubSerde.alwaysApplicable(), Pattern.compile(TOPIC), null));
216+
217+
var cs = cluster(serdes, null, null);
218+
219+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.KEY).getName())
220+
.isEqualTo("KeyOnly");
221+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
222+
.isEqualTo(StringSerde.NAME);
223+
}
224+
225+
// --- Cross-pass guards ------------------------------------------------------------------------
226+
227+
@Test
228+
void preferableButCannotDeserializeIsSkipped() {
229+
var preferableButCantDeser = new StubSerde(true, false, true);
230+
var serdes = new LinkedHashMap<String, SerdeInstance>();
231+
serdes.put(StringSerde.NAME, stringInstance());
232+
serdes.put("Preferable", instance("Preferable", preferableButCantDeser, null, null));
233+
234+
var cs = cluster(serdes, null, null);
235+
236+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
237+
.isEqualTo(StringSerde.NAME);
238+
}
239+
240+
@Test
241+
void pass3IterationOrderFollowsRegistrationOrder() {
242+
// Two serdes both nominate themselves; the first registered wins.
243+
var serdes = new LinkedHashMap<String, SerdeInstance>();
244+
serdes.put(StringSerde.NAME, stringInstance());
245+
serdes.put("FirstPreferable", instance("FirstPreferable", StubSerde.preferable(), null, null));
246+
serdes.put("SecondPreferable", instance("SecondPreferable", StubSerde.preferable(), null, null));
247+
248+
var cs = cluster(serdes, null, null);
249+
250+
assertThat(cs.suggestSerdeForDeserialize(TOPIC, Serde.Target.VALUE).getName())
251+
.isEqualTo("FirstPreferable");
252+
}
253+
}

api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,22 @@ void serdeWithCustomNameAndBuiltInClassnameAreExplicitlyConfigured() {
117117
verifyPatternsMatch(serdeConfig, explicitlyConfiguredSerde);
118118
}
119119

120+
@Test
121+
void defaultValueSerdeIsNullWhenNotConfigured() {
122+
// PR #1650 removed the implicit SchemaRegistry/ProtobufFile fallback that used to be wired
123+
// into the cluster's defaultValueSerde when none was configured. couldBePreferable + pass 3
124+
// in ClusterSerdes is the replacement. Pin that no implicit default leaks back in here.
125+
ClustersProperties.SerdeConfig serdeConfig = new ClustersProperties.SerdeConfig();
126+
serdeConfig.setName("BuiltIn1");
127+
serdeConfig.setTopicKeysPattern("keys");
128+
serdeConfig.setTopicValuesPattern("vals");
129+
130+
var serdes = init(serdeConfig);
131+
132+
assertThat(serdes.defaultKeySerde).isNull();
133+
assertThat(serdes.defaultValueSerde).isNull();
134+
}
135+
120136
private ClusterSerdes init(ClustersProperties.SerdeConfig... serdeConfigs) {
121137
return initializer.init(env, createProperties(serdeConfigs), 0);
122138
}

0 commit comments

Comments
 (0)