Skip to content

dqm_ml_core

DQM ML Core package for data quality metrics processing.

This package provides core components for computing data quality metrics on datasets using a streaming architecture. It includes base classes for metric processors and implementations for common metrics like completeness and representativeness.

Main components: - DatametricProcessor: Base class for all data quality metrics - CompletenessProcessor: Computes data completeness scores - RepresentativenessProcessor: Evaluates distribution representativeness - MetricRunner: Orchestrator for running metrics on DataFrames - PluginLoadedRegistry: Registry for dynamically loaded metric plugins

__all__ = ['CompletenessProcessor', 'DatametricProcessor', 'MetricRunner', 'PluginLoadedRegistry', 'RepresentativenessProcessor'] module-attribute

CompletenessProcessor

Bases: DatametricProcessor

Data completeness processor that evaluates the completeness of tabular data.

This processor calculates completeness scores (ratio of non-null to total values) for specified columns and provides overall dataset completeness metrics.

The processor operates at multiple levels: - Batch level: Aggregated counts for streaming processing - Dataset level: Final completeness scores and statistics

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/completeness.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class CompletenessProcessor(DatametricProcessor):
    """
    Data completeness processor that evaluates the completeness of tabular data.

    This processor calculates completeness scores (ratio of non-null to total values)
    for specified columns and provides overall dataset completeness metrics.

    The processor operates at multiple levels:
    - Batch level: Aggregated counts for streaming processing
    - Dataset level: Final completeness scores and statistics
    """

    def __init__(self, name: str = "completeness", config: dict[str, Any] | None = None) -> None:
        """
        Initialize the completeness processor.

        Args:
            name: Name of the processor
            config: Configuration dictionary containing:
                - input_columns: List of columns to analyze for completeness
                - output_metrics: Dictionary mapping metric names to output column names
                - include_per_column: Whether to include per-column completeness scores
                - include_overall: Whether to include overall completeness score
        """
        super().__init__(name, config)

        cfg = self.config or {}

        # Configuration for what metrics to compute - pour chaque colonne et pour l'ensemble des colonnes
        self.include_per_column: bool = bool(cfg.get("include_per_column", True))
        self.include_overall: bool = bool(cfg.get("include_overall", True))
        self.include_metadata: bool = bool(cfg.get("include_metadata", False))

        # Output column mappings
        self.output_metrics = cfg.get("output_metrics", {})

        # Validation
        if not self.include_per_column and not self.include_overall:
            raise ValueError(f"[{self.name}] At least one of 'include_per_column' or 'include_overall' must be True")

    @override
    def generated_metrics(self) -> list[str]:
        """
        Return the list of metric columns that will be generated.

        Returns:
            List of output metric column names
        """
        # TODO : manage output metrics names with configuration
        metrics = []

        if self.include_overall:
            overall_key = self.output_metrics.get("overall_completeness", "completeness_overall")
            metrics.append(overall_key)

        if self.include_per_column:
            for col in self.input_columns:
                col_key = self.output_metrics.get(f"completeness_{col}", f"completeness_{col}")
                metrics.append(col_key)

        return metrics

    @override
    def compute_features(
        self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array] | None = None
    ) -> dict[str, pa.Array]:
        """
        Extract the needed columns from the batch for completeness analysis.

        This method simply passes through the columns we need to analyze,
        as completeness calculation is done at batch and dataset levels.

        Args:
            batch: Input batch of data
            prev_features: Previous features (not used in this processor)

        Returns:
            Dictionary containing the columns to analyze
        """
        features = {}

        columns_to_analyze = self.input_columns if self.input_columns else batch.column_names

        for col in columns_to_analyze:
            if col not in batch.schema.names:
                logger.warning(f"[{self.name}] column '{col}' not found in batch")
                continue

            # Simply pass through the column data for batch-level processing
            features[col] = batch.column(col)

        return features

    @override
    def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
        """
        Compute batch-level completeness counts for streaming aggregation.

        This counts total and non-null values per column in this batch,
        which will be aggregated across all batches for final dataset completeness.

        Args:
            features: Dictionary of column arrays from this batch

        Returns:
            Dictionary of batch-level completeness counts
        """
        batch_metrics = {}

        for col, col_array in features.items():
            # Count total samples in this batch
            total_count = len(col_array)

            # Count non-null (complete) samples in this batch
            # pa.compute.is_valid() returns True for non-null values
            is_valid_mask = pa.compute.is_valid(col_array)
            complete_count = pa.compute.sum(is_valid_mask).as_py()

            # store counts for aggregation across batches
            batch_metrics[f"{col}_total_count"] = pa.array([total_count], type=pa.int64())
            batch_metrics[f"{col}_complete_count"] = pa.array([complete_count], type=pa.int64())

        return batch_metrics

    @override
    def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]:
        """
        Compute final dataset-level completeness metrics.

        This aggregates the batch-level counts to compute final completeness scores
        for each column and overall dataset completeness.

        Args:
            batch_metrics: Dictionary of batch-level metrics to aggregate

        Returns:
            Dictionary of final completeness metrics
        """
        if not batch_metrics:
            return {"_metadata": {"error": "No batch metrics provided"}}

        results: dict[str, Any] = {}

        # Determine columns from batch metrics
        columns_analyzed = []
        for key in batch_metrics:
            if key.endswith("_total_count"):
                col = key.replace("_total_count", "")
                columns_analyzed.append(col)

        if not columns_analyzed:
            logger.warning(f"[{self.name}] No columns found in batch metrics")
            return {"_metadata": {"error": "No columns found in batch metrics"}}

        per_column_completeness = {}
        total_samples = 0
        total_complete = 0

        # Calculate completeness for each column
        for col in columns_analyzed:
            total_key = f"{col}_total_count"
            complete_key = f"{col}_complete_count"

            if total_key not in batch_metrics or complete_key not in batch_metrics:
                logger.warning(f"[{self.name}] Missing batch metrics for column '{col}'")
                continue

            # agg counts across all batches
            col_total = int(np.sum(batch_metrics[total_key].to_numpy()))
            col_complete = int(np.sum(batch_metrics[complete_key].to_numpy()))

            # Calculate completeness score for this column
            completeness_score = col_complete / col_total if col_total > 0 else 0.0

            per_column_completeness[col] = completeness_score

            # Add to overall totals
            total_samples += col_total
            total_complete += col_complete

        # Generate output metrics based on configuration
        if self.include_per_column:
            for col, score in per_column_completeness.items():
                output_key = self.output_metrics.get(f"completeness_{col}", f"completeness_{col}")
                results[output_key] = score

        if self.include_overall:
            # Calculate overall completeness as average of column completeness scores
            if per_column_completeness:
                overall_completeness = sum(per_column_completeness.values()) / len(per_column_completeness)
            else:
                overall_completeness = 0.0

            output_key = self.output_metrics.get("overall_completeness", "completeness_overall")
            results[output_key] = overall_completeness

        # Add metadata
        metadata = {
            "columns_analyzed": columns_analyzed,
            "total_samples_per_column": total_samples // len(columns_analyzed) if columns_analyzed else 0,
            "per_column_scores": per_column_completeness,
            "overall_score": sum(per_column_completeness.values()) / len(per_column_completeness)
            if per_column_completeness
            else 0.0,
        }

        if self.include_metadata:
            results["_metadata"] = json.dumps(metadata)

        return results

    def reset(self) -> None:
        """Reset processor state for new processing run."""

include_metadata: bool = bool(cfg.get('include_metadata', False)) instance-attribute

include_overall: bool = bool(cfg.get('include_overall', True)) instance-attribute

include_per_column: bool = bool(cfg.get('include_per_column', True)) instance-attribute

output_metrics = cfg.get('output_metrics', {}) instance-attribute

__init__(name: str = 'completeness', config: dict[str, Any] | None = None) -> None

Initialize the completeness processor.

Parameters:

Name Type Description Default
name str

Name of the processor

'completeness'
config dict[str, Any] | None

Configuration dictionary containing: - input_columns: List of columns to analyze for completeness - output_metrics: Dictionary mapping metric names to output column names - include_per_column: Whether to include per-column completeness scores - include_overall: Whether to include overall completeness score

None
Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/completeness.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(self, name: str = "completeness", config: dict[str, Any] | None = None) -> None:
    """
    Initialize the completeness processor.

    Args:
        name: Name of the processor
        config: Configuration dictionary containing:
            - input_columns: List of columns to analyze for completeness
            - output_metrics: Dictionary mapping metric names to output column names
            - include_per_column: Whether to include per-column completeness scores
            - include_overall: Whether to include overall completeness score
    """
    super().__init__(name, config)

    cfg = self.config or {}

    # Configuration for what metrics to compute - pour chaque colonne et pour l'ensemble des colonnes
    self.include_per_column: bool = bool(cfg.get("include_per_column", True))
    self.include_overall: bool = bool(cfg.get("include_overall", True))
    self.include_metadata: bool = bool(cfg.get("include_metadata", False))

    # Output column mappings
    self.output_metrics = cfg.get("output_metrics", {})

    # Validation
    if not self.include_per_column and not self.include_overall:
        raise ValueError(f"[{self.name}] At least one of 'include_per_column' or 'include_overall' must be True")

compute(batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]

Compute final dataset-level completeness metrics.

This aggregates the batch-level counts to compute final completeness scores for each column and overall dataset completeness.

Parameters:

Name Type Description Default
batch_metrics dict[str, Array] | None

Dictionary of batch-level metrics to aggregate

None

Returns:

Type Description
dict[str, Any]

