Coverage for packages / dqm-ml-core / src / dqm_ml_core / utils / metric_runner.py: 100%
28 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
1"""Metric runner utility for executing metrics on DataFrames.
3This module contains the MetricRunner class that provides a high-level
4API for running metric processors directly on Pandas DataFrames.
5"""
7import logging
8from typing import Any
10from pandas import DataFrame
11import pyarrow as pa
13from dqm_ml_core.api.data_processor import DatametricProcessor
15logger = logging.getLogger(__name__)
18class MetricRunner:
19 """
20 Orchestrator for executing metric processors on in-memory Pandas DataFrames.
22 This class provides a high-level API for users who want to compute metrics
23 directly on DataFrames without using the full YAML-driven pipeline.
24 """
26 def __init__(self, config: dict[str, Any] | None = None) -> None:
27 """
28 Initialize the runner.
30 Args:
31 config: Optional configuration for metric default behaviors.
32 """
33 self.config = config or {}
35 def run(self, df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]:
36 """
37 Execute the provided metric processors on a DataFrame.
39 Args:
40 df: The input Pandas DataFrame.
41 metrics_processors: List of initialized DatametricProcessor instances.
43 Returns:
44 A dictionary containing the aggregated dataset-level metrics.
45 """
46 if df.empty or not metrics_processors:
47 logger.warning("Empty DataFrame or no metrics provided to MetricRunner")
48 return {}
50 metrics_array: dict[str, Any] = {}
52 batch = pa.RecordBatch.from_pandas(df)
53 batch_features: dict[str, Any] = {}
54 batch_metrics: dict[str, Any] = {}
56 # Compute features and batch-level metrics
57 for metric in metrics_processors:
58 logger.debug(f"Processing metric {metric.__class__.__name__}")
59 batch_features |= metric.compute_features(batch, prev_features=batch_features)
60 batch_metrics |= metric.compute_batch_metric(batch_features)
62 # Merge batch metrics (trivial here as there's only one batch)
63 for k, v in batch_metrics.items():
64 metrics_array[k] = v
66 # Compute dataset-level metrics
67 dataset_metrics: dict[str, Any] = {}
68 for metric in metrics_processors:
69 logger.debug(f"Computing final score for {metric.__class__.__name__}")
70 dataset_metrics |= metric.compute(batch_metrics=metrics_array)
72 return dataset_metrics