Skip to content

dqm_ml_core.metrics.representativeness

Representativeness metric processor for evaluating distribution fit.

This module contains the RepresentativenessProcessor class that evaluates how well a dataset represents a target statistical distribution using various statistical tests.

logger = logging.getLogger(__name__) module-attribute

RepresentativenessProcessor

Bases: DatametricProcessor

Evaluates how well the dataset represents a target statistical distribution.

This processor performs on samples discretisation statistical tests to compare the observed distribution of numerical columns against a theoretical target distribution (Normal or Uniform).

Supported Metrics
  • Chi-square: Goodness-of-fit test for categorical/binned data.
  • Kolmogorov-Smirnov (KS): Non-parametric test for continuous distributions (approximated via sampling).
  • Shannon Entropy: Measures the information diversity of the binned data.
  • GRTE (Geometric Representativeness Trajectory Error): Measures the exponential gap between observed and theoretical entropy.

The processor uses a streaming architecture: - Batch level: Computes partial calculus. - Dataset level: Aggregates histograms and performs final statistical tests.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
class RepresentativenessProcessor(DatametricProcessor):
    """
    Evaluates how well the dataset represents a target statistical distribution.

    This processor performs on samples discretisation statistical tests to compare the observed
    distribution of numerical columns against a theoretical target distribution
    (Normal or Uniform).

    Supported Metrics:
      - Chi-square: Goodness-of-fit test for categorical/binned data.
      - Kolmogorov-Smirnov (KS): Non-parametric test for continuous distributions (approximated via sampling).
      - Shannon Entropy: Measures the information diversity of the binned data.
      - GRTE (Geometric Representativeness Trajectory Error): Measures the exponential gap
        between observed and theoretical entropy.

    The processor uses a streaming architecture:
    - Batch level: Computes partial calculus.
    - Dataset level: Aggregates histograms and performs final statistical tests.
    """

    SUPPORTED_METRICS = {
        "chi-square",
        "grte",
        "shannon-entropy",
        "kolmogorov-smirnov",
    }
    SUPPORTED_DISTS = {"normal", "uniform"}

    # Configuration constants - can be overridden in config
    DEFAULT_ALPHA = 0.05  # Significance level for statistical tests
    DEFAULT_SHANNON_ENTROPY_THRESHOLD = 2.0  # Threshold for high/low diversity interpretation
    DEFAULT_GRTE_THRESHOLD = 0.5  # Threshold for high/low representativeness interpretation
    DEFAULT_KS_SAMPLE_SIZE = 500  # Maximum sample size for KS test
    DEFAULT_KS_MIN_SAMPLE_SIZE = 50  # Minimum sample size for KS test
    DEFAULT_KS_SAMPLE_DIVISOR = 20  # Divisor for calculating sample size per batch
    DEFAULT_EPSILON = 1e-9  # Small value to avoid division by zero
    DEFAULT_INTERPRETATION_THRESHOLDS = {
        "follows_distribution": "follows_distribution",
        "does_not_follow_distribution": "does_not_follow_distribution",
        "high_diversity": "high_diversity",
        "low_diversity": "low_diversity",
        "high_representativeness": "high_representativeness",
        "low_representativeness": "low_representativeness",
    }

    def __init__(
        self,
        name: str = "representativeness",
        config: dict[str, Any] | None = None,
    ) -> None:
        """
        Initialize the representativeness processor.

        Args:
            name: Name of the processor.
            config: Configuration dictionary containing:
                - input_columns: List of columns to analyze.
                - metrics: List of metrics to compute (default: all supported).
                - bins: Number of bins for histograms (default: 10).
                - distribution: Target distribution ("normal" or "uniform").
                - alpha: Significance level (default: 0.05).
                - distribution_params: Dictionary of params (e.g., mean, std, min, max).
        """
        super().__init__(name, config)
        self.name = name

        cfg = self.config
        self.metrics: list[str] = list(
            cfg.get(
                "metrics",
                ["chi-square", "grte", "kolmogorov-smirnov", "shannon-entropy"],
            )
        )

        self.bins: int = int(cfg.get("bins", 10))
        self.distribution: str = str(cfg.get("distribution", "normal")).lower()

        # Load configurable constants from config or use defaults
        self.alpha: float = float(cfg.get("alpha", self.DEFAULT_ALPHA))
        self.shannon_entropy_threshold: float = float(
            cfg.get(
                "shannon_entropy_threshold",
                self.DEFAULT_SHANNON_ENTROPY_THRESHOLD,
            )
        )
        self.grte_threshold: float = float(cfg.get("grte_threshold", self.DEFAULT_GRTE_THRESHOLD))
        self.ks_sample_size: int = int(cfg.get("ks_sample_size", self.DEFAULT_KS_SAMPLE_SIZE))
        self.ks_min_sample_size: int = int(cfg.get("ks_min_sample_size", self.DEFAULT_KS_MIN_SAMPLE_SIZE))
        self.ks_sample_divisor: int = int(cfg.get("ks_sample_divisor", self.DEFAULT_KS_SAMPLE_DIVISOR))
        self.epsilon: float = float(cfg.get("epsilon", self.DEFAULT_EPSILON))

        # Load interpretation thresholds from config or use defaults
        self.interpretation_thresholds: dict[str, str] = cfg.get(
            "interpretation_thresholds", self.DEFAULT_INTERPRETATION_THRESHOLDS
        )

        # Handle distribution_params properly - it can be None or a dict
        dist_params_raw = cfg.get("distribution_params")

        self.dist_params: dict[str, Any] = {}
        if dist_params_raw is not None:
            self.dist_params = dict(dist_params_raw)

        # check config: avoid redondancy checks with pipeline (see datasetpipeline )
        if not self.input_columns:
            raise ValueError(f"[{self.name}] 'input_columns' must be provided")
        if any(m not in self.SUPPORTED_METRICS for m in self.metrics):
            raise ValueError(f"[{self.name}] unsupported metric; supported: {self.SUPPORTED_METRICS}")
        if self.distribution not in self.SUPPORTED_DISTS:
            raise ValueError(f"[{self.name}] 'distribution' must be in {self.SUPPORTED_DISTS}")
        if self.bins < 2:
            raise ValueError(f"[{self.name}] 'bins' must be >= 2")
        if self.alpha <= 0 or self.alpha >= 1:
            raise ValueError(f"[{self.name}] 'alpha' must be between 0 and 1")
        if self.epsilon <= 0:
            raise ValueError(f"[{self.name}] 'epsilon' must be positive")

        self._bin_edges: dict[str, np.ndarray] = {}
        self._initialized: bool = False

    @override
    def generated_metrics(self) -> list[str]:
        """
        Return the list of metric columns that will be generated.

        Returns:
            List of output metric column names
        """
        # TODO : manage output metrics names with configuration
        # for now we follow a fixed naming convention
        metrics = []
        for col in self.input_columns:
            if "chi-square" in self.metrics:
                metrics.append(f"{col}_chi-square_p_value")
                metrics.append(f"{col}_chi-square_statistic")
                metrics.append(f"{col}_chi-square_interpretation")
            if "kolmogorov-smirnov" in self.metrics:
                metrics.append(f"{col}_kolmogorov-smirnov_p_value")
                metrics.append(f"{col}_kolmogorov-smirnov_statistic")
                metrics.append(f"{col}_kolmogorov-smirnov_interpretation")
            if "shannon-entropy" in self.metrics:
                metrics.append(f"{col}_shannon-entropy_entropy")
                metrics.append(f"{col}_shannon-entropy_interpretation")
            if "grte" in self.metrics:
                metrics.append(f"{col}_grte_grte_value")
                metrics.append(f"{col}_grte_interpretation")

        return metrics

    @override
    def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
        """
        Compute partial histogram statistics per batch for streaming aggregation.

        Args:
            features: Dictionary of column arrays from this batch.

        Returns:
            Dictionary containing:
                - {col}_count: Total valid numeric samples.
                - {col}_hist: Histogram counts.
                - {col}_ks_sample: Random subset of data for KS test.
        """
        batch_metrics = {}

        for col in self.input_columns:
            if col not in features:
                logger.warning(f"[{self.name}] column '{col}' not found in batch")
                continue

            arr = features[col]
            # convert to numeric, handle mixed types and NaN
            try:
                np_col = np.asarray(arr.to_numpy(zero_copy_only=False))
            except Exception:
                np_col = pd.Series(arr.to_pylist()).to_numpy(copy=True)

            values = pd.to_numeric(pd.Series(np_col), errors="coerce").dropna()

            if values.empty:
                logger.warning(f"[{self.name}] column '{col}' has no valid numeric values in this batch")
                continue

            if not self._initialized or col not in self._bin_edges:
                self._initialize_bin_edges(values.to_numpy(), col)

            edges = self._bin_edges[col]

            # Debug
            logger.debug(f"[{self.name}] edges shape: {edges.shape}, values shape: {values.shape}")

            hist_counts = np.histogram(values, bins=edges)[0].astype(np.int64)

            # debug: check histogram
            logger.debug(f"[{self.name}] hist_counts shape: {hist_counts.shape}, expected: {self.bins}")

            # store as Arrow arrays for aggregation
            batch_metrics[f"{col}_count"] = pa.array([len(values)], type=pa.int64())
            # batch_metrics[f"{col}_hist"] = pa.array(hist_counts.tolist(), type=pa.int64())
            batch_metrics[f"{col}_hist"] = pa.FixedSizeListArray.from_arrays(
                hist_counts, list_size=hist_counts.shape[0]
            )

            # sampling for KS test approximation
            # TODO: KS need to have all the data in memory to compute,
            # this metrics need to rely on other metrics prior computation of mean, std, min, max computation
            if "kolmogorov-smirnov" in self.metrics or "chi-square" in self.metrics:
                sample_per_batch = min(
                    self.ks_sample_size,
                    max(
                        self.ks_min_sample_size,
                        len(values) // self.ks_sample_divisor,
                    ),
                )
                if len(values) > sample_per_batch:
                    # Random sampling without replacement
                    sample_indices = np.random.choice(len(values), sample_per_batch, replace=False)
                    sample = values[sample_indices]
                else:
                    sample = values

                batch_metrics[f"{col}_ks_sample"] = pa.array(sample.tolist(), type=pa.float64())

        if not self._initialized and batch_metrics:
            self._initialized = True

        return batch_metrics

    def _initialize_bin_edges(self, sample_data: np.ndarray, col: str) -> None:
        """
        Initialize bin edges for a column based on sample data and target distribution.

        Args:
            sample_data: Data array used to infer parameters if not provided in config.
            col: Name of the column.
        """
        if self.distribution == "normal":
            mean = float(self.dist_params.get("mean", np.mean(sample_data)))
            std = float(self.dist_params.get("std", np.std(sample_data, ddof=0)))
            std = std if std > 0.0 else self.epsilon
            edges = self._bin_edges_normal(mean, std, self.bins, sample_data)
        else:
            mn = float(self.dist_params.get("min", np.min(sample_data)))
            mx = float(self.dist_params.get("max", np.max(sample_data)))
            if mx <= mn:
                mx = mn + self.epsilon
            edges = self._bin_edges_uniform(mn, mx, self.bins, sample_data)

        self._bin_edges[col] = edges

    @override
    def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]:
        """
        Compute final dataset-level metrics by aggregating batch histograms.

        Args:
            batch_metrics: Dictionary of batch-level metrics collected during processing.

        Returns:
            Dictionary containing final scores and interpretations for all selected metrics.
        """
        if not batch_metrics:
            return {"_metadata": {"error": "No batch metrics provided"}}

        out: dict[str, Any] = {}
        total_samples = 0

        for col in self.input_columns:
            count_key = f"{col}_count"
            hist_key = f"{col}_hist"

            if count_key not in batch_metrics or hist_key not in batch_metrics:
                logger.warning(f"[{self.name}] no batch metrics for column '{col}'")
                continue

            # TODO : maybe we need a try ? as in batch or not as na already removed
            # N/A handling shall be documented, and logs added
            hist_batch_arrays = np.asarray(batch_metrics[hist_key].to_numpy(zero_copy_only=False))
            if hist_batch_arrays.shape[0] == 0:
                logger.warning(f"[{self.name}] no histograme batch for '{col}'")
                continue

            # aggregate counts and histograms across all batches
            total_count = int(np.sum(batch_metrics[count_key].to_numpy()))

            hist_arrays = None

            for batch_hist in hist_batch_arrays:
                hist_arrays = batch_hist if hist_arrays is None else hist_arrays + batch_hist

            # Debug: vérifier les dimensions
            if hist_arrays is None:
                logger.warning(f"[{self.name}] no valid histogram for column '{col}'")
                continue

            logger.debug(f"[{self.name}] hist_arrays shape: {hist_arrays.shape}, expected bins: {self.bins}")

            # sum histogram counts across batches
            if hist_arrays.ndim == 1:
                logger.debug("Single histogram")
                obs_counts = hist_arrays.astype(float)
            else:
                # Ensure we're summing along the right axis
                logger.debug("Multiple histograms from different batches")
                if hist_arrays.shape[1] == self.bins:
                    logger.debug("Sum along batch dimension (axis=0)")
                    obs_counts = np.sum(hist_arrays, axis=0).astype(float)
                else:
                    logger.debug("Flatten and create a single histogram")
                    logger.warning(f"[{self.name}] Unexpected histogram shape {hist_arrays.shape}, flattening")
                    obs_counts = hist_arrays.flatten().astype(float)

            if total_count <= 0 or obs_counts.sum() <= 0:
                logger.warning(f"[{self.name}] no valid data for column '{col}'")
                continue

            total_samples += total_count

            #  distribution parameters and bin edges
            if col not in self._bin_edges:
                logger.warning(f"[{self.name}] no bin edges for column '{col}' - skipping")
                continue

            edges = self._bin_edges[col]

            # theoretical probabilities - Aligné sur DQM-ML officiel
            if self.distribution == "normal":
                logger.debug("Generate normal distribution")
                # Utilise les MÊMES paramètres que ceux utilisés pour générer les bins

                sample_key = f"{col}_ks_sample"
                if sample_key in batch_metrics:
                    logger.debug("Use sampled meand and std")
                    sample_arrays = batch_metrics[sample_key].to_numpy()
                    if sample_arrays.ndim > 1:
                        sample_arrays = sample_arrays.flatten()
                    mean = float(self.dist_params.get("mean", np.mean(sample_arrays)))
                    std = float(self.dist_params.get("std", np.std(sample_arrays, ddof=0)))
                    std = std if std > 0.0 else self.epsilon
                else:
                    logger.debug("Fallback: use default or configured mean and std")
                    mean = float(self.dist_params.get("mean", 0.0))
                    std = float(self.dist_params.get("std", 1.0))
                logger.debug(f"mean={mean}")
                logger.debug(f"std={mean}")
                # génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
                expected_values = np.random.normal(mean, std, total_count)
                exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)
            else:  # uniform
                logger.debug("Generate uniform distribution")
                mn = float(self.dist_params.get("min", edges[0]))
                mx = float(self.dist_params.get("max", edges[-1]))
                logger.debug(f"min={mn}")
                logger.debug(f"max={mx}")
                # Génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
                expected_values = np.random.uniform(mn, mx, total_count)
                exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)

            exp_counts = total_count * exp_probs

            col_res: dict[str, Any] = {}

            # chi-square: here we compute the chi-square with a alpha value of 0.05
            if "chi-square" in self.metrics:
                # Ensure observed and expected counts have the same sum

                mask = exp_counts > 0
                if mask.sum() >= 2:
                    logger.debug("Normalize expected counts to match observed sum")
                    obs_sum = obs_counts[mask].sum()
                    exp_sum = exp_counts[mask].sum()

                    if exp_sum > 0:
                        # Scale expected counts to match observed sum
                        exp_counts_normalized = exp_counts[mask] * (obs_sum / exp_sum)

                        logger.debug("Expected frequencies:")
                        logger.debug(exp_counts_normalized)
                        logger.debug("Observed frequencies: ")
                        logger.debug(obs_counts[mask])

                        try:
                            chi = stats.chisquare(
                                f_obs=obs_counts[mask],
                                f_exp=exp_counts_normalized,
                            )
                            logger.debug(f"Chi P value: {chi.pvalue}")
                            col_res["chi-square"] = {
                                "p_value": float(chi.pvalue),
                                "statistic": float(chi.statistic),
                                "interpretation": self.interpretation_thresholds.get(
                                    "follows_distribution"
                                    if chi.pvalue >= self.alpha
                                    else "does_not_follow_distribution",
                                    "follows_distribution",
                                ),
                            }
                        except ValueError as e:
                            # Fallback: use only observed counts if chi-square fails
                            col_res["chi-square"] = {
                                "p_value": float("nan"),
                                "statistic": float("nan"),
                                "interpretation": f"chi_square_failed: {e!s}",
                                "note": "using observed counts only due to statistical constraints",
                            }
                    else:
                        col_res["chi-square"] = {
                            "p_value": float("nan"),
                            "statistic": float("nan"),
                            "interpretation": "no_expected_counts",
                        }
                else:
                    col_res["chi-square"] = {
                        "p_value": float("nan"),
                        "statistic": float("nan"),
                        "interpretation": "insufficient_bins",
                    }

            # Kolmogorov-Smirnov test using sampled data
            if "kolmogorov-smirnov" in self.metrics:
                sample_key = f"{col}_ks_sample"
                if sample_key in batch_metrics:
                    sample_arrays = batch_metrics[sample_key].to_numpy()
                    ks_samples = sample_arrays if sample_arrays.ndim == 1 else sample_arrays.flatten()

                    if len(ks_samples) > 0:
                        # Perform KS test on aggregated samples
                        if self.distribution == "normal":
                            mean = float(self.dist_params.get("mean", np.mean(ks_samples)))
                            std = float(self.dist_params.get("std", np.std(ks_samples, ddof=0)))
                            std = std if std > 0.0 else self.epsilon
                            ks = stats.kstest(ks_samples, stats.norm.cdf, args=(mean, std))
                        else:  # uniform
                            mn = float(self.dist_params.get("min", np.min(ks_samples)))
                            mx = float(self.dist_params.get("max", np.max(ks_samples)))
                            if mx <= mn:
                                mx = mn + self.epsilon
                            ks = stats.kstest(
                                ks_samples,
                                stats.uniform.cdf,
                                args=(mn, mx - mn),
                            )

                        col_res["kolmogorov-smirnov"] = {
                            "p_value": float(ks.pvalue),
                            "statistic": float(ks.statistic),
                            "interpretation": self.interpretation_thresholds.get(
                                "follows_distribution" if ks.pvalue >= self.alpha else "does_not_follow_distribution",
                                "follows_distribution",
                            ),
                            "sample_size": len(ks_samples),
                            "note": "approximated_from_random_samples",
                        }
                    else:
                        col_res["kolmogorov-smirnov"] = {
                            "p_value": float("nan"),
                            "statistic": float("nan"),
                            "interpretation": "no_samples_available",
                        }
                else:
                    col_res["kolmogorov-smirnov"] = {
                        "p_value": float("nan"),
                        "statistic": float("nan"),
                        "interpretation": "no_sample_data_found",
                    }

            # Shannon entropy - aligned on dqm-ml v1 (using theoretical frequencies)
            if "shannon-entropy" in self.metrics:
                # Use theoretical frequencies
                p_exp = exp_probs / exp_probs.sum()
                h_exp = float(stats.entropy(p_exp))
                col_res["shannon-entropy"] = {
                    "entropy": h_exp,
                    "interpretation": self.interpretation_thresholds.get(
                        "high_diversity" if h_exp > self.shannon_entropy_threshold else "low_diversity",
                        "high_diversity",
                    ),
                }

            # GRTE (gap between observed and theoretical entropies) - aligned on dqm-ml v1
            if "grte" in self.metrics:
                # Use observed and theoretical frequencies
                p_obs = obs_counts / obs_counts.sum()
                p_exp = exp_probs / exp_probs.sum()
                h_obs = float(stats.entropy(p_obs))
                h_exp = float(stats.entropy(p_exp))
                grte = float(np.exp(-2.0 * abs(h_exp - h_obs)))
                col_res["grte"] = {
                    "grte_value": grte,
                    "interpretation": self.interpretation_thresholds.get(
                        "high_representativeness" if grte > self.grte_threshold else "low_representativeness",
                        "high_representativeness",
                    ),
                }

            # Stupid way to flatten tree of keys
            # TODO : refactor the implementation,for optimization
            if col_res:
                for key, value in col_res.items():
                    if isinstance(value, dict):
                        for prop, content in value.items():
                            out[key + "_" + col + "_" + prop] = content
                    else:
                        out[key + "_" + col] = value
        # TODO make optional export of metadata
        meta_data = {
            "bins": self.bins,
            "distribution": self.distribution,
            "metrics_computed": self.metrics,
            "total_samples": total_samples,
            "columns_analyzed": [c for c in self.input_columns if f"{c}_count" in batch_metrics],
            "ks_sampling_enabled": "kolmogorov-smirnov" in self.metrics,
            "note": "KS test uses random sampling approximation for scalability",
        }

        import json

        out["_metadata"] = json.dumps(meta_data)
        return out

    def reset(self) -> None:
        """Reset processor state for new processing run."""
        self._bin_edges = {}
        self._initialized = False

    # utils methods for bin edge calculation

    def _bin_edges_normal(self, mean: float, std: float, bins: int, data: np.ndarray) -> np.ndarray:
        """
        Calculate bin edges based on the Percent Point Function (PPF) of a Normal distribution.

        This ensures bins represent equal probability mass under the theoretical distribution.
        The first and last bins are extended to -inf and +inf respectively.
        """
        # logic from dqm-ml v1 : use stats.norm.ppf with linspace(1/bins, 1, bins)
        interval = []
        for i in range(1, bins):
            val = stats.norm.ppf(i / bins, mean, std)
            interval.append(val)
        interval.insert(0, -np.inf)
        interval.append(np.inf)
        return np.array(interval)

    def _bin_edges_uniform(self, mn: float, mx: float, bins: int, data: np.ndarray) -> np.ndarray:
        """
        Calculate linearly spaced bin edges for a Uniform distribution.

        The range is determined by the minimum/maximum of both the configured
        parameters and the actual observed data.
        """
        lo = min(mn, float(np.min(data)))
        hi = max(mx, float(np.max(data)))
        if hi <= lo:
            hi = lo + self.epsilon
        return np.linspace(lo, hi, bins + 1)