Dictionary of final completeness metrics

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/completeness.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@override
def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]:
    """
    Compute final dataset-level completeness metrics.

    This aggregates the batch-level counts to compute final completeness scores
    for each column and overall dataset completeness.

    Args:
        batch_metrics: Dictionary of batch-level metrics to aggregate

    Returns:
        Dictionary of final completeness metrics
    """
    if not batch_metrics:
        return {"_metadata": {"error": "No batch metrics provided"}}

    results: dict[str, Any] = {}

    # Determine columns from batch metrics
    columns_analyzed = []
    for key in batch_metrics:
        if key.endswith("_total_count"):
            col = key.replace("_total_count", "")
            columns_analyzed.append(col)

    if not columns_analyzed:
        logger.warning(f"[{self.name}] No columns found in batch metrics")
        return {"_metadata": {"error": "No columns found in batch metrics"}}

    per_column_completeness = {}
    total_samples = 0
    total_complete = 0

    # Calculate completeness for each column
    for col in columns_analyzed:
        total_key = f"{col}_total_count"
        complete_key = f"{col}_complete_count"

        if total_key not in batch_metrics or complete_key not in batch_metrics:
            logger.warning(f"[{self.name}] Missing batch metrics for column '{col}'")
            continue

        # agg counts across all batches
        col_total = int(np.sum(batch_metrics[total_key].to_numpy()))
        col_complete = int(np.sum(batch_metrics[complete_key].to_numpy()))

        # Calculate completeness score for this column
        completeness_score = col_complete / col_total if col_total > 0 else 0.0

        per_column_completeness[col] = completeness_score

        # Add to overall totals
        total_samples += col_total
        total_complete += col_complete

    # Generate output metrics based on configuration
    if self.include_per_column:
        for col, score in per_column_completeness.items():
            output_key = self.output_metrics.get(f"completeness_{col}", f"completeness_{col}")
            results[output_key] = score

    if self.include_overall:
        # Calculate overall completeness as average of column completeness scores
        if per_column_completeness:
            overall_completeness = sum(per_column_completeness.values()) / len(per_column_completeness)
        else:
            overall_completeness = 0.0

        output_key = self.output_metrics.get("overall_completeness", "completeness_overall")
        results[output_key] = overall_completeness

    # Add metadata
    metadata = {
        "columns_analyzed": columns_analyzed,
        "total_samples_per_column": total_samples // len(columns_analyzed) if columns_analyzed else 0,
        "per_column_scores": per_column_completeness,
        "overall_score": sum(per_column_completeness.values()) / len(per_column_completeness)
        if per_column_completeness
        else 0.0,
    }

    if self.include_metadata:
        results["_metadata"] = json.dumps(metadata)

    return results

compute_batch_metric(features: dict[str, pa.Array]) -> dict[str, pa.Array]

Compute batch-level completeness counts for streaming aggregation.

This counts total and non-null values per column in this batch, which will be aggregated across all batches for final dataset completeness.

Parameters:

Name Type Description Default
features dict[str, Array]

Dictionary of column arrays from this batch

required

Returns:

Type Description
dict[str, Array]

Dictionary of batch-level completeness counts

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/completeness.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@override
def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
    """
    Compute batch-level completeness counts for streaming aggregation.

    This counts total and non-null values per column in this batch,
    which will be aggregated across all batches for final dataset completeness.

    Args:
        features: Dictionary of column arrays from this batch

    Returns:
        Dictionary of batch-level completeness counts
    """
    batch_metrics = {}

    for col, col_array in features.items():
        # Count total samples in this batch
        total_count = len(col_array)

        # Count non-null (complete) samples in this batch
        # pa.compute.is_valid() returns True for non-null values
        is_valid_mask = pa.compute.is_valid(col_array)
        complete_count = pa.compute.sum(is_valid_mask).as_py()

        # store counts for aggregation across batches
        batch_metrics[f"{col}_total_count"] = pa.array([total_count], type=pa.int64())
        batch_metrics[f"{col}_complete_count"] = pa.array([complete_count], type=pa.int64())

    return batch_metrics

compute_features(batch: pa.RecordBatch, prev_features: dict[str, pa.Array] | None = None) -> dict[str, pa.Array]

Extract the needed columns from the batch for completeness analysis.

This method simply passes through the columns we need to analyze, as completeness calculation is done at batch and dataset levels.

Parameters:

Name Type Description Default
batch RecordBatch

Input batch of data

required
prev_features dict[str, Array] | None

Previous features (not used in this processor)

None

Returns:

Type Description
dict[str, Array]

Dictionary containing the columns to analyze

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/completeness.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@override
def compute_features(
    self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array] | None = None
) -> dict[str, pa.Array]:
    """
    Extract the needed columns from the batch for completeness analysis.

    This method simply passes through the columns we need to analyze,
    as completeness calculation is done at batch and dataset levels.

    Args:
        batch: Input batch of data
        prev_features: Previous features (not used in this processor)

    Returns:
        Dictionary containing the columns to analyze
    """
    features = {}

    columns_to_analyze = self.input_columns if self.input_columns else batch.column_names

    for col in columns_to_analyze:
        if col not in batch.schema.names:
            logger.warning(f"[{self.name}] column '{col}' not found in batch")
            continue

        # Simply pass through the column data for batch-level processing
        features[col] = batch.column(col)

    return features

generated_metrics() -> list[str]

Return the list of metric columns that will be generated.

Returns:

Type Description
list[str]

List of output metric column names

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/completeness.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@override
def generated_metrics(self) -> list[str]:
    """
    Return the list of metric columns that will be generated.

    Returns:
        List of output metric column names
    """
    # TODO : manage output metrics names with configuration
    metrics = []

    if self.include_overall:
        overall_key = self.output_metrics.get("overall_completeness", "completeness_overall")
        metrics.append(overall_key)

    if self.include_per_column:
        for col in self.input_columns:
            col_key = self.output_metrics.get(f"completeness_{col}", f"completeness_{col}")
            metrics.append(col_key)

    return metrics

reset() -> None

Reset processor state for new processing run.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/completeness.py
233
234
def reset(self) -> None:
    """Reset processor state for new processing run."""

DatametricProcessor

Base class for all Data Quality metrics and feature extractors.

The processor follows a streaming lifecycle designed to handle large datasets without loading them entirely into memory:

  1. Feature Extraction (compute_features): Transformation of raw data into relevant features (e.g., image -> luminosity).
  2. Batch Aggregation (compute_batch_metric): Compression of features into intermediate statistics (e.g., count, partial sum, histogram).
  3. Global Computation (compute): Final aggregation of all batch-level statistics into dataset-level scores.
Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class DatametricProcessor:
    """
    Base class for all Data Quality metrics and feature extractors.

    The processor follows a streaming lifecycle designed to handle large datasets
    without loading them entirely into memory:

    1. Feature Extraction (`compute_features`): Transformation of raw data into
       relevant features (e.g., image -> luminosity).
    2. Batch Aggregation (`compute_batch_metric`): Compression of features into
       intermediate statistics (e.g., count, partial sum, histogram).
    3. Global Computation (`compute`): Final aggregation of all batch-level
       statistics into dataset-level scores.
    """

    def __init__(self, name: str, config: dict[str, Any] | None):
        """
        Initialize the dataset processor.

        Args:
            name: Unique name of the processor instance.
            config: Configuration dictionary (optional).
        """

        self.name = name
        self.config = config or {}

        # Validate input_columns if present
        if "input_columns" in self.config:
            if not isinstance(self.config["input_columns"], list):
                raise ValueError(
                    f"Metric {name} configuration need 'input_columns', got {type(self.config['input_columns'])}"
                )
            self.input_columns = self.config["input_columns"]
        else:
            self.input_columns = []

        # Validate output_columns if present
        if "output_columns" in self.config:
            if not isinstance(self.config["output_columns"], dict):
                raise ValueError(
                    f"Metric {name} configuration need of 'output_columns', got {type(self.config['output_columns'])}"
                )
            self.outputs_columns = self.config["output_columns"]
        else:
            self.outputs_columns = {}

    def needed_columns(self) -> list[str]:
        """
        Return the list of raw input columns required for feature extraction.

        Returns:
            A list of column names.
        """
        return getattr(self, "input_columns", [])

    def generated_features(self) -> list[str]:
        """
        Return the list of columns generated by this processor during feature extraction.

        Returns:
            A list of feature names.
        """

        outputs = getattr(self, "output_features", {})
        return list(outputs.values())

    def generated_metrics(self) -> list[str]:
        """
        Return the names of the final metrics produced by this processor.

        Returns:
            A list of metric names.
        """

        outputs = getattr(self, "output_metrics", {})
        return list(outputs.values())

    def compute_features(self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array]) -> dict[str, pa.Array]:
        """
        Transform a raw data batch into features.

        Args:
            batch: The input pyarrow RecordBatch.
            prev_features: Features already computed by preceding processors.

        Returns:
            A dictionary mapping feature names to pyarrow Arrays.
        """
        features = {}

        for col in self.needed_columns():
            if col in prev_features:
                # feature already computed no need to add it again
                continue

            if col not in batch.schema.names:
                logger.warning(f"[{self.name}] column '{col}' not found in batch")
                continue
            features[col] = batch.column(col)

        return features

    def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
        """
        Aggregate features into intermediate statistics for the current batch.

        This method is critical for scalability. It should return a compact
        representation of the data (e.g., partial sums) that can be
        efficiently combined later.

        Args:
            features: Dictionary of feature arrays computed on the batch.

        Returns:
            A dictionary of aggregated statistics.
        """
        return {}

    def compute(self, batch_metrics: dict[str, pa.Array]) -> dict[str, Any]:
        """
        Perform the final dataset-level metric calculation.

        Args:
            batch_metrics: The aggregated intermediate statistics from all batches.

        Returns:
            A dictionary containing the final metrics.
        """
        return {}

    def compute_delta(self, source: dict[str, Any], target: dict[str, Any]) -> dict[str, Any]:
        """
        Compare metrics between two different dataselection.

        Args:
            source: Final metrics from the source dataselection.
            target: Final metrics from the target dataselection.

        Returns:
            A dictionary containing distance or difference scores.
        """
        return {}

