From 6973d613e86ac9b255e4f877a64768acfac7bd14 Mon Sep 17 00:00:00 2001 From: zhixiangli Date: Fri, 3 Jul 2026 11:20:54 +0000 Subject: [PATCH 1/3] feat(macrobenchmarks): collect and report system metrics Fetch per-pod system metrics (CPU, memory, network) from Cloud Monitoring during macrobenchmarks. Reduce these metrics to the bottleneck pod (maximum peak and mean across pods) and include them in the final summary. - Add `metrics.monitoring` to fetch metrics using Cloud Monitoring API. - Update `metrics.calculate` to perform the reduction. - Update schema and raw store to handle the new metrics. - Update `scrape_metrics.sh` to run the collection. - Add tests for the new functionality. TAG=agy CONV=c564d507-372d-476b-9cb2-a9aab7884941 --- .../macrobenchmarks_schema.json | 9 +- .../macrobenchmarks/metrics/calculate.py | 38 +++++ .../macrobenchmarks/metrics/monitoring.py | 158 ++++++++++++++++++ .../macrobenchmarks/metrics/raw_store.py | 18 ++ .../macrobenchmarks/metrics/requirements.txt | 1 + cloudbuild/macrobenchmarks/metrics/schema.py | 10 ++ .../metrics/tests/test_calculate_summary.py | 44 +++++ .../metrics/tests/test_calculate_system.py | 35 ++++ .../metrics/tests/test_monitoring.py | 99 +++++++++++ .../metrics/tests/test_raw_store_system.py | 19 +++ .../metrics/tests/test_schema_new_columns.py | 27 +++ .../macrobenchmarks/scripts/scrape_metrics.sh | 5 + 12 files changed, 462 insertions(+), 1 deletion(-) create mode 100644 cloudbuild/macrobenchmarks/metrics/monitoring.py create mode 100644 cloudbuild/macrobenchmarks/metrics/tests/test_calculate_system.py create mode 100644 cloudbuild/macrobenchmarks/metrics/tests/test_monitoring.py create mode 100644 cloudbuild/macrobenchmarks/metrics/tests/test_raw_store_system.py create mode 100644 cloudbuild/macrobenchmarks/metrics/tests/test_schema_new_columns.py diff --git a/cloudbuild/macrobenchmarks/macrobenchmarks_schema.json b/cloudbuild/macrobenchmarks/macrobenchmarks_schema.json index 5204f9ed..48c15af1 100644 --- a/cloudbuild/macrobenchmarks/macrobenchmarks_schema.json +++ b/cloudbuild/macrobenchmarks/macrobenchmarks_schema.json @@ -63,7 +63,14 @@ {"name": "checkpoint_delete_time_p100", "type": "FLOAT", "description": "Maximum (p100) checkpoint-delete duration in seconds."}, {"name": "num_checkpoint_delete_datapoints", "type": "INTEGER", "description": "Count of distinct checkpoint-delete datapoints."}, {"name": "accelerator_blocked_time", "type": "FLOAT", "description": "Seconds the accelerator was blocked waiting on data loading (bottleneck rank, run-wide)."}, - {"name": "accelerator_blocked_percent", "type": "FLOAT", "description": "Percent of wall-clock time the accelerator was blocked on data loading (bottleneck rank, run-wide)."} + {"name": "accelerator_blocked_percent", "type": "FLOAT", "description": "Percent of wall-clock time the accelerator was blocked on data loading (bottleneck rank, run-wide)."}, + {"name": "cpu_usage_peak_cores", "type": "FLOAT", "description": "Peak CPU usage in cores over the run window; max across worker pods (bottleneck pod)."}, + {"name": "cpu_usage_mean_cores", "type": "FLOAT", "description": "Mean CPU usage in cores over the run window; max of per-pod means across worker pods (bottleneck pod)."}, + {"name": "memory_usage_peak_bytes", "type": "INTEGER", "description": "Peak container memory usage in bytes over the run window; max across worker pods (bottleneck pod)."}, + {"name": "network_received_peak_bytes_per_sec", "type": "FLOAT", "description": "Peak pod network receive rate in bytes/s over the run window; max across worker pods (bottleneck pod)."}, + {"name": "network_received_mean_bytes_per_sec", "type": "FLOAT", "description": "Mean pod network receive rate in bytes/s over the run window; max of per-pod means across worker pods (bottleneck pod)."}, + {"name": "network_sent_peak_bytes_per_sec", "type": "FLOAT", "description": "Peak pod network send rate in bytes/s over the run window; max across worker pods (bottleneck pod)."}, + {"name": "network_sent_mean_bytes_per_sec", "type": "FLOAT", "description": "Mean pod network send rate in bytes/s over the run window; max of per-pod means across worker pods (bottleneck pod)."} ] } } diff --git a/cloudbuild/macrobenchmarks/metrics/calculate.py b/cloudbuild/macrobenchmarks/metrics/calculate.py index bdefdfb3..2314bcfc 100644 --- a/cloudbuild/macrobenchmarks/metrics/calculate.py +++ b/cloudbuild/macrobenchmarks/metrics/calculate.py @@ -168,6 +168,40 @@ def calc_data_loading_metrics(dl_rows: list) -> dict: } +# Maps series to schema columns. Memory has no mean column. +_SYSTEM_SERIES_COLUMNS = { + "cpu": ("cpu_usage_peak_cores", "cpu_usage_mean_cores"), + "memory": ("memory_usage_peak_bytes", None), + "network_received": ( + "network_received_peak_bytes_per_sec", + "network_received_mean_bytes_per_sec", + ), + "network_sent": ( + "network_sent_peak_bytes_per_sec", + "network_sent_mean_bytes_per_sec", + ), +} + + +def calc_system_metrics(system_rows: list) -> dict: + """Reduce per-pod metrics to the bottleneck pod (max peak/mean).""" + out = {} + by_metric = defaultdict(list) + for r in system_rows: + by_metric[r.get("metric")].append(r) + for series, (peak_col, mean_col) in _SYSTEM_SERIES_COLUMNS.items(): + rows = by_metric.get(series, []) + peaks = [r["peak"] for r in rows if r.get("peak") is not None] + if peaks: + val = max(peaks) + out[peak_col] = int(val) if peak_col == "memory_usage_peak_bytes" else val + if mean_col: + means = [r["mean"] for r in rows if r.get("mean") is not None] + if means: + out[mean_col] = max(means) + return out + + def build_summary_row( *, run_id: str, @@ -178,6 +212,7 @@ def build_summary_row( restore_rows: list, delete_rows: list, dl_rows: list, + system_rows: list = None, dimensions: dict = None, ) -> dict: row = { @@ -192,6 +227,7 @@ def build_summary_row( row.update(calc_restore_metrics(restore_rows)) row.update(calc_delete_metrics(delete_rows)) row.update(calc_data_loading_metrics(dl_rows)) + row.update(calc_system_metrics(system_rows or [])) return row @@ -335,6 +371,7 @@ def main(argv=None) -> None: restore_rows = tables.restore_rows delete_rows = tables.delete_rows dl_rows = tables.dl_rows + system_rows = tables.system_rows validate_required_metrics( step_rows=step_rows, @@ -394,6 +431,7 @@ def main(argv=None) -> None: restore_rows=restore_rows, delete_rows=delete_rows, dl_rows=dl_rows, + system_rows=system_rows, dimensions=dimensions, ) diff --git a/cloudbuild/macrobenchmarks/metrics/monitoring.py b/cloudbuild/macrobenchmarks/metrics/monitoring.py new file mode 100644 index 00000000..acfcb715 --- /dev/null +++ b/cloudbuild/macrobenchmarks/metrics/monitoring.py @@ -0,0 +1,158 @@ +"""Fetch per-pod system metrics from Cloud Monitoring. + +Usage: + python3 -m metrics.monitoring --project P --run-id R \ + --start-time RFC3339 --end-time RFC3339 --out-dir DIR +""" + +import argparse +import datetime +import statistics +import traceback +from dataclasses import dataclass + +from metrics import raw_store, schema + + +@dataclass(frozen=True) +class Series: + """One monitoring series mapped to our internal metric name.""" + + name: str # internal series name: cpu | memory | network_received | network_sent + metric_type: str # Cloud Monitoring metric.type + resource_type: str # k8s_container | k8s_pod + aligner: str # per-series aligner name + + +# CPU: cores (RATE), Memory: peak bytes (MAX), Network: bytes/s (RATE). +SERIES = [ + Series( + "cpu", + "kubernetes.io/container/cpu/core_usage_time", + "k8s_container", + "ALIGN_RATE", + ), + Series( + "memory", + "kubernetes.io/container/memory/used_bytes", + "k8s_container", + "ALIGN_MAX", + ), + Series( + "network_received", + "kubernetes.io/pod/network/received_bytes_count", + "k8s_pod", + "ALIGN_RATE", + ), + Series( + "network_sent", + "kubernetes.io/pod/network/sent_bytes_count", + "k8s_pod", + "ALIGN_RATE", + ), +] + + +def _to_epoch(rfc3339: str) -> int: + """Parse RFC3339 to epoch seconds.""" + dt = datetime.datetime.fromisoformat(rfc3339.replace("Z", "+00:00")) + return int(dt.timestamp()) + + +def _point_value(point) -> float: + """Read numeric value from point.""" + v = point.value + if getattr(v, "double_value", 0.0): + return float(v.double_value) + return float(getattr(v, "int64_value", 0)) + + +def reduce_points(values: list) -> tuple: + """Return (peak, mean) of values, or (None, None).""" + if not values: + return None, None + return max(values), statistics.mean(values) + + +def _build_request(project, run_id, series, start_epoch, end_epoch, period): + """Build list_time_series request.""" + filter_ = ( + f'metric.type = "{series.metric_type}" ' + f'AND resource.type = "{series.resource_type}" ' + f'AND resource.labels.pod_name = starts_with("{run_id}-workload-0-")' + ) + return { + "name": f"projects/{project}", + "filter": filter_, + "interval": { + "start_time": {"seconds": int(start_epoch)}, + "end_time": {"seconds": int(end_epoch)}, + }, + "aggregation": { + "alignment_period": {"seconds": period}, + "per_series_aligner": series.aligner, + }, + } + + +def collect(client, *, project, run_id, start_epoch, end_epoch, period=60) -> list: + """Collect SystemMetric rows for all SERIES.""" + rows = [] + for series in SERIES: + request = _build_request( + project, run_id, series, start_epoch, end_epoch, period + ) + for ts in client.list_time_series(request): + pod_name = ts.resource.labels.get("pod_name", "") + values = [_point_value(p) for p in ts.points] + peak, mean = reduce_points(values) + if peak is None: + continue + rows.append( + schema.SystemMetric( + pod_name=pod_name, metric=series.name, peak=peak, mean=mean + ) + ) + return rows + + +def main(argv=None) -> None: + parser = argparse.ArgumentParser( + description="Fetch per-pod system metrics from Cloud Monitoring." + ) + parser.add_argument("--project", required=True) + parser.add_argument("--run-id", required=True) + parser.add_argument("--start-time", required=True, help="RFC3339") + parser.add_argument("--end-time", required=True, help="RFC3339") + parser.add_argument("--out-dir", required=True) + parser.add_argument("--period", type=int, default=60) + args = parser.parse_args(argv) + + # Import here to handle missing library case separately. + try: + from google.cloud import monitoring_v3 + except ImportError as e: + print(f"Warning: google-cloud-monitoring unavailable, columns will be N/A: {e}") + return + + try: + client = monitoring_v3.MetricServiceClient() + rows = collect( + client, + project=args.project, + run_id=args.run_id, + start_epoch=_to_epoch(args.start_time), + end_epoch=_to_epoch(args.end_time), + period=args.period, + ) + raw_store.write_system_metrics(rows, args.out_dir) + print(f"Wrote {len(rows)} system-metric rows to {args.out_dir}") + except Exception as e: # best-effort + print( + f"Warning: system-metrics fetch failed, columns will be N/A: {e}\n" + f"{traceback.format_exc()}" + ) + + +if __name__ == "__main__": + main() diff --git a/cloudbuild/macrobenchmarks/metrics/raw_store.py b/cloudbuild/macrobenchmarks/metrics/raw_store.py index 2af53bbd..f419cf79 100644 --- a/cloudbuild/macrobenchmarks/metrics/raw_store.py +++ b/cloudbuild/macrobenchmarks/metrics/raw_store.py @@ -26,6 +26,7 @@ class RawMetricTables: restore_rows: List[dict] = field(default_factory=list) delete_rows: List[dict] = field(default_factory=list) dl_rows: List[dict] = field(default_factory=list) + system_rows: List[dict] = field(default_factory=list) def write_raw_metrics( @@ -93,6 +94,18 @@ def write_raw_metrics( ) +def write_system_metrics(system_rows, out_dir: str) -> None: + """Write SystemMetric rows to the system-metrics CSV (owned here like the rest).""" + if system_rows: + _write_csv( + os.path.join( + out_dir, schema.SYSTEM_METRICS_DIRECTORY, schema.SYSTEM_METRICS_FILE + ), + schema.SystemMetric, + system_rows, + ) + + def read_raw_metrics( in_dir: str, *, run_type: str = "perf_optimization" ) -> RawMetricTables: @@ -129,6 +142,11 @@ def read_raw_metrics( schema.DATA_LOADING_METRICS_FILE, ) ), + system_rows=_read_csv( + os.path.join( + in_dir, schema.SYSTEM_METRICS_DIRECTORY, schema.SYSTEM_METRICS_FILE + ) + ), ) diff --git a/cloudbuild/macrobenchmarks/metrics/requirements.txt b/cloudbuild/macrobenchmarks/metrics/requirements.txt index c638cbce..91a28a2a 100644 --- a/cloudbuild/macrobenchmarks/metrics/requirements.txt +++ b/cloudbuild/macrobenchmarks/metrics/requirements.txt @@ -1,4 +1,5 @@ google-cloud-logging +google-cloud-monitoring google-cloud-storage numpy pytest diff --git a/cloudbuild/macrobenchmarks/metrics/schema.py b/cloudbuild/macrobenchmarks/metrics/schema.py index da9dc8a0..c6302563 100644 --- a/cloudbuild/macrobenchmarks/metrics/schema.py +++ b/cloudbuild/macrobenchmarks/metrics/schema.py @@ -18,6 +18,8 @@ PER_ACCELERATOR_DIRECTORY = "per_accelerator" CALCULATED_METRICS_DIRECTORY = "calculated_metrics" DATA_LOADING_METRICS_FILE = "data_loading_metrics.csv" +SYSTEM_METRICS_DIRECTORY = "system_metrics" +SYSTEM_METRICS_FILE = "system_metrics.csv" def fieldnames(dataclass_type) -> list: @@ -76,3 +78,11 @@ class DataLoadingMetrics: accelerator_blocked_time: float = None accelerator_blocked_percent: float = None update_timestamp: str = None + + +@dataclass(kw_only=True) +class SystemMetric: + pod_name: str + metric: str + peak: float + mean: float = None diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_summary.py b/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_summary.py index 4263c26c..bdb834ac 100644 --- a/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_summary.py +++ b/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_summary.py @@ -258,6 +258,50 @@ def _write_restore_csv(in_dir): w.writerow([0, "gs://b/ckpt", 10.0, 18.0, 0, ""]) +def _write_system_metrics_csv(in_dir): + (in_dir / "system_metrics").mkdir(parents=True) + path = in_dir / "system_metrics" / "system_metrics.csv" + with open(path, "w", newline="") as f: + w = csv.writer(f) + w.writerow(["pod_name", "metric", "peak", "mean"]) + w.writerow(["p0", "cpu", 3.0, 1.0]) + w.writerow(["p1", "cpu", 5.0, 4.0]) + w.writerow(["p0", "memory", 2048.0, ""]) + w.writerow(["p0", "network_received", 10.0, 2.0]) + + +def test_main_emits_system_metric_columns(tmp_path): + # Verify system metrics are reduced to bottleneck pod and typed correctly. + in_dir = tmp_path / "raw" + _write_step_csv(in_dir) + _write_data_loading_csv(in_dir) + _write_system_metrics_csv(in_dir) + out_file = tmp_path / "summary.csv" + calculate.main( + [ + "--run-id", + "r", + "--workload-name", + "hf-pytorch-lightning-cpu", + "--requirements", + "gcsfs==1.0", + "--in-dir", + str(in_dir), + "--out-file", + str(out_file), + "--require-data-loading-metrics", + ] + ) + with open(out_file) as f: + rows = list(csv.DictReader(f)) + assert rows[0]["cpu_usage_peak_cores"] == "5.0" + assert rows[0]["cpu_usage_mean_cores"] == "4.0" # max of per-pod means + assert rows[0]["memory_usage_peak_bytes"] == "2048" # int-typed + assert rows[0]["network_received_peak_bytes_per_sec"] == "10.0" + assert rows[0]["network_received_mean_bytes_per_sec"] == "2.0" + assert rows[0]["network_sent_peak_bytes_per_sec"] == "N/A" + + def test_main_fails_when_required_data_loading_metrics_are_missing(tmp_path): # step metrics present, but no data_loading_metrics.csv -> must fail when # --require-data-loading-metrics is set (the profiler summary is required). diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_system.py b/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_system.py new file mode 100644 index 00000000..e8def084 --- /dev/null +++ b/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_system.py @@ -0,0 +1,35 @@ +from metrics import calculate + + +def test_max_across_pods_per_metric(): + rows = [ + {"pod_name": "p0", "metric": "cpu", "peak": 3.0, "mean": 1.0}, + {"pod_name": "p1", "metric": "cpu", "peak": 5.0, "mean": 4.0}, + {"pod_name": "p0", "metric": "memory", "peak": 1024.0, "mean": None}, + {"pod_name": "p1", "metric": "memory", "peak": 2048.0, "mean": None}, + {"pod_name": "p0", "metric": "network_received", "peak": 10.0, "mean": 2.0}, + {"pod_name": "p1", "metric": "network_received", "peak": 8.0, "mean": 3.0}, + {"pod_name": "p0", "metric": "network_sent", "peak": 7.0, "mean": 1.0}, + {"pod_name": "p1", "metric": "network_sent", "peak": 9.0, "mean": 5.0}, + ] + m = calculate.calc_system_metrics(rows) + assert m["cpu_usage_peak_cores"] == 5.0 + assert m["cpu_usage_mean_cores"] == 4.0 # max of per-pod means + assert m["memory_usage_peak_bytes"] == 2048 # int + assert isinstance(m["memory_usage_peak_bytes"], int) + assert m["network_received_peak_bytes_per_sec"] == 10.0 + assert m["network_received_mean_bytes_per_sec"] == 3.0 + assert m["network_sent_peak_bytes_per_sec"] == 9.0 + assert m["network_sent_mean_bytes_per_sec"] == 5.0 + + +def test_empty_rows_yield_no_keys(): + assert calculate.calc_system_metrics([]) == {} + + +def test_missing_series_omits_its_columns(): + rows = [{"pod_name": "p0", "metric": "cpu", "peak": 2.0, "mean": 1.0}] + m = calculate.calc_system_metrics(rows) + assert m["cpu_usage_peak_cores"] == 2.0 + assert "memory_usage_peak_bytes" not in m + assert "network_received_peak_bytes_per_sec" not in m diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_monitoring.py b/cloudbuild/macrobenchmarks/metrics/tests/test_monitoring.py new file mode 100644 index 00000000..0040c7f0 --- /dev/null +++ b/cloudbuild/macrobenchmarks/metrics/tests/test_monitoring.py @@ -0,0 +1,99 @@ +from types import SimpleNamespace + +from metrics import monitoring + + +def _point(value): + return SimpleNamespace(value=SimpleNamespace(double_value=value, int64_value=0)) + + +def _series(pod_name, values): + return SimpleNamespace( + resource=SimpleNamespace(labels={"pod_name": pod_name}), + points=[_point(v) for v in values], + ) + + +class _FakeClient: + """Returns a canned series list per metric.type in the request filter.""" + + def __init__(self, by_metric_type): + self.by_metric_type = by_metric_type + self.requests = [] + + def list_time_series(self, request): + self.requests.append(request) + for metric_type, series in self.by_metric_type.items(): + if metric_type in request["filter"]: + return iter(series) + return iter(()) + + +def test_reduce_points(): + assert monitoring.reduce_points([1.0, 3.0, 2.0]) == (3.0, 2.0) + assert monitoring.reduce_points([]) == (None, None) + + +def test_point_value_reads_int64_when_double_zero(): + # Test fallback to int64_value when double_value is 0.0. + point = SimpleNamespace( + value=SimpleNamespace(double_value=0.0, int64_value=1073741824) + ) + assert monitoring._point_value(point) == 1073741824.0 + + +def test_build_request_shape(): + # Verify request shape matches MetricServiceClient expectations. + cpu = monitoring.SERIES[0] + req = monitoring._build_request("proj", "run", cpu, 100, 700, 60) + assert req["name"] == "projects/proj" + assert ( + 'metric.type = "kubernetes.io/container/cpu/core_usage_time"' in req["filter"] + ) + assert 'resource.type = "k8s_container"' in req["filter"] + assert 'starts_with("run-workload-0-")' in req["filter"] + assert req["interval"]["start_time"]["seconds"] == 100 + assert req["interval"]["end_time"]["seconds"] == 700 + assert req["aggregation"]["alignment_period"]["seconds"] == 60 + assert req["aggregation"]["per_series_aligner"] == "ALIGN_RATE" + + +def test_to_epoch_handles_zulu(): + + assert monitoring._to_epoch("1970-01-01T00:01:00Z") == 60 + + +def test_collect_emits_one_row_per_pod_and_series(): + client = _FakeClient( + { + "core_usage_time": [_series("p0", [1.0, 5.0]), _series("p1", [2.0, 2.0])], + "memory/used_bytes": [_series("p0", [1024.0])], + "network/received_bytes_count": [_series("p0", [10.0, 20.0])], + "network/sent_bytes_count": [_series("p0", [4.0, 6.0])], + } + ) + rows = monitoring.collect( + client, project="proj", run_id="run", start_epoch=0, end_epoch=600 + ) + by_key = {(r.pod_name, r.metric): r for r in rows} + assert by_key[("p0", "cpu")].peak == 5.0 + assert by_key[("p0", "cpu")].mean == 3.0 + assert by_key[("p1", "cpu")].peak == 2.0 + assert by_key[("p0", "memory")].peak == 1024.0 + assert by_key[("p0", "network_received")].peak == 20.0 + assert by_key[("p0", "network_sent")].peak == 6.0 + + assert 'starts_with("run-workload-0-")' in client.requests[0]["filter"] + + +def test_collect_writes_via_raw_store(tmp_path): + from metrics import raw_store + + client = _FakeClient({"core_usage_time": [_series("p0", [2.0, 4.0])]}) + rows = monitoring.collect( + client, project="proj", run_id="run", start_epoch=0, end_epoch=600 + ) + raw_store.write_system_metrics(rows, str(tmp_path)) + tables = raw_store.read_raw_metrics(str(tmp_path)) + cpu = [r for r in tables.system_rows if r["metric"] == "cpu"] + assert cpu and cpu[0]["peak"] == 4.0 and cpu[0]["mean"] == 3.0 diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_raw_store_system.py b/cloudbuild/macrobenchmarks/metrics/tests/test_raw_store_system.py new file mode 100644 index 00000000..f6020d69 --- /dev/null +++ b/cloudbuild/macrobenchmarks/metrics/tests/test_raw_store_system.py @@ -0,0 +1,19 @@ +from metrics import raw_store, schema + + +def test_system_metrics_roundtrip(tmp_path): + rows = [ + schema.SystemMetric(pod_name="p0", metric="cpu", peak=3.0, mean=1.5), + schema.SystemMetric(pod_name="p1", metric="memory", peak=1024.0, mean=None), + ] + raw_store.write_system_metrics(rows, str(tmp_path)) + tables = raw_store.read_raw_metrics(str(tmp_path)) + assert tables.system_rows == [ + {"pod_name": "p0", "metric": "cpu", "peak": 3.0, "mean": 1.5}, + {"pod_name": "p1", "metric": "memory", "peak": 1024.0, "mean": None}, + ] + + +def test_absent_system_metrics_read_as_empty(tmp_path): + tables = raw_store.read_raw_metrics(str(tmp_path)) + assert tables.system_rows == [] diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_schema_new_columns.py b/cloudbuild/macrobenchmarks/metrics/tests/test_schema_new_columns.py new file mode 100644 index 00000000..d826513d --- /dev/null +++ b/cloudbuild/macrobenchmarks/metrics/tests/test_schema_new_columns.py @@ -0,0 +1,27 @@ +from metrics import summary_schema + +NEW_COLUMNS = [ + ("cpu_usage_peak_cores", "FLOAT"), + ("cpu_usage_mean_cores", "FLOAT"), + ("memory_usage_peak_bytes", "INTEGER"), + ("network_received_peak_bytes_per_sec", "FLOAT"), + ("network_received_mean_bytes_per_sec", "FLOAT"), + ("network_sent_peak_bytes_per_sec", "FLOAT"), + ("network_sent_mean_bytes_per_sec", "FLOAT"), +] + + +def test_new_columns_present_and_typed(): + fields = { + f["name"]: f["type"] + for f in summary_schema.external_table_definition()["schema"]["fields"] + } + for name, bq_type in NEW_COLUMNS: + assert name in fields, f"{name} missing from schema JSON" + assert fields[name] == bq_type, f"{name} should be {bq_type}" + + +def test_new_columns_are_last_and_in_order(): + names = summary_schema.fieldnames() + tail = names[-len(NEW_COLUMNS) :] + assert tail == [name for name, _ in NEW_COLUMNS] diff --git a/cloudbuild/macrobenchmarks/scripts/scrape_metrics.sh b/cloudbuild/macrobenchmarks/scripts/scrape_metrics.sh index bbeb0b6b..62c818ec 100755 --- a/cloudbuild/macrobenchmarks/scripts/scrape_metrics.sh +++ b/cloudbuild/macrobenchmarks/scripts/scrape_metrics.sh @@ -55,6 +55,11 @@ for attempt in $(seq 1 5); do sleep 60 continue fi + # Best-effort system metrics. Failure is ignored. + python3 -m metrics.monitoring \ + --project "${PROJECT_ID}" --run-id "$RUN_ID" \ + --start-time "$START_TIME" --end-time "$END_TIME" \ + --out-dir "$RAW_DIR" || true if python3 -m metrics.calculate \ --run-id "$RUN_ID" --workload-name "${_WORKLOAD}" \ --requirements "${_REQUIREMENTS}" --in-dir "$RAW_DIR" --out-file "$SUMMARY" \ From 139a7335c8b450694d5f2a7f5de169ae542e12e8 Mon Sep 17 00:00:00 2001 From: Zhixiang Li Date: Fri, 3 Jul 2026 19:33:18 +0800 Subject: [PATCH 2/3] Update cloudbuild/macrobenchmarks/metrics/monitoring.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- cloudbuild/macrobenchmarks/metrics/monitoring.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild/macrobenchmarks/metrics/monitoring.py b/cloudbuild/macrobenchmarks/metrics/monitoring.py index acfcb715..a7d201b6 100644 --- a/cloudbuild/macrobenchmarks/metrics/monitoring.py +++ b/cloudbuild/macrobenchmarks/metrics/monitoring.py @@ -55,7 +55,7 @@ class Series: def _to_epoch(rfc3339: str) -> int: """Parse RFC3339 to epoch seconds.""" - dt = datetime.datetime.fromisoformat(rfc3339.replace("Z", "+00:00")) + dt = datetime.datetime.fromisoformat(rfc3339.upper().replace("Z", "+00:00")) return int(dt.timestamp()) From 2ebeb3a9c31162123a466e01f768d573b3359b40 Mon Sep 17 00:00:00 2001 From: zhixiangli Date: Mon, 6 Jul 2026 00:52:10 +0800 Subject: [PATCH 3/3] Include GCS read-amplification and bottleneck system metrics --- .../macrobenchmarks-cloudbuild.yaml | 1 + .../macrobenchmarks_schema.json | 13 +- .../macrobenchmarks/metrics/calculate.py | 46 +++- .../macrobenchmarks/metrics/monitoring.py | 202 +++++++++++++++-- cloudbuild/macrobenchmarks/metrics/sizes.py | 55 +++++ .../metrics/tests/test_calculate_system.py | 43 ++++ .../metrics/tests/test_monitoring.py | 214 +++++++++++++++++- .../metrics/tests/test_schema_new_columns.py | 27 --- .../metrics/tests/test_sizes.py | 59 +++++ .../macrobenchmarks/scripts/create_buckets.sh | 16 +- .../macrobenchmarks/scripts/delete_buckets.sh | 5 +- .../macrobenchmarks/scripts/init_variables.sh | 1 + cloudbuild/macrobenchmarks/scripts/lib.sh | 18 +- .../macrobenchmarks/scripts/scrape_metrics.sh | 66 +++--- 14 files changed, 663 insertions(+), 103 deletions(-) create mode 100644 cloudbuild/macrobenchmarks/metrics/sizes.py delete mode 100644 cloudbuild/macrobenchmarks/metrics/tests/test_schema_new_columns.py create mode 100644 cloudbuild/macrobenchmarks/metrics/tests/test_sizes.py diff --git a/cloudbuild/macrobenchmarks/macrobenchmarks-cloudbuild.yaml b/cloudbuild/macrobenchmarks/macrobenchmarks-cloudbuild.yaml index 13c10781..1e0752ba 100644 --- a/cloudbuild/macrobenchmarks/macrobenchmarks-cloudbuild.yaml +++ b/cloudbuild/macrobenchmarks/macrobenchmarks-cloudbuild.yaml @@ -95,6 +95,7 @@ steps: - "LOCATION=${LOCATION}" - "_BUCKET_TYPE=${_BUCKET_TYPE}" - "_ZONE=${_ZONE}" + - "_DATASET_PATH=${_DATASET_PATH}" args: ["cloudbuild/macrobenchmarks/scripts/create_buckets.sh"] waitFor: ["init-variables"] diff --git a/cloudbuild/macrobenchmarks/macrobenchmarks_schema.json b/cloudbuild/macrobenchmarks/macrobenchmarks_schema.json index 48c15af1..61559284 100644 --- a/cloudbuild/macrobenchmarks/macrobenchmarks_schema.json +++ b/cloudbuild/macrobenchmarks/macrobenchmarks_schema.json @@ -70,7 +70,18 @@ {"name": "network_received_peak_bytes_per_sec", "type": "FLOAT", "description": "Peak pod network receive rate in bytes/s over the run window; max across worker pods (bottleneck pod)."}, {"name": "network_received_mean_bytes_per_sec", "type": "FLOAT", "description": "Mean pod network receive rate in bytes/s over the run window; max of per-pod means across worker pods (bottleneck pod)."}, {"name": "network_sent_peak_bytes_per_sec", "type": "FLOAT", "description": "Peak pod network send rate in bytes/s over the run window; max across worker pods (bottleneck pod)."}, - {"name": "network_sent_mean_bytes_per_sec", "type": "FLOAT", "description": "Mean pod network send rate in bytes/s over the run window; max of per-pod means across worker pods (bottleneck pod)."} + {"name": "network_sent_mean_bytes_per_sec", "type": "FLOAT", "description": "Mean pod network send rate in bytes/s over the run window; max of per-pod means across worker pods (bottleneck pod)."}, + {"name": "checkpoint_read_bytes", "type": "INTEGER", "description": "Total bytes served (egress) from the per-run checkpoint bucket over the run window; downloaded during restore."}, + {"name": "checkpoint_read_request_count", "type": "INTEGER", "description": "Count of object-read requests against the checkpoint bucket over the run window."}, + {"name": "checkpoint_restored_bytes", "type": "INTEGER", "description": "Size in bytes (du) of the restored checkpoint; denominator of the checkpoint read-amplification ratio."}, + {"name": "checkpoint_read_amplification_ratio", "type": "FLOAT", "description": "checkpoint_read_bytes / checkpoint_restored_bytes. DDP ~= world_size (every rank pulls the full checkpoint); FSDP ~= 1 (each pulls its shard)."}, + {"name": "dataset_read_bytes", "type": "INTEGER", "description": "Total bytes served (egress) from the per-run dataset bucket over the run window."}, + {"name": "dataset_read_request_count", "type": "INTEGER", "description": "Count of object-read requests against the dataset bucket over the run window."}, + {"name": "dataset_size_bytes", "type": "INTEGER", "description": "Size in bytes (du) of the per-run dataset bucket (a full unique copy); denominator of the dataset read-amplification ratio."}, + {"name": "dataset_read_amplification_ratio", "type": "FLOAT", "description": "dataset_read_bytes / dataset_size_bytes. Quantifies duplicate dataset download across ranks."}, + {"name": "memory_usage_mean_bytes", "type": "INTEGER", "description": "Mean container memory usage in bytes over the run window; max of per-pod means across worker pods (bottleneck pod)."}, + {"name": "memory_limit_utilization_peak", "type": "FLOAT", "description": "Peak fraction of the container memory limit over the run window (bottleneck pod); how close restore got to OOM."}, + {"name": "cpu_limit_utilization_peak", "type": "FLOAT", "description": "Peak fraction of the container CPU limit over the run window (bottleneck pod); whether the workload is pegged (compute-bound) vs I/O idle."} ] } } diff --git a/cloudbuild/macrobenchmarks/metrics/calculate.py b/cloudbuild/macrobenchmarks/metrics/calculate.py index 2314bcfc..650af280 100644 --- a/cloudbuild/macrobenchmarks/metrics/calculate.py +++ b/cloudbuild/macrobenchmarks/metrics/calculate.py @@ -168,10 +168,10 @@ def calc_data_loading_metrics(dl_rows: list) -> dict: } -# Maps series to schema columns. Memory has no mean column. +# Maps series to schema columns. `None` mean-column means the series has no mean. _SYSTEM_SERIES_COLUMNS = { "cpu": ("cpu_usage_peak_cores", "cpu_usage_mean_cores"), - "memory": ("memory_usage_peak_bytes", None), + "memory": ("memory_usage_peak_bytes", "memory_usage_mean_bytes"), "network_received": ( "network_received_peak_bytes_per_sec", "network_received_mean_bytes_per_sec", @@ -180,11 +180,38 @@ def calc_data_loading_metrics(dl_rows: list) -> dict: "network_sent_peak_bytes_per_sec", "network_sent_mean_bytes_per_sec", ), + "cpu_limit_utilization": ("cpu_limit_utilization_peak", None), + "memory_limit_utilization": ("memory_limit_utilization_peak", None), + "checkpoint_read_bytes": ("checkpoint_read_bytes", None), + "checkpoint_read_request_count": ("checkpoint_read_request_count", None), + "checkpoint_restored_bytes": ("checkpoint_restored_bytes", None), + "dataset_read_bytes": ("dataset_read_bytes", None), + "dataset_read_request_count": ("dataset_read_request_count", None), + "dataset_size_bytes": ("dataset_size_bytes", None), } +# Columns reported as whole numbers (bytes / counts) rather than floats. +_INT_COLUMNS = { + "memory_usage_peak_bytes", + "memory_usage_mean_bytes", + "checkpoint_read_bytes", + "checkpoint_read_request_count", + "checkpoint_restored_bytes", + "dataset_read_bytes", + "dataset_read_request_count", + "dataset_size_bytes", +} + + +def _amplification(numerator, denominator): + """numerator / denominator, or None when either is absent/zero.""" + if numerator is not None and denominator: + return numerator / denominator + return None + def calc_system_metrics(system_rows: list) -> dict: - """Reduce per-pod metrics to the bottleneck pod (max peak/mean).""" + """Reduce per-pod/per-bucket metrics to the bottleneck value and derive ratios.""" out = {} by_metric = defaultdict(list) for r in system_rows: @@ -194,11 +221,20 @@ def calc_system_metrics(system_rows: list) -> dict: peaks = [r["peak"] for r in rows if r.get("peak") is not None] if peaks: val = max(peaks) - out[peak_col] = int(val) if peak_col == "memory_usage_peak_bytes" else val + out[peak_col] = int(val) if peak_col in _INT_COLUMNS else val if mean_col: means = [r["mean"] for r in rows if r.get("mean") is not None] if means: - out[mean_col] = max(means) + val = max(means) + out[mean_col] = int(val) if mean_col in _INT_COLUMNS else val + ratio = _amplification( + out.get("checkpoint_read_bytes"), out.get("checkpoint_restored_bytes") + ) + if ratio is not None: + out["checkpoint_read_amplification_ratio"] = ratio + ratio = _amplification(out.get("dataset_read_bytes"), out.get("dataset_size_bytes")) + if ratio is not None: + out["dataset_read_amplification_ratio"] = ratio return out diff --git a/cloudbuild/macrobenchmarks/metrics/monitoring.py b/cloudbuild/macrobenchmarks/metrics/monitoring.py index a7d201b6..280ff4c9 100644 --- a/cloudbuild/macrobenchmarks/metrics/monitoring.py +++ b/cloudbuild/macrobenchmarks/metrics/monitoring.py @@ -18,13 +18,16 @@ class Series: """One monitoring series mapped to our internal metric name.""" - name: str # internal series name: cpu | memory | network_received | network_sent + name: str # internal series name metric_type: str # Cloud Monitoring metric.type - resource_type: str # k8s_container | k8s_pod + resource_type: str # k8s_container | k8s_pod | gcs_bucket aligner: str # per-series aligner name + filter_kind: str = "pod" # "pod" (pod_name prefix) | "bucket" (bucket_name) + method: str = None # optional metric.labels.method filter (bucket series) -# CPU: cores (RATE), Memory: peak bytes (MAX), Network: bytes/s (RATE). +# CPU: cores (RATE), Memory: peak bytes (MAX), Network: bytes/s (RATE), +# limit utilizations: fraction of the container limit (MAX). SERIES = [ Series( "cpu", @@ -50,6 +53,38 @@ class Series: "k8s_pod", "ALIGN_RATE", ), + Series( + "cpu_limit_utilization", + "kubernetes.io/container/cpu/limit_utilization", + "k8s_container", + "ALIGN_MAX", + ), + Series( + "memory_limit_utilization", + "kubernetes.io/container/memory/limit_utilization", + "k8s_container", + "ALIGN_MAX", + ), +] + +# Per-bucket totals summed over the window; `name` is prefixed with +# "checkpoint"/"dataset" to form the metric/column name. +GCS_BUCKET_SERIES = [ + Series( + "read_bytes", + "storage.googleapis.com/network/sent_bytes_count", + "gcs_bucket", + "ALIGN_DELTA", + filter_kind="bucket", + ), + Series( + "read_request_count", + "storage.googleapis.com/api/request_count", + "gcs_bucket", + "ALIGN_DELTA", + filter_kind="bucket", + method="ReadObject", + ), ] @@ -74,13 +109,22 @@ def reduce_points(values: list) -> tuple: return max(values), statistics.mean(values) -def _build_request(project, run_id, series, start_epoch, end_epoch, period): - """Build list_time_series request.""" - filter_ = ( - f'metric.type = "{series.metric_type}" ' - f'AND resource.type = "{series.resource_type}" ' - f'AND resource.labels.pod_name = starts_with("{run_id}-workload-0-")' - ) +def _build_request(project, series, target, start_epoch, end_epoch, period): + """Build a list_time_series request; ``target`` is the run id or bucket name.""" + if series.filter_kind == "bucket": + filter_ = ( + f'metric.type = "{series.metric_type}" ' + f'AND resource.type = "{series.resource_type}" ' + f'AND resource.labels.bucket_name = "{target}"' + ) + if series.method: + filter_ += f' AND metric.labels.method = "{series.method}"' + else: + filter_ = ( + f'metric.type = "{series.metric_type}" ' + f'AND resource.type = "{series.resource_type}" ' + f'AND resource.labels.pod_name = starts_with("{target}-workload-0-")' + ) return { "name": f"projects/{project}", "filter": filter_, @@ -96,23 +140,115 @@ def _build_request(project, run_id, series, start_epoch, end_epoch, period): def collect(client, *, project, run_id, start_epoch, end_epoch, period=60) -> list: - """Collect SystemMetric rows for all SERIES.""" + """Collect SystemMetric rows for all per-pod SERIES; a failed series is skipped.""" rows = [] for series in SERIES: - request = _build_request( - project, run_id, series, start_epoch, end_epoch, period - ) - for ts in client.list_time_series(request): - pod_name = ts.resource.labels.get("pod_name", "") - values = [_point_value(p) for p in ts.points] - peak, mean = reduce_points(values) - if peak is None: - continue - rows.append( - schema.SystemMetric( - pod_name=pod_name, metric=series.name, peak=peak, mean=mean + try: + request = _build_request( + project, series, run_id, start_epoch, end_epoch, period + ) + for ts in client.list_time_series(request): + pod_name = ts.resource.labels.get("pod_name", "") + values = [_point_value(p) for p in ts.points] + peak, mean = reduce_points(values) + if peak is None: + continue + rows.append( + schema.SystemMetric( + pod_name=pod_name, metric=series.name, peak=peak, mean=mean + ) ) + except Exception as e: # best-effort: keep the other series + print(f"Warning: system series '{series.name}' failed, its columns N/A: {e}") + return rows + + +def collect_bucket_totals( + client, *, project, bucket, prefix, start_epoch, end_epoch, period=60 +) -> list: + """Sum each GCS bucket series over the window into one SystemMetric row each.""" + rows = [] + for series in GCS_BUCKET_SERIES: + try: + request = _build_request( + project, series, bucket, start_epoch, end_epoch, period + ) + total = 0.0 + found = False + for ts in client.list_time_series(request): + for p in ts.points: + total += _point_value(p) + found = True + if found: + rows.append( + schema.SystemMetric( + pod_name=bucket, + metric=f"{prefix}_{series.name}", + peak=total, + mean=None, + ) + ) + except Exception as e: # best-effort: keep the other series + print( + f"Warning: bucket series '{prefix}_{series.name}' failed, " + f"its column N/A: {e}" + ) + return rows + + +def assemble_rows( + client, + storage_client, + *, + project, + run_id, + checkpoint_bucket, + dataset_bucket, + restore_rows, + start_epoch, + end_epoch, + period=60, +) -> list: + """Pod gauges + per-bucket totals + du sizes, as one row list.""" + rows = collect( + client, + project=project, + run_id=run_id, + start_epoch=start_epoch, + end_epoch=end_epoch, + period=period, + ) + if checkpoint_bucket: + rows += collect_bucket_totals( + client, + project=project, + bucket=checkpoint_bucket, + prefix="checkpoint", + start_epoch=start_epoch, + end_epoch=end_epoch, + period=period, + ) + if dataset_bucket: + rows += collect_bucket_totals( + client, + project=project, + bucket=dataset_bucket, + prefix="dataset", + start_epoch=start_epoch, + end_epoch=end_epoch, + period=period, + ) + if storage_client is not None: + from metrics import sizes + + # Best-effort; must not discard the rows already collected above. + try: + loc = sizes.restored_checkpoint_location(restore_rows or []) + rows += sizes.size_rows( + storage_client, dataset_bucket=dataset_bucket, restored_location=loc ) + except Exception as e: + print(f"Warning: du sizes failed, size columns N/A: {e}") return rows @@ -125,6 +261,8 @@ def main(argv=None) -> None: parser.add_argument("--start-time", required=True, help="RFC3339") parser.add_argument("--end-time", required=True, help="RFC3339") parser.add_argument("--out-dir", required=True) + parser.add_argument("--checkpoint-bucket") + parser.add_argument("--dataset-bucket") parser.add_argument("--period", type=int, default=60) args = parser.parse_args(argv) @@ -137,10 +275,26 @@ def main(argv=None) -> None: try: client = monitoring_v3.MetricServiceClient() - rows = collect( + storage_client = None + try: + from google.cloud import storage + + storage_client = storage.Client(project=args.project) + except Exception as e: # sizes become N/A, run continues + print(f"Warning: storage client unavailable, size columns N/A: {e}") + restore_rows = [] + try: + restore_rows = raw_store.read_raw_metrics(args.out_dir).restore_rows + except Exception as e: + print(f"Warning: could not read restore rows for checkpoint du: {e}") + rows = assemble_rows( client, + storage_client, project=args.project, run_id=args.run_id, + checkpoint_bucket=args.checkpoint_bucket, + dataset_bucket=args.dataset_bucket, + restore_rows=restore_rows, start_epoch=_to_epoch(args.start_time), end_epoch=_to_epoch(args.end_time), period=args.period, diff --git a/cloudbuild/macrobenchmarks/metrics/sizes.py b/cloudbuild/macrobenchmarks/metrics/sizes.py new file mode 100644 index 00000000..86ffcf10 --- /dev/null +++ b/cloudbuild/macrobenchmarks/metrics/sizes.py @@ -0,0 +1,55 @@ +"""GCS ``du`` sizes for the read-amplification denominators (best-effort).""" + +from metrics import schema + + +def gcs_du(storage_client, gs_path: str): + """Sum blob sizes under ``gs_path`` (gs://bucket[/prefix]); None if empty.""" + rest = gs_path[len("gs://") :] if gs_path.startswith("gs://") else gs_path + bucket_name, _, prefix = rest.partition("/") + total = 0 + found = False + for blob in storage_client.list_blobs(bucket_name, prefix=prefix): + total += blob.size or 0 + found = True + return total if found else None + + +def restored_checkpoint_location(restore_rows: list): + """checkpoint_location of the earliest-ending restore (the resume), or None.""" + candidates = [ + r + for r in restore_rows + if r.get("checkpoint_location") and r.get("end_time") is not None + ] + if not candidates: + return None + return min(candidates, key=lambda r: r["end_time"])["checkpoint_location"] + + +def size_rows(storage_client, *, dataset_bucket, restored_location) -> list: + """SystemMetric rows for the dataset-bucket size and restored-checkpoint size.""" + rows = [] + if dataset_bucket: + size = gcs_du(storage_client, f"gs://{dataset_bucket}") + if size is not None: + rows.append( + schema.SystemMetric( + pod_name=dataset_bucket, + metric="dataset_size_bytes", + peak=size, + mean=None, + ) + ) + if restored_location: + size = gcs_du(storage_client, restored_location) + if size is not None: + rows.append( + schema.SystemMetric( + pod_name=restored_location, + metric="checkpoint_restored_bytes", + peak=size, + mean=None, + ) + ) + return rows diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_system.py b/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_system.py index e8def084..866bf6f6 100644 --- a/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_system.py +++ b/cloudbuild/macrobenchmarks/metrics/tests/test_calculate_system.py @@ -33,3 +33,46 @@ def test_missing_series_omits_its_columns(): assert m["cpu_usage_peak_cores"] == 2.0 assert "memory_usage_peak_bytes" not in m assert "network_received_peak_bytes_per_sec" not in m + + +def test_maps_limit_utilization_and_mean_memory(): + rows = [ + {"pod_name": "p0", "metric": "memory", "peak": 4096.0, "mean": 2048.0}, + {"pod_name": "p1", "metric": "memory", "peak": 8192.0, "mean": 1024.0}, + {"pod_name": "p0", "metric": "cpu_limit_utilization", "peak": 0.7, "mean": 0.3}, + {"pod_name": "p1", "metric": "cpu_limit_utilization", "peak": 0.9, "mean": 0.4}, + {"pod_name": "p0", "metric": "memory_limit_utilization", "peak": 0.5, "mean": None}, + ] + m = calculate.calc_system_metrics(rows) + assert m["memory_usage_mean_bytes"] == 2048 # max of per-pod means, int + assert isinstance(m["memory_usage_mean_bytes"], int) + assert m["cpu_limit_utilization_peak"] == 0.9 + assert m["memory_limit_utilization_peak"] == 0.5 + + +def test_checkpoint_and_dataset_amplification_ratios(): + rows = [ + {"pod_name": "ckpt-bkt", "metric": "checkpoint_read_bytes", "peak": 800.0, "mean": None}, + {"pod_name": "gs://ckpt", "metric": "checkpoint_restored_bytes", "peak": 100.0, "mean": None}, + {"pod_name": "ds-bkt", "metric": "dataset_read_bytes", "peak": 3000.0, "mean": None}, + {"pod_name": "ds-bkt", "metric": "dataset_size_bytes", "peak": 1000.0, "mean": None}, + {"pod_name": "ckpt-bkt", "metric": "checkpoint_read_request_count", "peak": 42.0, "mean": None}, + ] + m = calculate.calc_system_metrics(rows) + assert m["checkpoint_read_bytes"] == 800 + assert isinstance(m["checkpoint_read_bytes"], int) + assert m["checkpoint_restored_bytes"] == 100 + assert m["checkpoint_read_amplification_ratio"] == 8.0 + assert m["dataset_read_amplification_ratio"] == 3.0 + assert m["checkpoint_read_request_count"] == 42 + + +def test_amplification_ratio_omitted_when_denominator_missing_or_zero(): + rows = [ + {"pod_name": "b", "metric": "checkpoint_read_bytes", "peak": 500.0, "mean": None}, + {"pod_name": "b", "metric": "dataset_read_bytes", "peak": 500.0, "mean": None}, + {"pod_name": "b", "metric": "dataset_size_bytes", "peak": 0.0, "mean": None}, + ] + m = calculate.calc_system_metrics(rows) + assert "checkpoint_read_amplification_ratio" not in m # no restored_bytes + assert "dataset_read_amplification_ratio" not in m # zero denominator diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_monitoring.py b/cloudbuild/macrobenchmarks/metrics/tests/test_monitoring.py index 0040c7f0..52078751 100644 --- a/cloudbuild/macrobenchmarks/metrics/tests/test_monitoring.py +++ b/cloudbuild/macrobenchmarks/metrics/tests/test_monitoring.py @@ -15,7 +15,13 @@ def _series(pod_name, values): class _FakeClient: - """Returns a canned series list per metric.type in the request filter.""" + """Returns a canned series list keyed by the full metric.type in the filter. + + Matches the exact ``metric.type = ""`` token rather than a loose + substring so a pod series (kubernetes.io/pod/network/sent_bytes_count) does + not accidentally satisfy a bucket series query + (storage.googleapis.com/network/sent_bytes_count) or vice-versa. + """ def __init__(self, by_metric_type): self.by_metric_type = by_metric_type @@ -24,7 +30,7 @@ def __init__(self, by_metric_type): def list_time_series(self, request): self.requests.append(request) for metric_type, series in self.by_metric_type.items(): - if metric_type in request["filter"]: + if f'metric.type = "{metric_type}"' in request["filter"]: return iter(series) return iter(()) @@ -45,7 +51,7 @@ def test_point_value_reads_int64_when_double_zero(): def test_build_request_shape(): # Verify request shape matches MetricServiceClient expectations. cpu = monitoring.SERIES[0] - req = monitoring._build_request("proj", "run", cpu, 100, 700, 60) + req = monitoring._build_request("proj", cpu, "run", 100, 700, 60) assert req["name"] == "projects/proj" assert ( 'metric.type = "kubernetes.io/container/cpu/core_usage_time"' in req["filter"] @@ -58,6 +64,59 @@ def test_build_request_shape(): assert req["aggregation"]["per_series_aligner"] == "ALIGN_RATE" +def test_build_request_bucket_filter_and_method(): + req_count = [s for s in monitoring.GCS_BUCKET_SERIES if s.method][0] + req = monitoring._build_request("proj", req_count, "my-bucket", 0, 600, 60) + assert 'resource.type = "gcs_bucket"' in req["filter"] + assert 'resource.labels.bucket_name = "my-bucket"' in req["filter"] + assert f'metric.labels.method = "{req_count.method}"' in req["filter"] + assert req["aggregation"]["per_series_aligner"] == "ALIGN_DELTA" + + +def _bucket_series(values): + # A gcs_bucket series carries no pod_name label; only points matter here. + return SimpleNamespace( + resource=SimpleNamespace(labels={}), points=[_point(v) for v in values] + ) + + +def test_collect_bucket_totals_sums_all_points(): + client = _FakeClient( + { + "storage.googleapis.com/network/sent_bytes_count": [ + _bucket_series([100.0, 200.0]), + _bucket_series([50.0]), + ], + "storage.googleapis.com/api/request_count": [_bucket_series([3.0, 4.0])], + } + ) + rows = monitoring.collect_bucket_totals( + client, + project="proj", + bucket="ckpt", + prefix="checkpoint", + start_epoch=0, + end_epoch=600, + ) + by_metric = {r.metric: r for r in rows} + assert by_metric["checkpoint_read_bytes"].peak == 350.0 + assert by_metric["checkpoint_read_bytes"].pod_name == "ckpt" + assert by_metric["checkpoint_read_request_count"].peak == 7.0 + + +def test_collect_bucket_totals_omits_empty_series(): + client = _FakeClient({}) # nothing returned + rows = monitoring.collect_bucket_totals( + client, + project="proj", + bucket="ds", + prefix="dataset", + start_epoch=0, + end_epoch=600, + ) + assert rows == [] + + def test_to_epoch_handles_zulu(): assert monitoring._to_epoch("1970-01-01T00:01:00Z") == 60 @@ -66,10 +125,15 @@ def test_to_epoch_handles_zulu(): def test_collect_emits_one_row_per_pod_and_series(): client = _FakeClient( { - "core_usage_time": [_series("p0", [1.0, 5.0]), _series("p1", [2.0, 2.0])], - "memory/used_bytes": [_series("p0", [1024.0])], - "network/received_bytes_count": [_series("p0", [10.0, 20.0])], - "network/sent_bytes_count": [_series("p0", [4.0, 6.0])], + "kubernetes.io/container/cpu/core_usage_time": [ + _series("p0", [1.0, 5.0]), + _series("p1", [2.0, 2.0]), + ], + "kubernetes.io/container/memory/used_bytes": [_series("p0", [1024.0])], + "kubernetes.io/pod/network/received_bytes_count": [ + _series("p0", [10.0, 20.0]) + ], + "kubernetes.io/pod/network/sent_bytes_count": [_series("p0", [4.0, 6.0])], } ) rows = monitoring.collect( @@ -89,7 +153,9 @@ def test_collect_emits_one_row_per_pod_and_series(): def test_collect_writes_via_raw_store(tmp_path): from metrics import raw_store - client = _FakeClient({"core_usage_time": [_series("p0", [2.0, 4.0])]}) + client = _FakeClient( + {"kubernetes.io/container/cpu/core_usage_time": [_series("p0", [2.0, 4.0])]} + ) rows = monitoring.collect( client, project="proj", run_id="run", start_epoch=0, end_epoch=600 ) @@ -97,3 +163,135 @@ def test_collect_writes_via_raw_store(tmp_path): tables = raw_store.read_raw_metrics(str(tmp_path)) cpu = [r for r in tables.system_rows if r["metric"] == "cpu"] assert cpu and cpu[0]["peak"] == 4.0 and cpu[0]["mean"] == 3.0 + + +def test_assemble_rows_combines_pod_bucket_and_sizes(): + client = _FakeClient( + { + "kubernetes.io/container/cpu/core_usage_time": [ + _series("run-workload-0-a", [2.0, 4.0]) + ], + "storage.googleapis.com/network/sent_bytes_count": [ + _bucket_series([10.0, 20.0]) + ], + "storage.googleapis.com/api/request_count": [_bucket_series([5.0])], + } + ) + + class _FakeStorage: + def list_blobs(self, bucket_name, prefix=""): + data = {"ds": [("f", 1000)], "ckpt": [("checkpoints/s/x", 400)]} + for name, size in data.get(bucket_name, []): + if name.startswith(prefix): + yield SimpleNamespace(size=size) + + restore_rows = [{"checkpoint_location": "gs://ckpt/checkpoints/s", "end_time": 1.0}] + rows = monitoring.assemble_rows( + client, + _FakeStorage(), + project="p", + run_id="run", + checkpoint_bucket="ckpt", + dataset_bucket="ds", + restore_rows=restore_rows, + start_epoch=0, + end_epoch=600, + ) + metrics = {r.metric for r in rows} + assert "cpu" in metrics + assert "checkpoint_read_bytes" in metrics + assert "dataset_read_bytes" in metrics + assert "dataset_size_bytes" in metrics + assert "checkpoint_restored_bytes" in metrics + + +def test_assemble_rows_without_storage_client_skips_sizes(): + client = _FakeClient( + {"kubernetes.io/container/cpu/core_usage_time": [_series("run-workload-0-a", [1.0])]} + ) + rows = monitoring.assemble_rows( + client, + None, + project="p", + run_id="run", + checkpoint_bucket=None, + dataset_bucket=None, + restore_rows=[], + start_epoch=0, + end_epoch=600, + ) + assert {r.metric for r in rows} == {"cpu"} + + +class _RaisingClient: + """Raises for one metric.type, returns a canned series for another.""" + + def __init__(self, raise_on, return_for, series): + self.raise_on = raise_on + self.return_for = return_for + self.series = series + + def list_time_series(self, request): + if f'metric.type = "{self.raise_on}"' in request["filter"]: + raise RuntimeError("boom") + if f'metric.type = "{self.return_for}"' in request["filter"]: + return iter(self.series) + return iter(()) + + +def test_collect_isolates_a_failing_series(): + # A failure querying one metric type drops only that series, not the rest. + client = _RaisingClient( + raise_on="kubernetes.io/container/cpu/core_usage_time", + return_for="kubernetes.io/container/memory/used_bytes", + series=[_series("p0", [1024.0])], + ) + rows = monitoring.collect( + client, project="proj", run_id="run", start_epoch=0, end_epoch=600 + ) + metrics = {r.metric for r in rows} + assert "memory" in metrics + assert "cpu" not in metrics + + +def test_collect_bucket_totals_isolates_a_failing_series(): + client = _RaisingClient( + raise_on="storage.googleapis.com/api/request_count", + return_for="storage.googleapis.com/network/sent_bytes_count", + series=[_bucket_series([10.0, 20.0])], + ) + rows = monitoring.collect_bucket_totals( + client, project="proj", bucket="ckpt", prefix="checkpoint", + start_epoch=0, end_epoch=600, + ) + by_metric = {r.metric: r for r in rows} + assert by_metric["checkpoint_read_bytes"].peak == 30.0 + assert "checkpoint_read_request_count" not in by_metric + + +def test_assemble_rows_isolates_du_failure(): + # A du (list_blobs) failure must not discard the pod metrics already collected. + client = _FakeClient( + { + "kubernetes.io/container/cpu/core_usage_time": [ + _series("run-workload-0-a", [1.0, 2.0]) + ] + } + ) + + class _BoomStorage: + def list_blobs(self, *args, **kwargs): + raise RuntimeError("denied") + + rows = monitoring.assemble_rows( + client, + _BoomStorage(), + project="p", + run_id="run", + checkpoint_bucket=None, + dataset_bucket="ds", + restore_rows=[], + start_epoch=0, + end_epoch=600, + ) + assert {r.metric for r in rows} == {"cpu"} diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_schema_new_columns.py b/cloudbuild/macrobenchmarks/metrics/tests/test_schema_new_columns.py deleted file mode 100644 index d826513d..00000000 --- a/cloudbuild/macrobenchmarks/metrics/tests/test_schema_new_columns.py +++ /dev/null @@ -1,27 +0,0 @@ -from metrics import summary_schema - -NEW_COLUMNS = [ - ("cpu_usage_peak_cores", "FLOAT"), - ("cpu_usage_mean_cores", "FLOAT"), - ("memory_usage_peak_bytes", "INTEGER"), - ("network_received_peak_bytes_per_sec", "FLOAT"), - ("network_received_mean_bytes_per_sec", "FLOAT"), - ("network_sent_peak_bytes_per_sec", "FLOAT"), - ("network_sent_mean_bytes_per_sec", "FLOAT"), -] - - -def test_new_columns_present_and_typed(): - fields = { - f["name"]: f["type"] - for f in summary_schema.external_table_definition()["schema"]["fields"] - } - for name, bq_type in NEW_COLUMNS: - assert name in fields, f"{name} missing from schema JSON" - assert fields[name] == bq_type, f"{name} should be {bq_type}" - - -def test_new_columns_are_last_and_in_order(): - names = summary_schema.fieldnames() - tail = names[-len(NEW_COLUMNS) :] - assert tail == [name for name, _ in NEW_COLUMNS] diff --git a/cloudbuild/macrobenchmarks/metrics/tests/test_sizes.py b/cloudbuild/macrobenchmarks/metrics/tests/test_sizes.py new file mode 100644 index 00000000..5bfd2400 --- /dev/null +++ b/cloudbuild/macrobenchmarks/metrics/tests/test_sizes.py @@ -0,0 +1,59 @@ +from types import SimpleNamespace + +from metrics import sizes + + +class _FakeStorage: + """list_blobs(bucket, prefix) over a canned {bucket: [(name, size), ...]}.""" + + def __init__(self, by_bucket): + self.by_bucket = by_bucket + + def list_blobs(self, bucket_name, prefix=""): + for name, size in self.by_bucket.get(bucket_name, []): + if name.startswith(prefix): + yield SimpleNamespace(size=size) + + +def test_gcs_du_sums_under_prefix(): + client = _FakeStorage( + {"b": [("checkpoints/a", 100), ("checkpoints/c", 50), ("other", 7)]} + ) + assert sizes.gcs_du(client, "gs://b/checkpoints") == 150 + assert sizes.gcs_du(client, "gs://b") == 157 + + +def test_gcs_du_none_when_no_match(): + client = _FakeStorage({"b": []}) + assert sizes.gcs_du(client, "gs://b/missing") is None + + +def test_restored_checkpoint_location_picks_earliest_end(): + rows = [ + {"checkpoint_location": "gs://c/step200", "end_time": 200.0}, + {"checkpoint_location": "gs://c/step100", "end_time": 100.0}, + {"checkpoint_location": None, "end_time": 5.0}, + ] + assert sizes.restored_checkpoint_location(rows) == "gs://c/step100" + assert sizes.restored_checkpoint_location([]) is None + + +def test_size_rows_emits_dataset_and_checkpoint(): + client = _FakeStorage( + { + "ds": [("train/0.parquet", 1000)], + "ckpt": [("checkpoints/step100/shard0", 400)], + } + ) + rows = sizes.size_rows( + client, dataset_bucket="ds", restored_location="gs://ckpt/checkpoints/step100" + ) + by_metric = {r.metric: r for r in rows} + assert by_metric["dataset_size_bytes"].peak == 1000 + assert by_metric["checkpoint_restored_bytes"].peak == 400 + + +def test_size_rows_skips_missing_inputs(): + client = _FakeStorage({"ds": []}) # dataset du -> None + rows = sizes.size_rows(client, dataset_bucket="ds", restored_location=None) + assert rows == [] diff --git a/cloudbuild/macrobenchmarks/scripts/create_buckets.sh b/cloudbuild/macrobenchmarks/scripts/create_buckets.sh index 7ff0550d..9df73094 100755 --- a/cloudbuild/macrobenchmarks/scripts/create_buckets.sh +++ b/cloudbuild/macrobenchmarks/scripts/create_buckets.sh @@ -7,13 +7,15 @@ source "$(dirname "$0")/lib.sh" trap 'record_failure create-buckets' ERR skip_if_failed source "${BUILD_VARS_FILE}" -if [[ "${_BUCKET_TYPE}" == "regional" ]]; then - gcloud storage buckets create gs://$CHECKPOINT_BUCKET --project=${PROJECT_ID} --location=$REGION -elif [[ "${_BUCKET_TYPE}" == "zonal" ]]; then - gcloud storage buckets create gs://$CHECKPOINT_BUCKET --project=${PROJECT_ID} --location=$REGION --placement=${_ZONE} --default-storage-class=RAPID --enable-hierarchical-namespace --uniform-bucket-level-access -elif [[ "${_BUCKET_TYPE}" == "hns" ]]; then - gcloud storage buckets create gs://$CHECKPOINT_BUCKET --project=${PROJECT_ID} --location=$REGION --enable-hierarchical-namespace --uniform-bucket-level-access -fi +create_typed_bucket "$CHECKPOINT_BUCKET" + +# Per-run dataset bucket (same config as CHECKPOINT_BUCKET), populated by an +# in-region copy, so its egress is attributable to one run for the dataset +# read-amplification metric. +create_typed_bucket "$DATASET_BUCKET" +SRC_OBJECT_PATH=$(echo "${_DATASET_PATH}" | sed -E 's#^gs://[^/]+/?##') +gcloud storage rsync --recursive "${_DATASET_PATH}" "gs://${DATASET_BUCKET}/${SRC_OBJECT_PATH}" +echo "export RUN_DATASET_PATH=gs://${DATASET_BUCKET}/${SRC_OBJECT_PATH}" >> "${BUILD_VARS_FILE}" if gcloud storage buckets describe gs://$RESULTS_BUCKET --project=${PROJECT_ID} >/dev/null 2>&1; then # Reuse only if co-located with this build's LOCATION. The ingestion pipeline # builds the BigQuery dataset (and external table) in LOCATION; a results diff --git a/cloudbuild/macrobenchmarks/scripts/delete_buckets.sh b/cloudbuild/macrobenchmarks/scripts/delete_buckets.sh index e328b2c6..8a3938be 100755 --- a/cloudbuild/macrobenchmarks/scripts/delete_buckets.sh +++ b/cloudbuild/macrobenchmarks/scripts/delete_buckets.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -# delete-buckets: delete the per-run checkpoint bucket (best-effort). The shared -# results bucket is intentionally left in place. +# delete-buckets: delete the per-run checkpoint and dataset buckets (best-effort). +# The shared results bucket is intentionally left in place. if [[ "${_SKIP_CLEANUP}" == "true" ]]; then echo "Skipping delete-buckets as requested." exit 0 @@ -8,3 +8,4 @@ fi source "$(dirname "$0")/lib.sh" source "${BUILD_VARS_FILE}" gcloud storage rm --recursive --project="${PROJECT_ID}" gs://$CHECKPOINT_BUCKET || true +gcloud storage rm --recursive --project="${PROJECT_ID}" gs://$DATASET_BUCKET || true diff --git a/cloudbuild/macrobenchmarks/scripts/init_variables.sh b/cloudbuild/macrobenchmarks/scripts/init_variables.sh index 87e0028c..509b9003 100755 --- a/cloudbuild/macrobenchmarks/scripts/init_variables.sh +++ b/cloudbuild/macrobenchmarks/scripts/init_variables.sh @@ -105,5 +105,6 @@ echo "export CLUSTER_NAME=${_INFRA_PREFIX}-gke-${SHORT_BUILD_ID}" >> "${BUILD_VA echo "export NETWORK_NAME=${_INFRA_PREFIX}-net-${SHORT_BUILD_ID}" >> "${BUILD_VARS_FILE}" echo "export SUBNET_NAME=${_INFRA_PREFIX}-subnet-${SHORT_BUILD_ID}" >> "${BUILD_VARS_FILE}" echo "export CHECKPOINT_BUCKET=${_INFRA_PREFIX}-macrobench-checkpoint-${SHORT_BUILD_ID}" >> "${BUILD_VARS_FILE}" +echo "export DATASET_BUCKET=${_INFRA_PREFIX}-macrobench-dataset-${SHORT_BUILD_ID}" >> "${BUILD_VARS_FILE}" echo "export RESULTS_BUCKET=${_INFRA_PREFIX}-macrobench-results" >> "${BUILD_VARS_FILE}" echo "export REGION=${REGION}" >> "${BUILD_VARS_FILE}" diff --git a/cloudbuild/macrobenchmarks/scripts/lib.sh b/cloudbuild/macrobenchmarks/scripts/lib.sh index d2138b08..880d661a 100755 --- a/cloudbuild/macrobenchmarks/scripts/lib.sh +++ b/cloudbuild/macrobenchmarks/scripts/lib.sh @@ -30,9 +30,25 @@ skip_if_failed() { fi } +# Create a per-run bucket per _BUCKET_TYPE (regional | zonal-RAPID | hns). +create_typed_bucket() { + local bucket="$1" + case "${_BUCKET_TYPE}" in + regional) + gcloud storage buckets create "gs://$bucket" --project="${PROJECT_ID}" --location="$REGION" ;; + zonal) + gcloud storage buckets create "gs://$bucket" --project="${PROJECT_ID}" --location="$REGION" --placement="${_ZONE}" --default-storage-class=RAPID --enable-hierarchical-namespace --uniform-bucket-level-access ;; + hns) + gcloud storage buckets create "gs://$bucket" --project="${PROJECT_ID}" --location="$REGION" --enable-hierarchical-namespace --uniform-bucket-level-access ;; + *) + echo "ERROR: unknown _BUCKET_TYPE='${_BUCKET_TYPE}' (expected regional|zonal|hns)" >&2 + return 1 ;; + esac +} + shared_workload_helm_args() { SHARED_HELM_ARGS=( - --set gcsfs.datasetPath="${_DATASET_PATH}" + --set gcsfs.datasetPath="${RUN_DATASET_PATH:-${_DATASET_PATH}}" --set workload.modelId="${_MODEL_ID}" --set-string workload.image="${_IMAGE}" --set workload.hfToken="${_HF_TOKEN}" diff --git a/cloudbuild/macrobenchmarks/scripts/scrape_metrics.sh b/cloudbuild/macrobenchmarks/scripts/scrape_metrics.sh index 62c818ec..7cefde68 100755 --- a/cloudbuild/macrobenchmarks/scripts/scrape_metrics.sh +++ b/cloudbuild/macrobenchmarks/scripts/scrape_metrics.sh @@ -29,15 +29,36 @@ if [ -n "${_CHECKPOINT_LOAD_PATH}" ] || [ "${_SEED_CHECKPOINT}" = "true" ]; then MIN_RESTORE_DATAPOINTS=1 RESUME_ARGS=(--resume-run) fi +# Run the calculator over the current raw metrics into the summary file $1. +run_calculate() { + python3 -m metrics.calculate \ + --run-id "$RUN_ID" --workload-name "${_WORKLOAD}" \ + --requirements "${_REQUIREMENTS}" --in-dir "$RAW_DIR" --out-file "$1" \ + --expected-steps "${_STEPS}" \ + --min-write-datapoints "$MIN_WRITE_DATAPOINTS" \ + --min-restore-datapoints "$MIN_RESTORE_DATAPOINTS" \ + "${RESUME_ARGS[@]}" \ + --require-data-loading-metrics \ + --bucket-type "${_BUCKET_TYPE}" --zone "${_ZONE}" --region "$REGION" \ + --machine-type "${_MACHINE_TYPE}" \ + --nodes "${_NODES}" --ranks-per-node "${_RANKS_PER_NODE}" \ + --steps "${_STEPS}" --checkpoint-interval "${_CHECKPOINT_INTERVAL}" \ + --checkpoints-to-keep "${_CKPT_TO_KEEP}" \ + --dataset-path "${_DATASET_PATH}" --model-id "${_MODEL_ID}" \ + --image "${_IMAGE}" \ + --training-strategy "${_TRAINING_STRATEGY}" \ + --simulated-step-compute-seconds "${_SIMULATED_STEP_COMPUTE_SECONDS}" \ + --per-device-batch "${_PER_DEVICE_BATCH}" --grad-accum "${_GRAD_ACCUM}" \ + --dataloader-workers "${_DATALOADER_WORKERS}" +} # Cloud Logging ingestion lags pod termination by seconds-to-minutes, and the # last logs emitted (the final checkpoint write and the profiler summary that # carries the data-loading metric) are the most likely to still be in flight # when the JobSet reports Completed. Settle once, then re-scrape at a fixed 60s # interval until the required metrics validate (or attempts are exhausted). -# calculate -# exits non-zero when metrics are incomplete; running it as an `if` condition -# keeps `set -e`/the ERR trap from aborting the step on a not-yet-complete -# attempt. +# run_calculate exits non-zero when metrics are incomplete; running it as an +# `if` condition keeps `set -e`/the ERR trap from aborting the step on a +# not-yet-complete attempt. sleep 60 SCRAPE_OK=false for attempt in $(seq 1 5); do @@ -55,30 +76,7 @@ for attempt in $(seq 1 5); do sleep 60 continue fi - # Best-effort system metrics. Failure is ignored. - python3 -m metrics.monitoring \ - --project "${PROJECT_ID}" --run-id "$RUN_ID" \ - --start-time "$START_TIME" --end-time "$END_TIME" \ - --out-dir "$RAW_DIR" || true - if python3 -m metrics.calculate \ - --run-id "$RUN_ID" --workload-name "${_WORKLOAD}" \ - --requirements "${_REQUIREMENTS}" --in-dir "$RAW_DIR" --out-file "$SUMMARY" \ - --expected-steps "${_STEPS}" \ - --min-write-datapoints "$MIN_WRITE_DATAPOINTS" \ - --min-restore-datapoints "$MIN_RESTORE_DATAPOINTS" \ - "${RESUME_ARGS[@]}" \ - --require-data-loading-metrics \ - --bucket-type "${_BUCKET_TYPE}" --zone "${_ZONE}" --region "$REGION" \ - --machine-type "${_MACHINE_TYPE}" \ - --nodes "${_NODES}" --ranks-per-node "${_RANKS_PER_NODE}" \ - --steps "${_STEPS}" --checkpoint-interval "${_CHECKPOINT_INTERVAL}" \ - --checkpoints-to-keep "${_CKPT_TO_KEEP}" \ - --dataset-path "${_DATASET_PATH}" --model-id "${_MODEL_ID}" \ - --image "${_IMAGE}" \ - --training-strategy "${_TRAINING_STRATEGY}" \ - --simulated-step-compute-seconds "${_SIMULATED_STEP_COMPUTE_SECONDS}" \ - --per-device-batch "${_PER_DEVICE_BATCH}" --grad-accum "${_GRAD_ACCUM}" \ - --dataloader-workers "${_DATALOADER_WORKERS}"; then + if run_calculate "$SUMMARY"; then SCRAPE_OK=true break fi @@ -93,4 +91,16 @@ if [ "$SCRAPE_OK" != "true" ]; then record_failure scrape-metrics exit 1 fi +# Best-effort system metrics: fetched once after the required metrics validate +# (not per attempt, to avoid re-du'ing the dataset bucket). Settle for GCS +# metric lag, then fold into the summary; a failure here must not lose the +# metrics-complete summary already written above, so it's `|| true`/warn-only. +sleep "${SYSTEM_METRICS_SETTLE_SECONDS:-180}" +python3 -m metrics.monitoring \ + --project "${PROJECT_ID}" --run-id "$RUN_ID" \ + --start-time "$START_TIME" --end-time "$END_TIME" \ + --checkpoint-bucket "$CHECKPOINT_BUCKET" --dataset-bucket "$DATASET_BUCKET" \ + --out-dir "$RAW_DIR" || true +run_calculate "$SUMMARY" \ + || echo "Warning: recompute with system metrics failed; uploading summary without them." gcloud storage cp "$SUMMARY" "gs://$RESULTS_BUCKET/branch=$BRANCH_NAME/$DATE_DIR/$RUN_ID/${TS_DIR}.csv"