DEFAULT_ALPHA = 0.05 class-attribute instance-attribute

DEFAULT_EPSILON = 1e-09 class-attribute instance-attribute

DEFAULT_GRTE_THRESHOLD = 0.5 class-attribute instance-attribute

DEFAULT_INTERPRETATION_THRESHOLDS = {'follows_distribution': 'follows_distribution', 'does_not_follow_distribution': 'does_not_follow_distribution', 'high_diversity': 'high_diversity', 'low_diversity': 'low_diversity', 'high_representativeness': 'high_representativeness', 'low_representativeness': 'low_representativeness'} class-attribute instance-attribute

DEFAULT_KS_MIN_SAMPLE_SIZE = 50 class-attribute instance-attribute

DEFAULT_KS_SAMPLE_DIVISOR = 20 class-attribute instance-attribute

DEFAULT_KS_SAMPLE_SIZE = 500 class-attribute instance-attribute

DEFAULT_SHANNON_ENTROPY_THRESHOLD = 2.0 class-attribute instance-attribute

SUPPORTED_DISTS = {'normal', 'uniform'} class-attribute instance-attribute

SUPPORTED_METRICS = {'chi-square', 'grte', 'shannon-entropy', 'kolmogorov-smirnov'} class-attribute instance-attribute

alpha: float = float(cfg.get('alpha', self.DEFAULT_ALPHA)) instance-attribute