config = config or {} instance-attribute

input_columns = self.config['input_columns'] instance-attribute

name = name instance-attribute

outputs_columns = self.config['output_columns'] instance-attribute

__init__(name: str, config: dict[str, Any] | None)

Initialize the dataset processor.

Parameters:

Name Type Description Default
name str

Unique name of the processor instance.

required
config dict[str, Any] | None

Configuration dictionary (optional).

required
Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, name: str, config: dict[str, Any] | None):
    """
    Initialize the dataset processor.

    Args:
        name: Unique name of the processor instance.
        config: Configuration dictionary (optional).
    """

    self.name = name
    self.config = config or {}

    # Validate input_columns if present
    if "input_columns" in self.config:
        if not isinstance(self.config["input_columns"], list):
            raise ValueError(
                f"Metric {name} configuration need 'input_columns', got {type(self.config['input_columns'])}"
            )
        self.input_columns = self.config["input_columns"]
    else:
        self.input_columns = []

    # Validate output_columns if present
    if "output_columns" in self.config:
        if not isinstance(self.config["output_columns"], dict):
            raise ValueError(
                f"Metric {name} configuration need of 'output_columns', got {type(self.config['output_columns'])}"
            )
        self.outputs_columns = self.config["output_columns"]
    else:
        self.outputs_columns = {}

compute(batch_metrics: dict[str, pa.Array]) -> dict[str, Any]

Perform the final dataset-level metric calculation.

Parameters:

Name Type Description Default
batch_metrics dict[str, Array]

The aggregated intermediate statistics from all batches.

required

Returns:

Type Description
dict[str, Any]

A dictionary containing the final metrics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
135
136
137
138
139
140
141
142
143
144
145
def compute(self, batch_metrics: dict[str, pa.Array]) -> dict[str, Any]:
    """
    Perform the final dataset-level metric calculation.

    Args:
        batch_metrics: The aggregated intermediate statistics from all batches.

    Returns:
        A dictionary containing the final metrics.
    """
    return {}

compute_batch_metric(features: dict[str, pa.Array]) -> dict[str, pa.Array]

Aggregate features into intermediate statistics for the current batch.

This method is critical for scalability. It should return a compact representation of the data (e.g., partial sums) that can be efficiently combined later.

Parameters:

Name Type Description Default
features dict[str, Array]

Dictionary of feature arrays computed on the batch.

required

Returns:

Type Description
dict[str, Array]

A dictionary of aggregated statistics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
    """
    Aggregate features into intermediate statistics for the current batch.

    This method is critical for scalability. It should return a compact
    representation of the data (e.g., partial sums) that can be
    efficiently combined later.

    Args:
        features: Dictionary of feature arrays computed on the batch.

    Returns:
        A dictionary of aggregated statistics.
    """
    return {}

compute_delta(source: dict[str, Any], target: dict[str, Any]) -> dict[str, Any]

Compare metrics between two different dataselection.

Parameters:

Name Type Description Default
source dict[str, Any]

Final metrics from the source dataselection.

required
target dict[str, Any]

Final metrics from the target dataselection.

required

Returns:

Type Description
dict[str, Any]

A dictionary containing distance or difference scores.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
147
148
149
150
151
152
153
154
155
156
157
158
def compute_delta(self, source: dict[str, Any], target: dict[str, Any]) -> dict[str, Any]:
    """
    Compare metrics between two different dataselection.

    Args:
        source: Final metrics from the source dataselection.
        target: Final metrics from the target dataselection.

    Returns:
        A dictionary containing distance or difference scores.
    """
    return {}

compute_features(batch: pa.RecordBatch, prev_features: dict[str, pa.Array]) -> dict[str, pa.Array]

Transform a raw data batch into features.

Parameters:

Name Type Description Default
batch RecordBatch

The input pyarrow RecordBatch.

required
prev_features dict[str, Array]

Features already computed by preceding processors.

required

Returns:

Type Description
dict[str, Array]

A dictionary mapping feature names to pyarrow Arrays.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def compute_features(self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array]) -> dict[str, pa.Array]:
    """
    Transform a raw data batch into features.

    Args:
        batch: The input pyarrow RecordBatch.
        prev_features: Features already computed by preceding processors.

    Returns:
        A dictionary mapping feature names to pyarrow Arrays.
    """
    features = {}

    for col in self.needed_columns():
        if col in prev_features:
            # feature already computed no need to add it again
            continue

        if col not in batch.schema.names:
            logger.warning(f"[{self.name}] column '{col}' not found in batch")
            continue
        features[col] = batch.column(col)

    return features

generated_features() -> list[str]

Return the list of columns generated by this processor during feature extraction.

Returns:

Type Description
list[str]

A list of feature names.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
72
73
74
75
76
77
78
79
80
81
def generated_features(self) -> list[str]:
    """
    Return the list of columns generated by this processor during feature extraction.

    Returns:
        A list of feature names.
    """

    outputs = getattr(self, "output_features", {})
    return list(outputs.values())

generated_metrics() -> list[str]

Return the names of the final metrics produced by this processor.

Returns:

Type Description
list[str]

A list of metric names.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
83
84
85
86
87
88
89
90
91
92
def generated_metrics(self) -> list[str]:
    """
    Return the names of the final metrics produced by this processor.

    Returns:
        A list of metric names.
    """

    outputs = getattr(self, "output_metrics", {})
    return list(outputs.values())

needed_columns() -> list[str]

Return the list of raw input columns required for feature extraction.

Returns:

Type Description
list[str]

A list of column names.

Source code in packages/dqm-ml-core/src/dqm_ml_core/api/data_processor.py
63
64
65
66
67
68
69
70
def needed_columns(self) -> list[str]:
    """
    Return the list of raw input columns required for feature extraction.

    Returns:
        A list of column names.
    """
    return getattr(self, "input_columns", [])

MetricRunner

Orchestrator for executing metric processors on in-memory Pandas DataFrames.

This class provides a high-level API for users who want to compute metrics directly on DataFrames without using the full YAML-driven pipeline.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class MetricRunner:
    """
    Orchestrator for executing metric processors on in-memory Pandas DataFrames.

    This class provides a high-level API for users who want to compute metrics
    directly on DataFrames without using the full YAML-driven pipeline.
    """

    def __init__(self, config: dict[str, Any] | None = None) -> None:
        """
        Initialize the runner.

        Args:
            config: Optional configuration for metric default behaviors.
        """
        self.config = config or {}

    def run(self, df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]:
        """
        Execute the provided metric processors on a DataFrame.

        Args:
            df: The input Pandas DataFrame.
            metrics_processors: List of initialized DatametricProcessor instances.

        Returns:
            A dictionary containing the aggregated dataset-level metrics.
        """
        if df.empty or not metrics_processors:
            logger.warning("Empty DataFrame or no metrics provided to MetricRunner")
            return {}

        metrics_array: dict[str, Any] = {}

        batch = pa.RecordBatch.from_pandas(df)
        batch_features: dict[str, Any] = {}
        batch_metrics: dict[str, Any] = {}

        # Compute features and batch-level metrics
        for metric in metrics_processors:
            logger.debug(f"Processing metric {metric.__class__.__name__}")
            batch_features |= metric.compute_features(batch, prev_features=batch_features)
            batch_metrics |= metric.compute_batch_metric(batch_features)

        # Merge batch metrics (trivial here as there's only one batch)
        for k, v in batch_metrics.items():
            metrics_array[k] = v

        # Compute dataset-level metrics
        dataset_metrics: dict[str, Any] = {}
        for metric in metrics_processors:
            logger.debug(f"Computing final score for {metric.__class__.__name__}")
            dataset_metrics |= metric.compute(batch_metrics=metrics_array)

        return dataset_metrics

config = config or {} instance-attribute

__init__(config: dict[str, Any] | None = None) -> None

Initialize the runner.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Optional configuration for metric default behaviors.

None
Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
26
27
28
29
30
31
32
33
def __init__(self, config: dict[str, Any] | None = None) -> None:
    """
    Initialize the runner.

    Args:
        config: Optional configuration for metric default behaviors.
    """
    self.config = config or {}

run(df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]

Execute the provided metric processors on a DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The input Pandas DataFrame.

required
metrics_processors list[DatametricProcessor]

List of initialized DatametricProcessor instances.

required

Returns:

Type Description
dict[str, Any]

A dictionary containing the aggregated dataset-level metrics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/metric_runner.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def run(self, df: DataFrame, metrics_processors: list[DatametricProcessor]) -> dict[str, Any]:
    """
    Execute the provided metric processors on a DataFrame.

    Args:
        df: The input Pandas DataFrame.
        metrics_processors: List of initialized DatametricProcessor instances.

    Returns:
        A dictionary containing the aggregated dataset-level metrics.
    """
    if df.empty or not metrics_processors:
        logger.warning("Empty DataFrame or no metrics provided to MetricRunner")
        return {}

    metrics_array: dict[str, Any] = {}

    batch = pa.RecordBatch.from_pandas(df)
    batch_features: dict[str, Any] = {}
    batch_metrics: dict[str, Any] = {}

    # Compute features and batch-level metrics
    for metric in metrics_processors:
        logger.debug(f"Processing metric {metric.__class__.__name__}")
        batch_features |= metric.compute_features(batch, prev_features=batch_features)
        batch_metrics |= metric.compute_batch_metric(batch_features)

    # Merge batch metrics (trivial here as there's only one batch)
    for k, v in batch_metrics.items():
        metrics_array[k] = v

    # Compute dataset-level metrics
    dataset_metrics: dict[str, Any] = {}
    for metric in metrics_processors:
        logger.debug(f"Computing final score for {metric.__class__.__name__}")
        dataset_metrics |= metric.compute(batch_metrics=metrics_array)

    return dataset_metrics

