Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cloudbuild/macrobenchmarks/macrobenchmarks_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@
{"name": "checkpoint_delete_time_p100", "type": "FLOAT", "description": "Maximum (p100) checkpoint-delete duration in seconds."},
{"name": "num_checkpoint_delete_datapoints", "type": "INTEGER", "description": "Count of distinct checkpoint-delete datapoints."},
{"name": "accelerator_blocked_time", "type": "FLOAT", "description": "Seconds the accelerator was blocked waiting on data loading (bottleneck rank, run-wide)."},
{"name": "accelerator_blocked_percent", "type": "FLOAT", "description": "Percent of wall-clock time the accelerator was blocked on data loading (bottleneck rank, run-wide)."}
{"name": "accelerator_blocked_percent", "type": "FLOAT", "description": "Percent of wall-clock time the accelerator was blocked on data loading (bottleneck rank, run-wide)."},
{"name": "cpu_usage_peak_cores", "type": "FLOAT", "description": "Peak CPU usage in cores over the run window; max across worker pods (bottleneck pod)."},
{"name": "cpu_usage_mean_cores", "type": "FLOAT", "description": "Mean CPU usage in cores over the run window; max of per-pod means across worker pods (bottleneck pod)."},
{"name": "memory_usage_peak_bytes", "type": "INTEGER", "description": "Peak container memory usage in bytes over the run window; max across worker pods (bottleneck pod)."},
{"name": "network_received_peak_bytes_per_sec", "type": "FLOAT", "description": "Peak pod network receive rate in bytes/s over the run window; max across worker pods (bottleneck pod)."},
{"name": "network_received_mean_bytes_per_sec", "type": "FLOAT", "description": "Mean pod network receive rate in bytes/s over the run window; max of per-pod means across worker pods (bottleneck pod)."},
{"name": "network_sent_peak_bytes_per_sec", "type": "FLOAT", "description": "Peak pod network send rate in bytes/s over the run window; max across worker pods (bottleneck pod)."},
{"name": "network_sent_mean_bytes_per_sec", "type": "FLOAT", "description": "Mean pod network send rate in bytes/s over the run window; max of per-pod means across worker pods (bottleneck pod)."}
]
}
}
38 changes: 38 additions & 0 deletions cloudbuild/macrobenchmarks/metrics/calculate.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,40 @@ def calc_data_loading_metrics(dl_rows: list) -> dict:
}


# Maps series to schema columns. Memory has no mean column.
_SYSTEM_SERIES_COLUMNS = {
"cpu": ("cpu_usage_peak_cores", "cpu_usage_mean_cores"),
"memory": ("memory_usage_peak_bytes", None),
"network_received": (
"network_received_peak_bytes_per_sec",
"network_received_mean_bytes_per_sec",
),
"network_sent": (
"network_sent_peak_bytes_per_sec",
"network_sent_mean_bytes_per_sec",
),
}


def calc_system_metrics(system_rows: list) -> dict:
"""Reduce per-pod metrics to the bottleneck pod (max peak/mean)."""
out = {}
by_metric = defaultdict(list)
for r in system_rows:
by_metric[r.get("metric")].append(r)
for series, (peak_col, mean_col) in _SYSTEM_SERIES_COLUMNS.items():
rows = by_metric.get(series, [])
peaks = [r["peak"] for r in rows if r.get("peak") is not None]
if peaks:
val = max(peaks)
out[peak_col] = int(val) if peak_col == "memory_usage_peak_bytes" else val
if mean_col:
means = [r["mean"] for r in rows if r.get("mean") is not None]
if means:
out[mean_col] = max(means)
return out


