diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java index cee7fac5c7ee..0935366d9918 100644 --- a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java @@ -1,92 +1,96 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.inputformat.protobuf; - -import com.github.os72.protobuf.dynamic.DynamicSchema; -import com.google.common.base.Preconditions; -import com.google.protobuf.Descriptors; -import com.google.protobuf.DynamicMessage; -import com.google.protobuf.Message; -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.stream.StreamMessageDecoder; - - -//TODO: Add support for Schema Registry -public class ProtoBufMessageDecoder implements StreamMessageDecoder { - public static final String DESCRIPTOR_FILE_PATH = "descriptorFile"; - public static final String PROTO_CLASS_NAME = "protoClassName"; - - private ProtoBufRecordExtractor _recordExtractor; - private String _protoClassName; - private Message.Builder _builder; - - @Override - public void init(Map props, Set fieldsToRead, String topicName) - throws Exception { - Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH), - "Protocol Buffer schema descriptor file must be provided"); - - _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, ""); - InputStream descriptorFileInputStream = ProtoBufUtils.getDescriptorFileInputStream( - props.get(DESCRIPTOR_FILE_PATH)); - Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream); - _recordExtractor = new ProtoBufRecordExtractor(); - _recordExtractor.init(fieldsToRead, null); - DynamicMessage dynamicMessage = DynamicMessage.getDefaultInstance(descriptor); - _builder = dynamicMessage.newBuilderForType(); - } - - private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin) - throws IOException { - try { - DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin); - - if (!StringUtils.isEmpty(_protoClassName)) { - return dynamicSchema.getMessageDescriptor(_protoClassName); - } else { - return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]); - } - } catch (Descriptors.DescriptorValidationException e) { - throw new IOException("Descriptor file validation failed", e); - } - } - - @Override - public GenericRow decode(byte[] payload, GenericRow destination) { - return decode(payload, 0, payload.length, destination); - } - - @Override - public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { - Message message; - try { - message = _builder.mergeFrom(payload, offset, length).build(); - } catch (Exception e) { - throw new RuntimeException("Caught exception while decoding protobuf message", e); - } finally { - _builder.clear(); - } - return _recordExtractor.extract(message, destination); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.protobuf; + +import com.github.os72.protobuf.dynamic.DynamicSchema; +import com.google.common.base.Preconditions; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageDecoder; + + +/// This decoder is intentionally file-based and does not support schema registry. +/// For Confluent Schema Registry-backed descriptor resolution, use +/// `KafkaConfluentSchemaRegistryProtoBufMessageDecoder` instead. + +public class ProtoBufMessageDecoder implements StreamMessageDecoder { + public static final String DESCRIPTOR_FILE_PATH = "descriptorFile"; + public static final String PROTO_CLASS_NAME = "protoClassName"; + + private ProtoBufRecordExtractor _recordExtractor; + private String _protoClassName; + private Message.Builder _builder; + + @Override + public void init(Map props, Set fieldsToRead, String topicName) throws Exception { + Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH), + "Property '%s' must be specified for ProtoBufMessageDecoder. " + + "If you are using Confluent Schema Registry, use " + + "KafkaConfluentSchemaRegistryProtoBufMessageDecoder instead.", + DESCRIPTOR_FILE_PATH); + + _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, ""); + InputStream descriptorFileInputStream = ProtoBufUtils.getDescriptorFileInputStream(props.get(DESCRIPTOR_FILE_PATH)); + Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream); + _recordExtractor = new ProtoBufRecordExtractor(); + _recordExtractor.init(fieldsToRead, null); + DynamicMessage dynamicMessage = DynamicMessage.getDefaultInstance(descriptor); + _builder = dynamicMessage.newBuilderForType(); + } + + private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin) + throws IOException { + try { + DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin); + + if (!StringUtils.isEmpty(_protoClassName)) { + return dynamicSchema.getMessageDescriptor(_protoClassName); + } else { + return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]); + } + } catch (Descriptors.DescriptorValidationException e) { + throw new IOException("Descriptor file validation failed", e); + } + } + + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + return decode(payload, 0, payload.length, destination); + } + + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + Message message; + try { + message = _builder.mergeFrom(payload, offset, length).build(); + } catch (Exception e) { + throw new RuntimeException("Caught exception while decoding protobuf message", e); + } finally { + _builder.clear(); + } + return _recordExtractor.extract(message, destination); + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoderTest.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoderTest.java index 205111926898..b6b4a85d4d05 100644 --- a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoderTest.java @@ -1,157 +1,158 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pinot.plugin.inputformat.protobuf; - -import com.google.common.collect.ImmutableSet; -import java.net.URL; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.testng.annotations.Test; - -import static org.apache.pinot.plugin.inputformat.protobuf.ProtoBufTestDataGenerator.*; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - - -public class ProtoBufMessageDecoderTest { - - @Test - public void testHappyCase() - throws Exception { - Map decoderProps = new HashMap<>(); - URL descriptorFile = getClass().getClassLoader().getResource("sample.desc"); - decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); - ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); - messageDecoder.init(decoderProps, getFieldsInSampleRecord(), ""); - Sample.SampleRecord sampleRecord = getSampleRecordMessage(); - GenericRow destination = new GenericRow(); - messageDecoder.decode(sampleRecord.toByteArray(), destination); - assertNotNull(destination.getValue("email")); - assertNotNull(destination.getValue("name")); - assertNotNull(destination.getValue("id")); - - assertEquals(destination.getValue("email"), "foobar@hello.com"); - assertEquals(destination.getValue("name"), "Alice"); - assertEquals(destination.getValue("id"), 18); - } - - @Test - public void testWithClassName() - throws Exception { - Map decoderProps = new HashMap<>(); - URL descriptorFile = getClass().getClassLoader().getResource("sample.desc"); - decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); - decoderProps.put("protoClassName", "SampleRecord"); - ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); - messageDecoder.init(decoderProps, getFieldsInSampleRecord(), ""); - Sample.SampleRecord sampleRecord = getSampleRecordMessage(); - GenericRow destination = new GenericRow(); - messageDecoder.decode(sampleRecord.toByteArray(), destination); - assertNotNull(destination.getValue("email")); - assertNotNull(destination.getValue("name")); - assertNotNull(destination.getValue("id")); - - assertEquals(destination.getValue("email"), "foobar@hello.com"); - assertEquals(destination.getValue("name"), "Alice"); - assertEquals(destination.getValue("id"), 18); - } - - @Test - public void testComplexClass() - throws Exception { - Map decoderProps = new HashMap<>(); - URL descriptorFile = getClass().getClassLoader().getResource("complex_types.desc"); - decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); - ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); - messageDecoder.init(decoderProps, getSourceFieldsForComplexType(), ""); - Map inputRecord = createComplexTypeRecord(); - GenericRow destination = new GenericRow(); - messageDecoder.decode(getComplexTypeObject(inputRecord).toByteArray(), destination); - - for (String col : getSourceFieldsForComplexType()) { - assertNotNull(destination.getValue(col)); - } - - for (String col : getSourceFieldsForComplexType()) { - if (col.contains("field")) { - assertEquals(destination.getValue(col), inputRecord.get(col)); - } - } - } - - @Test - public void testNestedMessageClass() - throws Exception { - Map decoderProps = new HashMap<>(); - URL descriptorFile = getClass().getClassLoader().getResource("complex_types.desc"); - decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); - decoderProps.put("protoClassName", "TestMessage.NestedMessage"); - ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); - - ComplexTypes.TestMessage.NestedMessage nestedMessage = - ComplexTypes.TestMessage.NestedMessage.newBuilder().setNestedStringField("hello").setNestedIntField(42).build(); - - messageDecoder.init(decoderProps, ImmutableSet.of(NESTED_STRING_FIELD, NESTED_INT_FIELD), ""); - GenericRow destination = new GenericRow(); - messageDecoder.decode(nestedMessage.toByteArray(), destination); - - assertNotNull(destination.getValue(NESTED_STRING_FIELD)); - assertNotNull(destination.getValue(NESTED_INT_FIELD)); - - assertEquals(destination.getValue(NESTED_STRING_FIELD), "hello"); - assertEquals(destination.getValue(NESTED_INT_FIELD), 42); - } - - @Test - public void testCompositeMessage() - throws Exception { - Map decoderProps = new HashMap<>(); - URL descriptorFile = getClass().getClassLoader().getResource("composite_types.desc"); - decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); - ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); - Set sourceFields = getSourceFieldsForComplexType(); - sourceFields.addAll(getFieldsInSampleRecord()); - - messageDecoder.init(decoderProps, ImmutableSet.of("test_message", "sample_record"), ""); - Map inputRecord = createComplexTypeRecord(); - GenericRow destination = new GenericRow(); - ComplexTypes.TestMessage testMessage = getComplexTypeObject(inputRecord); - Sample.SampleRecord sampleRecord = getSampleRecordMessage(); - CompositeTypes.CompositeMessage compositeMessage = - CompositeTypes.CompositeMessage.newBuilder().setTestMessage(testMessage).setSampleRecord(sampleRecord).build(); - messageDecoder.decode(compositeMessage.toByteArray(), destination); - - assertNotNull(destination.getValue("test_message")); - - for (String col : getSourceFieldsForComplexType()) { - assertNotNull(((Map) destination.getValue("test_message")).get(col)); - } - - assertNotNull(((Map) destination.getValue("sample_record")).get("email")); - assertNotNull(((Map) destination.getValue("sample_record")).get("name")); - assertNotNull(((Map) destination.getValue("sample_record")).get("id")); - - assertEquals(((Map) destination.getValue("sample_record")).get("email"), "foobar@hello.com"); - assertEquals(((Map) destination.getValue("sample_record")).get("name"), "Alice"); - assertEquals(((Map) destination.getValue("sample_record")).get("id"), 18); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.plugin.inputformat.protobuf; + +import com.google.common.collect.ImmutableSet; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.testng.annotations.Test; + +import static org.apache.pinot.plugin.inputformat.protobuf.ProtoBufTestDataGenerator.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + + +public class ProtoBufMessageDecoderTest { + + @Test + public void testHappyCase() + throws Exception { + Map decoderProps = new HashMap<>(); + URL descriptorFile = getClass().getClassLoader().getResource("sample.desc"); + decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); + ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); + messageDecoder.init(decoderProps, getFieldsInSampleRecord(), ""); + Sample.SampleRecord sampleRecord = getSampleRecordMessage(); + GenericRow destination = new GenericRow(); + messageDecoder.decode(sampleRecord.toByteArray(), destination); + assertNotNull(destination.getValue("email")); + assertNotNull(destination.getValue("name")); + assertNotNull(destination.getValue("id")); + + assertEquals(destination.getValue("email"), "foobar@hello.com"); + assertEquals(destination.getValue("name"), "Alice"); + assertEquals(destination.getValue("id"), 18); + } + + @Test + public void testWithClassName() + throws Exception { + Map decoderProps = new HashMap<>(); + URL descriptorFile = getClass().getClassLoader().getResource("sample.desc"); + decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); + decoderProps.put("protoClassName", "SampleRecord"); + ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); + messageDecoder.init(decoderProps, getFieldsInSampleRecord(), ""); + Sample.SampleRecord sampleRecord = getSampleRecordMessage(); + GenericRow destination = new GenericRow(); + messageDecoder.decode(sampleRecord.toByteArray(), destination); + assertNotNull(destination.getValue("email")); + assertNotNull(destination.getValue("name")); + assertNotNull(destination.getValue("id")); + + assertEquals(destination.getValue("email"), "foobar@hello.com"); + assertEquals(destination.getValue("name"), "Alice"); + assertEquals(destination.getValue("id"), 18); + } + + @Test + public void testComplexClass() + throws Exception { + Map decoderProps = new HashMap<>(); + URL descriptorFile = getClass().getClassLoader().getResource("complex_types.desc"); + decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); + ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); + messageDecoder.init(decoderProps, getSourceFieldsForComplexType(), ""); + Map inputRecord = createComplexTypeRecord(); + GenericRow destination = new GenericRow(); + messageDecoder.decode(getComplexTypeObject(inputRecord).toByteArray(), destination); + + for (String col : getSourceFieldsForComplexType()) { + assertNotNull(destination.getValue(col)); + } + + for (String col : getSourceFieldsForComplexType()) { + if (col.contains("field")) { + assertEquals(destination.getValue(col), inputRecord.get(col)); + } + } + } + + @Test + public void testNestedMessageClass() + throws Exception { + Map decoderProps = new HashMap<>(); + URL descriptorFile = getClass().getClassLoader().getResource("complex_types.desc"); + decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); + decoderProps.put("protoClassName", "TestMessage.NestedMessage"); + ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); + + ComplexTypes.TestMessage.NestedMessage nestedMessage = + ComplexTypes.TestMessage.NestedMessage.newBuilder().setNestedStringField("hello").setNestedIntField(42).build(); + + messageDecoder.init(decoderProps, ImmutableSet.of(NESTED_STRING_FIELD, NESTED_INT_FIELD), ""); + GenericRow destination = new GenericRow(); + messageDecoder.decode(nestedMessage.toByteArray(), destination); + + assertNotNull(destination.getValue(NESTED_STRING_FIELD)); + assertNotNull(destination.getValue(NESTED_INT_FIELD)); + + assertEquals(destination.getValue(NESTED_STRING_FIELD), "hello"); + assertEquals(destination.getValue(NESTED_INT_FIELD), 42); + } + + @Test + public void testCompositeMessage() + throws Exception { + Map decoderProps = new HashMap<>(); + URL descriptorFile = getClass().getClassLoader().getResource("composite_types.desc"); + decoderProps.put("descriptorFile", descriptorFile.toURI().toString()); + ProtoBufMessageDecoder messageDecoder = new ProtoBufMessageDecoder(); + Set sourceFields = getSourceFieldsForComplexType(); + sourceFields.addAll(getFieldsInSampleRecord()); + + messageDecoder.init(decoderProps, ImmutableSet.of("test_message", "sample_record"), ""); + Map inputRecord = createComplexTypeRecord(); + GenericRow destination = new GenericRow(); + ComplexTypes.TestMessage testMessage = getComplexTypeObject(inputRecord); + Sample.SampleRecord sampleRecord = getSampleRecordMessage(); + CompositeTypes.CompositeMessage compositeMessage = + CompositeTypes.CompositeMessage.newBuilder().setTestMessage(testMessage).setSampleRecord(sampleRecord).build(); + messageDecoder.decode(compositeMessage.toByteArray(), destination); + + assertNotNull(destination.getValue("test_message")); + + for (String col : getSourceFieldsForComplexType()) { + assertNotNull(((Map) destination.getValue("test_message")).get(col)); + } + + assertNotNull(((Map) destination.getValue("sample_record")).get("email")); + assertNotNull(((Map) destination.getValue("sample_record")).get("name")); + assertNotNull(((Map) destination.getValue("sample_record")).get("id")); + + assertEquals(((Map) destination.getValue("sample_record")).get("email"), "foobar@hello.com"); + assertEquals(((Map) destination.getValue("sample_record")).get("name"), "Alice"); + assertEquals(((Map) destination.getValue("sample_record")).get("id"), 18); + } + +}