PluginLoadedRegistry

Singleton registry that provides lazy access to all registered DQM components.

Components include: - Metrics (DatametricProcessor) - DataLoaders - OutputWriters

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/registry.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class PluginLoadedRegistry:
    """
    Singleton registry that provides lazy access to all registered DQM components.

    Components include:
    - Metrics (DatametricProcessor)
    - DataLoaders
    - OutputWriters
    """

    _metrics_registry: dict[str, type[DatametricProcessor]] | None = None
    _dataloaders_registry: dict[str, Any] | None = None
    _outputwriter_registry: dict[str, Any] | None = None

    @classmethod
    def get_metrics_registry(cls) -> dict[str, type[DatametricProcessor]]:
        """Return the registry of available metric processors.

        Returns:
            A dictionary mapping metric processor names to their classes.
        """
        if not cls._metrics_registry:
            cls._metrics_registry = load_registered_plugins("dqm_ml.metrics", DatametricProcessor)

        return cls._metrics_registry

    @classmethod
    def get_dataloaders_registry(cls) -> dict[str, Any]:
        """Return the registry of available data loaders.

        Returns:
            A dictionary mapping data loader names to their classes.
        """
        if not cls._dataloaders_registry:
            cls._dataloaders_registry = load_registered_plugins("dqm_ml.dataloaders", None)  # TODO add base class
        return cls._dataloaders_registry

    @classmethod
    def get_outputwriter_registry(cls) -> dict[str, Any]:
        """Return the registry of available output writers.

        Returns:
            A dictionary mapping output writer names to their classes.
        """
        if not cls._outputwriter_registry:
            cls._outputwriter_registry = load_registered_plugins("dqm_ml.outputwriter", None)  # TODO add base class

        return cls._outputwriter_registry

get_dataloaders_registry() -> dict[str, Any] classmethod

Return the registry of available data loaders.

Returns:

Type Description
dict[str, Any]

A dictionary mapping data loader names to their classes.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/registry.py
79
80
81
82
83
84
85
86
87
88
@classmethod
def get_dataloaders_registry(cls) -> dict[str, Any]:
    """Return the registry of available data loaders.

    Returns:
        A dictionary mapping data loader names to their classes.
    """
    if not cls._dataloaders_registry:
        cls._dataloaders_registry = load_registered_plugins("dqm_ml.dataloaders", None)  # TODO add base class
    return cls._dataloaders_registry

get_metrics_registry() -> dict[str, type[DatametricProcessor]] classmethod

Return the registry of available metric processors.

Returns:

Type Description
dict[str, type[DatametricProcessor]]

A dictionary mapping metric processor names to their classes.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/registry.py
67
68
69
70
71
72
73
74
75
76
77
@classmethod
def get_metrics_registry(cls) -> dict[str, type[DatametricProcessor]]:
    """Return the registry of available metric processors.

    Returns:
        A dictionary mapping metric processor names to their classes.
    """
    if not cls._metrics_registry:
        cls._metrics_registry = load_registered_plugins("dqm_ml.metrics", DatametricProcessor)

    return cls._metrics_registry

get_outputwriter_registry() -> dict[str, Any] classmethod

Return the registry of available output writers.

Returns:

Type Description
dict[str, Any]

A dictionary mapping output writer names to their classes.

Source code in packages/dqm-ml-core/src/dqm_ml_core/utils/registry.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@classmethod
def get_outputwriter_registry(cls) -> dict[str, Any]:
    """Return the registry of available output writers.

    Returns:
        A dictionary mapping output writer names to their classes.
    """
    if not cls._outputwriter_registry:
        cls._outputwriter_registry = load_registered_plugins("dqm_ml.outputwriter", None)  # TODO add base class

    return cls._outputwriter_registry

RepresentativenessProcessor

Bases: DatametricProcessor

Evaluates how well the dataset represents a target statistical distribution.

This processor performs on samples discretisation statistical tests to compare the observed distribution of numerical columns against a theoretical target distribution (Normal or Uniform).

Supported Metrics
  • Chi-square: Goodness-of-fit test for categorical/binned data.
  • Kolmogorov-Smirnov (KS): Non-parametric test for continuous distributions (approximated via sampling).
  • Shannon Entropy: Measures the information diversity of the binned data.
  • GRTE (Geometric Representativeness Trajectory Error): Measures the exponential gap between observed and theoretical entropy.

