Skip to content

dqm_ml_core.api.data_processor

Base data metric processor class.

This module contains the DatametricProcessor base class that all metric processors must inherit from. It provides the streaming architecture for processing large datasets.

logger = logging.getLogger(__name__) module-attribute

DatametricProcessor

Base class for all Data Quality metrics and feature extractors.

The processor follows a streaming lifecycle designed to handle large datasets without loading them entirely into memory:

  1. Feature Extraction (compute_features): Transformation of raw data into relevant features (e.g., image -> luminosity).
  2. Batch Aggregation (compute_batch_metric): Compression of features into intermediate statistics (e.g., count, partial sum, histogram).
  3. Global Computation (compute): Final aggregation of all batch-level statistics into dataset-level scores.
Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class DatametricProcessor:
    """
    Base class for all Data Quality metrics and feature extractors.

    The processor follows a streaming lifecycle designed to handle large datasets
    without loading them entirely into memory:

    1. Feature Extraction (`compute_features`): Transformation of raw data into
       relevant features (e.g., image -> luminosity).
    2. Batch Aggregation (`compute_batch_metric`): Compression of features into
       intermediate statistics (e.g., count, partial sum, histogram).
    3. Global Computation (`compute`): Final aggregation of all batch-level
       statistics into dataset-level scores.
    """

    def __init__(self, name: str, config: dict[str, Any] | None):
        """
        Initialize the dataset processor.

        Args:
            name: Unique name of the processor instance.
            config: Configuration dictionary (optional).
        """

        self.name = name
        self.config = config or {}

        # Validate input_columns if present
        if "input_columns" in self.config:
            if not isinstance(self.config["input_columns"], list):
                raise ValueError(
                    f"Metric {name} configuration need 'input_columns', got {type(self.config['input_columns'])}"
                )
            self.input_columns = self.config["input_columns"]
        else:
            self.input_columns = []

        # Validate output_columns if present
        if "output_columns" in self.config:
            if not isinstance(self.config["output_columns"], dict):
                raise ValueError(
                    f"Metric {name} configuration need of 'output_columns', got {type(self.config['output_columns'])}"
                )
            self.outputs_columns = self.config["output_columns"]
        else:
            self.outputs_columns = {}

    def needed_columns(self) -> list[str]:
        """
        Return the list of raw input columns required for feature extraction.

        Returns:
            A list of column names.
        """
        return getattr(self, "input_columns", [])

    def generated_features(self) -> list[str]:
        """
        Return the list of columns generated by this processor during feature extraction.

        Returns:
            A list of feature names.
        """

        outputs = getattr(self, "output_features", {})
        return list(outputs.values())

    def generated_metrics(self) -> list[str]:
        """
        Return the names of the final metrics produced by this processor.

        Returns:
            A list of metric names.
        """

        outputs = getattr(self, "output_metrics", {})
        return list(outputs.values())

    def compute_features(self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array]) -> dict[str, pa.Array]:
        """
        Transform a raw data batch into features.

        Args:
            batch: The input pyarrow RecordBatch.
            prev_features: Features already computed by preceding processors.

        Returns:
            A dictionary mapping feature names to pyarrow Arrays.
        """
        features = {}

        for col in self.needed_columns():
            if col in prev_features:
                # feature already computed no need to add it again
                continue

            if col not in batch.schema.names:
                logger.warning(f"[{self.name}] column '{col}' not found in batch")
                continue
            features[col] = batch.column(col)

        return features

    def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
        """
        Aggregate features into intermediate statistics for the current batch.

        This method is critical for scalability. It should return a compact
        representation of the data (e.g., partial sums) that can be
        efficiently combined later.

        Args:
            features: Dictionary of feature arrays computed on the batch.

        Returns:
            A dictionary of aggregated statistics.
        """
        return {}

    def compute(self, batch_metrics: dict[str, pa.Array]) -> dict[str, Any]:
        """
        Perform the final dataset-level metric calculation.

        Args:
            batch_metrics: The aggregated intermediate statistics from all batches.

        Returns:
            A dictionary containing the final metrics.
        """
        return {}

    def compute_delta(self, source: dict[str, Any], target: dict[str, Any]) -> dict[str, Any]:
        """
        Compare metrics between two different dataselection.

        Args:
            source: Final metrics from the source dataselection.
            target: Final metrics from the target dataselection.

        Returns:
            A dictionary containing distance or difference scores.
        """
        return {}

config = config or {} instance-attribute

input_columns = self.config['input_columns'] instance-attribute

name = name instance-attribute

outputs_columns = self.config['output_columns'] instance-attribute

__init__(name: str, config: dict[str, Any] | None)

Initialize the dataset processor.

Parameters:

Name Type Description Default
name str

Unique name of the processor instance.

required
config dict[str, Any] | None

Configuration dictionary (optional).

