diff --git a/docs/guides/metrics.md b/docs/guides/metrics.md index 0a8a0a8c4..591cd670d 100644 --- a/docs/guides/metrics.md +++ b/docs/guides/metrics.md @@ -71,6 +71,16 @@ These metrics provide a breakdown of the overall request statuses, helping users - **Definition**: The average time taken to generate each output token, including the first token. - **Use Case**: Provides a detailed view of the model's token generation efficiency. +### Time To Last Round Trip + +- **Definition**: For the realtime WebSocket backend (`openai_websocket`), the time from the last sent packet to the last received token. +- **Use Case**: Measures tail latency of a streaming exchange (how long the final output lags the final input). + +### Average Round-Trip Time (Avg RTT) + +- **Definition**: For the WebSocket backend, the mean of received-token timestamps minus the mean of sent-packet timestamps. +- **Use Case**: Estimates the average send-to-receive lag across a request. It is approximate, since it assumes sent packets and received tokens line up evenly in time. + ## Statistical Summaries GuideLLM provides detailed statistical summaries for each of the above metrics using the `StatusDistributionSummary` and `DistributionSummary` models. These summaries include the following statistics: diff --git a/src/guidellm/backends/openai/websocket.py b/src/guidellm/backends/openai/websocket.py index ae723eafa..f5d5c8e06 100644 --- a/src/guidellm/backends/openai/websocket.py +++ b/src/guidellm/backends/openai/websocket.py @@ -80,6 +80,9 @@ def _record_content_tokens( if content_tokens <= 0: return False + request_info.timings.token_received_sum += iter_time + request_info.timings.token_received_count += 1 + if request_info.timings.first_token_iteration is None: request_info.timings.first_token_iteration = iter_time request_info.timings.token_iterations = 0 @@ -92,6 +95,18 @@ def _record_content_tokens( return False +def _record_request_sent(request_info: RequestInfo) -> None: + """ + Record the timestamp of one outbound WebSocket frame for round-trip metrics. + + :param request_info: Mutable timing state for the in-flight request. + """ + sent_time = time.time() + request_info.timings.last_request_sent = sent_time + request_info.timings.request_sent_sum += sent_time + request_info.timings.request_sent_count += 1 + + def _load_ws_event(raw: str) -> dict[str, Any]: """Parse a JSON WebSocket text frame; raise RuntimeError on invalid JSON.""" try: @@ -374,8 +389,9 @@ async def resolve( # type: ignore[override, misc] # noqa: C901, PLR0912, PLR09 self, request: GenerationRequest, request_info: RequestInfo, - history: list[tuple[GenerationRequest, GenerationResponse | None]] - | None = None, + history: ( + list[tuple[GenerationRequest, GenerationResponse | None]] | None + ) = None, ) -> AsyncIterator[tuple[GenerationResponse | None, RequestInfo]]: """ Stream one realtime transcription over WebSocket for a single audio column. @@ -451,18 +467,22 @@ async def resolve( # type: ignore[override, misc] # noqa: C901, PLR0912, PLR09 f"Expected session.created, got {first_event.get('type')!r}" ) await ws.send(_json_text(session_update)) + _record_request_sent(request_info) for b64_chunk in chunks: await ws.send( _json_text( {"type": "input_audio_buffer.append", "audio": b64_chunk} ) ) + _record_request_sent(request_info) await ws.send( _json_text({"type": "input_audio_buffer.commit", "final": False}) ) + _record_request_sent(request_info) await ws.send( _json_text({"type": "input_audio_buffer.commit", "final": True}) ) + _record_request_sent(request_info) ignored_events = 0 while True: diff --git a/src/guidellm/benchmark/outputs/console.py b/src/guidellm/benchmark/outputs/console.py index 1e0ade6be..2c19b4331 100644 --- a/src/guidellm/benchmark/outputs/console.py +++ b/src/guidellm/benchmark/outputs/console.py @@ -480,6 +480,16 @@ def print_request_latency_table(self, report: GenerativeBenchmarksReport): group="TPOT", name="ms", ) + columns.add_stats( + benchmark.metrics.time_to_last_round_trip_ms, + group="Last RT", + name="ms", + ) + columns.add_stats( + benchmark.metrics.avg_round_trip_time_ms, + group="Avg RTT", + name="ms", + ) headers, values = columns.get_table_data() self.console.print("\n") diff --git a/src/guidellm/benchmark/outputs/csv.py b/src/guidellm/benchmark/outputs/csv.py index 35b5b5895..1ae183d4c 100644 --- a/src/guidellm/benchmark/outputs/csv.py +++ b/src/guidellm/benchmark/outputs/csv.py @@ -454,6 +454,20 @@ def _add_request_latency_metrics( "Inter Token Latency", "ms", ) + self._add_stats_for_metric( + headers, + values, + benchmark.metrics.time_to_last_round_trip_ms, + "Time To Last Round Trip", + "ms", + ) + self._add_stats_for_metric( + headers, + values, + benchmark.metrics.avg_round_trip_time_ms, + "Avg Round Trip Time", + "ms", + ) def _add_server_throughput_metrics( self, diff --git a/src/guidellm/benchmark/schemas/accumulator.py b/src/guidellm/benchmark/schemas/accumulator.py index 74224307a..3ea2914b5 100644 --- a/src/guidellm/benchmark/schemas/accumulator.py +++ b/src/guidellm/benchmark/schemas/accumulator.py @@ -495,6 +495,14 @@ class GenerativeMetricsAccumulator(StandardBaseModel): default_factory=RunningMetricStats, description="Accumulated time to first token statistics in milliseconds", ) + time_to_last_round_trip_ms: RunningMetricStats = Field( + default_factory=RunningMetricStats, + description="Accumulated websocket last round-trip latency in milliseconds", + ) + avg_round_trip_time_ms: RunningMetricStats = Field( + default_factory=RunningMetricStats, + description="Accumulated websocket average round-trip time in milliseconds", + ) time_to_first_output_token_ms: RunningMetricStats = Field( default_factory=RunningMetricStats, description="Accumulated time to first content token stats in ms", @@ -539,6 +547,12 @@ def update_estimate(self, stats: GenerativeRequestStats, duration: float): self.time_to_first_token_ms.update_estimate( stats.time_to_first_token_ms, duration=duration ) + self.time_to_last_round_trip_ms.update_estimate( + stats.time_to_last_round_trip_ms, duration=duration + ) + self.avg_round_trip_time_ms.update_estimate( + stats.avg_round_trip_time_ms, duration=duration + ) self.time_to_first_output_token_ms.update_estimate( stats.time_to_first_output_token_ms, duration=duration ) diff --git a/src/guidellm/benchmark/schemas/metrics.py b/src/guidellm/benchmark/schemas/metrics.py index 40ba5b48a..765def28b 100644 --- a/src/guidellm/benchmark/schemas/metrics.py +++ b/src/guidellm/benchmark/schemas/metrics.py @@ -813,6 +813,18 @@ class GenerativeMetrics(StandardBaseDict): inter_token_latency_ms: StatusDistributionSummary = Field( description="Distribution of inter-token latencies in milliseconds" ) + time_to_last_round_trip_ms: StatusDistributionSummary = Field( + description=( + "Distribution of websocket last-round-trip latencies in milliseconds " + "(last received token minus last sent packet)" + ) + ) + avg_round_trip_time_ms: StatusDistributionSummary = Field( + description=( + "Distribution of approximate websocket average round-trip times in " + "milliseconds (mean received minus mean sent)" + ) + ) prompt_tokens_per_second: StatusDistributionSummary = Field( description="Distribution of prompt token processing rates" ) @@ -889,10 +901,12 @@ def compile(cls, accumulator: GenerativeBenchmarkAccumulator) -> GenerativeMetri ), request_concurrency=StatusDistributionSummary.concurrency_distribution_from_timings_function( function=( - lambda req: (req.request_start_time, req.request_end_time) - if req.request_start_time is not None - and req.request_end_time is not None - else None + lambda req: ( + (req.request_start_time, req.request_end_time) + if req.request_start_time is not None + and req.request_end_time is not None + else None + ) ), successful=successful, incomplete=incomplete, @@ -937,6 +951,18 @@ def compile(cls, accumulator: GenerativeBenchmarkAccumulator) -> GenerativeMetri incomplete=incomplete, errored=errored, ), + time_to_last_round_trip_ms=StatusDistributionSummary.from_values_function( + function=lambda req: req.time_to_last_round_trip_ms or 0.0, + successful=successful, + incomplete=incomplete, + errored=errored, + ), + avg_round_trip_time_ms=StatusDistributionSummary.from_values_function( + function=lambda req: req.avg_round_trip_time_ms or 0.0, + successful=successful, + incomplete=incomplete, + errored=errored, + ), time_to_first_output_token_ms=StatusDistributionSummary.from_values_function( function=lambda req: req.time_to_first_output_token_ms or 0.0, successful=successful, diff --git a/src/guidellm/schemas/info.py b/src/guidellm/schemas/info.py index 2df99557d..04007295b 100644 --- a/src/guidellm/schemas/info.py +++ b/src/guidellm/schemas/info.py @@ -78,6 +78,35 @@ class RequestTimings(StandardBaseDict): token_iterations: int = Field( default=0, ) + last_request_sent: float | None = Field( + default=None, + description=( + "Unix timestamp of the last packet sent to the server, used for " + "round-trip metrics (openai_websocket backend)" + ), + ) + request_sent_sum: float = Field( + default=0.0, + description=( + "Sum of sent-packet timestamps for mean round-trip estimation " + "(openai_websocket backend)" + ), + ) + request_sent_count: int = Field( + default=0, + description="Number of packets sent to the server (openai_websocket backend)", + ) + token_received_sum: float = Field( + default=0.0, + description=( + "Sum of received content-token timestamps for mean round-trip " + "estimation (openai_websocket backend)" + ), + ) + token_received_count: int = Field( + default=0, + description="Number of content tokens received (openai_websocket backend)", + ) request_end: float | None = Field( default=None, description="Unix timestamp when the backend completed processing the request", diff --git a/src/guidellm/schemas/request_stats.py b/src/guidellm/schemas/request_stats.py index e685710c2..6af758b32 100644 --- a/src/guidellm/schemas/request_stats.py +++ b/src/guidellm/schemas/request_stats.py @@ -162,6 +162,45 @@ def time_to_first_token_ms(self) -> float | None: return 1000 * (first_token - start) + @computed_field # type: ignore[misc] + @property + def time_to_last_round_trip_ms(self) -> float | None: + """ + Time from the last sent packet to the last received token in milliseconds. + + Only populated by the websocket backend, which records send timestamps; + None for backends that do not record sends. + + :return: Last round-trip latency in milliseconds, or None if unavailable + """ + last_received = self.info.timings.last_token_iteration + last_sent = self.info.timings.last_request_sent + if last_received is None or last_sent is None: + return None + + return 1000 * (last_received - last_sent) + + @computed_field # type: ignore[misc] + @property + def avg_round_trip_time_ms(self) -> float | None: + """ + Approximate average round-trip time in milliseconds. + + Computed as the mean of received content-token timestamps minus the mean + of sent-packet timestamps. This is an approximation that assumes sent + packets and received tokens align uniformly in time. Only populated by + the websocket backend; None otherwise. + + :return: Average round-trip time in milliseconds, or None if unavailable + """ + timings = self.info.timings + if timings.request_sent_count <= 0 or timings.token_received_count <= 0: + return None + + mean_sent = timings.request_sent_sum / timings.request_sent_count + mean_received = timings.token_received_sum / timings.token_received_count + return 1000 * (mean_received - mean_sent) + @computed_field # type: ignore[misc] @property def time_per_output_token_ms(self) -> float | None: diff --git a/tests/unit/backends/openai/test_realtime_ws.py b/tests/unit/backends/openai/test_realtime_ws.py index 506836975..68fd79b9f 100644 --- a/tests/unit/backends/openai/test_realtime_ws.py +++ b/tests/unit/backends/openai/test_realtime_ws.py @@ -656,3 +656,126 @@ def test_openai_websocket_backend_args_invalid_request_format_rejected() -> None target="http://localhost:8000", request_format="nope", ) + + +@pytest.mark.asyncio +async def test_resolve_records_round_trip_timings() -> None: + """Round-trip send/receive timestamps are recorded during resolve. + + ## WRITTEN BY AI ## + """ + + async def handler(ws: object) -> None: + await ws.send( + json.dumps({"type": "session.created", "id": "sess-rtt", "created": 0}) + ) + while True: + msg = await ws.recv() + data = json.loads(msg if isinstance(msg, str) else msg.decode()) + if data.get("type") == "input_audio_buffer.commit" and data.get("final"): + break + await ws.send(json.dumps({"type": "transcription.delta", "delta": "hi"})) + await ws.send( + json.dumps( + { + "type": "transcription.done", + "text": "hi", + "usage": { + "prompt_tokens": 5, + "completion_tokens": 1, + "total_tokens": 6, + }, + } + ) + ) + + async with serve(handler, "127.0.0.1", 0) as server: + port = server.sockets[0].getsockname()[1] + be = _make_ws_backend( + target=f"http://127.0.0.1:{port}", + model="m", + validate_backend=False, + ) + await be.process_startup() + req = GenerationRequest( + request_id="rtt", + columns={ + "audio_column": [ + {"audio": b"fake", "format": "mp3", "file_name": "f.mp3"} + ] + }, + ) + info = RequestInfo(timings=RequestTimings()) + async for _ in be.resolve(req, info): + pass + await be.process_shutdown() + + t = info.timings + # one patched chunk -> session.update + 1 append + 2 commits = 4 sends + assert t.request_sent_count == 4 + assert t.last_request_sent is not None + assert t.request_sent_sum > 0 + # one non-empty delta received + assert t.token_received_count == 1 + assert t.token_received_sum > 0 + # tokens arrive after the last packet is sent -> last round trip >= 0 + assert t.last_token_iteration is not None + assert t.last_token_iteration >= t.last_request_sent + + +@pytest.mark.asyncio +async def test_resolve_records_round_trip_timings_done_only() -> None: + """Send/receive timings are recorded when text arrives only on done. + + ## WRITTEN BY AI ## + """ + + async def handler(ws: object) -> None: + await ws.send( + json.dumps({"type": "session.created", "id": "sess-done", "created": 0}) + ) + while True: + msg = await ws.recv() + data = json.loads(msg if isinstance(msg, str) else msg.decode()) + if data.get("type") == "input_audio_buffer.commit" and data.get("final"): + break + await ws.send( + json.dumps( + { + "type": "transcription.done", + "text": "hello", + "usage": { + "prompt_tokens": 5, + "completion_tokens": 1, + "total_tokens": 6, + }, + } + ) + ) + + async with serve(handler, "127.0.0.1", 0) as server: + port = server.sockets[0].getsockname()[1] + be = _make_ws_backend( + target=f"http://127.0.0.1:{port}", + model="m", + validate_backend=False, + ) + await be.process_startup() + req = GenerationRequest( + request_id="rtt-done", + columns={ + "audio_column": [ + {"audio": b"fake", "format": "mp3", "file_name": "f.mp3"} + ] + }, + ) + info = RequestInfo(timings=RequestTimings()) + async for _ in be.resolve(req, info): + pass + await be.process_shutdown() + + t = info.timings + assert t.request_sent_count == 4 + assert t.last_request_sent is not None + assert t.token_received_count == 1 + assert t.token_received_sum > 0 diff --git a/tests/unit/benchmark/schemas/test_metrics.py b/tests/unit/benchmark/schemas/test_metrics.py index 16c70fb04..e6bb9879f 100644 --- a/tests/unit/benchmark/schemas/test_metrics.py +++ b/tests/unit/benchmark/schemas/test_metrics.py @@ -8,6 +8,7 @@ import pytest from guidellm.benchmark.schemas.metrics import ( + GenerativeMetrics, GenerativeMetricsSummary, GenerativeToolCallMetricsSummary, ) @@ -15,6 +16,7 @@ GenerativeRequestStats, RequestInfo, RequestTimings, + StatusDistributionSummary, UsageMetrics, ) @@ -162,3 +164,52 @@ def test_tool_call_summary_compile_no_tool_calls(self): assert summary.mixed_tokens is not None assert summary.mixed_tokens.input is None assert summary.mixed_tokens.output is None + + +@pytest.mark.sanity +def test_round_trip_metrics_compile(): + """ + WebSocket round-trip metrics expose GenerativeMetrics fields and compile + into StatusDistributionSummary distributions from request timings. + + ## WRITTEN BY AI ## + """ + timings = RequestTimings( + resolve_start=0.0, + resolve_end=2.0, + request_start=0.0, + request_end=2.0, + last_request_sent=0.2, + last_token_iteration=0.5, + request_sent_sum=0.3, # mean 0.1 over 3 sends + request_sent_count=3, + token_received_sum=1.2, # mean 0.4 over 3 receives + token_received_count=3, + ) + stats = GenerativeRequestStats( + request_id="rtt", + info=RequestInfo(request_id="rtt", status="completed", timings=timings), + input_metrics=UsageMetrics(), + output_metrics=UsageMetrics(), + ) + + # Schema exposes the new metric fields. + assert "time_to_last_round_trip_ms" in GenerativeMetrics.model_fields + assert "avg_round_trip_time_ms" in GenerativeMetrics.model_fields + + # The same compile expressions used in GenerativeMetrics.compile(). + last_round_trip = StatusDistributionSummary.from_values_function( + function=lambda req: req.time_to_last_round_trip_ms or 0.0, + successful=[stats], + incomplete=[], + errored=[], + ) + avg_round_trip = StatusDistributionSummary.from_values_function( + function=lambda req: req.avg_round_trip_time_ms or 0.0, + successful=[stats], + incomplete=[], + errored=[], + ) + + assert last_round_trip.successful.mean == pytest.approx(300.0, abs=0.1) + assert avg_round_trip.successful.mean == pytest.approx(300.0, abs=0.1) diff --git a/tests/unit/schemas/test_request_stats.py b/tests/unit/schemas/test_request_stats.py index 5bdc9dd44..8ed8808b1 100644 --- a/tests/unit/schemas/test_request_stats.py +++ b/tests/unit/schemas/test_request_stats.py @@ -284,6 +284,8 @@ def test_class_signatures(self): "output_tokens_per_second", "iter_tokens_per_iteration", "output_tokens_per_iteration", + "time_to_last_round_trip_ms", + "avg_round_trip_time_ms", ): assert hasattr(GenerativeRequestStats, prop_name) @@ -722,6 +724,78 @@ def test_time_to_first_output_token_ms_none(self): assert stats.time_to_first_output_token_ms is None + @pytest.mark.sanity + def test_time_to_last_round_trip_ms(self): + """ + Last round trip = last received-token time minus last sent-packet time. + + ## WRITTEN BY AI ## + """ + info = RequestInfo(request_id="ttlrt", status="completed") + info.timings.request_start = 0.0 + info.timings.last_request_sent = 0.2 + info.timings.last_token_iteration = 0.5 + info.timings.request_end = 0.6 + info.timings.resolve_end = 0.6 + + stats = GenerativeRequestStats( + request_id="ttlrt", + info=info, + input_metrics=UsageMetrics(text_tokens=5), + output_metrics=UsageMetrics(text_tokens=10), + ) + + assert stats.time_to_last_round_trip_ms == pytest.approx(300.0, abs=0.1) + + @pytest.mark.sanity + def test_avg_round_trip_time_ms(self): + """ + Avg RTT (approximate) = mean(received times) minus mean(sent times). + + ## WRITTEN BY AI ## + """ + info = RequestInfo(request_id="avgrtt", status="completed") + info.timings.request_start = 0.0 + info.timings.request_sent_sum = 0.3 # mean 0.1 over 3 sends + info.timings.request_sent_count = 3 + info.timings.token_received_sum = 1.2 # mean 0.4 over 3 receives + info.timings.token_received_count = 3 + info.timings.request_end = 1.0 + info.timings.resolve_end = 1.0 + + stats = GenerativeRequestStats( + request_id="avgrtt", + info=info, + input_metrics=UsageMetrics(text_tokens=5), + output_metrics=UsageMetrics(text_tokens=10), + ) + + assert stats.avg_round_trip_time_ms == pytest.approx(300.0, abs=0.1) + + @pytest.mark.smoke + def test_round_trip_metrics_none_without_sends(self): + """ + Round-trip metrics are None when no send timestamps were recorded + (e.g. the HTTP backend), keeping them websocket-only. + + ## WRITTEN BY AI ## + """ + info = RequestInfo(request_id="rtt-none", status="completed") + info.timings.request_start = 0.0 + info.timings.last_token_iteration = 0.5 + info.timings.request_end = 1.0 + info.timings.resolve_end = 1.0 + + stats = GenerativeRequestStats( + request_id="rtt-none", + info=info, + input_metrics=UsageMetrics(text_tokens=5), + output_metrics=UsageMetrics(text_tokens=10), + ) + + assert stats.time_to_last_round_trip_ms is None + assert stats.avg_round_trip_time_ms is None + @pytest.mark.sanity @pytest.mark.asyncio @async_timeout(0.2) # ensure no accidental indefinite waits if expanded later