The processor uses a streaming architecture: - Batch level: Computes partial calculus. - Dataset level: Aggregates histograms and performs final statistical tests.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
class RepresentativenessProcessor(DatametricProcessor):
    """
    Evaluates how well the dataset represents a target statistical distribution.

    This processor performs on samples discretisation statistical tests to compare the observed
    distribution of numerical columns against a theoretical target distribution
    (Normal or Uniform).

    Supported Metrics:
      - Chi-square: Goodness-of-fit test for categorical/binned data.
      - Kolmogorov-Smirnov (KS): Non-parametric test for continuous distributions (approximated via sampling).
      - Shannon Entropy: Measures the information diversity of the binned data.
      - GRTE (Geometric Representativeness Trajectory Error): Measures the exponential gap
        between observed and theoretical entropy.

    The processor uses a streaming architecture:
    - Batch level: Computes partial calculus.
    - Dataset level: Aggregates histograms and performs final statistical tests.
    """

    SUPPORTED_METRICS = {
        "chi-square",
        "grte",
        "shannon-entropy",
        "kolmogorov-smirnov",
    }
    SUPPORTED_DISTS = {"normal", "uniform"}

    # Configuration constants - can be overridden in config
    DEFAULT_ALPHA = 0.05  # Significance level for statistical tests
    DEFAULT_SHANNON_ENTROPY_THRESHOLD = 2.0  # Threshold for high/low diversity interpretation
    DEFAULT_GRTE_THRESHOLD = 0.5  # Threshold for high/low representativeness interpretation
    DEFAULT_KS_SAMPLE_SIZE = 500  # Maximum sample size for KS test
    DEFAULT_KS_MIN_SAMPLE_SIZE = 50  # Minimum sample size for KS test
    DEFAULT_KS_SAMPLE_DIVISOR = 20  # Divisor for calculating sample size per batch
    DEFAULT_EPSILON = 1e-9  # Small value to avoid division by zero
    DEFAULT_INTERPRETATION_THRESHOLDS = {
        "follows_distribution": "follows_distribution",
        "does_not_follow_distribution": "does_not_follow_distribution",
        "high_diversity": "high_diversity",
        "low_diversity": "low_diversity",
        "high_representativeness": "high_representativeness",
        "low_representativeness": "low_representativeness",
    }

    def __init__(
        self,
        name: str = "representativeness",
        config: dict[str, Any] | None = None,
    ) -> None:
        """
        Initialize the representativeness processor.

        Args:
            name: Name of the processor.
            config: Configuration dictionary containing:
                - input_columns: List of columns to analyze.
                - metrics: List of metrics to compute (default: all supported).
                - bins: Number of bins for histograms (default: 10).
                - distribution: Target distribution ("normal" or "uniform").
                - alpha: Significance level (default: 0.05).
                - distribution_params: Dictionary of params (e.g., mean, std, min, max).
        """
        super().__init__(name, config)
        self.name = name

        cfg = self.config
        self.metrics: list[str] = list(
            cfg.get(
                "metrics",
                ["chi-square", "grte", "kolmogorov-smirnov", "shannon-entropy"],
            )
        )

        self.bins: int = int(cfg.get("bins", 10))
        self.distribution: str = str(cfg.get("distribution", "normal")).lower()

        # Load configurable constants from config or use defaults
        self.alpha: float = float(cfg.get("alpha", self.DEFAULT_ALPHA))
        self.shannon_entropy_threshold: float = float(
            cfg.get(
                "shannon_entropy_threshold",
                self.DEFAULT_SHANNON_ENTROPY_THRESHOLD,
            )
        )
        self.grte_threshold: float = float(cfg.get("grte_threshold", self.DEFAULT_GRTE_THRESHOLD))
        self.ks_sample_size: int = int(cfg.get("ks_sample_size", self.DEFAULT_KS_SAMPLE_SIZE))
        self.ks_min_sample_size: int = int(cfg.get("ks_min_sample_size", self.DEFAULT_KS_MIN_SAMPLE_SIZE))
        self.ks_sample_divisor: int = int(cfg.get("ks_sample_divisor", self.DEFAULT_KS_SAMPLE_DIVISOR))
        self.epsilon: float = float(cfg.get("epsilon", self.DEFAULT_EPSILON))

        # Load interpretation thresholds from config or use defaults
        self.interpretation_thresholds: dict[str, str] = cfg.get(
            "interpretation_thresholds", self.DEFAULT_INTERPRETATION_THRESHOLDS
        )

        # Handle distribution_params properly - it can be None or a dict
        dist_params_raw = cfg.get("distribution_params")

        self.dist_params: dict[str, Any] = {}
        if dist_params_raw is not None:
            self.dist_params = dict(dist_params_raw)

        # check config: avoid redondancy checks with pipeline (see datasetpipeline )
        if not self.input_columns:
            raise ValueError(f"[{self.name}] 'input_columns' must be provided")
        if any(m not in self.SUPPORTED_METRICS for m in self.metrics):
            raise ValueError(f"[{self.name}] unsupported metric; supported: {self.SUPPORTED_METRICS}")
        if self.distribution not in self.SUPPORTED_DISTS:
            raise ValueError(f"[{self.name}] 'distribution' must be in {self.SUPPORTED_DISTS}")
        if self.bins < 2:
            raise ValueError(f"[{self.name}] 'bins' must be >= 2")
        if self.alpha <= 0 or self.alpha >= 1:
            raise ValueError(f"[{self.name}] 'alpha' must be between 0 and 1")
        if self.epsilon <= 0:
            raise ValueError(f"[{self.name}] 'epsilon' must be positive")

        self._bin_edges: dict[str, np.ndarray] = {}
        self._initialized: bool = False

    @override
    def generated_metrics(self) -> list[str]:
        """
        Return the list of metric columns that will be generated.

        Returns:
            List of output metric column names
        """
        # TODO : manage output metrics names with configuration
        # for now we follow a fixed naming convention
        metrics = []
        for col in self.input_columns:
            if "chi-square" in self.metrics:
                metrics.append(f"{col}_chi-square_p_value")
                metrics.append(f"{col}_chi-square_statistic")
                metrics.append(f"{col}_chi-square_interpretation")
            if "kolmogorov-smirnov" in self.metrics:
                metrics.append(f"{col}_kolmogorov-smirnov_p_value")
                metrics.append(f"{col}_kolmogorov-smirnov_statistic")
                metrics.append(f"{col}_kolmogorov-smirnov_interpretation")
            if "shannon-entropy" in self.metrics:
                metrics.append(f"{col}_shannon-entropy_entropy")
                metrics.append(f"{col}_shannon-entropy_interpretation")
            if "grte" in self.metrics:
                metrics.append(f"{col}_grte_grte_value")
                metrics.append(f"{col}_grte_interpretation")

        return metrics

    @override
    def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
        """
        Compute partial histogram statistics per batch for streaming aggregation.

        Args:
            features: Dictionary of column arrays from this batch.

        Returns:
            Dictionary containing:
                - {col}_count: Total valid numeric samples.
                - {col}_hist: Histogram counts.
                - {col}_ks_sample: Random subset of data for KS test.
        """
        batch_metrics = {}

        for col in self.input_columns:
            if col not in features:
                logger.warning(f"[{self.name}] column '{col}' not found in batch")
                continue

            arr = features[col]
            # convert to numeric, handle mixed types and NaN
            try:
                np_col = np.asarray(arr.to_numpy(zero_copy_only=False))
            except Exception:
                np_col = pd.Series(arr.to_pylist()).to_numpy(copy=True)

            values = pd.to_numeric(pd.Series(np_col), errors="coerce").dropna()

            if values.empty:
                logger.warning(f"[{self.name}] column '{col}' has no valid numeric values in this batch")
                continue

            if not self._initialized or col not in self._bin_edges:
                self._initialize_bin_edges(values.to_numpy(), col)

            edges = self._bin_edges[col]

            # Debug
            logger.debug(f"[{self.name}] edges shape: {edges.shape}, values shape: {values.shape}")

            hist_counts = np.histogram(values, bins=edges)[0].astype(np.int64)

            # debug: check histogram
            logger.debug(f"[{self.name}] hist_counts shape: {hist_counts.shape}, expected: {self.bins}")

            # store as Arrow arrays for aggregation
            batch_metrics[f"{col}_count"] = pa.array([len(values)], type=pa.int64())
            # batch_metrics[f"{col}_hist"] = pa.array(hist_counts.tolist(), type=pa.int64())
            batch_metrics[f"{col}_hist"] = pa.FixedSizeListArray.from_arrays(
                hist_counts, list_size=hist_counts.shape[0]
            )

            # sampling for KS test approximation
            # TODO: KS need to have all the data in memory to compute,
            # this metrics need to rely on other metrics prior computation of mean, std, min, max computation
            if "kolmogorov-smirnov" in self.metrics or "chi-square" in self.metrics:
                sample_per_batch = min(
                    self.ks_sample_size,
                    max(
                        self.ks_min_sample_size,
                        len(values) // self.ks_sample_divisor,
                    ),
                )
                if len(values) > sample_per_batch:
                    # Random sampling without replacement
                    sample_indices = np.random.choice(len(values), sample_per_batch, replace=False)
                    sample = values[sample_indices]
                else:
                    sample = values

                batch_metrics[f"{col}_ks_sample"] = pa.array(sample.tolist(), type=pa.float64())

        if not self._initialized and batch_metrics:
            self._initialized = True

        return batch_metrics

    def _initialize_bin_edges(self, sample_data: np.ndarray, col: str) -> None:
        """
        Initialize bin edges for a column based on sample data and target distribution.

        Args:
            sample_data: Data array used to infer parameters if not provided in config.
            col: Name of the column.
        """
        if self.distribution == "normal":
            mean = float(self.dist_params.get("mean", np.mean(sample_data)))
            std = float(self.dist_params.get("std", np.std(sample_data, ddof=0)))
            std = std if std > 0.0 else self.epsilon
            edges = self._bin_edges_normal(mean, std, self.bins, sample_data)
        else:
            mn = float(self.dist_params.get("min", np.min(sample_data)))
            mx = float(self.dist_params.get("max", np.max(sample_data)))
            if mx <= mn:
                mx = mn + self.epsilon
            edges = self._bin_edges_uniform(mn, mx, self.bins, sample_data)

        self._bin_edges[col] = edges

    @override
    def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]:
        """
        Compute final dataset-level metrics by aggregating batch histograms.

        Args:
            batch_metrics: Dictionary of batch-level metrics collected during processing.

        Returns:
            Dictionary containing final scores and interpretations for all selected metrics.
        """
        if not batch_metrics:
            return {"_metadata": {"error": "No batch metrics provided"}}

        out: dict[str, Any] = {}
        total_samples = 0

        for col in self.input_columns:
            count_key = f"{col}_count"
            hist_key = f"{col}_hist"

            if count_key not in batch_metrics or hist_key not in batch_metrics:
                logger.warning(f"[{self.name}] no batch metrics for column '{col}'")
                continue

            # TODO : maybe we need a try ? as in batch or not as na already removed
            # N/A handling shall be documented, and logs added
            hist_batch_arrays = np.asarray(batch_metrics[hist_key].to_numpy(zero_copy_only=False))
            if hist_batch_arrays.shape[0] == 0:
                logger.warning(f"[{self.name}] no histograme batch for '{col}'")
                continue

            # aggregate counts and histograms across all batches
            total_count = int(np.sum(batch_metrics[count_key].to_numpy()))

            hist_arrays = None

            for batch_hist in hist_batch_arrays:
                hist_arrays = batch_hist if hist_arrays is None else hist_arrays + batch_hist

            # Debug: vérifier les dimensions
            if hist_arrays is None:
                logger.warning(f"[{self.name}] no valid histogram for column '{col}'")
                continue

            logger.debug(f"[{self.name}] hist_arrays shape: {hist_arrays.shape}, expected bins: {self.bins}")

            # sum histogram counts across batches
            if hist_arrays.ndim == 1:
                logger.debug("Single histogram")
                obs_counts = hist_arrays.astype(float)
            else:
                # Ensure we're summing along the right axis
                logger.debug("Multiple histograms from different batches")
                if hist_arrays.shape[1] == self.bins:
                    logger.debug("Sum along batch dimension (axis=0)")
                    obs_counts = np.sum(hist_arrays, axis=0).astype(float)
                else:
                    logger.debug("Flatten and create a single histogram")
                    logger.warning(f"[{self.name}] Unexpected histogram shape {hist_arrays.shape}, flattening")
                    obs_counts = hist_arrays.flatten().astype(float)

            if total_count <= 0 or obs_counts.sum() <= 0:
                logger.warning(f"[{self.name}] no valid data for column '{col}'")
                continue

            total_samples += total_count

            #  distribution parameters and bin edges
            if col not in self._bin_edges:
                logger.warning(f"[{self.name}] no bin edges for column '{col}' - skipping")
                continue

            edges = self._bin_edges[col]

            # theoretical probabilities - Aligné sur DQM-ML officiel
            if self.distribution == "normal":
                logger.debug("Generate normal distribution")
                # Utilise les MÊMES paramètres que ceux utilisés pour générer les bins

                sample_key = f"{col}_ks_sample"
                if sample_key in batch_metrics:
                    logger.debug("Use sampled meand and std")
                    sample_arrays = batch_metrics[sample_key].to_numpy()
                    if sample_arrays.ndim > 1:
                        sample_arrays = sample_arrays.flatten()
                    mean = float(self.dist_params.get("mean", np.mean(sample_arrays)))
                    std = float(self.dist_params.get("std", np.std(sample_arrays, ddof=0)))
                    std = std if std > 0.0 else self.epsilon
                else:
                    logger.debug("Fallback: use default or configured mean and std")
                    mean = float(self.dist_params.get("mean", 0.0))
                    std = float(self.dist_params.get("std", 1.0))
                logger.debug(f"mean={mean}")
                logger.debug(f"std={mean}")
                # génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
                expected_values = np.random.normal(mean, std, total_count)
                exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)
            else:  # uniform
                logger.debug("Generate uniform distribution")
                mn = float(self.dist_params.get("min", edges[0]))
                mx = float(self.dist_params.get("max", edges[-1]))
                logger.debug(f"min={mn}")
                logger.debug(f"max={mx}")
                # Génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
                expected_values = np.random.uniform(mn, mx, total_count)
                exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)

            exp_counts = total_count * exp_probs

            col_res: dict[str, Any] = {}

            # chi-square: here we compute the chi-square with a alpha value of 0.05
            if "chi-square" in self.metrics:
                # Ensure observed and expected counts have the same sum

                mask = exp_counts > 0
                if mask.sum() >= 2:
                    logger.debug("Normalize expected counts to match observed sum")
                    obs_sum = obs_counts[mask].sum()
                    exp_sum = exp_counts[mask].sum()

                    if exp_sum > 0:
                        # Scale expected counts to match observed sum
                        exp_counts_normalized = exp_counts[mask] * (obs_sum / exp_sum)

                        logger.debug("Expected frequencies:")
                        logger.debug(exp_counts_normalized)
                        logger.debug("Observed frequencies: ")
                        logger.debug(obs_counts[mask])

                        try:
                            chi = stats.chisquare(
                                f_obs=obs_counts[mask],
                                f_exp=exp_counts_normalized,
                            )
                            logger.debug(f"Chi P value: {chi.pvalue}")
                            col_res["chi-square"] = {
                                "p_value": float(chi.pvalue),
                                "statistic": float(chi.statistic),
                                "interpretation": self.interpretation_thresholds.get(
                                    "follows_distribution"
                                    if chi.pvalue >= self.alpha
                                    else "does_not_follow_distribution",
                                    "follows_distribution",
                                ),
                            }
                        except ValueError as e:
                            # Fallback: use only observed counts if chi-square fails
                            col_res["chi-square"] = {
                                "p_value": float("nan"),
                                "statistic": float("nan"),
                                "interpretation": f"chi_square_failed: {e!s}",
                                "note": "using observed counts only due to statistical constraints",
                            }
                    else:
                        col_res["chi-square"] = {
                            "p_value": float("nan"),
                            "statistic": float("nan"),
                            "interpretation": "no_expected_counts",
                        }
                else:
                    col_res["chi-square"] = {
                        "p_value": float("nan"),
                        "statistic": float("nan"),
                        "interpretation": "insufficient_bins",
                    }

            # Kolmogorov-Smirnov test using sampled data
            if "kolmogorov-smirnov" in self.metrics:
                sample_key = f"{col}_ks_sample"
                if sample_key in batch_metrics:
                    sample_arrays = batch_metrics[sample_key].to_numpy()
                    ks_samples = sample_arrays if sample_arrays.ndim == 1 else sample_arrays.flatten()

                    if len(ks_samples) > 0:
                        # Perform KS test on aggregated samples
                        if self.distribution == "normal":
                            mean = float(self.dist_params.get("mean", np.mean(ks_samples)))
                            std = float(self.dist_params.get("std", np.std(ks_samples, ddof=0)))
                            std = std if std > 0.0 else self.epsilon
                            ks = stats.kstest(ks_samples, stats.norm.cdf, args=(mean, std))
                        else:  # uniform
                            mn = float(self.dist_params.get("min", np.min(ks_samples)))
                            mx = float(self.dist_params.get("max", np.max(ks_samples)))
                            if mx <= mn:
                                mx = mn + self.epsilon
                            ks = stats.kstest(
                                ks_samples,
                                stats.uniform.cdf,
                                args=(mn, mx - mn),
                            )

                        col_res["kolmogorov-smirnov"] = {
                            "p_value": float(ks.pvalue),
                            "statistic": float(ks.statistic),
                            "interpretation": self.interpretation_thresholds.get(
                                "follows_distribution" if ks.pvalue >= self.alpha else "does_not_follow_distribution",
                                "follows_distribution",
                            ),
                            "sample_size": len(ks_samples),
                            "note": "approximated_from_random_samples",
                        }
                    else:
                        col_res["kolmogorov-smirnov"] = {
                            "p_value": float("nan"),
                            "statistic": float("nan"),
                            "interpretation": "no_samples_available",
                        }
                else:
                    col_res["kolmogorov-smirnov"] = {
                        "p_value": float("nan"),
                        "statistic": float("nan"),
                        "interpretation": "no_sample_data_found",
                    }

            # Shannon entropy - aligned on dqm-ml v1 (using theoretical frequencies)
            if "shannon-entropy" in self.metrics:
                # Use theoretical frequencies
                p_exp = exp_probs / exp_probs.sum()
                h_exp = float(stats.entropy(p_exp))
                col_res["shannon-entropy"] = {
                    "entropy": h_exp,
                    "interpretation": self.interpretation_thresholds.get(
                        "high_diversity" if h_exp > self.shannon_entropy_threshold else "low_diversity",
                        "high_diversity",
                    ),
                }

            # GRTE (gap between observed and theoretical entropies) - aligned on dqm-ml v1
            if "grte" in self.metrics:
                # Use observed and theoretical frequencies
                p_obs = obs_counts / obs_counts.sum()
                p_exp = exp_probs / exp_probs.sum()
                h_obs = float(stats.entropy(p_obs))
                h_exp = float(stats.entropy(p_exp))
                grte = float(np.exp(-2.0 * abs(h_exp - h_obs)))
                col_res["grte"] = {
                    "grte_value": grte,
                    "interpretation": self.interpretation_thresholds.get(
                        "high_representativeness" if grte > self.grte_threshold else "low_representativeness",
                        "high_representativeness",
                    ),
                }

            # Stupid way to flatten tree of keys
            # TODO : refactor the implementation,for optimization
            if col_res:
                for key, value in col_res.items():
                    if isinstance(value, dict):
                        for prop, content in value.items():
                            out[key + "_" + col + "_" + prop] = content
                    else:
                        out[key + "_" + col] = value
        # TODO make optional export of metadata
        meta_data = {
            "bins": self.bins,
            "distribution": self.distribution,
            "metrics_computed": self.metrics,
            "total_samples": total_samples,
            "columns_analyzed": [c for c in self.input_columns if f"{c}_count" in batch_metrics],
            "ks_sampling_enabled": "kolmogorov-smirnov" in self.metrics,
            "note": "KS test uses random sampling approximation for scalability",
        }

        import json

        out["_metadata"] = json.dumps(meta_data)
        return out

    def reset(self) -> None:
        """Reset processor state for new processing run."""
        self._bin_edges = {}
        self._initialized = False

    # utils methods for bin edge calculation

    def _bin_edges_normal(self, mean: float, std: float, bins: int, data: np.ndarray) -> np.ndarray:
        """
        Calculate bin edges based on the Percent Point Function (PPF) of a Normal distribution.

        This ensures bins represent equal probability mass under the theoretical distribution.
        The first and last bins are extended to -inf and +inf respectively.
        """
        # logic from dqm-ml v1 : use stats.norm.ppf with linspace(1/bins, 1, bins)
        interval = []
        for i in range(1, bins):
            val = stats.norm.ppf(i / bins, mean, std)
            interval.append(val)
        interval.insert(0, -np.inf)
        interval.append(np.inf)
        return np.array(interval)

    def _bin_edges_uniform(self, mn: float, mx: float, bins: int, data: np.ndarray) -> np.ndarray:
        """
        Calculate linearly spaced bin edges for a Uniform distribution.

        The range is determined by the minimum/maximum of both the configured
        parameters and the actual observed data.
        """
        lo = min(mn, float(np.min(data)))
        hi = max(mx, float(np.max(data)))
        if hi <= lo:
            hi = lo + self.epsilon
        return np.linspace(lo, hi, bins + 1)