bins: int = int(cfg.get('bins', 10)) instance-attribute

dist_params: dict[str, Any] = {} instance-attribute

distribution: str = str(cfg.get('distribution', 'normal')).lower() instance-attribute

epsilon: float = float(cfg.get('epsilon', self.DEFAULT_EPSILON)) instance-attribute

grte_threshold: float = float(cfg.get('grte_threshold', self.DEFAULT_GRTE_THRESHOLD)) instance-attribute

interpretation_thresholds: dict[str, str] = cfg.get('interpretation_thresholds', self.DEFAULT_INTERPRETATION_THRESHOLDS) instance-attribute

ks_min_sample_size: int = int(cfg.get('ks_min_sample_size', self.DEFAULT_KS_MIN_SAMPLE_SIZE)) instance-attribute

ks_sample_divisor: int = int(cfg.get('ks_sample_divisor', self.DEFAULT_KS_SAMPLE_DIVISOR)) instance-attribute

ks_sample_size: int = int(cfg.get('ks_sample_size', self.DEFAULT_KS_SAMPLE_SIZE)) instance-attribute

metrics: list[str] = list(cfg.get('metrics', ['chi-square', 'grte', 'kolmogorov-smirnov', 'shannon-entropy'])) instance-attribute

name = name instance-attribute