required
Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, name: str, config: dict[str, Any] | None):
    """
    Initialize the dataset processor.

    Args:
        name: Unique name of the processor instance.
        config: Configuration dictionary (optional).
    """

    self.name = name
    self.config = config or {}

    # Validate input_columns if present
    if "input_columns" in self.config:
        if not isinstance(self.config["input_columns"], list):
            raise ValueError(
                f"Metric {name} configuration need 'input_columns', got {type(self.config['input_columns'])}"
            )
        self.input_columns = self.config["input_columns"]
    else:
        self.input_columns = []

    # Validate output_columns if present
    if "output_columns" in self.config:
        if not isinstance(self.config["output_columns"], dict):
            raise ValueError(
                f"Metric {name} configuration need of 'output_columns', got {type(self.config['output_columns'])}"
            )
        self.outputs_columns = self.config["output_columns"]
    else:
        self.outputs_columns = {}

compute(batch_metrics: dict[str, pa.Array]) -> dict[str, Any]

Perform the final dataset-level metric calculation.

Parameters:

Name Type Description Default
batch_metrics dict[str, Array]

The aggregated intermediate statistics from all batches.

required

Returns:

Type Description
dict[str, Any]

A dictionary containing the final metrics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
135
136
137
138
139
140
141
142
143
144
145
def compute(self, batch_metrics: dict[str, pa.Array]) -> dict[str, Any]:
    """
    Perform the final dataset-level metric calculation.

    Args:
        batch_metrics: The aggregated intermediate statistics from all batches.

    Returns:
        A dictionary containing the final metrics.
    """
    return {}

compute_batch_metric(features: dict[str, pa.Array]) -> dict[str, pa.Array]

Aggregate features into intermediate statistics for the current batch.

This method is critical for scalability. It should return a compact representation of the data (e.g., partial sums) that can be efficiently combined later.

Parameters:

Name Type Description Default
features dict[str, Array]

Dictionary of feature arrays computed on the batch.

required

Returns:

Type Description
dict[str, Array]

A dictionary of aggregated statistics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
    """
    Aggregate features into intermediate statistics for the current batch.

    This method is critical for scalability. It should return a compact
    representation of the data (e.g., partial sums) that can be
    efficiently combined later.

    Args:
        features: Dictionary of feature arrays computed on the batch.

    Returns:
        A dictionary of aggregated statistics.
    """
    return {}

compute_delta(source: dict[str, Any], target: dict[str, Any]) -> dict[str, Any]

Compare metrics between two different dataselection.

Parameters:

Name Type Description Default
source dict[str, Any]

Final metrics from the source dataselection.

required
target dict[str, Any]

Final metrics from the target dataselection.

required

Returns:

Type Description
dict[str, Any]

A dictionary containing distance or difference scores.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
147
148
149
150
151
152
153
154
155
156
157
158
def compute_delta(self, source: dict[str, Any], target: dict[str, Any]) -> dict[str, Any]:
    """
    Compare metrics between two different dataselection.

    Args:
        source: Final metrics from the source dataselection.
        target: Final metrics from the target dataselection.

    Returns:
        A dictionary containing distance or difference scores.
    """
    return {}

compute_features(batch: pa.RecordBatch, prev_features: dict[str, pa.Array]) -> dict[str, pa.Array]

Transform a raw data batch into features.

Parameters:

Name Type Description Default
batch RecordBatch

The input pyarrow RecordBatch.

required
prev_features dict[str, Array]

Features already computed by preceding processors.

required

Returns:

Type Description
dict[str, Array]

A dictionary mapping feature names to pyarrow Arrays.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def compute_features(self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array]) -> dict[str, pa.Array]:
    """
    Transform a raw data batch into features.

    Args:
        batch: The input pyarrow RecordBatch.
        prev_features: Features already computed by preceding processors.

    Returns:
        A dictionary mapping feature names to pyarrow Arrays.
    """
    features = {}

    for col in self.needed_columns():
        if col in prev_features:
            # feature already computed no need to add it again
            continue

        if col not in batch.schema.names:
            logger.warning(f"[{self.name}] column '{col}' not found in batch")
            continue
        features[col] = batch.column(col)

    return features

generated_features() -> list[str]

Return the list of columns generated by this processor during feature extraction.

Returns:

Type Description
list[str]

A list of feature names.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
72
73
74
75
76
77
78
79
80
81
def generated_features(self) -> list[str]:
    """
    Return the list of columns generated by this processor during feature extraction.

    Returns:
        A list of feature names.
    """

    outputs = getattr(self, "output_features", {})
    return list(outputs.values())

generated_metrics() -> list[str]

Return the names of the final metrics produced by this processor.

Returns:

Type Description
list[str]

A list of metric names.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
83
84
85
86
87
88
89
90
91
92
def generated_metrics(self) -> list[str]:
    """
    Return the names of the final metrics produced by this processor.

    Returns:
        A list of metric names.
    """

    outputs = getattr(self, "output_metrics", {})
    return list(outputs.values())

needed_columns() -> list[str]

Return the list of raw input columns required for feature extraction.

Returns:

Type Description
list[str]

A list of column names.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
63
64
65
66
67
68
69
70
def needed_columns(self) -> list[str]:
    """
    Return the list of raw input columns required for feature extraction.

    Returns:
        A list of column names.
    """
    return getattr(self, "input_columns", [])