DEFAULT_ALPHA = 0.05 class-attribute instance-attribute

DEFAULT_EPSILON = 1e-09 class-attribute instance-attribute

DEFAULT_GRTE_THRESHOLD = 0.5 class-attribute instance-attribute

DEFAULT_INTERPRETATION_THRESHOLDS = {'follows_distribution': 'follows_distribution', 'does_not_follow_distribution': 'does_not_follow_distribution', 'high_diversity': 'high_diversity', 'low_diversity': 'low_diversity', 'high_representativeness': 'high_representativeness', 'low_representativeness': 'low_representativeness'} class-attribute instance-attribute

DEFAULT_KS_MIN_SAMPLE_SIZE = 50 class-attribute instance-attribute

DEFAULT_KS_SAMPLE_DIVISOR = 20 class-attribute instance-attribute

DEFAULT_KS_SAMPLE_SIZE = 500 class-attribute instance-attribute

DEFAULT_SHANNON_ENTROPY_THRESHOLD = 2.0 class-attribute instance-attribute

SUPPORTED_DISTS = {'normal', 'uniform'} class-attribute instance-attribute

SUPPORTED_METRICS = {'chi-square', 'grte', 'shannon-entropy', 'kolmogorov-smirnov'} class-attribute instance-attribute

alpha: float = float(cfg.get('alpha', self.DEFAULT_ALPHA)) instance-attribute

bins: int = int(cfg.get('bins', 10)) instance-attribute

dist_params: dict[str, Any] = {} instance-attribute

distribution: str = str(cfg.get('distribution', 'normal')).lower() instance-attribute

epsilon: float = float(cfg.get('epsilon', self.DEFAULT_EPSILON)) instance-attribute

grte_threshold: float = float(cfg.get('grte_threshold', self.DEFAULT_GRTE_THRESHOLD)) instance-attribute

interpretation_thresholds: dict[str, str] = cfg.get('interpretation_thresholds', self.DEFAULT_INTERPRETATION_THRESHOLDS) instance-attribute

ks_min_sample_size: int = int(cfg.get('ks_min_sample_size', self.DEFAULT_KS_MIN_SAMPLE_SIZE)) instance-attribute

ks_sample_divisor: int = int(cfg.get('ks_sample_divisor', self.DEFAULT_KS_SAMPLE_DIVISOR)) instance-attribute

ks_sample_size: int = int(cfg.get('ks_sample_size', self.DEFAULT_KS_SAMPLE_SIZE)) instance-attribute

