Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ public class TimeFormats {
.appendPattern("'Z'")
.toFormatter();

/**
* Formatter for ISO8601 string representation of a timestamp value (with explicit timezone
* offset).
*/
public static final DateTimeFormatter ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please could you update the docs around this change.

new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral('T')
.append(DateTimeFormatter.ISO_OFFSET_TIME)
.toFormatter();

/** Formatter for SQL string representation of a time value. */
public static final DateTimeFormatter SQL_TIME_FORMAT =
new DateTimeFormatterBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,11 @@ public enum TimestampFormat {
* "yyyy-MM-ddTHH:mm:ss.s{precision}" format, TIMESTAMP_WITH_LOCAL_TIMEZONE in
* "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" and output in the same format.
*/
ISO_8601
ISO_8601,

/**
* Options to specify TIMESTAMP with explicit timezone offset format. It will parse TIMESTAMP in
* "yyyy-MM-ddTHH:mm:ss.s{precision}±HH:mm" format and output in the same format.
*/
ISO_8601_WITH_OFFSET
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ public class JsonFormatOptionsUtil {

public static final String SQL = "SQL";
public static final String ISO_8601 = "ISO-8601";
public static final String ISO_8601_WITH_OFFSET = "ISO-8601-WITH-OFFSET";

public static final Set<String> TIMESTAMP_FORMAT_ENUM =
new HashSet<>(Arrays.asList(SQL, ISO_8601));
new HashSet<>(Arrays.asList(SQL, ISO_8601, ISO_8601_WITH_OFFSET));

// The handling mode of null key for map data
public static final String JSON_MAP_NULL_KEY_MODE_FAIL = "FAIL";
Expand All @@ -65,6 +66,8 @@ public static TimestampFormat getTimestampFormat(ReadableConfig config) {
return TimestampFormat.SQL;
case ISO_8601:
return TimestampFormat.ISO_8601;
case ISO_8601_WITH_OFFSET:
return TimestampFormat.ISO_8601_WITH_OFFSET;
default:
throw new TableException(
String.format(
Expand Down Expand Up @@ -132,13 +135,13 @@ public static void validateEncodingFormatOptions(ReadableConfig tableOptions) {
validateTimestampFormat(tableOptions);
}

/** Validates timestamp format which value should be SQL or ISO-8601. */
/** Validates timestamp format which value should be SQL, ISO-8601, or ISO-8601-WITH-OFFSET. */
static void validateTimestampFormat(ReadableConfig tableOptions) {
String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)) {
throw new ValidationException(
String.format(
"Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",
"Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601, ISO-8601-WITH-OFFSET].",
timestampFormat, TIMESTAMP_FORMAT.key()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
Expand Down Expand Up @@ -278,6 +279,17 @@ private TimestampData convertToTimestamp(JsonParser jp) throws IOException {
case ISO_8601:
parsedTimestamp = ISO8601_TIMESTAMP_FORMAT.parse(jp.getText());
break;
case ISO_8601_WITH_OFFSET:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my AI says
For DST: Document that this format preserves the explicit offset only, not the timezone. Users needing DST-aware handling should use TIMESTAMP_WITH_LOCAL_TIMEZONE with a proper timezone ID (e.g., America/New_York), not just offsets.

For Half-Hour Timezones: Add comprehensive tests:

root.put("timestamp_india", "1990-10-14T12:12:43.123+05:30");
root.put("timestamp_nepal", "1990-10-14T12:12:43.123+05:45");

Consider: Whether output should preserve the original offset instead of always converting to UTC. Current behavior may surprise users expecting offset preservation.

Documentation: Clearly state that this format converts all timestamps to UTC offset (+00:00) on output, regardless of input offset.

parsedTimestamp = ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT.parse(jp.getText());
ZoneOffset offset = parsedTimestamp.query(TemporalQueries.offset());
if (offset != null) {
return TimestampData.fromInstant(
LocalDateTime.of(
parsedTimestamp.query(TemporalQueries.localDate()),
parsedTimestamp.query(TemporalQueries.localTime()))
.toInstant(offset));
}
break;
default:
throw new TableException(
String.format(
Expand All @@ -301,6 +313,10 @@ private TimestampData convertToTimestampWithLocalZone(JsonParser jp) throws IOEx
parsedTimestampWithLocalZone =
ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jp.getText());
break;
case ISO_8601_WITH_OFFSET:
parsedTimestampWithLocalZone =
ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT.parse(jp.getText());
break;
default:
throw new TableException(
String.format(
Expand All @@ -309,9 +325,11 @@ private TimestampData convertToTimestampWithLocalZone(JsonParser jp) throws IOEx
}
LocalTime localTime = parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
LocalDate localDate = parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
ZoneOffset offset = parsedTimestampWithLocalZone.query(TemporalQueries.offset());

return TimestampData.fromInstant(
LocalDateTime.of(localDate, localTime).toInstant(ZoneOffset.UTC));
LocalDateTime.of(localDate, localTime)
.toInstant(offset != null ? offset : ZoneOffset.UTC));
}

private StringData convertToString(JsonParser jp) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
Expand Down Expand Up @@ -196,6 +197,14 @@ private RowDataToJsonConverter createTimestampConverter() {
return mapper.getNodeFactory()
.textNode(SQL_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
};
case ISO_8601_WITH_OFFSET:
return (mapper, reuse, value) -> {
TimestampData timestamp = (TimestampData) value;
return mapper.getNodeFactory()
.textNode(
ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT.format(
timestamp.toInstant().atOffset(ZoneOffset.UTC)));
};
default:
throw new TableException(
"Unsupported timestamp format. Validator should have checked that.");
Expand Down Expand Up @@ -224,6 +233,16 @@ private RowDataToJsonConverter createTimestampWithLocalZone() {
.toInstant()
.atOffset(ZoneOffset.UTC)));
};
case ISO_8601_WITH_OFFSET:
return (mapper, reuse, value) -> {
TimestampData timestampWithLocalZone = (TimestampData) value;
return mapper.getNodeFactory()
.textNode(
ISO8601_TIMESTAMP_WITH_OFFSET_FORMAT.format(
timestampWithLocalZone
.toInstant()
.atOffset(ZoneOffset.UTC)));
};
default:
throw new TableException(
"Unsupported timestamp format. Validator should have checked that.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.SMALLINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static org.apache.flink.table.api.DataTypes.TINYINT;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -343,4 +344,51 @@ private void innerTestProjectBothRowAndNestedField(
RowData rowData = deserializationSchema.deserialize(serializedJson);
assertThat(rowData).isEqualTo(expected);
}

/** Tests parsing timestamps with explicit timezone offsets. */
@Test
public void testTimestampWithTimezoneOffset() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();

ObjectNode root = objectMapper.createObjectNode();
root.put("timestamp3", "1990-10-14T12:12:43.123+02:00");
root.put("timestamp_with_local_timezone3", "1990-10-14T12:12:43.123-05:00");

byte[] serializedJson = objectMapper.writeValueAsBytes(root);

DataType dataType =
ROW(
FIELD("timestamp3", TIMESTAMP(3)),
FIELD("timestamp_with_local_timezone3", TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)));
RowType schema = (RowType) dataType.getLogicalType();
TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(schema);

DeserializationSchema<RowData> deserializationSchema =
new JsonParserRowDataDeserializationSchema(
schema, resultTypeInfo, false, false, TimestampFormat.ISO_8601_WITH_OFFSET);
open(deserializationSchema);

RowData rowData = deserializationSchema.deserialize(serializedJson);
Row actual = convertToExternal(rowData, dataType);

assertThat(actual.getField(0)).isNotNull();
assertThat(actual.getField(1)).isNotNull();

JsonRowDataSerializationSchema serializationSchema =
new JsonRowDataSerializationSchema(
schema,
TimestampFormat.ISO_8601_WITH_OFFSET,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
true,
false);
open(serializationSchema);

byte[] serialized = serializationSchema.serialize(rowData);
ObjectNode serializedNode = (ObjectNode) objectMapper.readTree(serialized);

assertThat(serializedNode.get("timestamp3").asText()).endsWith("+00:00");
assertThat(serializedNode.get("timestamp_with_local_timezone3").asText())
.endsWith("+00:00");
}
}