diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py index 05a6b05dbea..4947938d8e5 100644 --- a/sdk/python/feast/infra/online_stores/remote.py +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -150,6 +150,14 @@ def _proto_value_to_transport_value(proto_value: ValueProto) -> Any: if val_attr in ("uuid_set_val", "time_uuid_set_val"): return list(getattr(proto_value, val_attr).val) + # UnixTimestamp values are stored as int64 (epoch seconds) in proto. + # Return them directly to avoid feast_value_type_to_python_type + # converting to datetime objects which are not JSON-serializable. + if val_attr == "unix_timestamp_val": + return getattr(proto_value, val_attr) + if val_attr in ("unix_timestamp_list_val", "unix_timestamp_set_val"): + return list(getattr(proto_value, val_attr).val) + return feast_value_type_to_python_type(proto_value) _STATUS_MAP = { diff --git a/sdk/python/tests/unit/infra/online_store/test_remote_online_store.py b/sdk/python/tests/unit/infra/online_store/test_remote_online_store.py index 13c1392fe22..3babb384f6f 100644 --- a/sdk/python/tests/unit/infra/online_store/test_remote_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_remote_online_store.py @@ -12,7 +12,7 @@ from feast.online_response import OnlineResponse from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.types import Float32, Int64, String +from feast.types import Float32, Int64, String, UnixTimestamp from feast.value_type import ValueType @@ -661,3 +661,121 @@ def test_all_base_class_params_forwarded( assert expected_keys.issubset(req_body.keys()), ( f"Missing keys in req_body: {expected_keys - req_body.keys()}" ) + + +class TestProtoValueToTransportValue: + """Tests for RemoteOnlineStore._proto_value_to_transport_value.""" + + def test_unix_timestamp_val_returns_raw_int(self): + """UnixTimestamp scalar should return the raw int64 epoch seconds.""" + proto_val = ValueProto(unix_timestamp_val=1700000000) + result = RemoteOnlineStore._proto_value_to_transport_value(proto_val) + assert result == 1700000000 + assert isinstance(result, int) + + def test_unix_timestamp_list_val_returns_int_list(self): + """UnixTimestamp list should return a list of int64 epoch seconds.""" + proto_val = ValueProto() + proto_val.unix_timestamp_list_val.val.extend([1700000000, 1700000001]) + result = RemoteOnlineStore._proto_value_to_transport_value(proto_val) + assert result == [1700000000, 1700000001] + assert all(isinstance(v, int) for v in result) + + def test_unix_timestamp_set_val_returns_flat_list(self): + """UnixTimestamp set should return a list (not a set) of int64 + epoch seconds, so the value is JSON-serializable.""" + proto_val = ValueProto() + proto_val.unix_timestamp_set_val.val.extend([1700000000, 1700000001]) + result = RemoteOnlineStore._proto_value_to_transport_value(proto_val) + assert result == [1700000000, 1700000001] + assert isinstance(result, list) + + def test_unix_timestamp_val_null_sentinel_returns_raw_int(self): + """The NULL_TIMESTAMP sentinel should still be passed through as + a raw int64 — it is up to the server to interpret it.""" + proto_val = ValueProto(unix_timestamp_val=-9223372036854775808) + result = RemoteOnlineStore._proto_value_to_transport_value(proto_val) + assert result == -9223372036854775808 + assert isinstance(result, int) + + def test_empty_proto_returns_none(self): + """A ValueProto with no val set should return None.""" + proto_val = ValueProto() + result = RemoteOnlineStore._proto_value_to_transport_value(proto_val) + assert result is None + + +class TestRemoteOnlineStoreWriteBatch: + """Tests for RemoteOnlineStore.online_write_batch.""" + + @pytest.fixture + def remote_store(self): + return RemoteOnlineStore() + + @pytest.fixture + def config(self): + return RepoConfig( + project="test_project", + online_store=RemoteOnlineStoreConfig( + type="remote", path="http://localhost:6566" + ), + registry="dummy_registry", + ) + + @pytest.fixture + def feature_view(self): + entity = Entity( + name="user_id", description="User ID", value_type=ValueType.INT64 + ) + source = FileSource(path="test.parquet", timestamp_field="event_timestamp") + return FeatureView( + name="test_feature_view", + entities=[entity], + ttl=timedelta(days=1), + schema=[ + Field(name="user_id", dtype=Int64), + Field(name="feature1", dtype=String), + Field(name="feature2", dtype=UnixTimestamp), + ], + source=source, + ) + + @patch("feast.infra.online_stores.remote.post_remote_online_write") + def test_unix_timestamp_value_serialized_as_int( + self, mock_post, remote_store, config, feature_view + ): + """online_write_batch should send int64 epoch seconds in the + DataFrame for UnixTimestamp features.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_post.return_value = mock_response + + entity_key = EntityKeyProto( + join_keys=["user_id"], + entity_values=[ValueProto(int64_val=42)], + ) + feature_values = { + "feature1": ValueProto(string_val="hello"), + "feature2": ValueProto(unix_timestamp_val=1700000000), + } + event_ts = datetime(2023, 11, 15, 0, 0, 0) + created_ts = datetime(2023, 11, 14, 0, 0, 0) + data = [(entity_key, feature_values, event_ts, created_ts)] + + remote_store.online_write_batch( + config=config, table=feature_view, data=data, progress=None + ) + + mock_post.assert_called_once() + req_body = mock_post.call_args[1]["req_body"] + df = req_body["df"] + + # UnixTimestamp feature value must be a raw int, not a datetime + assert df["feature2"] == [1700000000] + assert isinstance(df["feature2"][0], int) + + # Other feature types should remain unchanged + assert df["feature1"] == ["hello"] + + # Event timestamps should be ISO strings as before + assert df["event_timestamp"] == ["2023-11-15T00:00:00"]