metrics: list[str] = list(cfg.get('metrics', ['chi-square', 'grte', 'kolmogorov-smirnov', 'shannon-entropy'])) instance-attribute

name = name instance-attribute

shannon_entropy_threshold: float = float(cfg.get('shannon_entropy_threshold', self.DEFAULT_SHANNON_ENTROPY_THRESHOLD)) instance-attribute

__init__(name: str = 'representativeness', config: dict[str, Any] | None = None) -> None

Initialize the representativeness processor.

Parameters:

Name Type Description Default
name str

Name of the processor.

'representativeness'
config dict[str, Any] | None

Configuration dictionary containing: - input_columns: List of columns to analyze. - metrics: List of metrics to compute (default: all supported). - bins: Number of bins for histograms (default: 10). - distribution: Target distribution ("normal" or "uniform"). - alpha: Significance level (default: 0.05). - distribution_params: Dictionary of params (e.g., mean, std, min, max).

None
Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def __init__(
    self,
    name: str = "representativeness",
    config: dict[str, Any] | None = None,
) -> None:
    """
    Initialize the representativeness processor.

    Args:
        name: Name of the processor.
        config: Configuration dictionary containing:
            - input_columns: List of columns to analyze.
            - metrics: List of metrics to compute (default: all supported).
            - bins: Number of bins for histograms (default: 10).
            - distribution: Target distribution ("normal" or "uniform").
            - alpha: Significance level (default: 0.05).
            - distribution_params: Dictionary of params (e.g., mean, std, min, max).
    """
    super().__init__(name, config)
    self.name = name

    cfg = self.config
    self.metrics: list[str] = list(
        cfg.get(
            "metrics",
            ["chi-square", "grte", "kolmogorov-smirnov", "shannon-entropy"],
        )
    )

    self.bins: int = int(cfg.get("bins", 10))
    self.distribution: str = str(cfg.get("distribution", "normal")).lower()

    # Load configurable constants from config or use defaults
    self.alpha: float = float(cfg.get("alpha", self.DEFAULT_ALPHA))
    self.shannon_entropy_threshold: float = float(
        cfg.get(
            "shannon_entropy_threshold",
            self.DEFAULT_SHANNON_ENTROPY_THRESHOLD,
        )
    )
    self.grte_threshold: float = float(cfg.get("grte_threshold", self.DEFAULT_GRTE_THRESHOLD))
    self.ks_sample_size: int = int(cfg.get("ks_sample_size", self.DEFAULT_KS_SAMPLE_SIZE))
    self.ks_min_sample_size: int = int(cfg.get("ks_min_sample_size", self.DEFAULT_KS_MIN_SAMPLE_SIZE))
    self.ks_sample_divisor: int = int(cfg.get("ks_sample_divisor", self.DEFAULT_KS_SAMPLE_DIVISOR))
    self.epsilon: float = float(cfg.get("epsilon", self.DEFAULT_EPSILON))

    # Load interpretation thresholds from config or use defaults
    self.interpretation_thresholds: dict[str, str] = cfg.get(
        "interpretation_thresholds", self.DEFAULT_INTERPRETATION_THRESHOLDS
    )

    # Handle distribution_params properly - it can be None or a dict
    dist_params_raw = cfg.get("distribution_params")

    self.dist_params: dict[str, Any] = {}
    if dist_params_raw is not None:
        self.dist_params = dict(dist_params_raw)

    # check config: avoid redondancy checks with pipeline (see datasetpipeline )
    if not self.input_columns:
        raise ValueError(f"[{self.name}] 'input_columns' must be provided")
    if any(m not in self.SUPPORTED_METRICS for m in self.metrics):
        raise ValueError(f"[{self.name}] unsupported metric; supported: {self.SUPPORTED_METRICS}")
    if self.distribution not in self.SUPPORTED_DISTS:
        raise ValueError(f"[{self.name}] 'distribution' must be in {self.SUPPORTED_DISTS}")
    if self.bins < 2:
        raise ValueError(f"[{self.name}] 'bins' must be >= 2")
    if self.alpha <= 0 or self.alpha >= 1:
        raise ValueError(f"[{self.name}] 'alpha' must be between 0 and 1")
    if self.epsilon <= 0:
        raise ValueError(f"[{self.name}] 'epsilon' must be positive")

    self._bin_edges: dict[str, np.ndarray] = {}
    self._initialized: bool = False

compute(batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]

Compute final dataset-level metrics by aggregating batch histograms.

Parameters:

Name Type Description Default
batch_metrics dict[str, Array] | None

Dictionary of batch-level metrics collected during processing.

None

Returns:

Type Description
dict[str, Any]