def build_summary_row(
*,
run_id: str,
Expand All @@ -178,6 +212,7 @@ def build_summary_row(
restore_rows: list,
delete_rows: list,
dl_rows: list,
system_rows: list = None,
dimensions: dict = None,
) -> dict:
row = {
Expand All @@ -192,6 +227,7 @@ def build_summary_row(
row.update(calc_restore_metrics(restore_rows))
row.update(calc_delete_metrics(delete_rows))
row.update(calc_data_loading_metrics(dl_rows))
row.update(calc_system_metrics(system_rows or []))
return row


Expand Down Expand Up @@ -335,6 +371,7 @@ def main(argv=None) -> None:
restore_rows = tables.restore_rows
delete_rows = tables.delete_rows
dl_rows = tables.dl_rows
system_rows = tables.system_rows

validate_required_metrics(
step_rows=step_rows,
Expand Down Expand Up @@ -394,6 +431,7 @@ def main(argv=None) -> None:
restore_rows=restore_rows,
delete_rows=delete_rows,
dl_rows=dl_rows,
system_rows=system_rows,
dimensions=dimensions,
)

Expand Down
158 changes: 158 additions & 0 deletions cloudbuild/macrobenchmarks/metrics/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Fetch per-pod system metrics from Cloud Monitoring.

Usage:
python3 -m metrics.monitoring --project P --run-id R \
--start-time RFC3339 --end-time RFC3339 --out-dir DIR
"""

import argparse
import datetime
import statistics
import traceback
from dataclasses import dataclass

from metrics import raw_store, schema


@dataclass(frozen=True)
class Series:
"""One monitoring series mapped to our internal metric name."""

name: str # internal series name: cpu | memory | network_received | network_sent
metric_type: str # Cloud Monitoring metric.type
resource_type: str # k8s_container | k8s_pod
aligner: str # per-series aligner name


# CPU: cores (RATE), Memory: peak bytes (MAX), Network: bytes/s (RATE).
SERIES = [
Series(
"cpu",
"kubernetes.io/container/cpu/core_usage_time",
"k8s_container",
"ALIGN_RATE",
),
Series(
"memory",
"kubernetes.io/container/memory/used_bytes",
"k8s_container",
"ALIGN_MAX",
),
Series(
"network_received",
"kubernetes.io/pod/network/received_bytes_count",
"k8s_pod",
"ALIGN_RATE",
),
Series(
"network_sent",
"kubernetes.io/pod/network/sent_bytes_count",
"k8s_pod",
"ALIGN_RATE",
),
]


def _to_epoch(rfc3339: str) -> int:
"""Parse RFC3339 to epoch seconds."""
dt = datetime.datetime.fromisoformat(rfc3339.upper().replace("Z", "+00:00"))
return int(dt.timestamp())
Comment thread
zhixiangli marked this conversation as resolved.


def _point_value(point) -> float:
"""Read numeric value from point."""
v = point.value
if getattr(v, "double_value", 0.0):
return float(v.double_value)
return float(getattr(v, "int64_value", 0))


def reduce_points(values: list) -> tuple:
"""Return (peak, mean) of values, or (None, None)."""
if not values:
return None, None
return max(values), statistics.mean(values)


def _build_request(project, run_id, series, start_epoch, end_epoch, period):
"""Build list_time_series request."""
filter_ = (
f'metric.type = "{series.metric_type}" '
f'AND resource.type = "{series.resource_type}" '
f'AND resource.labels.pod_name = starts_with("{run_id}-workload-0-")'
)
return {
"name": f"projects/{project}",
"filter": filter_,
"interval": {
"start_time": {"seconds": int(start_epoch)},
"end_time": {"seconds": int(end_epoch)},
},
"aggregation": {
"alignment_period": {"seconds": period},
"per_series_aligner": series.aligner,
},
}


def collect(client, *, project, run_id, start_epoch, end_epoch, period=60) -> list:
"""Collect SystemMetric rows for all SERIES."""
rows = []
for series in SERIES:
request = _build_request(
project, run_id, series, start_epoch, end_epoch, period
)
for ts in client.list_time_series(request):
pod_name = ts.resource.labels.get("pod_name", "")
values = [_point_value(p) for p in ts.points]
peak, mean = reduce_points(values)
if peak is None:
continue
rows.append(
schema.SystemMetric(
pod_name=pod_name, metric=series.name, peak=peak, mean=mean
)
)
return rows


def main(argv=None) -> None:
parser = argparse.ArgumentParser(
description="Fetch per-pod system metrics from Cloud Monitoring."
)
parser.add_argument("--project", required=True)
parser.add_argument("--run-id", required=True)
parser.add_argument("--start-time", required=True, help="RFC3339")
parser.add_argument("--end-time", required=True, help="RFC3339")
parser.add_argument("--out-dir", required=True)
parser.add_argument("--period", type=int, default=60)
args = parser.parse_args(argv)

# Import here to handle missing library case separately.
try:
from google.cloud import monitoring_v3
except ImportError as e:
print(f"Warning: google-cloud-monitoring unavailable, columns will be N/A: {e}")
return

try:
client = monitoring_v3.MetricServiceClient()
rows = collect(
client,
project=args.project,
run_id=args.run_id,
start_epoch=_to_epoch(args.start_time),
end_epoch=_to_epoch(args.end_time),
period=args.period,
)
raw_store.write_system_metrics(rows, args.out_dir)
print(f"Wrote {len(rows)} system-metric rows to {args.out_dir}")
except Exception as e: # best-effort
print(
f"Warning: system-metrics fetch failed, columns will be N/A: {e}\n"
f"{traceback.format_exc()}"
)


if __name__ == "__main__":
main()
18 changes: 18 additions & 0 deletions cloudbuild/macrobenchmarks/metrics/raw_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class RawMetricTables:
restore_rows: List[dict] = field(default_factory=list)
delete_rows: List[dict] = field(default_factory=list)
dl_rows: List[dict] = field(default_factory=list)
system_rows: List[dict] = field(default_factory=list)


def write_raw_metrics(
Expand Down Expand Up @@ -93,6 +94,18 @@ def write_raw_metrics(
)


def write_system_metrics(system_rows, out_dir: str) -> None:
"""Write SystemMetric rows to the system-metrics CSV (owned here like the rest)."""
if system_rows:
_write_csv(
os.path.join(
out_dir, schema.SYSTEM_METRICS_DIRECTORY, schema.SYSTEM_METRICS_FILE
),
schema.SystemMetric,
system_rows,
)


def read_raw_metrics(
in_dir: str, *, run_type: str = "perf_optimization"
) -> RawMetricTables:
Expand Down Expand Up @@ -129,6 +142,11 @@ def read_raw_metrics(
schema.DATA_LOADING_METRICS_FILE,
)
),
system_rows=_read_csv(
os.path.join(
in_dir, schema.SYSTEM_METRICS_DIRECTORY, schema.SYSTEM_METRICS_FILE
)
),
)


Expand Down
1 change: 1 addition & 0 deletions cloudbuild/macrobenchmarks/metrics/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
google-cloud-logging
google-cloud-monitoring
google-cloud-storage
numpy
pytest
10 changes: 10 additions & 0 deletions cloudbuild/macrobenchmarks/metrics/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
PER_ACCELERATOR_DIRECTORY = "per_accelerator"
CALCULATED_METRICS_DIRECTORY = "calculated_metrics"
DATA_LOADING_METRICS_FILE = "data_loading_metrics.csv"
SYSTEM_METRICS_DIRECTORY = "system_metrics"
SYSTEM_METRICS_FILE = "system_metrics.csv"


def fieldnames(dataclass_type) -> list:
Expand Down Expand Up @@ -76,3 +78,11 @@ class DataLoadingMetrics:
accelerator_blocked_time: float = None
accelerator_blocked_percent: float = None
update_timestamp: str = None


@dataclass(kw_only=True)
class SystemMetric:
pod_name: str
metric: str
peak: float
mean: float = None
44 changes: 44 additions & 0 deletions cloudbuild/macrobenchmarks/metrics/tests/test_calculate_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,50 @@ def _write_restore_csv(in_dir):
w.writerow([0, "gs://b/ckpt", 10.0, 18.0, 0, ""])


def _write_system_metrics_csv(in_dir):
(in_dir / "system_metrics").mkdir(parents=True)
path = in_dir / "system_metrics" / "system_metrics.csv"
with open(path, "w", newline="") as f:
w = csv.writer(f)
w.writerow(["pod_name", "metric", "peak", "mean"])
w.writerow(["p0", "cpu", 3.0, 1.0])
w.writerow(["p1", "cpu", 5.0, 4.0])
w.writerow(["p0", "memory", 2048.0, ""])
w.writerow(["p0", "network_received", 10.0, 2.0])


def test_main_emits_system_metric_columns(tmp_path):
# Verify system metrics are reduced to bottleneck pod and typed correctly.
in_dir = tmp_path / "raw"
_write_step_csv(in_dir)
_write_data_loading_csv(in_dir)
_write_system_metrics_csv(in_dir)
out_file = tmp_path / "summary.csv"
calculate.main(
[
"--run-id",
"r",
"--workload-name",
"hf-pytorch-lightning-cpu",
"--requirements",
"gcsfs==1.0",
"--in-dir",
str(in_dir),
"--out-file",
str(out_file),
"--require-data-loading-metrics",
]
)
with open(out_file) as f:
rows = list(csv.DictReader(f))
assert rows[0]["cpu_usage_peak_cores"] == "5.0"
assert rows[0]["cpu_usage_mean_cores"] == "4.0" # max of per-pod means
assert rows[0]["memory_usage_peak_bytes"] == "2048" # int-typed
assert rows[0]["network_received_peak_bytes_per_sec"] == "10.0"
assert rows[0]["network_received_mean_bytes_per_sec"] == "2.0"
assert rows[0]["network_sent_peak_bytes_per_sec"] == "N/A"


def test_main_fails_when_required_data_loading_metrics_are_missing(tmp_path):
# step metrics present, but no data_loading_metrics.csv -> must fail when
# --require-data-loading-metrics is set (the profiler summary is required).
Expand Down
Loading
Loading