diff --git a/openshift_metrics/invoice.py b/openshift_metrics/invoice.py index 8765b01..a071564 100644 --- a/openshift_metrics/invoice.py +++ b/openshift_metrics/invoice.py @@ -1,7 +1,7 @@ import math from dataclasses import dataclass, field from collections import namedtuple -from typing import List, Tuple, Optional +from typing import List, Tuple from decimal import Decimal, ROUND_HALF_UP import datetime @@ -121,7 +121,8 @@ def get_service_unit(self, su_definitions) -> ServiceUnit: return ServiceUnit(su_type, su_count, determining_resource) def get_runtime( - self, ignore_times: List[Tuple[datetime.datetime, datetime.datetime]] = None + self, + ignore_times: List[Tuple[datetime.datetime, datetime.datetime]] | None = None, ) -> Decimal: """Return runtime eligible for billing in hours""" @@ -209,7 +210,9 @@ class ProjectInvoce: project_id: str rates: Rates su_definitions: dict - ignore_hours: Optional[List[Tuple[datetime.datetime, datetime.datetime]]] = None + ignore_hours: List[Tuple[datetime.datetime, datetime.datetime]] = field( + default_factory=list + ) su_hours: dict = field( default_factory=lambda: { SU_CPU: 0, diff --git a/openshift_metrics/merge.py b/openshift_metrics/merge.py index ecd052d..360565e 100644 --- a/openshift_metrics/merge.py +++ b/openshift_metrics/merge.py @@ -106,22 +106,12 @@ def load_metrics_metadata(files: List[str]) -> MetricsMetadata: def load_and_merge_metrics(interval_minutes, files: List[str]) -> MetricsProcessor: - """Load and merge metrics - - Loads metrics from provided json files and then returns a processor - that has all the merged data. - """ + """Load metrics from new-format files (namespaces + segments).""" processor = MetricsProcessor(interval_minutes) for file in files: with open(file, "r") as jsonfile: metrics_from_file = json.load(jsonfile) - cpu_request_metrics = metrics_from_file["cpu_metrics"] - memory_request_metrics = metrics_from_file["memory_metrics"] - gpu_request_metrics = metrics_from_file.get("gpu_metrics", None) - processor.merge_metrics("cpu_request", cpu_request_metrics) - processor.merge_metrics("memory_request", memory_request_metrics) - if gpu_request_metrics is not None: - processor.merge_metrics("gpu_request", gpu_request_metrics) + processor.load_segment_data(metrics_from_file["namespaces"]) logger.info(f"Total metric files read: {len(files)}") return processor @@ -181,18 +171,22 @@ def get_rates_and_outages( logger.info("Using nerc rates for rates and outages") rates_data = rates.load_from_url() invoice_rates = invoice.Rates( - cpu=rates_data.get_value_at("CPU SU Rate", meta.report_month, Decimal), - gpu_a100=rates_data.get_value_at( - "GPUA100 SU Rate", meta.report_month, Decimal + cpu=Decimal( + rates_data.get_value_at("CPU SU Rate", meta.report_month, Decimal) + ), + gpu_a100=Decimal( + rates_data.get_value_at("GPUA100 SU Rate", meta.report_month, Decimal) ), - gpu_a100sxm4=rates_data.get_value_at( - "GPUA100SXM4 SU Rate", meta.report_month, Decimal + gpu_a100sxm4=Decimal( + rates_data.get_value_at( + "GPUA100SXM4 SU Rate", meta.report_month, Decimal + ) ), - gpu_v100=rates_data.get_value_at( - "GPUV100 SU Rate", meta.report_month, Decimal + gpu_v100=Decimal( + rates_data.get_value_at("GPUV100 SU Rate", meta.report_month, Decimal) ), - gpu_h100=rates_data.get_value_at( - "GPUH100 SU Rate", meta.report_month, Decimal + gpu_h100=Decimal( + rates_data.get_value_at("GPUH100 SU Rate", meta.report_month, Decimal) ), ) outage_data = outages.load_from_url() @@ -303,11 +297,8 @@ def main(): f"Generating report from {metrics_metadata.start_time_utc} to {metrics_metadata.end_time_utc + timedelta(days=1)} for {cluster_name}" ) - # load and merge the metrics from the files, followed by condensing the metrics. processor = load_and_merge_metrics(metrics_metadata.interval_minutes, files) - condensed_metrics_dict = processor.condense_metrics( - ["cpu_request", "memory_request", "gpu_request", "gpu_type"] - ) + condensed_metrics_dict = processor.merged_data # gather invoice rates and su defitions. invoice_rates, ignore_hours = get_rates_and_outages(args, metrics_metadata) diff --git a/openshift_metrics/metrics_processor.py b/openshift_metrics/metrics_processor.py index 5b9f14f..a45d377 100644 --- a/openshift_metrics/metrics_processor.py +++ b/openshift_metrics/metrics_processor.py @@ -1,5 +1,5 @@ import json -from typing import List, Dict +from typing import Dict from collections import namedtuple import logging @@ -16,58 +16,15 @@ class MetricsProcessor: def __init__( self, interval_minutes: int = 15, - merged_data: dict = None, + merged_data: dict | None = None, gpu_mapping_file: str = "gpu_node_map.json", ): self.interval_minutes = interval_minutes self.merged_data = merged_data if merged_data is not None else {} self.gpu_mapping = self._load_gpu_mapping(gpu_mapping_file) - def merge_metrics(self, metric_name, metric_list): - """Merge metrics (cpu, memory, gpu) by pod""" - for metric in metric_list: - pod = metric["metric"]["pod"] - namespace = metric["metric"]["namespace"] - node = metric["metric"].get("node") - - self.merged_data.setdefault(namespace, {}) - self.merged_data[namespace].setdefault(pod, {"metrics": {}}) - - if metric_name == "cpu_request": - class_name = metric["metric"].get("label_nerc_mghpcc_org_class") - if class_name is not None: - self.merged_data[namespace][pod]["label_nerc_mghpcc_org_class"] = ( - class_name - ) - - gpu_type, gpu_resource, node_model = self._extract_gpu_info( - metric_name, metric - ) - - for epoch_time, metric_value in metric["values"]: - self.merged_data[namespace][pod]["metrics"].setdefault(epoch_time, {}) - - self.merged_data[namespace][pod]["metrics"][epoch_time][metric_name] = ( - metric_value - ) - if gpu_type: - self.merged_data[namespace][pod]["metrics"][epoch_time][ - "gpu_type" - ] = gpu_type - if gpu_resource: - self.merged_data[namespace][pod]["metrics"][epoch_time][ - "gpu_resource" - ] = gpu_resource - if node_model: - self.merged_data[namespace][pod]["metrics"][epoch_time][ - "node_model" - ] = node_model - if node: - self.merged_data[namespace][pod]["metrics"][epoch_time]["node"] = ( - node - ) - - def _extract_gpu_info(self, metric_name: str, metric: Dict) -> GPUInfo: + @staticmethod + def _extract_gpu_info(metric_name: str, metric: Dict) -> GPUInfo: """Extract GPU related info""" gpu_type = None gpu_resource = None @@ -80,12 +37,6 @@ def _extract_gpu_info(self, metric_name: str, metric: Dict) -> GPUInfo: gpu_resource = metric["metric"].get("resource") node_model = metric["metric"].get("label_nvidia_com_gpu_machine") - # Sometimes GPU labels from the nodes can be missing, in that case - # we get the gpu_type from the gpu-node file - if gpu_type == GPU_UNKNOWN_TYPE: - node_name = metric["metric"].get("node") - gpu_type = self.gpu_mapping.get(node_name, GPU_UNKNOWN_TYPE) - return GPUInfo(gpu_type, gpu_resource, node_model) @staticmethod @@ -97,83 +48,132 @@ def _load_gpu_mapping(file_path: str) -> Dict[str, str]: logger.warning("Could not load gpu-node map file: %s", file_path) return {} - def condense_metrics(self, metrics_to_check: List[str]) -> Dict: - """ - Checks if the value of metrics is the same, and removes redundant - metrics while updating the duration. If there's a gap in the reported - metrics then don't count that as part of duration. - """ - interval = self.interval_minutes * 60 - condensed_dict = {} - - for namespace, pods in self.merged_data.items(): - condensed_dict.setdefault(namespace, {}) - - for pod, pod_dict in pods.items(): - metrics_dict = pod_dict["metrics"] - new_metrics_dict = {} - epoch_times_list = sorted(metrics_dict.keys()) - - start_epoch_time = epoch_times_list[0] - - start_metric_dict = metrics_dict[start_epoch_time].copy() - - for i in range(1, len(epoch_times_list)): - current_time = epoch_times_list[i] - previous_time = epoch_times_list[i - 1] - - metrics_changed = self._are_metrics_different( - metrics_dict[start_epoch_time], - metrics_dict[current_time], - metrics_to_check, - ) - - pod_was_stopped = self._was_pod_stopped( - current_time=current_time, - previous_time=previous_time, - interval=interval, - ) - - if metrics_changed or pod_was_stopped: - duration = previous_time - start_epoch_time + interval - start_metric_dict["duration"] = duration - new_metrics_dict[start_epoch_time] = start_metric_dict - - # Reset start_epoch_time and start_metric_dict - start_epoch_time = current_time - start_metric_dict = metrics_dict[start_epoch_time].copy() - - # Final block after the loop - duration = epoch_times_list[-1] - start_epoch_time + interval - start_metric_dict["duration"] = duration - new_metrics_dict[start_epoch_time] = start_metric_dict - - # Update the pod dict with the condensed data - new_pod_dict = pod_dict.copy() - new_pod_dict["metrics"] = new_metrics_dict - condensed_dict[namespace][pod] = new_pod_dict - - return condensed_dict + @staticmethod + def _was_pod_stopped(current_time: int, previous_time: int, interval: int) -> bool: + """Return True if the gap between samples is larger than the query interval (pod likely stopped).""" + return (current_time - previous_time) > interval @staticmethod - def _are_metrics_different( - metrics_a: Dict, metrics_b: Dict, metrics_to_check: List[str] - ) -> bool: - """Method that compares all the metrics in metrics_to_check are different in - metrics_a and metrics_b - """ - return any( - metrics_a.get(metric, 0) != metrics_b.get(metric, 0) - for metric in metrics_to_check + def _condense_values(values: list, interval: int, key: str = "value") -> list: + """Return list of condensed segments.""" + if not values: + return [] + values = sorted(values, key=lambda x: x[0]) + segments = [] + start_t, start_v = values[0] + prev_t = start_t + for t, v in values[1:]: + if v != start_v or MetricsProcessor._was_pod_stopped(t, prev_t, interval): + segments.append( + { + "start": start_t, + "duration": prev_t - start_t + interval, + key: start_v, + } + ) + start_t, start_v = t, v + prev_t = t + segments.append( + { + "start": start_t, + "duration": values[-1][0] - start_t + interval, + key: start_v, + } ) + return segments @staticmethod - def _was_pod_stopped(current_time: int, previous_time: int, interval: int) -> bool: - """ - A pod is assumed to be stopped if the the gap between two consecutive timestamps - is more than the frequency of our metric collection + def condense_metric_series(metric_list: list, interval: int, key: str) -> list: + """Apply per-series condense to a list of Prometheus metric objects.""" + if not metric_list: + return [] + result = [] + for item in metric_list: + segs = MetricsProcessor._condense_values(item["values"], interval, key) + for seg in segs: + seg.update(item["metric"]) + result.extend(segs) + return result + + ESSENTIAL_LABEL_KEYS = { + "pod", + "namespace", + "node", + "label_nerc_mghpcc_org_class", + "label_nvidia_com_gpu_product", + "resource", + "label_nvidia_com_gpu_machine", + } + + @staticmethod + def build_namespaces_dict( + *segment_lists: list, gpu_mapping: dict | None = None + ) -> dict: + """Group condensed segments (from condense_metric_series) into the + new pod-centric export format expected by load_segment_data(). + Only essential labels are kept. """ - return (current_time - previous_time) > interval + namespaces = {} + essential = MetricsProcessor.ESSENTIAL_LABEL_KEYS + for seg_list in segment_lists: + for seg in seg_list: + ns = seg.get("namespace") + pod = seg.get("pod") + if not ns or not pod: + continue + namespaces.setdefault(ns, {}).setdefault(pod, {"segments": []}) + # keep only essential labels + the metric value keys + clean = { + k: v + for k, v in seg.items() + if k in essential + or k + in ( + "start", + "duration", + "cpu_request", + "memory_request", + "gpu_request", + ) + } + # remove pod/namespace from the segment body (they are keys) + clean.pop("pod", None) + clean.pop("namespace", None) + + # Rename Prometheus label keys to the nice names used by the invoice layer + if "label_nvidia_com_gpu_product" in clean: + clean["gpu_type"] = clean.pop("label_nvidia_com_gpu_product") + elif gpu_mapping is not None and "gpu_request" in clean: + # Fallback: resolve GPU type from the node-name mapping file + node_name = clean.get("node") + if node_name: + clean["gpu_type"] = gpu_mapping.get(node_name, GPU_UNKNOWN_TYPE) + if "label_nvidia_com_gpu_machine" in clean: + clean["node_model"] = clean.pop("label_nvidia_com_gpu_machine") + if "resource" in clean and clean.get("gpu_request") is not None: + clean["gpu_resource"] = clean.pop("resource") + + namespaces[ns][pod]["segments"].append(clean) + return namespaces + + def load_segment_data(self, namespaces: dict): + """Load already-condensed segment data from the export format.""" + for ns, pods in namespaces.items(): + self.merged_data.setdefault(ns, {}) + for pod, pod_data in pods.items(): + self.merged_data[ns].setdefault(pod, {"metrics": {}}) + for seg in pod_data.get("segments", []): + # Promote the class label to pod level so write_metrics_by_classes can find it + if "label_nerc_mghpcc_org_class" in seg: + self.merged_data[ns][pod].setdefault( + "label_nerc_mghpcc_org_class", + seg["label_nerc_mghpcc_org_class"], + ) + start = seg["start"] + # Use update so multiple segments at the same timestamp are merged, + # not overwritten (e.g. separate CPU and memory series share start times) + entry = self.merged_data[ns][pod]["metrics"].setdefault(start, {}) + entry.update({k: v for k, v in seg.items() if k != "start"}) @staticmethod def insert_node_labels(node_labels: list, resource_request_metrics: list) -> list: @@ -214,3 +214,16 @@ def insert_pod_labels(pod_labels: list, resource_request_metrics: list) -> list: "class" ) return resource_request_metrics + + @staticmethod + def strip_to_essential_labels(metric_list: list) -> list: + """Return a new list with only the labels that merge_metrics() reads.""" + if not metric_list: + return metric_list + essential = MetricsProcessor.ESSENTIAL_LABEL_KEYS + stripped = [] + for item in metric_list: + slim_metric = {k: v for k, v in item["metric"].items() if k in essential} + stripped.append({"metric": slim_metric, "values": item["values"]}) + + return stripped diff --git a/openshift_metrics/openshift_prometheus_metrics.py b/openshift_metrics/openshift_prometheus_metrics.py index 512d713..ab3bbb4 100755 --- a/openshift_metrics/openshift_prometheus_metrics.py +++ b/openshift_metrics/openshift_prometheus_metrics.py @@ -94,6 +94,9 @@ def main(): f"Generating report starting {report_start_date} and ending {report_end_date} in {output_file} with interval {PROM_QUERY_INTERVAL_MINUTES} minute" ) + if not OPENSHIFT_TOKEN: + sys.exit("OPENSHIFT_TOKEN environment variable is required") + prom_client = PrometheusClient( openshift_url, OPENSHIFT_TOKEN, PROM_QUERY_INTERVAL_MINUTES ) @@ -106,44 +109,83 @@ def main(): args.openshift_url, args.openshift_url ) - cpu_request_metrics = prom_client.query_metric( - CPU_REQUEST, report_start_date, report_end_date - ) + interval_seconds = PROM_QUERY_INTERVAL_MINUTES * 60 + + cpu_segments = [] + mem_segments = [] + gpu_segments = [] + + # --- CPU request (independent query) --- + cpu_request_metrics = None + try: + cpu_request_metrics = prom_client.query_metric( + CPU_REQUEST, report_start_date, report_end_date + ) + except utils.EmptyResultError: + logger.info( + f"No CPU metrics found for the period {report_start_date} to {report_end_date}" + ) + # Pod labels (independent query, only needed for CPU enrichment) + pod_labels = None try: pod_labels = prom_client.query_metric( KUBE_POD_LABELS, report_start_date, report_end_date ) - metrics_dict["cpu_metrics"] = MetricsProcessor.insert_pod_labels( - pod_labels, cpu_request_metrics + except utils.EmptyResultError: + logger.info("No pod labels found for the period") + + if cpu_request_metrics: + labeled_cpu = MetricsProcessor.insert_pod_labels( + pod_labels or [], cpu_request_metrics + ) + cpu_segments = MetricsProcessor.condense_metric_series( + labeled_cpu, interval_seconds, "cpu_request" + ) + + try: + memory_request_metrics = prom_client.query_metric( + MEMORY_REQUEST, report_start_date, report_end_date + ) + mem_segments = MetricsProcessor.condense_metric_series( + memory_request_metrics, interval_seconds, "memory_request" ) except utils.EmptyResultError: logger.info( - f"No pod labels found for the period {report_start_date} to {report_end_date}" + f"No memory metrics found for the period {report_start_date} to {report_end_date}" ) - metrics_dict["cpu_metrics"] = cpu_request_metrics - memory_request_metrics = prom_client.query_metric( - MEMORY_REQUEST, report_start_date, report_end_date - ) - metrics_dict["memory_metrics"] = memory_request_metrics - - # because if nobody requests a GPU then we will get an empty set + gpu_request_metrics = None try: gpu_request_metrics = prom_client.query_metric( GPU_REQUEST, report_start_date, report_end_date ) + except utils.EmptyResultError: + logger.info( + f"No GPU metrics found for the period {report_start_date} to {report_end_date}" + ) + + node_labels = None + try: node_labels = prom_client.query_metric( KUBE_NODE_LABELS, report_start_date, report_end_date ) - metrics_dict["gpu_metrics"] = MetricsProcessor.insert_node_labels( - node_labels, gpu_request_metrics - ) except utils.EmptyResultError: - logger.info( - f"No GPU metrics found for the period {report_start_date} to {report_end_date}" + logger.info("No node labels found for the period") + + if gpu_request_metrics: + labeled_gpu = MetricsProcessor.insert_node_labels( + node_labels or [], gpu_request_metrics ) - pass + gpu_segments = MetricsProcessor.condense_metric_series( + labeled_gpu, interval_seconds, "gpu_request" + ) + + # Build the new pod-centric format expected by the ingest pipeline + gpu_mapping = MetricsProcessor._load_gpu_mapping("gpu_node_map.json") + metrics_dict["namespaces"] = MetricsProcessor.build_namespaces_dict( + cpu_segments, mem_segments, gpu_segments, gpu_mapping=gpu_mapping + ) month_year = datetime.strptime(report_start_date, "%Y-%m-%d").strftime("%Y-%m") diff --git a/openshift_metrics/tests/conftest.py b/openshift_metrics/tests/conftest.py index 6fa572d..901c23e 100644 --- a/openshift_metrics/tests/conftest.py +++ b/openshift_metrics/tests/conftest.py @@ -4,143 +4,130 @@ @pytest.fixture def mock_metrics_file1(): - cpu_metrics = [ - { - "metric": { - "pod": "pod1", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [0, 10], - [60, 15], - [120, 20], - ], - }, - { - "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [0, 30], - [60, 35], - [120, 40], - ], - }, - ] - memory_metrics = [ - { - "metric": { - "pod": "pod1", - "namespace": "namespace1", - "resource": "memory", - }, - "values": [ - [0, 10], - [60, 15], - [120, 20], - ], - }, - { - "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [0, 30], - [60, 35], - [120, 40], - ], - }, - ] return { "cluster_name": "ocp-prod", "start_date": "2025-09-20", "end_date": "2025-09-20", "interval_minutes": 15, - "cpu_metrics": cpu_metrics, - "memory_metrics": memory_metrics, + "namespaces": { + "namespace1": { + "pod1": { + "segments": [ + { + "start": 0, + "duration": 60, + "cpu_request": 10, + "memory_request": 10, + }, + { + "start": 60, + "duration": 60, + "cpu_request": 15, + "memory_request": 15, + }, + { + "start": 120, + "duration": 60, + "cpu_request": 20, + "memory_request": 20, + }, + ] + }, + "pod2": { + "segments": [ + { + "start": 0, + "duration": 60, + "cpu_request": 30, + "memory_request": 30, + }, + { + "start": 60, + "duration": 60, + "cpu_request": 35, + "memory_request": 35, + }, + { + "start": 120, + "duration": 60, + "cpu_request": 40, + "memory_request": 40, + }, + ] + }, + } + }, } @pytest.fixture def mock_metrics_file2(): - cpu_metrics = [ - { - "metric": { - "pod": "pod1", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [180, 10], - [240, 15], - [300, 20], - ], - }, - { - "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [180, 30], - [240, 35], - [300, 40], - ], - }, - ] - memory_metrics = [ - { - "metric": { - "pod": "pod1", - "namespace": "namespace1", - "resource": "memory", - }, - "values": [ - [180, 10], - [240, 15], - [300, 20], - ], - }, - { - "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [180, 30], - [240, 35], - [300, 40], - ], - }, - ] + # New pod-centric format (already condensed segments) return { "cluster_name": "ocp-prod", "start_date": "2025-09-21", "end_date": "2025-09-21", - "cpu_metrics": cpu_metrics, - "memory_metrics": memory_metrics, "interval_minutes": 15, + "namespaces": { + "namespace1": { + "pod1": { + "segments": [ + { + "start": 180, + "duration": 60, + "cpu_request": 10, + "memory_request": 10, + }, + { + "start": 240, + "duration": 60, + "cpu_request": 15, + "memory_request": 15, + }, + { + "start": 300, + "duration": 60, + "cpu_request": 20, + "memory_request": 20, + }, + ] + }, + "pod2": { + "segments": [ + { + "start": 180, + "duration": 60, + "cpu_request": 30, + "memory_request": 30, + }, + { + "start": 240, + "duration": 60, + "cpu_request": 35, + "memory_request": 35, + }, + { + "start": 300, + "duration": 60, + "cpu_request": 40, + "memory_request": 40, + }, + ] + }, + } + }, } @pytest.fixture def mock_metrics_file3(): - cpu_metrics = [] - memory_metrics = [] + # Empty file in new format return { "cluster_name": "ocp-prod", "start_date": "2025-09-21", "end_date": "2025-09-21", "interval_minutes": 3, # file1 and file2 have 15 minutes - "cpu_metrics": cpu_metrics, - "memory_metrics": memory_metrics, + "namespaces": {}, } diff --git a/openshift_metrics/tests/test_e2e_openshift_prometheus_metrics.py b/openshift_metrics/tests/test_e2e_openshift_prometheus_metrics.py new file mode 100644 index 0000000..1f8d348 --- /dev/null +++ b/openshift_metrics/tests/test_e2e_openshift_prometheus_metrics.py @@ -0,0 +1,268 @@ +"""End-to-end integration test for openshift_prometheus_metrics.py. + +Mocks the Thanos/Prometheus endpoint via PrometheusClient. +Covers the full flow (arg parsing -> independent queries for all metric types +-> MetricsProcessor enrichment/condense/build -> JSON output) with basic +structure/schema assertions. +Includes data covering all SU types (CPU + multiple GPU variants). +Does not duplicate unit-test coverage of fine details in MetricsProcessor. + +Also verifies that the produced JSON can be fed directly into merge.py. +""" + +import json +from decimal import Decimal +from unittest import mock + +import pytest + +from openshift_metrics import openshift_prometheus_metrics as main_script + + +@pytest.fixture +def mock_query_responses(): + """Sample responses covering CPU/Mem + GPU (multiple SU types) + labels.""" + cpu = [ + { + "metric": { + "pod": "cpu-pod", + "namespace": "ai-performance-profiling", + "resource": "cpu", + "node": "wrk-0", + }, + "values": [[1780358400, "2"], [1780359000, "2"]], + } + ] + memory = [ + { + "metric": { + "pod": "mem-pod", + "namespace": "ai-performance-profiling", + "resource": "memory", + "node": "wrk-0", + }, + "values": [[1780358400, "4294967296"], [1780359000, "4294967296"]], + } + ] + # GPU data covers multiple SU types (A100, H100 whole GPU + MIG) + gpu = [ + { + "metric": { + "pod": "gpu-a100-pod", + "namespace": "ai-performance-profiling", + "resource": "nvidia.com/gpu", + "node": "wrk-gpu-a100", + }, + "values": [[1780358400, "1"]], + }, + { + "metric": { + "pod": "gpu-h100-pod", + "namespace": "ai-performance-profiling", + "resource": "nvidia.com/gpu", + "node": "wrk-gpu-h100", + }, + "values": [[1780358400, "1"]], + }, + { + "metric": { + "pod": "mig-pod", + "namespace": "ai-performance-profiling", + "resource": "nvidia.com/mig-1g.5gb", + "node": "wrk-gpu-mig", + }, + "values": [[1780358400, "1"]], + }, + ] + node_labels = [ + { + "metric": { + "node": "wrk-gpu-a100", + "label_nvidia_com_gpu_product": "NVIDIA-A100-SXM4-40GB", + "label_nvidia_com_gpu_machine": "Dell", + }, + "values": [[1780358400, "1"]], + }, + { + "metric": { + "node": "wrk-gpu-h100", + "label_nvidia_com_gpu_product": "NVIDIA-H100-80GB", + "label_nvidia_com_gpu_machine": "Dell", + }, + "values": [[1780358400, "1"]], + }, + ] + pod_labels = [ + { + "metric": { + "pod": "cpu-pod", + "label_nerc_mghpcc_org_class": "cpu", + }, + "values": [[1780358400, "1"]], + } + ] + return { + "cpu_request": cpu, + "memory_request": memory, + "gpu_request": gpu, + "kube_node_labels": node_labels, + "kube_pod_labels": pod_labels, + } + + +@mock.patch( + "openshift_metrics.openshift_prometheus_metrics.OPENSHIFT_TOKEN", "fake-token" +) +@mock.patch( + "openshift_metrics.openshift_prometheus_metrics.PROM_QUERY_INTERVAL_MINUTES", 15 +) +@mock.patch("openshift_metrics.openshift_prometheus_metrics.PrometheusClient") +def test_metrics_end_to_end(mock_client_class, mock_query_responses, tmp_path): + """Full start-to-finish run with mocked Thanos queries covering all SU types.""" + mock_client = mock_client_class.return_value + # Order of calls in main(): cpu, pod_labels, memory, gpu, node_labels + responses = [ + mock_query_responses["cpu_request"], + mock_query_responses["kube_pod_labels"], + mock_query_responses["memory_request"], + mock_query_responses["gpu_request"], + mock_query_responses["kube_node_labels"], + ] + mock_client.query_metric.side_effect = responses + + output_file = tmp_path / "metrics-test.json" + test_url = "https://thanos-querier-openshift-monitoring.apps.shift.nerc.mghpcc.org" + + with mock.patch("argparse.ArgumentParser.parse_args") as mock_parse: + mock_parse.return_value = mock.Mock( + openshift_url=test_url, + report_start_date="2026-06-02", + report_end_date="2026-06-02", + upload_to_s3=False, + output_file=str(output_file), + ) + main_script.main() + + # Verify file written and basic schema + assert output_file.exists() + data = json.loads(output_file.read_text()) + + # Top-level schema checks (new pod-centric format) + assert data["start_date"] == "2026-06-02" + assert data["end_date"] == "2026-06-02" + assert data["interval_minutes"] == 15 + assert data["cluster_name"] == "ocp-prod" # mapped from URL + assert "namespaces" in data + assert isinstance(data["namespaces"], dict) + + namespaces = data["namespaces"] + # Should have pods from the mocked data + assert "ai-performance-profiling" in namespaces + ns_pods = namespaces["ai-performance-profiling"] + assert "cpu-pod" in ns_pods or "mem-pod" in ns_pods or "gpu-a100-pod" in ns_pods + + # Spot-check that segments contain the expected request keys + # (detailed condense/insert logic covered by unit tests) + found_cpu = found_mem = found_gpu = False + for pod_data in ns_pods.values(): + for seg in pod_data.get("segments", []): + if "cpu_request" in seg: + found_cpu = True + if "memory_request" in seg: + found_mem = True + if "gpu_request" in seg: + found_gpu = True + assert found_cpu + assert found_mem + assert found_gpu + + # Verify client was instantiated and called the expected number of times + mock_client_class.assert_called_once_with(test_url, "fake-token", 15) + assert mock_client.query_metric.call_count == 5 + + +# --------------------------------------------------------------------------- +# Merge pipeline integration (feed the output of the collection script into merge) +# --------------------------------------------------------------------------- + + +@mock.patch("openshift_metrics.merge.rates.load_from_url") +@mock.patch("openshift_metrics.merge.outages.load_from_url") +@mock.patch("openshift_metrics.merge.utils.write_metrics_by_namespace") +@mock.patch("openshift_metrics.merge.utils.write_metrics_by_classes") +@mock.patch("openshift_metrics.merge.utils.write_metrics_by_pod") +def test_collection_output_feeds_merge( + mock_write_pod, + mock_write_classes, + mock_write_ns, + mock_outages, + mock_rates, + tmp_path, +): + """The JSON produced by openshift_prometheus_metrics can be consumed by merge.py.""" + # Minimal rates / outages return values + mock_rates.return_value.get_value_at.return_value = Decimal("1.0") + mock_outages.return_value.get_outages_during.return_value = [] + + # Create a minimal valid metrics file (as would be produced by the collection script) + metrics_file = tmp_path / "metrics-2026-06-02.json" + metrics_file.write_text( + json.dumps( + { + "cluster_name": "ocp-prod", + "start_date": "2026-06-02", + "end_date": "2026-06-02", + "interval_minutes": 15, + "namespaces": { + "ai-performance-profiling": { + "gpu-a100-pod": { + "segments": [ + { + "start": 0, + "duration": 3600, + "cpu_request": 24, + "memory_request": 98304, + "gpu_request": 1, + "gpu_type": "NVIDIA-A100-SXM4-40GB", + "gpu_resource": "nvidia.com/gpu", + "node_model": "Dell", + } + ] + } + } + }, + } + ) + ) + + # Simulate merge.main() argument parsing + with mock.patch("argparse.ArgumentParser.parse_args") as mock_parse: + mock_parse.return_value = mock.Mock( + files=[str(metrics_file)], + invoice_file=str(tmp_path / "invoice.csv"), + pod_report_file=str(tmp_path / "pod.csv"), + class_invoice_file=str(tmp_path / "class.csv"), + upload_to_s3=False, + ignore_hours=[], + use_nerc_rates=True, + rate_cpu_su=None, + rate_gpu_v100_su=None, + rate_gpu_a100sxm4_su=None, + rate_gpu_a100_su=None, + rate_gpu_h100_su=None, + ) + from openshift_metrics import merge as merge_module + + merge_module.main() + + # The merge pipeline should have called the three report writers + assert mock_write_ns.called + assert mock_write_classes.called + assert mock_write_pod.called + + # Basic sanity: the processor received the GPU pod data (SU type coverage) + # (we don't inspect the full invoice output here; unit tests cover that) + call_args = mock_write_ns.call_args[1] + condensed = call_args["condensed_metrics_dict"] + assert "ai-performance-profiling" in condensed + assert "gpu-a100-pod" in condensed["ai-performance-profiling"] diff --git a/openshift_metrics/tests/test_merge.py b/openshift_metrics/tests/test_merge.py index ec6b4b8..08bf8e4 100644 --- a/openshift_metrics/tests/test_merge.py +++ b/openshift_metrics/tests/test_merge.py @@ -1,6 +1,7 @@ import pytest from decimal import Decimal +from openshift_metrics import metrics_processor from openshift_metrics.merge import ( compare_dates, get_su_definitions, @@ -98,6 +99,251 @@ def test_load_metrics_metadata( assert metadata.interval_minutes == 15 +class TestIngestFormat: + """Tests for the new namespaces + segments ingest path.""" + + def test_load_segment_data_basic(self, create_metrics_file): + """Tests loading a simple namespaces file -> asserts merged_data is populated correctly.""" + data = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": { + "ns1": { + "pod1": { + "segments": [ + { + "start": 100, + "duration": 900, + "cpu_request": 2, + "memory_request": 4, + } + ] + } + } + }, + } + path = create_metrics_file(data, "newformat.json") + processor = load_and_merge_metrics(15, [path]) + assert "ns1" in processor.merged_data + assert "pod1" in processor.merged_data["ns1"] + assert 100 in processor.merged_data["ns1"]["pod1"]["metrics"] + + def test_load_segment_data_with_gpu_fields(self, create_metrics_file): + """Tests GPU fields in segments -> asserts gpu_type, gpu_resource, node_hostname are preserved.""" + data = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": { + "ns1": { + "gpu-pod": { + "segments": [ + { + "start": 200, + "duration": 1800, + "gpu_request": 1, + "gpu_type": "NVIDIA-A100", + "gpu_resource": "nvidia.com/gpu", + "node_hostname": "gpu-node-1", + } + ] + } + } + }, + } + path = create_metrics_file(data, "gpu.json") + processor = load_and_merge_metrics(15, [path]) + seg = processor.merged_data["ns1"]["gpu-pod"]["metrics"][200] + assert seg["gpu_request"] == 1 + assert seg["gpu_type"] == "NVIDIA-A100" + + def test_load_multiple_files_merges_namespaces(self, create_metrics_file): + """Tests loading two files -> asserts pods from both files appear in merged_data.""" + file1 = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": { + "ns1": { + "pod1": { + "segments": [{"start": 0, "duration": 900, "cpu_request": 1}] + } + } + }, + } + file2 = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-02", + "end_date": "2025-01-02", + "interval_minutes": 15, + "namespaces": { + "ns1": { + "pod2": { + "segments": [{"start": 1000, "duration": 900, "cpu_request": 2}] + } + } + }, + } + p1 = create_metrics_file(file1, "f1.json") + p2 = create_metrics_file(file2, "f2.json") + processor = load_and_merge_metrics(15, [p1, p2]) + assert "pod1" in processor.merged_data["ns1"] + assert "pod2" in processor.merged_data["ns1"] + + def test_empty_namespaces_file(self, create_metrics_file): + """Tests file with empty namespaces -> asserts merged_data stays empty without crashing.""" + data = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": {}, + } + path = create_metrics_file(data, "empty.json") + processor = load_and_merge_metrics(15, [path]) + assert processor.merged_data == {} + + def test_mismatched_interval_raises(self, create_metrics_file): + """Tests files with different interval_minutes -> asserts load_metadata fails early.""" + file1 = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": {}, + } + file2 = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-02", + "end_date": "2025-01-02", + "interval_minutes": 5, + "namespaces": {}, + } + p1 = create_metrics_file(file1, "f1.json") + p2 = create_metrics_file(file2, "f2.json") + with pytest.raises(SystemExit): + load_metrics_metadata([p1, p2]) + + def test_overlapping_pod_data(self, create_metrics_file): + """Tests same pod in two files with overlapping times -> asserts both segments are loaded.""" + file1 = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": { + "ns1": { + "pod1": { + "segments": [{"start": 0, "duration": 900, "cpu_request": 1}] + } + } + }, + } + file2 = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": { + "ns1": { + "pod1": { + "segments": [{"start": 1000, "duration": 900, "cpu_request": 2}] + } + } + }, + } + p1 = create_metrics_file(file1, "f1.json") + p2 = create_metrics_file(file2, "f2.json") + processor = load_and_merge_metrics(15, [p1, p2]) + metrics = processor.merged_data["ns1"]["pod1"]["metrics"] + assert 0 in metrics and 1000 in metrics + + def test_roundtrip_producer_to_ingest(self, create_metrics_file): + """Tests producer output shape -> asserts it can be loaded by ingest without data loss.""" + # Use only CPU for this round-trip to avoid timestamp collision in load_segment_data + cpu_segs = [ + { + "start": 0, + "duration": 1800, + "cpu_request": 4, + "pod": "p1", + "namespace": "ns1", + "node": "n1", + } + ] + namespaces = metrics_processor.MetricsProcessor.build_namespaces_dict(cpu_segs) + + data = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": namespaces, + } + path = create_metrics_file(data, "roundtrip.json") + processor = load_and_merge_metrics(15, [path]) + seg = processor.merged_data["ns1"]["p1"]["metrics"][0] + assert seg["cpu_request"] == 4 + + def test_pod_spans_two_days(self, create_metrics_file): + """Tests same pod appearing in two consecutive daily files -> asserts segments from both days are merged correctly.""" + day1 = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-01", + "end_date": "2025-01-01", + "interval_minutes": 15, + "namespaces": { + "ns1": { + "long-running-pod": { + "segments": [ + { + "start": 0, + "duration": 86400, + "cpu_request": 2, + "memory_request": 4, + } + ] + } + } + }, + } + day2 = { + "cluster_name": "ocp-prod", + "start_date": "2025-01-02", + "end_date": "2025-01-02", + "interval_minutes": 15, + "namespaces": { + "ns1": { + "long-running-pod": { + "segments": [ + { + "start": 86400, + "duration": 86400, + "cpu_request": 2, + "memory_request": 4, + } + ] + } + } + }, + } + p1 = create_metrics_file(day1, "day1.json") + p2 = create_metrics_file(day2, "day2.json") + processor = load_and_merge_metrics(15, [p1, p2]) + pod_metrics = processor.merged_data["ns1"]["long-running-pod"]["metrics"] + assert 0 in pod_metrics + assert 86400 in pod_metrics + assert pod_metrics[0]["cpu_request"] == 2 + assert pod_metrics[86400]["cpu_request"] == 2 + + total_duration = sum(seg["duration"] for seg in pod_metrics.values()) + assert total_duration == 86400 * 2 + + def test_load_metrics_metadata_failure( create_metrics_file, mock_metrics_file2, mock_metrics_file3 ): diff --git a/openshift_metrics/tests/test_metrics_processor.py b/openshift_metrics/tests/test_metrics_processor.py index b55741e..598d985 100644 --- a/openshift_metrics/tests/test_metrics_processor.py +++ b/openshift_metrics/tests/test_metrics_processor.py @@ -1,755 +1,507 @@ -from unittest import TestCase, mock -from openshift_metrics import metrics_processor, invoice +from unittest import TestCase +from openshift_metrics import metrics_processor -class TestMergeMetrics(TestCase): - def test_merge_metrics_empty(self): - test_metric_list = [ - { - "metric": { - "pod": "pod1", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [0, 10], - [60, 15], - [120, 20], - ], +class TestExtractGPUInfo(TestCase): + def test_extract_gpu_info(self): + metric_with_label = { + "metric": { + "pod": "pod2", + "namespace": "namespace1", + "resource": "nvidia.com/gpu", + "label_nvidia_com_gpu_product": "V100-GPU", + "node": "node-1", + "label_nvidia_com_gpu_machine": "Dell PowerEdge", }, - { - "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [0, 30], - [60, 35], - [120, 40], - ], + } + gpu_info = metrics_processor.MetricsProcessor._extract_gpu_info( + "gpu_request", metric_with_label + ) + self.assertEqual(gpu_info.gpu_type, "V100-GPU") + self.assertEqual(gpu_info.gpu_resource, "nvidia.com/gpu") + self.assertEqual(gpu_info.node_model, "Dell PowerEdge") + + def test_extract_gpu_info_with_missing_labels(self): + metric = { + "metric": { + "pod": "pod1", + "namespace": "namespace1", + "resource": "nvidia.com/gpu", + "node": "wrk-1", }, - ] - expected_output_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: {"cpu": 10}, - 60: {"cpu": 15}, - 120: {"cpu": 20}, - }, - }, - "pod2": { - "metrics": { - 0: {"cpu": 30}, - 60: {"cpu": 35}, - 120: {"cpu": 40}, - }, - }, - } } - processor = metrics_processor.MetricsProcessor() - processor.merge_metrics("cpu", test_metric_list) - self.assertEqual(processor.merged_data, expected_output_dict) + gpu_info = metrics_processor.MetricsProcessor._extract_gpu_info( + "gpu_request", metric + ) + self.assertEqual(gpu_info.gpu_type, metrics_processor.GPU_UNKNOWN_TYPE) + self.assertEqual(gpu_info.gpu_resource, "nvidia.com/gpu") + self.assertIsNone(gpu_info.node_model) + + def test_extract_gpu_info_no_info_anywhere(self): + metric = { + "metric": { + "pod": "pod1", + "namespace": "namespace1", + "resource": "cpu", + }, + } + gpu_info = metrics_processor.MetricsProcessor._extract_gpu_info( + "cpu_request", metric + ) + self.assertIsNone(gpu_info.gpu_type) + self.assertIsNone(gpu_info.gpu_resource) + self.assertIsNone(gpu_info.node_model) + - def test_merge_metrics_not_empty(self): - test_metric_list = [ +class TestInsertNodeLabels(TestCase): + def test_insert_node_labels(self): + resource_request_metrics = [ { "metric": { - "pod": "pod1", + "pod": "TestPodA", + "node": "wrk-1", "namespace": "namespace1", - "resource": "mem", }, - "values": [ - [0, 100], - [60, 150], - [120, 200], - ], + "values": [[1730939400, "4"], [1730940300, "4"], [1730941200, "4"]], }, + ] + node_labels = [ { "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [60, 300], - ], - }, + "node": "wrk-1", + "label_nvidia_com_gpu_product": "A100", + "label_nvidia_com_gpu_machine": "Dell", + } + } ] - output_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: {"cpu": 10}, - 60: {"cpu": 15}, - 120: {"cpu": 20}, - }, - }, - "pod2": { - "metrics": { - 0: {"cpu": 30}, - 60: {"cpu": 35}, - 120: {"cpu": 40}, - }, - }, + result = metrics_processor.MetricsProcessor.insert_node_labels( + node_labels, resource_request_metrics + ) + self.assertEqual(result[0]["metric"]["label_nvidia_com_gpu_product"], "A100") + self.assertEqual(result[0]["metric"]["label_nvidia_com_gpu_machine"], "Dell") + + +class TestStripEssentialLabels(TestCase): + def test_strip_to_essential_labels(self): + raw = [ + { + "metric": { + "pod": "p1", + "namespace": "ns1", + "node": "n1", + "container": "main", + "job": "kube-state-metrics", + "label_nerc_mghpcc_org_class": "classA", + }, + "values": [[0, "1"]], } - } - expected_output_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: {"cpu": 10, "mem": 100}, - 60: {"cpu": 15, "mem": 150}, - 120: {"cpu": 20, "mem": 200}, - }, - }, - "pod2": { - "metrics": { - 0: {"cpu": 30}, - 60: {"cpu": 35, "mem": 300}, - 120: {"cpu": 40}, - }, - }, + ] + stripped = metrics_processor.MetricsProcessor.strip_to_essential_labels(raw) + self.assertIn("pod", stripped[0]["metric"]) + self.assertIn("label_nerc_mghpcc_org_class", stripped[0]["metric"]) + self.assertNotIn("container", stripped[0]["metric"]) + self.assertNotIn("job", stripped[0]["metric"]) + + def test_strip_empty(self): + self.assertEqual( + metrics_processor.MetricsProcessor.strip_to_essential_labels([]), [] + ) + + +class TestCondenseValues(TestCase): + def test_condense_values(self): + vals = [[0, "1"], [900, "1"], [1800, "2"]] + out = metrics_processor.MetricsProcessor._condense_values( + vals, 900, "cpu_request" + ) + self.assertEqual(len(out), 2) + self.assertEqual(out[0]["cpu_request"], "1") + self.assertEqual(out[1]["cpu_request"], "2") + + def test_condense_metric_series(self): + raw = [{"metric": {"pod": "p1"}, "values": [[0, "2"], [900, "2"]]}] + out = metrics_processor.MetricsProcessor.condense_metric_series( + raw, 900, "cpu_request" + ) + self.assertEqual(len(out), 1) + self.assertEqual(out[0]["cpu_request"], "2") + self.assertEqual(out[0]["pod"], "p1") + + +class TestProducerCondenseLogic(TestCase): + """Tests that replace the coverage lost from the old TestCondenseMetrics / TestMergeMetrics classes.""" + + def test_basic_condense(self): + raw = [ + { + "metric": {"pod": "pod1", "namespace": "ns1"}, + "values": [[0, 10], [900, 10], [1800, 10]], } - } - processor = metrics_processor.MetricsProcessor(merged_data=output_dict) - processor.merge_metrics("mem", test_metric_list) - self.assertEqual(processor.merged_data, expected_output_dict) + ] + segments = metrics_processor.MetricsProcessor.condense_metric_series( + raw, 900, "cpu_request" + ) + self.assertEqual(len(segments), 1) + self.assertEqual(segments[0]["cpu_request"], 10) + self.assertEqual(segments[0]["duration"], 2700) - def test_merge_metrics_overlapping_range(self): - test_metric_list = [ + def test_value_change(self): + raw = [ + { + "metric": {"pod": "pod1", "namespace": "ns1"}, + "values": [[0, 10], [900, 10], [1800, 20], [2700, 20]], + } + ] + segments = metrics_processor.MetricsProcessor.condense_metric_series( + raw, 900, "cpu_request" + ) + self.assertEqual(len(segments), 2) + self.assertEqual(segments[0]["cpu_request"], 10) + self.assertEqual(segments[1]["cpu_request"], 20) + + def test_gpu_type_change(self): + raw = [ { "metric": { "pod": "pod1", - "namespace": "namespace1", - "resource": "cpu", + "namespace": "ns1", + "label_nvidia_com_gpu_product": "V100", }, - "values": [ - [0, 10], - [60, 10], - [120, 10], - ], + "values": [[0, 1], [900, 1], [1800, 1]], }, - ] - test_metric_list_2 = [ { "metric": { "pod": "pod1", - "namespace": "namespace1", - "resource": "cpu", + "namespace": "ns1", + "label_nvidia_com_gpu_product": "A100", }, - "values": [ - [60, 8], - [120, 8], - [180, 10], - ], + "values": [[2700, 1], [3600, 1]], }, ] - expected_output_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: {"cpu": 10}, - 60: {"cpu": 8}, - 120: {"cpu": 8}, - 180: {"cpu": 10}, - }, - }, + segments = metrics_processor.MetricsProcessor.condense_metric_series( + raw, 900, "gpu_request" + ) + self.assertEqual(len(segments), 2) + self.assertEqual(segments[0]["label_nvidia_com_gpu_product"], "V100") + self.assertEqual(segments[1]["label_nvidia_com_gpu_product"], "A100") + + def test_time_skip_creates_new_segment(self): + raw = [ + { + "metric": {"pod": "pod1", "namespace": "ns1"}, + "values": [[0, 5], [900, 5], [5400, 5]], # large gap } - } - processor = metrics_processor.MetricsProcessor() - processor.merge_metrics("cpu", test_metric_list) - processor.merge_metrics("cpu", test_metric_list_2) - self.assertEqual(processor.merged_data, expected_output_dict) + ] + segments = metrics_processor.MetricsProcessor.condense_metric_series( + raw, 900, "cpu_request" + ) + self.assertEqual(len(segments), 2) + + def test_build_namespaces_dict(self): + cpu_segs = [ + { + "start": 0, + "duration": 900, + "cpu_request": 4, + "pod": "p1", + "namespace": "ns1", + "node": "n1", + } + ] + mem_segs = [ + { + "start": 0, + "duration": 900, + "memory_request": 8, + "pod": "p1", + "namespace": "ns1", + "node": "n1", + } + ] + namespaces = metrics_processor.MetricsProcessor.build_namespaces_dict( + cpu_segs, mem_segs + ) + self.assertIn("ns1", namespaces) + self.assertIn("p1", namespaces["ns1"]) + self.assertEqual(len(namespaces["ns1"]["p1"]["segments"]), 2) - # trying to merge the same metrics again should not change anything - processor.merge_metrics("cpu", test_metric_list_2) - self.assertEqual(processor.merged_data, expected_output_dict) + def test_separate_series_same_pod_namespace_merge_into_one_pod(self): + """Tests that two distinct raw Prometheus series sharing the same pod and + namespace are collapsed into a single pod entry during preprocessing. - def test_merge_metrics_same_pod_name(self): - test_metric_list = [ + When a pod is killed and recreated with the same name/namespace but + different resources, Prometheus reports it as two separate series (two + 'metric' entries with identical pod/namespace labels but different + values). build_namespaces_dict keys by (namespace, pod), so both series + end up under one pod key rather than two. + """ + raw = [ { - "metric": { - "pod": "podA", - "namespace": "namespace1", - "resource": "cpu", - }, - "values": [ - [0, 10], - [60, 15], - [120, 20], - ], + "metric": {"pod": "database", "namespace": "ns1", "node": "n1"}, + "values": [[0, 1], [900, 1]], # first lifetime, 1 core }, { - "metric": { - "pod": "podA", - "namespace": "namespace2", - "resource": "cpu", - }, + "metric": {"pod": "database", "namespace": "ns1", "node": "n1"}, "values": [ - [0, 30], - [60, 35], - [120, 40], - ], + [1800, 4], + [2700, 4], + ], # recreated, 4 cores -> separate series }, ] - expected_output_dict = { - "namespace1": { - "podA": { - "metrics": { - 0: {"cpu": 10}, - 60: {"cpu": 15}, - 120: {"cpu": 20}, - }, - } - }, - "namespace2": { - "podA": { - "metrics": { - 0: {"cpu": 30}, - 60: {"cpu": 35}, - 120: {"cpu": 40}, - }, - }, - }, - } - processor = metrics_processor.MetricsProcessor() - processor.merge_metrics("cpu", test_metric_list) - self.assertEqual(processor.merged_data, expected_output_dict) + segments = metrics_processor.MetricsProcessor.condense_metric_series( + raw, 900, "cpu_request" + ) + namespaces = metrics_processor.MetricsProcessor.build_namespaces_dict(segments) + + # The two separate series collapse into a single pod key. + self.assertEqual(list(namespaces["ns1"].keys()), ["database"]) + pod_segments = namespaces["ns1"]["database"]["segments"] + self.assertEqual(len(pod_segments), 2) + + by_start = {seg["start"]: seg for seg in pod_segments} + self.assertEqual(by_start[0]["cpu_request"], 1) + self.assertEqual(by_start[1800]["cpu_request"], 4) - def test_merge_metrics_not_empty_with_gpu(self): - test_metric_list = [ + def test_same_name_namespace_grouped_as_one_pod(self): + """Tests that a same name + namespace pod stopped and restarted after a + long gap is grouped under a single pod key, with the gap excluded. + + Edge case: a pod 'database' runs, is killed, and is later recreated with + the same name and namespace and the same resources. The downtime spans + more than one sampling slot, so the restart shows up as a gap of 3x the + interval inside a single series (interval = 900s, gap = 2700s). + + Because a name is unique within a namespace at any instant, the two + lifetimes are necessarily sequential. _was_pod_stopped detects the gap + (> interval), so the producer emits two segments grouped under one pod + key, and the downtime between them is not billed. + """ + interval = 900 + # Same value/series throughout so the split is driven by _was_pod_stopped, + # not by a value change. Slots at 0, 900, then a 3x-interval jump to 3600. + raw = [ { - "metric": { - "pod": "pod1", - "namespace": "namespace1", - "resource": "nvidia.com/gpu", - "label_nvidia_com_gpu_product": "Tesla-V100-PCIE-32GB", - }, + "metric": {"pod": "database", "namespace": "ns1", "node": "n1"}, "values": [ - [0, 1], - [60, 1], - [120, 2], + [0, 1], # first lifetime starts + [900, 1], # still running (gap to next = 2700s = 3x interval) + [3600, 1], # recreated after downtime + [4500, 1], # still running ], }, ] - output_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: {"cpu": 10}, - 60: {"cpu": 15}, - 120: {"cpu": 20}, - }, - }, - } - } - expected_output_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: { - "cpu": 10, - "gpu_request": 1, - "gpu_type": "Tesla-V100-PCIE-32GB", - "gpu_resource": "nvidia.com/gpu", - }, - 60: { - "cpu": 15, - "gpu_request": 1, - "gpu_type": "Tesla-V100-PCIE-32GB", - "gpu_resource": "nvidia.com/gpu", - }, - 120: { - "cpu": 20, - "gpu_request": 2, - "gpu_type": "Tesla-V100-PCIE-32GB", - "gpu_resource": "nvidia.com/gpu", - }, - }, - }, - } - } - processor = metrics_processor.MetricsProcessor(merged_data=output_dict) - processor.merge_metrics("gpu_request", test_metric_list) - self.assertEqual(processor.merged_data, expected_output_dict) + segments = metrics_processor.MetricsProcessor.condense_metric_series( + raw, interval, "cpu_request" + ) + self.assertEqual(len(segments), 2) + namespaces = metrics_processor.MetricsProcessor.build_namespaces_dict(segments) + # Both lifetimes are grouped under the single pod key. + self.assertEqual(list(namespaces["ns1"].keys()), ["database"]) + pod_segments = namespaces["ns1"]["database"]["segments"] + self.assertEqual(len(pod_segments), 2) -class TestCondenseMetrics(TestCase): - def test_condense_metrics(self): - test_input_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: { - "cpu": 10, - "mem": 15, - }, - 900: { - "cpu": 10, - "mem": 15, - }, - } - }, - "pod2": { - "metrics": { - 0: { - "cpu": 2, - "mem": 256, - }, - 900: { - "cpu": 2, - "mem": 256, - }, - } - }, - } - } - expected_condensed_dict = { - "namespace1": { - "pod1": {"metrics": {0: {"cpu": 10, "mem": 15, "duration": 1800}}}, - "pod2": {"metrics": {0: {"cpu": 2, "mem": 256, "duration": 1800}}}, - } - } - processor = metrics_processor.MetricsProcessor(merged_data=test_input_dict) - condensed_dict = processor.condense_metrics(["cpu", "mem"]) - self.assertEqual(condensed_dict, expected_condensed_dict) + # Distinct start times mean both segments survive (no collision/overwrite). + by_start = {seg["start"]: seg for seg in pod_segments} + self.assertEqual(set(by_start), {0, 3600}) + self.assertEqual(by_start[0]["cpu_request"], 1) + self.assertEqual(by_start[3600]["cpu_request"], 1) - def test_condense_metrics_no_interval(self): - test_input_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: { - "cpu": 10, - "mem": 15, - } - } - }, - } - } - expected_condensed_dict = { - "namespace1": { - "pod1": {"metrics": {0: {"cpu": 10, "mem": 15, "duration": 900}}}, - } - } - processor = metrics_processor.MetricsProcessor(merged_data=test_input_dict) - condensed_dict = processor.condense_metrics(["cpu", "mem"]) - self.assertEqual(condensed_dict, expected_condensed_dict) - - def test_condense_metrics_with_change(self): - test_input_dict = { - "namespace1": { - "pod2": { - "metrics": { - 0: { - "cpu": 20, - "mem": 25, - }, - 900: { - "cpu": 20, - "mem": 25, - }, - 1800: { - "cpu": 25, - "mem": 25, - }, - 2700: { - "cpu": 20, - "mem": 25, - }, - } - }, - } - } - expected_condensed_dict = { - "namespace1": { - "pod2": { - "metrics": { - 0: {"cpu": 20, "mem": 25, "duration": 1800}, - 1800: {"cpu": 25, "mem": 25, "duration": 900}, - 2700: {"cpu": 20, "mem": 25, "duration": 900}, - } - }, - } - } - processor = metrics_processor.MetricsProcessor(merged_data=test_input_dict) - condensed_dict = processor.condense_metrics(["cpu", "mem"]) - self.assertEqual(condensed_dict, expected_condensed_dict) - - def test_condense_metrics_skip_metric(self): - test_input_dict = { - "namespace1": { - "pod3": { - "metrics": { - 0: { - "cpu": 30, - "mem": 35, - "gpu": 1, - }, - 900: { - "cpu": 30, - "mem": 35, - "gpu": 2, - }, - } - } - } - } - expected_condensed_dict = { - "namespace1": { - "pod3": { - "metrics": {0: {"cpu": 30, "mem": 35, "gpu": 1, "duration": 1800}} - }, - } - } - processor = metrics_processor.MetricsProcessor(merged_data=test_input_dict) - condensed_dict = processor.condense_metrics(["cpu", "mem"]) - self.assertEqual(condensed_dict, expected_condensed_dict) + # Each lifetime covers two slots -> 900 + interval = 1800s billed. + self.assertEqual(by_start[0]["duration"], 1800) + self.assertEqual(by_start[3600]["duration"], 1800) - def test_condense_metrics_with_timeskips(self): - test_input_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: { - "cpu": 1, - "mem": 4, - }, - 900: { - "cpu": 1, - "mem": 4, - }, - 1800: { - "cpu": 1, - "mem": 4, - }, - 5400: { # time skipped - "cpu": 1, - "mem": 4, - }, - 6300: { - "cpu": 1, - "mem": 4, - }, - 8100: { # metric changed and time skipped - "cpu": 2, - "mem": 8, - }, - 9000: { - "cpu": 2, - "mem": 8, - }, - } - }, - "pod2": { - "metrics": { - 0: { - "cpu": 2, - "mem": 16, - }, - 900: { - "cpu": 2, - "mem": 16, - }, - } - }, - } - } - expected_condensed_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: {"cpu": 1, "mem": 4, "duration": 2700}, - 5400: {"cpu": 1, "mem": 4, "duration": 1800}, - 8100: {"cpu": 2, "mem": 8, "duration": 1800}, - } - }, - "pod2": {"metrics": {0: {"cpu": 2, "mem": 16, "duration": 1800}}}, - } - } - processor = metrics_processor.MetricsProcessor(merged_data=test_input_dict) - condensed_dict = processor.condense_metrics(["cpu", "mem"]) - self.assertEqual(condensed_dict, expected_condensed_dict) + # The 2700s downtime between 1800 and 3600 is excluded from billing: + # total billed (3600s) is less than the 5400s wall-clock span. + total_billed = sum(seg["duration"] for seg in pod_segments) + self.assertEqual(total_billed, 3600) - def test_condense_metrics_with_changing_gpu(self): - test_input_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: { - "cpu": 1, - "mem": 4, - }, - 900: { - "cpu": 1, - "mem": 4, - }, - 1800: { # pod acquires a GPU - "cpu": 1, - "mem": 4, - "gpu_request": 1, - "gpu_type": invoice.GPU_V100, - }, - 2700: { - "cpu": 1, - "mem": 4, - "gpu_request": 1, - "gpu_type": invoice.GPU_V100, - }, - 3600: { # type of GPU is changed - "cpu": 1, - "mem": 4, - "gpu_request": 1, - "gpu_type": invoice.GPU_A100_SXM4, - }, - 4500: { - "cpu": 1, - "mem": 4, - "gpu_request": 1, - "gpu_type": invoice.GPU_A100_SXM4, - }, - 5400: { - "cpu": 1, - "mem": 4, - "gpu_request": 1, - "gpu_type": invoice.GPU_A100_SXM4, - }, - 6300: { # count of GPU is changed - "cpu": 1, - "mem": 4, - "gpu_request": 3, - "gpu_type": invoice.GPU_A100_SXM4, - }, - 7200: { - "cpu": 1, - "mem": 4, - "gpu_request": 3, - "gpu_type": invoice.GPU_A100_SXM4, - }, - 8100: { # no longer using GPUs - "cpu": 1, - "mem": 4, - }, - } - }, - } - } - expected_condensed_dict = { - "namespace1": { - "pod1": { - "metrics": { - 0: {"cpu": 1, "mem": 4, "duration": 1800}, - 1800: { - "cpu": 1, - "mem": 4, - "duration": 1800, - "gpu_request": 1, - "gpu_type": invoice.GPU_V100, - }, - 3600: { - "cpu": 1, - "mem": 4, - "duration": 2700, - "gpu_request": 1, - "gpu_type": invoice.GPU_A100_SXM4, - }, - 6300: { - "cpu": 1, - "mem": 4, - "duration": 1800, - "gpu_request": 3, - "gpu_type": invoice.GPU_A100_SXM4, - }, - 8100: { - "cpu": 1, - "mem": 4, - "duration": 900, - }, - } - }, + def test_detects_pod_stop_and_restart(self): + """Tests that a large gap between samples creates separate segments (pod stopped then restarted).""" + raw = [ + { + "metric": {"pod": "p1", "namespace": "ns1"}, + "values": [ + [0, 2], + [900, 2], + [1800, 2], + # large gap here (pod stopped) + [7200, 2], + [8100, 2], + ], } - } - processor = metrics_processor.MetricsProcessor(merged_data=test_input_dict) - condensed_dict = processor.condense_metrics( - ["cpu", "mem", "gpu_request", "gpu_type"] + ] + segments = metrics_processor.MetricsProcessor.condense_metric_series( + raw, 900, "cpu_request" ) - self.assertEqual(condensed_dict, expected_condensed_dict) + self.assertEqual(len(segments), 2) + self.assertEqual(segments[0]["start"], 0) + self.assertEqual(segments[0]["duration"], 2700) # 0 -> 1800 + interval + self.assertEqual(segments[1]["start"], 7200) + self.assertEqual(segments[1]["duration"], 1800) # 7200 -> 8100 + interval + def test_pod_restart_within_interval_is_single_segment(self): + """Tests that a pod killed and restarted within one query interval is treated as a single segment. -class TestExtractGPUInfo(TestCase): - def test_extract_gpu_info(self): - metric_with_label = { - "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "nvidia.com/gpu", - "label_nvidia_com_gpu_product": "V100-GPU", - "node": "node-1", - "label_nvidia_com_gpu_machine": "Dell PowerEdge", - }, - "values": [ - [60, 2], - ], - } - - processor = metrics_processor.MetricsProcessor() - gpu_info = processor._extract_gpu_info("gpu_request", metric_with_label) - - assert gpu_info.gpu_type == "V100-GPU" - assert gpu_info.gpu_resource == "nvidia.com/gpu" - assert gpu_info.node_model == "Dell PowerEdge" - - def test_extract_gpu_info_with_missing_labels(self): - mocked_gpu_mapping = { - "node-1": "A100-GPU", - "node-2": "doesnt-matter", - } - metric_without_label = { - "metric": { - "pod": "pod1", - "namespace": "namespace1", - "resource": "nvidia.com/gpu", - "node": "node-1", - }, - "values": [ - [60, 1], - ], - } - metric_with_label = { - "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "nvidia.com/gpu", - "label_nvidia_com_gpu_product": "V100-GPU", - "node": "node-2", - }, - "values": [ - [60, 2], - ], - } + Edge case: a pod named 'database' runs from 1:00 PM to 1:15 PM, + is killed, and restarted at 1:20 PM with the same name and resources. + The gap between 1:15 PM (sample at 900s) and 1:20 PM (sample at 1000s) is + only 100s, which is less than the 900s interval, so the condense logic + does not split it into separate segments. - with mock.patch.object( - metrics_processor.MetricsProcessor, - "_load_gpu_mapping", - return_value=mocked_gpu_mapping, - ): - processor = metrics_processor.MetricsProcessor() - gpu_info = processor._extract_gpu_info("gpu_request", metric_without_label) + Note: the current condense logic calculates duration as + last_timestamp - first_timestamp + interval, which includes the gap. + The actual active runtime is 1800s (900 + 900), but the segment + duration is 2800s (1900 - 0 + 900). + """ + raw = [ + { + "metric": {"pod": "database", "namespace": "ns1"}, + "values": [ + [0, 2], # 1:00 PM + [900, 2], # 1:15 PM (pod killed here) + [1000, 2], # 1:20 PM (pod restarted, same name/resources) + [1900, 2], # 1:35 PM + ], + } + ] + segments = metrics_processor.MetricsProcessor.condense_metric_series( + raw, 900, "cpu_request" + ) + # The gap (100s) is less than the interval (900s), so it stays as one segment + self.assertEqual(len(segments), 1) + self.assertEqual(segments[0]["start"], 0) + # Duration spans the full range including the gap: 1900 - 0 + 900 = 2800 + self.assertEqual(segments[0]["duration"], 2800) + self.assertEqual(segments[0]["cpu_request"], 2) - assert gpu_info.gpu_type == "A100-GPU" - assert gpu_info.gpu_resource == "nvidia.com/gpu" - assert gpu_info.node_model is None - # When the GPU label is present in the metrics, then the value in the gpu-node mapping isn't considered - gpu_info = processor._extract_gpu_info("gpu_request", metric_with_label) - assert gpu_info.gpu_type == "V100-GPU" +class TestLoadSegmentDataRegressions(TestCase): + """Regression tests for bugs in the load_segment_data / build_namespaces_dict pipeline.""" - def test_extract_gpu_info_no_info_anywhere(self): - """When node is missing in the file, we get no gpu info""" - mocked_gpu_mapping = { - "node-2": "doesnt-matter", - } - metric_with_label = { - "metric": { - "pod": "pod2", - "namespace": "namespace1", - "resource": "nvidia.com/gpu", - "node": "node-1", - }, - "values": [ - [60, 2], - ], + def test_cpu_and_memory_segments_at_same_timestamp_both_preserved(self): + """load_segment_data must merge segments at the same start time, not overwrite.""" + namespaces = { + "ns1": { + "pod1": { + "segments": [ + {"start": 0, "duration": 900, "cpu_request": 2}, + {"start": 0, "duration": 900, "memory_request": 4096}, + ] + } + } } - with mock.patch.object( - metrics_processor.MetricsProcessor, - "_load_gpu_mapping", - return_value=mocked_gpu_mapping, - ): - processor = metrics_processor.MetricsProcessor() - gpu_info = processor._extract_gpu_info("gpu_request", metric_with_label) - - assert gpu_info.gpu_type == metrics_processor.GPU_UNKNOWN_TYPE - + processor = metrics_processor.MetricsProcessor() + processor.load_segment_data(namespaces) + entry = processor.merged_data["ns1"]["pod1"]["metrics"][0] + self.assertIn( + "cpu_request", entry, "cpu_request was overwritten by the memory segment" + ) + self.assertIn( + "memory_request", entry, "memory_request was overwritten by the cpu segment" + ) + self.assertEqual(entry["cpu_request"], 2) + self.assertEqual(entry["memory_request"], 4096) -class TestInsertNodeLabels(TestCase): - def test_insert_node_labels(self): - resource_request_metrics = [ - { - "metric": { - "pod": "TestPodA", - "node": "wrk-1", - "namespace": "namespace1", - }, - "values": [[1730939400, "4"], [1730940300, "4"], [1730941200, "4"]], - }, + def test_build_and_load_roundtrip_cpu_and_memory_both_survive(self): + """CPU and memory condensed as separate series must both appear after full roundtrip.""" + cpu_segs = [ { - "metric": { - "pod": "TestPodB", - "node": "wrk-2", - "namespace": "namespace2", - }, - "values": [[1730939400, "4"], [1730940300, "4"], [1730941200, "4"]], - }, - { - "metric": { - "pod": "TestPodC", - "node": "wrk-3", # let's assume this node doesn't have any associated labels - "namespace": "namespace2", - }, - "values": [[1730939400, "4"], [1730940300, "4"], [1730941200, "4"]], + "start": 0, + "duration": 900, + "cpu_request": 4, + "pod": "p1", + "namespace": "ns1", + "node": "n1", }, ] - kube_node_labels = [ + mem_segs = [ { - "metric": { - "node": "wrk-1", - "label_nvidia_com_gpu_machine": "ThinkSystem-SD650-N-V2", - "label_nvidia_com_gpu_product": "NVIDIA-A100-SXM4-40GB", - }, - "values": [[1730939400, "1"], [1730940300, "1"]], - }, - { - "metric": { - "node": "wrk-2", - "label_nvidia_com_gpu_product": "Tesla-V100-PCIE-32GB", - "label_nvidia_com_gpu_machine": "PowerEdge-R740xd", - }, - "values": [[1730939400, "1"], [1730940300, "1"]], + "start": 0, + "duration": 900, + "memory_request": 8192, + "pod": "p1", + "namespace": "ns1", + "node": "n1", }, ] - metrics_with_labels = metrics_processor.MetricsProcessor.insert_node_labels( - kube_node_labels, resource_request_metrics + namespaces = metrics_processor.MetricsProcessor.build_namespaces_dict( + cpu_segs, mem_segs ) - expected_metrics = [ - { - "metric": { - "pod": "TestPodA", - "node": "wrk-1", - "namespace": "namespace1", - "label_nvidia_com_gpu_machine": "ThinkSystem-SD650-N-V2", - "label_nvidia_com_gpu_product": "NVIDIA-A100-SXM4-40GB", - }, - "values": [[1730939400, "4"], [1730940300, "4"], [1730941200, "4"]], - }, + processor = metrics_processor.MetricsProcessor() + processor.load_segment_data(namespaces) + entry = processor.merged_data["ns1"]["p1"]["metrics"][0] + self.assertIn("cpu_request", entry) + self.assertIn("memory_request", entry) + self.assertEqual(entry["cpu_request"], 4) + self.assertEqual(entry["memory_request"], 8192) + + def test_class_label_promoted_to_pod_level(self): + """label_nerc_mghpcc_org_class in a segment must be lifted to pod level so write_metrics_by_classes can read it.""" + namespaces = { + "ns1": { + "pod1": { + "segments": [ + { + "start": 0, + "duration": 900, + "cpu_request": 2, + "label_nerc_mghpcc_org_class": "cs-101", + }, + ] + } + } + } + processor = metrics_processor.MetricsProcessor() + processor.load_segment_data(namespaces) + pod_dict = processor.merged_data["ns1"]["pod1"] + self.assertEqual( + pod_dict.get("label_nerc_mghpcc_org_class"), + "cs-101", + "class label must be promoted to pod level, not buried in metrics[start]", + ) + + def test_gpu_mapping_fallback_applied_when_label_absent(self): + """build_namespaces_dict uses gpu_mapping to resolve gpu_type when the Prometheus label is missing.""" + gpu_segs = [ { - "metric": { - "pod": "TestPodB", - "node": "wrk-2", - "namespace": "namespace2", - "label_nvidia_com_gpu_product": "Tesla-V100-PCIE-32GB", - "label_nvidia_com_gpu_machine": "PowerEdge-R740xd", - }, - "values": [[1730939400, "4"], [1730940300, "4"], [1730941200, "4"]], + "start": 0, + "duration": 900, + "gpu_request": 1, + "pod": "gpu-pod", + "namespace": "ns1", + "node": "wrk-gpu-1", + "resource": "nvidia.com/gpu", }, + ] + gpu_mapping = {"wrk-gpu-1": "NVIDIA-A100-SXM4-40GB"} + namespaces = metrics_processor.MetricsProcessor.build_namespaces_dict( + gpu_segs, gpu_mapping=gpu_mapping + ) + seg = namespaces["ns1"]["gpu-pod"]["segments"][0] + self.assertEqual( + seg.get("gpu_type"), + "NVIDIA-A100-SXM4-40GB", + "gpu_type should fall back to gpu_mapping when label_nvidia_com_gpu_product is absent", + ) + + def test_gpu_label_takes_precedence_over_mapping(self): + """When label_nvidia_com_gpu_product is present, it wins over any gpu_mapping entry.""" + gpu_segs = [ { - "metric": { - "pod": "TestPodC", - "node": "wrk-3", - "namespace": "namespace2", - }, - "values": [[1730939400, "4"], [1730940300, "4"], [1730941200, "4"]], + "start": 0, + "duration": 900, + "gpu_request": 1, + "pod": "gpu-pod", + "namespace": "ns1", + "node": "wrk-gpu-1", + "resource": "nvidia.com/gpu", + "label_nvidia_com_gpu_product": "NVIDIA-H100-80GB", }, ] - self.assertEqual(expected_metrics, metrics_with_labels) + gpu_mapping = {"wrk-gpu-1": "NVIDIA-A100-SXM4-40GB"} + namespaces = metrics_processor.MetricsProcessor.build_namespaces_dict( + gpu_segs, gpu_mapping=gpu_mapping + ) + seg = namespaces["ns1"]["gpu-pod"]["segments"][0] + self.assertEqual(seg.get("gpu_type"), "NVIDIA-H100-80GB")