Dictionary containing final scores and interpretations for all selected metrics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
@override
def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]:
    """
    Compute final dataset-level metrics by aggregating batch histograms.

    Args:
        batch_metrics: Dictionary of batch-level metrics collected during processing.

    Returns:
        Dictionary containing final scores and interpretations for all selected metrics.
    """
    if not batch_metrics:
        return {"_metadata": {"error": "No batch metrics provided"}}

    out: dict[str, Any] = {}
    total_samples = 0

    for col in self.input_columns:
        count_key = f"{col}_count"
        hist_key = f"{col}_hist"

        if count_key not in batch_metrics or hist_key not in batch_metrics:
            logger.warning(f"[{self.name}] no batch metrics for column '{col}'")
            continue

        # TODO : maybe we need a try ? as in batch or not as na already removed
        # N/A handling shall be documented, and logs added
        hist_batch_arrays = np.asarray(batch_metrics[hist_key].to_numpy(zero_copy_only=False))
        if hist_batch_arrays.shape[0] == 0:
            logger.warning(f"[{self.name}] no histograme batch for '{col}'")
            continue

        # aggregate counts and histograms across all batches
        total_count = int(np.sum(batch_metrics[count_key].to_numpy()))

        hist_arrays = None

        for batch_hist in hist_batch_arrays:
            hist_arrays = batch_hist if hist_arrays is None else hist_arrays + batch_hist

        # Debug: vérifier les dimensions
        if hist_arrays is None:
            logger.warning(f"[{self.name}] no valid histogram for column '{col}'")
            continue

        logger.debug(f"[{self.name}] hist_arrays shape: {hist_arrays.shape}, expected bins: {self.bins}")

        # sum histogram counts across batches
        if hist_arrays.ndim == 1:
            logger.debug("Single histogram")
            obs_counts = hist_arrays.astype(float)
        else:
            # Ensure we're summing along the right axis
            logger.debug("Multiple histograms from different batches")
            if hist_arrays.shape[1] == self.bins:
                logger.debug("Sum along batch dimension (axis=0)")
                obs_counts = np.sum(hist_arrays, axis=0).astype(float)
            else:
                logger.debug("Flatten and create a single histogram")
                logger.warning(f"[{self.name}] Unexpected histogram shape {hist_arrays.shape}, flattening")
                obs_counts = hist_arrays.flatten().astype(float)

        if total_count <= 0 or obs_counts.sum() <= 0:
            logger.warning(f"[{self.name}] no valid data for column '{col}'")
            continue

        total_samples += total_count

        #  distribution parameters and bin edges
        if col not in self._bin_edges:
            logger.warning(f"[{self.name}] no bin edges for column '{col}' - skipping")
            continue

        edges = self._bin_edges[col]

        # theoretical probabilities - Aligné sur DQM-ML officiel
        if self.distribution == "normal":
            logger.debug("Generate normal distribution")
            # Utilise les MÊMES paramètres que ceux utilisés pour générer les bins

            sample_key = f"{col}_ks_sample"
            if sample_key in batch_metrics:
                logger.debug("Use sampled meand and std")
                sample_arrays = batch_metrics[sample_key].to_numpy()
                if sample_arrays.ndim > 1:
                    sample_arrays = sample_arrays.flatten()
                mean = float(self.dist_params.get("mean", np.mean(sample_arrays)))
                std = float(self.dist_params.get("std", np.std(sample_arrays, ddof=0)))
                std = std if std > 0.0 else self.epsilon
            else:
                logger.debug("Fallback: use default or configured mean and std")
                mean = float(self.dist_params.get("mean", 0.0))
                std = float(self.dist_params.get("std", 1.0))
            logger.debug(f"mean={mean}")
            logger.debug(f"std={mean}")
            # génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
            expected_values = np.random.normal(mean, std, total_count)
            exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)
        else:  # uniform
            logger.debug("Generate uniform distribution")
            mn = float(self.dist_params.get("min", edges[0]))
            mx = float(self.dist_params.get("max", edges[-1]))
            logger.debug(f"min={mn}")
            logger.debug(f"max={mx}")
            # Génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
            expected_values = np.random.uniform(mn, mx, total_count)
            exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)

        exp_counts = total_count * exp_probs

        col_res: dict[str, Any] = {}

        # chi-square: here we compute the chi-square with a alpha value of 0.05
        if "chi-square" in self.metrics:
            # Ensure observed and expected counts have the same sum

            mask = exp_counts > 0
            if mask.sum() >= 2:
                logger.debug("Normalize expected counts to match observed sum")
                obs_sum = obs_counts[mask].sum()
                exp_sum = exp_counts[mask].sum()

                if exp_sum > 0:
                    # Scale expected counts to match observed sum
                    exp_counts_normalized = exp_counts[mask] * (obs_sum / exp_sum)

                    logger.debug("Expected frequencies:")
                    logger.debug(exp_counts_normalized)
                    logger.debug("Observed frequencies: ")
                    logger.debug(obs_counts[mask])

                    try:
                        chi = stats.chisquare(
                            f_obs=obs_counts[mask],
                            f_exp=exp_counts_normalized,
                        )
                        logger.debug(f"Chi P value: {chi.pvalue}")
                        col_res["chi-square"] = {
                            "p_value": float(chi.pvalue),
                            "statistic": float(chi.statistic),
                            "interpretation": self.interpretation_thresholds.get(
                                "follows_distribution"
                                if chi.pvalue >= self.alpha
                                else "does_not_follow_distribution",
                                "follows_distribution",
                            ),
                        }
                    except ValueError as e:
                        # Fallback: use only observed counts if chi-square fails
                        col_res["chi-square"] = {
                            "p_value": float("nan"),
                            "statistic": float("nan"),
                            "interpretation": f"chi_square_failed: {e!s}",
                            "note": "using observed counts only due to statistical constraints",
                        }
                else:
                    col_res["chi-square"] = {
                        "p_value": float("nan"),
                        "statistic": float("nan"),
                        "interpretation": "no_expected_counts",
                    }
            else:
                col_res["chi-square"] = {
                    "p_value": float("nan"),
                    "statistic": float("nan"),
                    "interpretation": "insufficient_bins",
                }

        # Kolmogorov-Smirnov test using sampled data
        if "kolmogorov-smirnov" in self.metrics:
            sample_key = f"{col}_ks_sample"
            if sample_key in batch_metrics:
                sample_arrays = batch_metrics[sample_key].to_numpy()
                ks_samples = sample_arrays if sample_arrays.ndim == 1 else sample_arrays.flatten()

                if len(ks_samples) > 0:
                    # Perform KS test on aggregated samples
                    if self.distribution == "normal":
                        mean = float(self.dist_params.get("mean", np.mean(ks_samples)))
                        std = float(self.dist_params.get("std", np.std(ks_samples, ddof=0)))
                        std = std if std > 0.0 else self.epsilon
                        ks = stats.kstest(ks_samples, stats.norm.cdf, args=(mean, std))
                    else:  # uniform
                        mn = float(self.dist_params.get("min", np.min(ks_samples)))
                        mx = float(self.dist_params.get("max", np.max(ks_samples)))
                        if mx <= mn:
                            mx = mn + self.epsilon
                        ks = stats.kstest(
                            ks_samples,
                            stats.uniform.cdf,
                            args=(mn, mx - mn),
                        )

                    col_res["kolmogorov-smirnov"] = {
                        "p_value": float(ks.pvalue),
                        "statistic": float(ks.statistic),
                        "interpretation": self.interpretation_thresholds.get(
                            "follows_distribution" if ks.pvalue >= self.alpha else "does_not_follow_distribution",
                            "follows_distribution",
                        ),
                        "sample_size": len(ks_samples),
                        "note": "approximated_from_random_samples",
                    }
                else:
                    col_res["kolmogorov-smirnov"] = {
                        "p_value": float("nan"),
                        "statistic": float("nan"),
                        "interpretation": "no_samples_available",
                    }
            else:
                col_res["kolmogorov-smirnov"] = {
                    "p_value": float("nan"),
                    "statistic": float("nan"),
                    "interpretation": "no_sample_data_found",
                }

        # Shannon entropy - aligned on dqm-ml v1 (using theoretical frequencies)
        if "shannon-entropy" in self.metrics:
            # Use theoretical frequencies
            p_exp = exp_probs / exp_probs.sum()
            h_exp = float(stats.entropy(p_exp))
            col_res["shannon-entropy"] = {
                "entropy": h_exp,
                "interpretation": self.interpretation_thresholds.get(
                    "high_diversity" if h_exp > self.shannon_entropy_threshold else "low_diversity",
                    "high_diversity",
                ),
            }

        # GRTE (gap between observed and theoretical entropies) - aligned on dqm-ml v1
        if "grte" in self.metrics:
            # Use observed and theoretical frequencies
            p_obs = obs_counts / obs_counts.sum()
            p_exp = exp_probs / exp_probs.sum()
            h_obs = float(stats.entropy(p_obs))
            h_exp = float(stats.entropy(p_exp))
            grte = float(np.exp(-2.0 * abs(h_exp - h_obs)))
            col_res["grte"] = {
                "grte_value": grte,
                "interpretation": self.interpretation_thresholds.get(
                    "high_representativeness" if grte > self.grte_threshold else "low_representativeness",
                    "high_representativeness",
                ),
            }

        # Stupid way to flatten tree of keys
        # TODO : refactor the implementation,for optimization
        if col_res:
            for key, value in col_res.items():
                if isinstance(value, dict):
                    for prop, content in value.items():
                        out[key + "_" + col + "_" + prop] = content
                else:
                    out[key + "_" + col] = value
    # TODO make optional export of metadata
    meta_data = {
        "bins": self.bins,
        "distribution": self.distribution,
        "metrics_computed": self.metrics,
        "total_samples": total_samples,
        "columns_analyzed": [c for c in self.input_columns if f"{c}_count" in batch_metrics],
        "ks_sampling_enabled": "kolmogorov-smirnov" in self.metrics,
        "note": "KS test uses random sampling approximation for scalability",
    }

    import json

    out["_metadata"] = json.dumps(meta_data)
    return out

compute_batch_metric(features: dict[str, pa.Array]) -> dict[str, pa.Array]

Compute partial histogram statistics per batch for streaming aggregation.

Parameters:

Name Type Description Default
features dict[str, Array]

Dictionary of column arrays from this batch.

required

Returns:

Type Description
dict[str, Array]

Dictionary containing: - {col}_count: Total valid numeric samples. - {col}_hist: Histogram counts. - {col}_ks_sample: Random subset of data for KS test.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@override
def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
    """
    Compute partial histogram statistics per batch for streaming aggregation.

    Args:
        features: Dictionary of column arrays from this batch.

    Returns:
        Dictionary containing:
            - {col}_count: Total valid numeric samples.
            - {col}_hist: Histogram counts.
            - {col}_ks_sample: Random subset of data for KS test.
    """
    batch_metrics = {}

    for col in self.input_columns:
        if col not in features:
            logger.warning(f"[{self.name}] column '{col}' not found in batch")
            continue

        arr = features[col]
        # convert to numeric, handle mixed types and NaN
        try:
            np_col = np.asarray(arr.to_numpy(zero_copy_only=False))
        except Exception:
            np_col = pd.Series(arr.to_pylist()).to_numpy(copy=True)

        values = pd.to_numeric(pd.Series(np_col), errors="coerce").dropna()

        if values.empty:
            logger.warning(f"[{self.name}] column '{col}' has no valid numeric values in this batch")
            continue

        if not self._initialized or col not in self._bin_edges:
            self._initialize_bin_edges(values.to_numpy(), col)

        edges = self._bin_edges[col]

        # Debug
        logger.debug(f"[{self.name}] edges shape: {edges.shape}, values shape: {values.shape}")

        hist_counts = np.histogram(values, bins=edges)[0].astype(np.int64)

        # debug: check histogram
        logger.debug(f"[{self.name}] hist_counts shape: {hist_counts.shape}, expected: {self.bins}")

        # store as Arrow arrays for aggregation
        batch_metrics[f"{col}_count"] = pa.array([len(values)], type=pa.int64())
        # batch_metrics[f"{col}_hist"] = pa.array(hist_counts.tolist(), type=pa.int64())
        batch_metrics[f"{col}_hist"] = pa.FixedSizeListArray.from_arrays(
            hist_counts, list_size=hist_counts.shape[0]
        )

        # sampling for KS test approximation
        # TODO: KS need to have all the data in memory to compute,
        # this metrics need to rely on other metrics prior computation of mean, std, min, max computation
        if "kolmogorov-smirnov" in self.metrics or "chi-square" in self.metrics:
            sample_per_batch = min(
                self.ks_sample_size,
                max(
                    self.ks_min_sample_size,
                    len(values) // self.ks_sample_divisor,
                ),
            )
            if len(values) > sample_per_batch:
                # Random sampling without replacement
                sample_indices = np.random.choice(len(values), sample_per_batch, replace=False)
                sample = values[sample_indices]
            else:
                sample = values

            batch_metrics[f"{col}_ks_sample"] = pa.array(sample.tolist(), type=pa.float64())

    if not self._initialized and batch_metrics:
        self._initialized = True

    return batch_metrics

generated_metrics() -> list[str]

Return the list of metric columns that will be generated.

Returns:

Type Description
list[str]

List of output metric column names

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@override
def generated_metrics(self) -> list[str]:
    """
    Return the list of metric columns that will be generated.

    Returns:
        List of output metric column names
    """
    # TODO : manage output metrics names with configuration
    # for now we follow a fixed naming convention
    metrics = []
    for col in self.input_columns:
        if "chi-square" in self.metrics:
            metrics.append(f"{col}_chi-square_p_value")
            metrics.append(f"{col}_chi-square_statistic")
            metrics.append(f"{col}_chi-square_interpretation")
        if "kolmogorov-smirnov" in self.metrics:
            metrics.append(f"{col}_kolmogorov-smirnov_p_value")
            metrics.append(f"{col}_kolmogorov-smirnov_statistic")
            metrics.append(f"{col}_kolmogorov-smirnov_interpretation")
        if "shannon-entropy" in self.metrics:
            metrics.append(f"{col}_shannon-entropy_entropy")
            metrics.append(f"{col}_shannon-entropy_interpretation")
        if "grte" in self.metrics:
            metrics.append(f"{col}_grte_grte_value")
            metrics.append(f"{col}_grte_interpretation")

    return metrics

reset() -> None

Reset processor state for new processing run.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
544
545
546
547
def reset(self) -> None:
    """Reset processor state for new processing run."""
    self._bin_edges = {}
    self._initialized = False