shannon_entropy_threshold: float = float(cfg.get('shannon_entropy_threshold', self.DEFAULT_SHANNON_ENTROPY_THRESHOLD)) instance-attribute

__init__(name: str = 'representativeness', config: dict[str, Any] | None = None) -> None

Initialize the representativeness processor.

Parameters:

Name Type Description Default
name str

Name of the processor.

'representativeness'
config dict[str, Any] | None

Configuration dictionary containing: - input_columns: List of columns to analyze. - metrics: List of metrics to compute (default: all supported). - bins: Number of bins for histograms (default: 10). - distribution: Target distribution ("normal" or "uniform"). - alpha: Significance level (default: 0.05). - distribution_params: Dictionary of params (e.g., mean, std, min, max).

None
Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def __init__(
    self,
    name: str = "representativeness",
    config: dict[str, Any] | None = None,
) -> None:
    """
    Initialize the representativeness processor.

    Args:
        name: Name of the processor.
        config: Configuration dictionary containing:
            - input_columns: List of columns to analyze.
            - metrics: List of metrics to compute (default: all supported).
            - bins: Number of bins for histograms (default: 10).
            - distribution: Target distribution ("normal" or "uniform").
            - alpha: Significance level (default: 0.05).
            - distribution_params: Dictionary of params (e.g., mean, std, min, max).
    """
    super().__init__(name, config)
    self.name = name

    cfg = self.config
    self.metrics: list[str] = list(
        cfg.get(
            "metrics",
            ["chi-square", "grte", "kolmogorov-smirnov", "shannon-entropy"],
        )
    )

    self.bins: int = int(cfg.get("bins", 10))
    self.distribution: str = str(cfg.get("distribution", "normal")).lower()

    # Load configurable constants from config or use defaults
    self.alpha: float = float(cfg.get("alpha", self.DEFAULT_ALPHA))
    self.shannon_entropy_threshold: float = float(
        cfg.get(
            "shannon_entropy_threshold",
            self.DEFAULT_SHANNON_ENTROPY_THRESHOLD,
        )
    )
    self.grte_threshold: float = float(cfg.get("grte_threshold", self.DEFAULT_GRTE_THRESHOLD))
    self.ks_sample_size: int = int(cfg.get("ks_sample_size", self.DEFAULT_KS_SAMPLE_SIZE))
    self.ks_min_sample_size: int = int(cfg.get("ks_min_sample_size", self.DEFAULT_KS_MIN_SAMPLE_SIZE))
    self.ks_sample_divisor: int = int(cfg.get("ks_sample_divisor", self.DEFAULT_KS_SAMPLE_DIVISOR))
    self.epsilon: float = float(cfg.get("epsilon", self.DEFAULT_EPSILON))

    # Load interpretation thresholds from config or use defaults
    self.interpretation_thresholds: dict[str, str] = cfg.get(
        "interpretation_thresholds", self.DEFAULT_INTERPRETATION_THRESHOLDS
    )

    # Handle distribution_params properly - it can be None or a dict
    dist_params_raw = cfg.get("distribution_params")

    self.dist_params: dict[str, Any] = {}
    if dist_params_raw is not None:
        self.dist_params = dict(dist_params_raw)

    # check config: avoid redondancy checks with pipeline (see datasetpipeline )
    if not self.input_columns:
        raise ValueError(f"[{self.name}] 'input_columns' must be provided")
    if any(m not in self.SUPPORTED_METRICS for m in self.metrics):
        raise ValueError(f"[{self.name}] unsupported metric; supported: {self.SUPPORTED_METRICS}")
    if self.distribution not in self.SUPPORTED_DISTS:
        raise ValueError(f"[{self.name}] 'distribution' must be in {self.SUPPORTED_DISTS}")
    if self.bins < 2:
        raise ValueError(f"[{self.name}] 'bins' must be >= 2")
    if self.alpha <= 0 or self.alpha >= 1:
        raise ValueError(f"[{self.name}] 'alpha' must be between 0 and 1")
    if self.epsilon <= 0:
        raise ValueError(f"[{self.name}] 'epsilon' must be positive")

    self._bin_edges: dict[str, np.ndarray] = {}
    self._initialized: bool = False

