Skip to content

dqm_ml_core.utils.metric_runner

Metric runner utility for executing metrics on DataFrames.

This module contains the MetricRunner class that provides a high-level API for running metric processors directly on Pandas DataFrames.

logger = logging.getLogger(__name__) module-attribute

MetricRunner

Orchestrator for executing metric processors on in-memory Pandas DataFrames.

This class provides a high-level API for users who want to compute metrics directly on DataFrames without using the full YAML-driven pipeline.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class MetricRunner:
    """
    Orchestrator for executing metric processors on in-memory Pandas DataFrames.

    This class provides a high-level API for users who want to compute metrics
    directly on DataFrames without using the full YAML-driven pipeline.
    """

    def __init__(self, config: dict[str, Any] | None = None) -> None:
        """
        Initialize the runner.

        Args:
            config: Optional configuration for metric default behaviors.
        """
        self.config = config or {}

    def run(self, df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]:
        """
        Execute the provided metric processors on a DataFrame.

        Args:
            df: The input Pandas DataFrame.
            metrics_processors: List of initialized DatametricProcessor instances.

        Returns:
            A dictionary containing the aggregated dataset-level metrics.
        """
        if df.empty or not metrics_processors:
            logger.warning("Empty DataFrame or no metrics provided to MetricRunner")
            return {}

        metrics_array: dict[str, Any] = {}

        batch = pa.RecordBatch.from_pandas(df)
        batch_features: dict[str, Any] = {}
        batch_metrics: dict[str, Any] = {}

        # Compute features and batch-level metrics
        for metric in metrics_processors:
            logger.debug(f"Processing metric {metric.__class__.__name__}")
            batch_features |= metric.compute_features(batch, prev_features=batch_features)
            batch_metrics |= metric.compute_batch_metric(batch_features)

        # Merge batch metrics (trivial here as there's only one batch)
        for k, v in batch_metrics.items():
            metrics_array[k] = v

        # Compute dataset-level metrics
        dataset_metrics: dict[str, Any] = {}
        for metric in metrics_processors:
            logger.debug(f"Computing final score for {metric.__class__.__name__}")
            dataset_metrics |= metric.compute(batch_metrics=metrics_array)

        return dataset_metrics

config = config or {} instance-attribute

__init__(config: dict[str, Any] | None = None) -> None

Initialize the runner.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Optional configuration for metric default behaviors.

None
Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
26
27
28
29
30
31
32
33
def __init__(self, config: dict[str, Any] | None = None) -> None:
    """
    Initialize the runner.

    Args:
        config: Optional configuration for metric default behaviors.
    """
    self.config = config or {}

run(df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]

Execute the provided metric processors on a DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The input Pandas DataFrame.

required
metrics_processors list[DatametricProcessor]

List of initialized DatametricProcessor instances.

required

Returns:

Type Description
dict[str, Any]

A dictionary containing the aggregated dataset-level metrics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def run(self, df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]:
    """
    Execute the provided metric processors on a DataFrame.

    Args:
        df: The input Pandas DataFrame.
        metrics_processors: List of initialized DatametricProcessor instances.

    Returns:
        A dictionary containing the aggregated dataset-level metrics.
    """
    if df.empty or not metrics_processors:
        logger.warning("Empty DataFrame or no metrics provided to MetricRunner")
        return {}

    metrics_array: dict[str, Any] = {}

    batch = pa.RecordBatch.from_pandas(df)
    batch_features: dict[str, Any] = {}
    batch_metrics: dict[str, Any] = {}

    # Compute features and batch-level metrics
    for metric in metrics_processors:
        logger.debug(f"Processing metric {metric.__class__.__name__}")
        batch_features |= metric.compute_features(batch, prev_features=batch_features)
        batch_metrics |= metric.compute_batch_metric(batch_features)

    # Merge batch metrics (trivial here as there's only one batch)
    for k, v in batch_metrics.items():
        metrics_array[k] = v

    # Compute dataset-level metrics
    dataset_metrics: dict[str, Any] = {}
    for metric in metrics_processors:
        logger.debug(f"Computing final score for {metric.__class__.__name__}")
        dataset_metrics |= metric.compute(batch_metrics=metrics_array)

    return dataset_metrics