diff --git a/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java index 402b6fe9f64d3b..1c38347caf0a3b 100644 --- a/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java +++ b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java @@ -63,6 +63,17 @@ public class TimeFormats { .appendPattern("'Z'") .toFormatter(); + /** + * Formatter for ISO8601 string representation of a timestamp value (with explicit timezone + * offset). + */ + public static final DateTimeFormatter ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT = + new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral('T') + .append(DateTimeFormatter.ISO_OFFSET_TIME) + .toFormatter(); + /** Formatter for SQL string representation of a time value. */ public static final DateTimeFormatter SQL_TIME_FORMAT = new DateTimeFormatterBuilder() diff --git a/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java index 0e3655d059c627..86c35db75cb5a6 100644 --- a/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java +++ b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java @@ -35,5 +35,11 @@ public enum TimestampFormat { * "yyyy-MM-ddTHH:mm:ss.s{precision}" format, TIMESTAMP_WITH_LOCAL_TIMEZONE in * "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" and output in the same format. */ - ISO_8601 + ISO_8601, + + /** + * Options to specify TIMESTAMP with explicit timezone offset format. It will parse TIMESTAMP in + * "yyyy-MM-ddTHH:mm:ss.s{precision}±HH:mm" format and output in the same format. + */ + ISO_8601_WITH_OFFSET } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptionsUtil.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptionsUtil.java index 60a953d0848dca..2a0c4b79c63847 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptionsUtil.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptionsUtil.java @@ -45,9 +45,10 @@ public class JsonFormatOptionsUtil { public static final String SQL = "SQL"; public static final String ISO_8601 = "ISO-8601"; + public static final String ISO_8601_WITH_OFFSET = "ISO-8601-WITH-OFFSET"; public static final Set TIMESTAMP_FORMAT_ENUM = - new HashSet<>(Arrays.asList(SQL, ISO_8601)); + new HashSet<>(Arrays.asList(SQL, ISO_8601, ISO_8601_WITH_OFFSET)); // The handling mode of null key for map data public static final String JSON_MAP_NULL_KEY_MODE_FAIL = "FAIL"; @@ -65,6 +66,8 @@ public static TimestampFormat getTimestampFormat(ReadableConfig config) { return TimestampFormat.SQL; case ISO_8601: return TimestampFormat.ISO_8601; + case ISO_8601_WITH_OFFSET: + return TimestampFormat.ISO_8601_WITH_OFFSET; default: throw new TableException( String.format( @@ -132,13 +135,13 @@ public static void validateEncodingFormatOptions(ReadableConfig tableOptions) { validateTimestampFormat(tableOptions); } - /** Validates timestamp format which value should be SQL or ISO-8601. */ + /** Validates timestamp format which value should be SQL, ISO-8601, or ISO-8601-WITH-OFFSET. */ static void validateTimestampFormat(ReadableConfig tableOptions) { String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT); if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)) { throw new ValidationException( String.format( - "Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].", + "Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601, ISO-8601-WITH-OFFSET].", timestampFormat, TIMESTAMP_FORMAT.key())); } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java index ea5ced4c9058c3..92b901ff07ee1e 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java @@ -62,6 +62,7 @@ import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT; import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT; import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT; @@ -278,6 +279,17 @@ private TimestampData convertToTimestamp(JsonParser jp) throws IOException { case ISO_8601: parsedTimestamp = ISO8601_TIMESTAMP_FORMAT.parse(jp.getText()); break; + case ISO_8601_WITH_OFFSET: + parsedTimestamp = ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT.parse(jp.getText()); + ZoneOffset offset = parsedTimestamp.query(TemporalQueries.offset()); + if (offset != null) { + return TimestampData.fromInstant( + LocalDateTime.of( + parsedTimestamp.query(TemporalQueries.localDate()), + parsedTimestamp.query(TemporalQueries.localTime())) + .toInstant(offset)); + } + break; default: throw new TableException( String.format( @@ -301,6 +313,10 @@ private TimestampData convertToTimestampWithLocalZone(JsonParser jp) throws IOEx parsedTimestampWithLocalZone = ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jp.getText()); break; + case ISO_8601_WITH_OFFSET: + parsedTimestampWithLocalZone = + ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT.parse(jp.getText()); + break; default: throw new TableException( String.format( @@ -309,9 +325,11 @@ private TimestampData convertToTimestampWithLocalZone(JsonParser jp) throws IOEx } LocalTime localTime = parsedTimestampWithLocalZone.query(TemporalQueries.localTime()); LocalDate localDate = parsedTimestampWithLocalZone.query(TemporalQueries.localDate()); + ZoneOffset offset = parsedTimestampWithLocalZone.query(TemporalQueries.offset()); return TimestampData.fromInstant( - LocalDateTime.of(localDate, localTime).toInstant(ZoneOffset.UTC)); + LocalDateTime.of(localDate, localTime) + .toInstant(offset != null ? offset : ZoneOffset.UTC)); } private StringData convertToString(JsonParser jp) throws IOException { diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java index 11ab01a35e8a40..fbc75ddd573517 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java @@ -49,6 +49,7 @@ import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT; import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT; import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT; @@ -196,6 +197,14 @@ private RowDataToJsonConverter createTimestampConverter() { return mapper.getNodeFactory() .textNode(SQL_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime())); }; + case ISO_8601_WITH_OFFSET: + return (mapper, reuse, value) -> { + TimestampData timestamp = (TimestampData) value; + return mapper.getNodeFactory() + .textNode( + ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT.format( + timestamp.toInstant().atOffset(ZoneOffset.UTC))); + }; default: throw new TableException( "Unsupported timestamp format. Validator should have checked that."); @@ -224,6 +233,16 @@ private RowDataToJsonConverter createTimestampWithLocalZone() { .toInstant() .atOffset(ZoneOffset.UTC))); }; + case ISO_8601_WITH_OFFSET: + return (mapper, reuse, value) -> { + TimestampData timestampWithLocalZone = (TimestampData) value; + return mapper.getNodeFactory() + .textNode( + ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT.format( + timestampWithLocalZone + .toInstant() + .atOffset(ZoneOffset.UTC))); + }; default: throw new TableException( "Unsupported timestamp format. Validator should have checked that."); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java index 0d7509b2fb29f8..029f020a0114c6 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java @@ -49,6 +49,7 @@ import static org.apache.flink.table.api.DataTypes.ROW; import static org.apache.flink.table.api.DataTypes.SMALLINT; import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.DataTypes.TINYINT; import static org.assertj.core.api.Assertions.assertThat; @@ -343,4 +344,51 @@ private void innerTestProjectBothRowAndNestedField( RowData rowData = deserializationSchema.deserialize(serializedJson); assertThat(rowData).isEqualTo(expected); } + + /** Tests parsing timestamps with explicit timezone offsets. */ + @Test + public void testTimestampWithTimezoneOffset() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + + ObjectNode root = objectMapper.createObjectNode(); + root.put("timestamp3", "1990-10-14T12:12:43.123+02:00"); + root.put("timestamp_with_local_timezone3", "1990-10-14T12:12:43.123-05:00"); + + byte[] serializedJson = objectMapper.writeValueAsBytes(root); + + DataType dataType = + ROW( + FIELD("timestamp3", TIMESTAMP(3)), + FIELD("timestamp_with_local_timezone3", TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))); + RowType schema = (RowType) dataType.getLogicalType(); + TypeInformation resultTypeInfo = InternalTypeInfo.of(schema); + + DeserializationSchema deserializationSchema = + new JsonParserRowDataDeserializationSchema( + schema, resultTypeInfo, false, false, TimestampFormat.ISO_8601_WITH_OFFSET); + open(deserializationSchema); + + RowData rowData = deserializationSchema.deserialize(serializedJson); + Row actual = convertToExternal(rowData, dataType); + + assertThat(actual.getField(0)).isNotNull(); + assertThat(actual.getField(1)).isNotNull(); + + JsonRowDataSerializationSchema serializationSchema = + new JsonRowDataSerializationSchema( + schema, + TimestampFormat.ISO_8601_WITH_OFFSET, + JsonFormatOptions.MapNullKeyMode.LITERAL, + "null", + true, + false); + open(serializationSchema); + + byte[] serialized = serializationSchema.serialize(rowData); + ObjectNode serializedNode = (ObjectNode) objectMapper.readTree(serialized); + + assertThat(serializedNode.get("timestamp3").asText()).endsWith("+00:00"); + assertThat(serializedNode.get("timestamp_with_local_timezone3").asText()) + .endsWith("+00:00"); + } }