compute(batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]

Compute final dataset-level metrics by aggregating batch histograms.

Parameters:

Name Type Description Default
batch_metrics dict[str, Array] | None

Dictionary of batch-level metrics collected during processing.

None

Returns:

Type Description
dict[str, Any]

Dictionary containing final scores and interpretations for all selected metrics.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
@override
def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]:
    """
    Compute final dataset-level metrics by aggregating batch histograms.

    Args:
        batch_metrics: Dictionary of batch-level metrics collected during processing.

    Returns:
        Dictionary containing final scores and interpretations for all selected metrics.
    """
    if not batch_metrics:
        return {"_metadata": {"error": "No batch metrics provided"}}

    out: dict[str, Any] = {}
    total_samples = 0

    for col in self.input_columns:
        count_key = f"{col}_count"
        hist_key = f"{col}_hist"

        if count_key not in batch_metrics or hist_key not in batch_metrics:
            logger.warning(f"[{self.name}] no batch metrics for column '{col}'")
            continue

        # TODO : maybe we need a try ? as in batch or not as na already removed
        # N/A handling shall be documented, and logs added
        hist_batch_arrays = np.asarray(batch_metrics[hist_key].to_numpy(zero_copy_only=False))
        if hist_batch_arrays.shape[0] == 0:
            logger.warning(f"[{self.name}] no histograme batch for '{col}'")
            continue

        # aggregate counts and histograms across all batches
        total_count = int(np.sum(batch_metrics[count_key].to_numpy()))

        hist_arrays = None

        for batch_hist in hist_batch_arrays:
            hist_arrays = batch_hist if hist_arrays is None else hist_arrays + batch_hist

        # Debug: vérifier les dimensions
        if hist_arrays is None:
            logger.warning(f"[{self.name}] no valid histogram for column '{col}'")
            continue

        logger.debug(f"[{self.name}] hist_arrays shape: {hist_arrays.shape}, expected bins: {self.bins}")

        # sum histogram counts across batches
        if hist_arrays.ndim == 1:
            logger.debug("Single histogram")
            obs_counts = hist_arrays.astype(float)
        else:
            # Ensure we're summing along the right axis
            logger.debug("Multiple histograms from different batches")
            if hist_arrays.shape[1] == self.bins:
                logger.debug("Sum along batch dimension (axis=0)")
                obs_counts = np.sum(hist_arrays, axis=0).astype(float)
            else:
                logger.debug("Flatten and create a single histogram")
                logger.warning(f"[{self.name}] Unexpected histogram shape {hist_arrays.shape}, flattening")
                obs_counts = hist_arrays.flatten().astype(float)

        if total_count <= 0 or obs_counts.sum() <= 0:
            logger.warning(f"[{self.name}] no valid data for column '{col}'")
            continue

        total_samples += total_count

        #  distribution parameters and bin edges
        if col not in self._bin_edges:
            logger.warning(f"[{self.name}] no bin edges for column '{col}' - skipping")
            continue

        edges = self._bin_edges[col]

        # theoretical probabilities - Aligné sur DQM-ML officiel
        if self.distribution == "normal":
            logger.debug("Generate normal distribution")
            # Utilise les MÊMES paramètres que ceux utilisés pour générer les bins

            sample_key = f"{col}_ks_sample"
            if sample_key in batch_metrics:
                logger.debug("Use sampled meand and std")
                sample_arrays = batch_metrics[sample_key].to_numpy()
                if sample_arrays.ndim > 1:
                    sample_arrays = sample_arrays.flatten()
                mean = float(self.dist_params.get("mean", np.mean(sample_arrays)))
                std = float(self.dist_params.get("std", np.std(sample_arrays, ddof=0)))
                std = std if std > 0.0 else self.epsilon
            else:
                logger.debug("Fallback: use default or configured mean and std")
                mean = float(self.dist_params.get("mean", 0.0))
                std = float(self.dist_params.get("std", 1.0))
            logger.debug(f"mean={mean}")
            logger.debug(f"std={mean}")
            # génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
            expected_values = np.random.normal(mean, std, total_count)
            exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)
        else:  # uniform
            logger.debug("Generate uniform distribution")
            mn = float(self.dist_params.get("min", edges[0]))
            mx = float(self.dist_params.get("max", edges[-1]))
            logger.debug(f"min={mn}")
            logger.debug(f"max={mx}")
            # Génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
            expected_values = np.random.uniform(mn, mx, total_count)
            exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)

        exp_counts = total_count * exp_probs

        col_res: dict[str, Any] = {}

        # chi-square: here we compute the chi-square with a alpha value of 0.05
        if "chi-square" in self.metrics:
            # Ensure observed and expected counts have the same sum

            mask = exp_counts > 0
            if mask.sum() >= 2:
                logger.debug("Normalize expected counts to match observed sum")
                obs_sum = obs_counts[mask].sum()
                exp_sum = exp_counts[mask].sum()

                if exp_sum > 0:
                    # Scale expected counts to match observed sum
                    exp_counts_normalized = exp_counts[mask] * (obs_sum / exp_sum)

                    logger.debug("Expected frequencies:")
                    logger.debug(exp_counts_normalized)
                    logger.debug("Observed frequencies: ")
                    logger.debug(obs_counts[mask])

                    try:
                        chi = stats.chisquare(
                            f_obs=obs_counts[mask],
                            f_exp=exp_counts_normalized,
                        )
                        logger.debug(f"Chi P value: {chi.pvalue}")
                        col_res["chi-square"] = {
                            "p_value": float(chi.pvalue),
                            "statistic": float(chi.statistic),
                            "interpretation": self.interpretation_thresholds.get(
                                "follows_distribution"
                                if chi.pvalue >= self.alpha
                                else "does_not_follow_distribution",
                                "follows_distribution",
                            ),
                        }
                    except ValueError as e:
                        # Fallback: use only observed counts if chi-square fails
                        col_res["chi-square"] = {
                            "p_value": float("nan"),
                            "statistic": float("nan"),
                            "interpretation": f"chi_square_failed: {e!s}",
                            "note": "using observed counts only due to statistical constraints",
                        }
                else:
                    col_res["chi-square"] = {
                        "p_value": float("nan"),
                        "statistic": float("nan"),
                        "interpretation": "no_expected_counts",
                    }
            else:
                col_res["chi-square"] = {
                    "p_value": float("nan"),
                    "statistic": float("nan"),
                    "interpretation": "insufficient_bins",
                }

        # Kolmogorov-Smirnov test using sampled data
        if "kolmogorov-smirnov" in self.metrics:
            sample_key = f"{col}_ks_sample"
            if sample_key in batch_metrics:
                sample_arrays = batch_metrics[sample_key].to_numpy()
                ks_samples = sample_arrays if sample_arrays.ndim == 1 else sample_arrays.flatten()

                if len(ks_samples) > 0:
                    # Perform KS test on aggregated samples
                    if self.distribution == "normal":
                        mean = float(self.dist_params.get("mean", np.mean(ks_samples)))
                        std = float(self.dist_params.get("std", np.std(ks_samples, ddof=0)))
                        std = std if std > 0.0 else self.epsilon
                        ks = stats.kstest(ks_samples, stats.norm.cdf, args=(mean, std))
                    else:  # uniform
                        mn = float(self.dist_params.get("min", np.min(ks_samples)))
                        mx = float(self.dist_params.get("max", np.max(ks_samples)))
                        if mx <= mn:
                            mx = mn + self.epsilon
                        ks = stats.kstest(
                            ks_samples,
                            stats.uniform.cdf,
                            args=(mn, mx - mn),
                        )

                    col_res["kolmogorov-smirnov"] = {
                        "p_value": float(ks.pvalue),
                        "statistic": float(ks.statistic),
                        "interpretation": self.interpretation_thresholds.get(
                            "follows_distribution" if ks.pvalue >= self.alpha else "does_not_follow_distribution",
                            "follows_distribution",
                        ),
                        "sample_size": len(ks_samples),
                        "note": "approximated_from_random_samples",
                    }
                else:
                    col_res["kolmogorov-smirnov"] = {
                        "p_value": float("nan"),
                        "statistic": float("nan"),
                        "interpretation": "no_samples_available",
                    }
            else:
                col_res["kolmogorov-smirnov"] = {
                    "p_value": float("nan"),
                    "statistic": float("nan"),
                    "interpretation": "no_sample_data_found",
                }

        # Shannon entropy - aligned on dqm-ml v1 (using theoretical frequencies)
        if "shannon-entropy" in self.metrics:
            # Use theoretical frequencies
            p_exp = exp_probs / exp_probs.sum()
            h_exp = float(stats.entropy(p_exp))
            col_res["shannon-entropy"] = {
                "entropy": h_exp,
                "interpretation": self.interpretation_thresholds.get(
                    "high_diversity" if h_exp > self.shannon_entropy_threshold else "low_diversity",
                    "high_diversity",
                ),
            }

        # GRTE (gap between observed and theoretical entropies) - aligned on dqm-ml v1
        if "grte" in self.metrics:
            # Use observed and theoretical frequencies
            p_obs = obs_counts / obs_counts.sum()
            p_exp = exp_probs / exp_probs.sum()
            h_obs = float(stats.entropy(p_obs))
            h_exp = float(stats.entropy(p_exp))
            grte = float(np.exp(-2.0 * abs(h_exp - h_obs)))
            col_res["grte"] = {
                "grte_value": grte,
                "interpretation": self.interpretation_thresholds.get(
                    "high_representativeness" if grte > self.grte_threshold else "low_representativeness",
                    "high_representativeness",
                ),
            }

        # Stupid way to flatten tree of keys
        # TODO : refactor the implementation,for optimization
        if col_res:
            for key, value in col_res.items():
                if isinstance(value, dict):
                    for prop, content in value.items():
                        out[key + "_" + col + "_" + prop] = content
                else:
                    out[key + "_" + col] = value
    # TODO make optional export of metadata
    meta_data = {
        "bins": self.bins,
        "distribution": self.distribution,
        "metrics_computed": self.metrics,
        "total_samples": total_samples,
        "columns_analyzed": [c for c in self.input_columns if f"{c}_count" in batch_metrics],
        "ks_sampling_enabled": "kolmogorov-smirnov" in self.metrics,
        "note": "KS test uses random sampling approximation for scalability",
    }

    import json

    out["_metadata"] = json.dumps(meta_data)
    return out

