Skip to content

dqm_ml_core.utils

Utility modules for DQM ML Core.

This package contains utility classes and functions used across the DQM ML Core package, including: - MetricRunner: Orchestrator for executing metrics on DataFrames - PluginLoadedRegistry: Registry for dynamically loaded plugins

__all__ = ['MetricRunner', 'PluginLoadedRegistry'] module-attribute

MetricRunner

Orchestrator for executing metric processors on in-memory Pandas DataFrames.

This class provides a high-level API for users who want to compute metrics directly on DataFrames without using the full YAML-driven pipeline.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class MetricRunner:
    """
    Orchestrator for executing metric processors on in-memory Pandas DataFrames.

    This class provides a high-level API for users who want to compute metrics
    directly on DataFrames without using the full YAML-driven pipeline.
    """

    def __init__(self, config: dict[str, Any] | None = None) -> None:
        """
        Initialize the runner.

        Args:
            config: Optional configuration for metric default behaviors.
        """
        self.config = config or {}

    def run(self, df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]:
        """
        Execute the provided metric processors on a DataFrame.

        Args:
            df: The input Pandas DataFrame.
            metrics_processors: List of initialized DatametricProcessor instances.

        Returns:
            A dictionary containing the aggregated dataset-level metrics.
        """
        if df.empty or not metrics_processors:
            logger.warning("Empty DataFrame or no metrics provided to MetricRunner")
            return {}

        metrics_array: dict[str, Any] = {}

        batch = pa.RecordBatch.from_pandas(df)
        batch_features: dict[str, Any] = {}
        batch_metrics: dict[str, Any] = {}

        # Compute features and batch-level metrics
        for metric in metrics_processors:
            logger.debug(f"Processing metric {metric.__class__.__name__}")
            batch_features |= metric.compute_features(batch, prev_features=batch_features)
            batch_metrics |= metric.compute_batch_metric(batch_features)

        # Merge batch metrics (trivial here as there's only one batch)
        for k, v in batch_metrics.items():
            metrics_array[k] = v

        # Compute dataset-level metrics
        dataset_metrics: dict[str, Any] = {}
        for metric in metrics_processors:
            logger.debug(f"Computing final score for {metric.__class__.__name__}")
            dataset_metrics |= metric.compute(batch_metrics=metrics_array)

        return dataset_metrics

config = config or {} instance-attribute

__init__(config: dict[str, Any] | None = None) -> None

Initialize the runner.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Optional configuration for metric default behaviors.

None
Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
26
27
28
29
30
31
32
33
def __init__(self, config: dict[str, Any] | None = None) -> None:
    """
    Initialize the runner.

    Args:
        config: Optional configuration for metric default behaviors.
    """
    self.config = config or {}

run(df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]

Execute the provided metric processors on a DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The input Pandas DataFrame.

required
metrics_processors list[DatametricProcessor]

List of initialized DatametricProcessor instances.

required

Returns:

Type Description
dict[str, Any]

A dictionary containing the aggregated dataset-level metrics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def run(self, df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]:
    """
    Execute the provided metric processors on a DataFrame.

    Args:
        df: The input Pandas DataFrame.
        metrics_processors: List of initialized DatametricProcessor instances.

    Returns:
        A dictionary containing the aggregated dataset-level metrics.
    """
    if df.empty or not metrics_processors:
        logger.warning("Empty DataFrame or no metrics provided to MetricRunner")
        return {}

    metrics_array: dict[str, Any] = {}

    batch = pa.RecordBatch.from_pandas(df)
    batch_features: dict[str, Any] = {}
    batch_metrics: dict[str, Any] = {}

    # Compute features and batch-level metrics
    for metric in metrics_processors:
        logger.debug(f"Processing metric {metric.__class__.__name__}")
        batch_features |= metric.compute_features(batch, prev_features=batch_features)
        batch_metrics |= metric.compute_batch_metric(batch_features)

    # Merge batch metrics (trivial here as there's only one batch)
    for k, v in batch_metrics.items():
        metrics_array[k] = v

    # Compute dataset-level metrics
    dataset_metrics: dict[str, Any] = {}
    for metric in metrics_processors:
        logger.debug(f"Computing final score for {metric.__class__.__name__}")
        dataset_metrics |= metric.compute(batch_metrics=metrics_array)

    return dataset_metrics

