Coverage for packages / dqm-ml-core / src / dqm_ml_core / utils / metric_runner.py: 100%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 10:11 +0000

1"""Metric runner utility for executing metrics on DataFrames. 

2 

3This module contains the MetricRunner class that provides a high-level 

4API for running metric processors directly on Pandas DataFrames. 

5""" 

6 

7import logging 

8from typing import Any 

9 

10from pandas import DataFrame 

11import pyarrow as pa 

12 

13from dqm_ml_core.api.data_processor import DatametricProcessor 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class MetricRunner: 

19 """ 

20 Orchestrator for executing metric processors on in-memory Pandas DataFrames. 

21 

22 This class provides a high-level API for users who want to compute metrics 

23 directly on DataFrames without using the full YAML-driven pipeline. 

24 """ 

25 

26 def __init__(self, config: dict[str, Any] | None = None) -> None: 

27 """ 

28 Initialize the runner. 

29 

30 Args: 

31 config: Optional configuration for metric default behaviors. 

32 """ 

33 self.config = config or {} 

34 

35 def run(self, df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]: 

36 """ 

37 Execute the provided metric processors on a DataFrame. 

38 

39 Args: 

40 df: The input Pandas DataFrame. 

41 metrics_processors: List of initialized DatametricProcessor instances. 

42 

43 Returns: 

44 A dictionary containing the aggregated dataset-level metrics. 

45 """ 

46 if df.empty or not metrics_processors: 

47 logger.warning("Empty DataFrame or no metrics provided to MetricRunner") 

48 return {} 

49 

50 metrics_array: dict[str, Any] = {} 

51 

52 batch = pa.RecordBatch.from_pandas(df) 

53 batch_features: dict[str, Any] = {} 

54 batch_metrics: dict[str, Any] = {} 

55 

56 # Compute features and batch-level metrics 

57 for metric in metrics_processors: 

58 logger.debug(f"Processing metric {metric.__class__.__name__}") 

59 batch_features |= metric.compute_features(batch, prev_features=batch_features) 

60 batch_metrics |= metric.compute_batch_metric(batch_features) 

61 

62 # Merge batch metrics (trivial here as there's only one batch) 

63 for k, v in batch_metrics.items(): 

64 metrics_array[k] = v 

65 

66 # Compute dataset-level metrics 

67 dataset_metrics: dict[str, Any] = {} 

68 for metric in metrics_processors: 

69 logger.debug(f"Computing final score for {metric.__class__.__name__}") 

70 dataset_metrics |= metric.compute(batch_metrics=metrics_array) 

71 

72 return dataset_metrics