compute_batch_metric(features: dict[str, pa.Array]) -> dict[str, pa.Array]

Compute partial histogram statistics per batch for streaming aggregation.

Parameters:

Name Type Description Default
features dict[str, Array]

Dictionary of column arrays from this batch.

required

Returns:

Type Description
dict[str, Array]

Dictionary containing: - {col}_count: Total valid numeric samples. - {col}_hist: Histogram counts. - {col}_ks_sample: Random subset of data for KS test.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@override
def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
    """
    Compute partial histogram statistics per batch for streaming aggregation.

    Args:
        features: Dictionary of column arrays from this batch.

    Returns:
        Dictionary containing:
            - {col}_count: Total valid numeric samples.
            - {col}_hist: Histogram counts.
            - {col}_ks_sample: Random subset of data for KS test.
    """
    batch_metrics = {}

    for col in self.input_columns:
        if col not in features:
            logger.warning(f"[{self.name}] column '{col}' not found in batch")
            continue

        arr = features[col]
        # convert to numeric, handle mixed types and NaN
        try:
            np_col = np.asarray(arr.to_numpy(zero_copy_only=False))
        except Exception:
            np_col = pd.Series(arr.to_pylist()).to_numpy(copy=True)

        values = pd.to_numeric(pd.Series(np_col), errors="coerce").dropna()

        if values.empty:
            logger.warning(f"[{self.name}] column '{col}' has no valid numeric values in this batch")
            continue

        if not self._initialized or col not in self._bin_edges:
            self._initialize_bin_edges(values.to_numpy(), col)

        edges = self._bin_edges[col]

        # Debug
        logger.debug(f"[{self.name}] edges shape: {edges.shape}, values shape: {values.shape}")

        hist_counts = np.histogram(values, bins=edges)[0].astype(np.int64)

        # debug: check histogram
        logger.debug(f"[{self.name}] hist_counts shape: {hist_counts.shape}, expected: {self.bins}")

        # store as Arrow arrays for aggregation
        batch_metrics[f"{col}_count"] = pa.array([len(values)], type=pa.int64())
        # batch_metrics[f"{col}_hist"] = pa.array(hist_counts.tolist(), type=pa.int64())
        batch_metrics[f"{col}_hist"] = pa.FixedSizeListArray.from_arrays(
            hist_counts, list_size=hist_counts.shape[0]
        )

        # sampling for KS test approximation
        # TODO: KS need to have all the data in memory to compute,
        # this metrics need to rely on other metrics prior computation of mean, std, min, max computation
        if "kolmogorov-smirnov" in self.metrics or "chi-square" in self.metrics:
            sample_per_batch = min(
                self.ks_sample_size,
                max(
                    self.ks_min_sample_size,
                    len(values) // self.ks_sample_divisor,
                ),
            )
            if len(values) > sample_per_batch:
                # Random sampling without replacement
                sample_indices = np.random.choice(len(values), sample_per_batch, replace=False)
                sample = values[sample_indices]
            else:
                sample = values

            batch_metrics[f"{col}_ks_sample"] = pa.array(sample.tolist(), type=pa.float64())

    if not self._initialized and batch_metrics:
        self._initialized = True

    return batch_metrics