PluginLoadedRegistry

Singleton registry that provides lazy access to all registered DQM components.

Components include: - Metrics (DatametricProcessor) - DataLoaders - OutputWriters

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/registry.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class PluginLoadedRegistry:
    """
    Singleton registry that provides lazy access to all registered DQM components.

    Components include:
    - Metrics (DatametricProcessor)
    - DataLoaders
    - OutputWriters
    """

    _metrics_registry: dict[str, type[DatametricProcessor]] | None = None
    _dataloaders_registry: dict[str, Any] | None = None
    _outputwriter_registry: dict[str, Any] | None = None

    @classmethod
    def get_metrics_registry(cls) -> dict[str, type[DatametricProcessor]]:
        """Return the registry of available metric processors.

        Returns:
            A dictionary mapping metric processor names to their classes.
        """
        if not cls._metrics_registry:
            cls._metrics_registry = load_registered_plugins("dqm_ml.metrics", DatametricProcessor)

        return cls._metrics_registry

    @classmethod
    def get_dataloaders_registry(cls) -> dict[str, Any]:
        """Return the registry of available data loaders.

        Returns:
            A dictionary mapping data loader names to their classes.
        """
        if not cls._dataloaders_registry:
            cls._dataloaders_registry = load_registered_plugins("dqm_ml.dataloaders", None)  # TODO add base class
        return cls._dataloaders_registry

    @classmethod
    def get_outputwriter_registry(cls) -> dict[str, Any]:
        """Return the registry of available output writers.

        Returns:
            A dictionary mapping output writer names to their classes.
        """
        if not cls._outputwriter_registry:
            cls._outputwriter_registry = load_registered_plugins("dqm_ml.outputwriter", None)  # TODO add base class

        return cls._outputwriter_registry

get_dataloaders_registry() -> dict[str, Any] classmethod

Return the registry of available data loaders.

Returns:

Type Description
dict[str, Any]

A dictionary mapping data loader names to their classes.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/registry.py
79
80
81
82
83
84
85
86
87
88
@classmethod
def get_dataloaders_registry(cls) -> dict[str, Any]:
    """Return the registry of available data loaders.

    Returns:
        A dictionary mapping data loader names to their classes.
    """
    if not cls._dataloaders_registry:
        cls._dataloaders_registry = load_registered_plugins("dqm_ml.dataloaders", None)  # TODO add base class
    return cls._dataloaders_registry

get_metrics_registry() -> dict[str, type[DatametricProcessor]] classmethod

Return the registry of available metric processors.

Returns:

Type Description
dict[str, type[DatametricProcessor]]

A dictionary mapping metric processor names to their classes.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/registry.py
67
68
69
70
71
72
73
74
75
76
77
@classmethod
def get_metrics_registry(cls) -> dict[str, type[DatametricProcessor]]:
    """Return the registry of available metric processors.

    Returns:
        A dictionary mapping metric processor names to their classes.
    """
    if not cls._metrics_registry:
        cls._metrics_registry = load_registered_plugins("dqm_ml.metrics", DatametricProcessor)

    return cls._metrics_registry

get_outputwriter_registry() -> dict[str, Any] classmethod

Return the registry of available output writers.

Returns:

Type Description
dict[str, Any]

A dictionary mapping output writer names to their classes.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/registry.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@classmethod
def get_outputwriter_registry(cls) -> dict[str, Any]:
    """Return the registry of available output writers.

    Returns:
        A dictionary mapping output writer names to their classes.
    """
    if not cls._outputwriter_registry:
        cls._outputwriter_registry = load_registered_plugins("dqm_ml.outputwriter", None)  # TODO add base class

    return cls._outputwriter_registry