generated_metrics() -> list[str]

Return the list of metric columns that will be generated.

Returns:

Type Description
list[str]

List of output metric column names

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@override
def generated_metrics(self) -> list[str]:
    """
    Return the list of metric columns that will be generated.

    Returns:
        List of output metric column names
    """
    # TODO : manage output metrics names with configuration
    # for now we follow a fixed naming convention
    metrics = []
    for col in self.input_columns:
        if "chi-square" in self.metrics:
            metrics.append(f"{col}_chi-square_p_value")
            metrics.append(f"{col}_chi-square_statistic")
            metrics.append(f"{col}_chi-square_interpretation")
        if "kolmogorov-smirnov" in self.metrics:
            metrics.append(f"{col}_kolmogorov-smirnov_p_value")
            metrics.append(f"{col}_kolmogorov-smirnov_statistic")
            metrics.append(f"{col}_kolmogorov-smirnov_interpretation")
        if "shannon-entropy" in self.metrics:
            metrics.append(f"{col}_shannon-entropy_entropy")
            metrics.append(f"{col}_shannon-entropy_interpretation")
        if "grte" in self.metrics:
            metrics.append(f"{col}_grte_grte_value")
            metrics.append(f"{col}_grte_interpretation")

    return metrics

reset() -> None

Reset processor state for new processing run.

Source code in packages/dqm-ml-core/src/dqm_ml_core/metrics/representativeness.py
544
545
546
547
def reset(self) -> None:
    """Reset processor state for new processing run."""
    self._bin_edges = {}
    self._initialized = False