Coverage for packages / dqm-ml-core / src / dqm_ml_core / metrics / representativeness.py: 77%
264 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
1"""Representativeness metric processor for evaluating distribution fit.
3This module contains the RepresentativenessProcessor class that evaluates
4how well a dataset represents a target statistical distribution using
5various statistical tests.
6"""
8import logging
9from typing import Any
11import numpy as np
12import pandas as pd
13import pyarrow as pa
14from scipy import stats
16# COMPATIBILITY : from typing import Any, override # When support of 3.10 and 3.11 will be removed
17from typing_extensions import override
19from dqm_ml_core.api.data_processor import DatametricProcessor
21logger = logging.getLogger(__name__)
24class RepresentativenessProcessor(DatametricProcessor):
25 """
26 Evaluates how well the dataset represents a target statistical distribution.
28 This processor performs on samples discretisation statistical tests to compare the observed
29 distribution of numerical columns against a theoretical target distribution
30 (Normal or Uniform).
32 Supported Metrics:
33 - Chi-square: Goodness-of-fit test for categorical/binned data.
34 - Kolmogorov-Smirnov (KS): Non-parametric test for continuous distributions (approximated via sampling).
35 - Shannon Entropy: Measures the information diversity of the binned data.
36 - GRTE (Geometric Representativeness Trajectory Error): Measures the exponential gap
37 between observed and theoretical entropy.
39 The processor uses a streaming architecture:
40 - Batch level: Computes partial calculus.
41 - Dataset level: Aggregates histograms and performs final statistical tests.
42 """
44 SUPPORTED_METRICS = {
45 "chi-square",
46 "grte",
47 "shannon-entropy",
48 "kolmogorov-smirnov",
49 }
50 SUPPORTED_DISTS = {"normal", "uniform"}
52 # Configuration constants - can be overridden in config
53 DEFAULT_ALPHA = 0.05 # Significance level for statistical tests
54 DEFAULT_SHANNON_ENTROPY_THRESHOLD = 2.0 # Threshold for high/low diversity interpretation
55 DEFAULT_GRTE_THRESHOLD = 0.5 # Threshold for high/low representativeness interpretation
56 DEFAULT_KS_SAMPLE_SIZE = 500 # Maximum sample size for KS test
57 DEFAULT_KS_MIN_SAMPLE_SIZE = 50 # Minimum sample size for KS test
58 DEFAULT_KS_SAMPLE_DIVISOR = 20 # Divisor for calculating sample size per batch
59 DEFAULT_EPSILON = 1e-9 # Small value to avoid division by zero
60 DEFAULT_INTERPRETATION_THRESHOLDS = {
61 "follows_distribution": "follows_distribution",
62 "does_not_follow_distribution": "does_not_follow_distribution",
63 "high_diversity": "high_diversity",
64 "low_diversity": "low_diversity",
65 "high_representativeness": "high_representativeness",
66 "low_representativeness": "low_representativeness",
67 }
69 def __init__(
70 self,
71 name: str = "representativeness",
72 config: dict[str, Any] | None = None,
73 ) -> None:
74 """
75 Initialize the representativeness processor.
77 Args:
78 name: Name of the processor.
79 config: Configuration dictionary containing:
80 - input_columns: List of columns to analyze.
81 - metrics: List of metrics to compute (default: all supported).
82 - bins: Number of bins for histograms (default: 10).
83 - distribution: Target distribution ("normal" or "uniform").
84 - alpha: Significance level (default: 0.05).
85 - distribution_params: Dictionary of params (e.g., mean, std, min, max).
86 """
87 super().__init__(name, config)
88 self.name = name
90 cfg = self.config
91 self.metrics: list[str] = list(
92 cfg.get(
93 "metrics",
94 ["chi-square", "grte", "kolmogorov-smirnov", "shannon-entropy"],
95 )
96 )
98 self.bins: int = int(cfg.get("bins", 10))
99 self.distribution: str = str(cfg.get("distribution", "normal")).lower()
101 # Load configurable constants from config or use defaults
102 self.alpha: float = float(cfg.get("alpha", self.DEFAULT_ALPHA))
103 self.shannon_entropy_threshold: float = float(
104 cfg.get(
105 "shannon_entropy_threshold",
106 self.DEFAULT_SHANNON_ENTROPY_THRESHOLD,
107 )
108 )
109 self.grte_threshold: float = float(cfg.get("grte_threshold", self.DEFAULT_GRTE_THRESHOLD))
110 self.ks_sample_size: int = int(cfg.get("ks_sample_size", self.DEFAULT_KS_SAMPLE_SIZE))
111 self.ks_min_sample_size: int = int(cfg.get("ks_min_sample_size", self.DEFAULT_KS_MIN_SAMPLE_SIZE))
112 self.ks_sample_divisor: int = int(cfg.get("ks_sample_divisor", self.DEFAULT_KS_SAMPLE_DIVISOR))
113 self.epsilon: float = float(cfg.get("epsilon", self.DEFAULT_EPSILON))
115 # Load interpretation thresholds from config or use defaults
116 self.interpretation_thresholds: dict[str, str] = cfg.get(
117 "interpretation_thresholds", self.DEFAULT_INTERPRETATION_THRESHOLDS
118 )
120 # Handle distribution_params properly - it can be None or a dict
121 dist_params_raw = cfg.get("distribution_params")
123 self.dist_params: dict[str, Any] = {}
124 if dist_params_raw is not None: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 self.dist_params = dict(dist_params_raw)
127 # check config: avoid redondancy checks with pipeline (see datasetpipeline )
128 if not self.input_columns: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 raise ValueError(f"[{self.name}] 'input_columns' must be provided")
130 if any(m not in self.SUPPORTED_METRICS for m in self.metrics): 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true
131 raise ValueError(f"[{self.name}] unsupported metric; supported: {self.SUPPORTED_METRICS}")
132 if self.distribution not in self.SUPPORTED_DISTS: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 raise ValueError(f"[{self.name}] 'distribution' must be in {self.SUPPORTED_DISTS}")
134 if self.bins < 2: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 raise ValueError(f"[{self.name}] 'bins' must be >= 2")
136 if self.alpha <= 0 or self.alpha >= 1: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 raise ValueError(f"[{self.name}] 'alpha' must be between 0 and 1")
138 if self.epsilon <= 0: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 raise ValueError(f"[{self.name}] 'epsilon' must be positive")
141 self._bin_edges: dict[str, np.ndarray] = {}
142 self._initialized: bool = False
144 @override
145 def generated_metrics(self) -> list[str]:
146 """
147 Return the list of metric columns that will be generated.
149 Returns:
150 List of output metric column names
151 """
152 # TODO : manage output metrics names with configuration
153 # for now we follow a fixed naming convention
154 metrics = []
155 for col in self.input_columns:
156 if "chi-square" in self.metrics: 156 ↛ 160line 156 didn't jump to line 160 because the condition on line 156 was always true
157 metrics.append(f"{col}_chi-square_p_value")
158 metrics.append(f"{col}_chi-square_statistic")
159 metrics.append(f"{col}_chi-square_interpretation")
160 if "kolmogorov-smirnov" in self.metrics: 160 ↛ 164line 160 didn't jump to line 164 because the condition on line 160 was always true
161 metrics.append(f"{col}_kolmogorov-smirnov_p_value")
162 metrics.append(f"{col}_kolmogorov-smirnov_statistic")
163 metrics.append(f"{col}_kolmogorov-smirnov_interpretation")
164 if "shannon-entropy" in self.metrics: 164 ↛ 167line 164 didn't jump to line 167 because the condition on line 164 was always true
165 metrics.append(f"{col}_shannon-entropy_entropy")
166 metrics.append(f"{col}_shannon-entropy_interpretation")
167 if "grte" in self.metrics: 167 ↛ 155line 167 didn't jump to line 155 because the condition on line 167 was always true
168 metrics.append(f"{col}_grte_grte_value")
169 metrics.append(f"{col}_grte_interpretation")
171 return metrics
173 @override
174 def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
175 """
176 Compute partial histogram statistics per batch for streaming aggregation.
178 Args:
179 features: Dictionary of column arrays from this batch.
181 Returns:
182 Dictionary containing:
183 - {col}_count: Total valid numeric samples.
184 - {col}_hist: Histogram counts.
185 - {col}_ks_sample: Random subset of data for KS test.
186 """
187 batch_metrics = {}
189 for col in self.input_columns:
190 if col not in features: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 logger.warning(f"[{self.name}] column '{col}' not found in batch")
192 continue
194 arr = features[col]
195 # convert to numeric, handle mixed types and NaN
196 try:
197 np_col = np.asarray(arr.to_numpy(zero_copy_only=False))
198 except Exception:
199 np_col = pd.Series(arr.to_pylist()).to_numpy(copy=True)
201 values = pd.to_numeric(pd.Series(np_col), errors="coerce").dropna()
203 if values.empty: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 logger.warning(f"[{self.name}] column '{col}' has no valid numeric values in this batch")
205 continue
207 if not self._initialized or col not in self._bin_edges:
208 self._initialize_bin_edges(values.to_numpy(), col)
210 edges = self._bin_edges[col]
212 # Debug
213 logger.debug(f"[{self.name}] edges shape: {edges.shape}, values shape: {values.shape}")
215 hist_counts = np.histogram(values, bins=edges)[0].astype(np.int64)
217 # debug: check histogram
218 logger.debug(f"[{self.name}] hist_counts shape: {hist_counts.shape}, expected: {self.bins}")
220 # store as Arrow arrays for aggregation
221 batch_metrics[f"{col}_count"] = pa.array([len(values)], type=pa.int64())
222 # batch_metrics[f"{col}_hist"] = pa.array(hist_counts.tolist(), type=pa.int64())
223 batch_metrics[f"{col}_hist"] = pa.FixedSizeListArray.from_arrays(
224 hist_counts, list_size=hist_counts.shape[0]
225 )
227 # sampling for KS test approximation
228 # TODO: KS need to have all the data in memory to compute,
229 # this metrics need to rely on other metrics prior computation of mean, std, min, max computation
230 if "kolmogorov-smirnov" in self.metrics or "chi-square" in self.metrics: 230 ↛ 189line 230 didn't jump to line 189 because the condition on line 230 was always true
231 sample_per_batch = min(
232 self.ks_sample_size,
233 max(
234 self.ks_min_sample_size,
235 len(values) // self.ks_sample_divisor,
236 ),
237 )
238 if len(values) > sample_per_batch:
239 # Random sampling without replacement
240 sample_indices = np.random.choice(len(values), sample_per_batch, replace=False)
241 sample = values[sample_indices]
242 else:
243 sample = values
245 batch_metrics[f"{col}_ks_sample"] = pa.array(sample.tolist(), type=pa.float64())
247 if not self._initialized and batch_metrics:
248 self._initialized = True
250 return batch_metrics
252 def _initialize_bin_edges(self, sample_data: np.ndarray, col: str) -> None:
253 """
254 Initialize bin edges for a column based on sample data and target distribution.
256 Args:
257 sample_data: Data array used to infer parameters if not provided in config.
258 col: Name of the column.
259 """
260 if self.distribution == "normal":
261 mean = float(self.dist_params.get("mean", np.mean(sample_data)))
262 std = float(self.dist_params.get("std", np.std(sample_data, ddof=0)))
263 std = std if std > 0.0 else self.epsilon
264 edges = self._bin_edges_normal(mean, std, self.bins, sample_data)
265 else:
266 mn = float(self.dist_params.get("min", np.min(sample_data)))
267 mx = float(self.dist_params.get("max", np.max(sample_data)))
268 if mx <= mn: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 mx = mn + self.epsilon
270 edges = self._bin_edges_uniform(mn, mx, self.bins, sample_data)
272 self._bin_edges[col] = edges
274 @override
275 def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]:
276 """
277 Compute final dataset-level metrics by aggregating batch histograms.
279 Args:
280 batch_metrics: Dictionary of batch-level metrics collected during processing.
282 Returns:
283 Dictionary containing final scores and interpretations for all selected metrics.
284 """
285 if not batch_metrics: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 return {"_metadata": {"error": "No batch metrics provided"}}
288 out: dict[str, Any] = {}
289 total_samples = 0
291 for col in self.input_columns:
292 count_key = f"{col}_count"
293 hist_key = f"{col}_hist"
295 if count_key not in batch_metrics or hist_key not in batch_metrics: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true
296 logger.warning(f"[{self.name}] no batch metrics for column '{col}'")
297 continue
299 # TODO : maybe we need a try ? as in batch or not as na already removed
300 # N/A handling shall be documented, and logs added
301 hist_batch_arrays = np.asarray(batch_metrics[hist_key].to_numpy(zero_copy_only=False))
302 if hist_batch_arrays.shape[0] == 0: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 logger.warning(f"[{self.name}] no histograme batch for '{col}'")
304 continue
306 # aggregate counts and histograms across all batches
307 total_count = int(np.sum(batch_metrics[count_key].to_numpy()))
309 hist_arrays = None
311 for batch_hist in hist_batch_arrays:
312 hist_arrays = batch_hist if hist_arrays is None else hist_arrays + batch_hist
314 # Debug: vérifier les dimensions
315 if hist_arrays is None: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 logger.warning(f"[{self.name}] no valid histogram for column '{col}'")
317 continue
319 logger.debug(f"[{self.name}] hist_arrays shape: {hist_arrays.shape}, expected bins: {self.bins}")
321 # sum histogram counts across batches
322 if hist_arrays.ndim == 1: 322 ↛ 327line 322 didn't jump to line 327 because the condition on line 322 was always true
323 logger.debug("Single histogram")
324 obs_counts = hist_arrays.astype(float)
325 else:
326 # Ensure we're summing along the right axis
327 logger.debug("Multiple histograms from different batches")
328 if hist_arrays.shape[1] == self.bins:
329 logger.debug("Sum along batch dimension (axis=0)")
330 obs_counts = np.sum(hist_arrays, axis=0).astype(float)
331 else:
332 logger.debug("Flatten and create a single histogram")
333 logger.warning(f"[{self.name}] Unexpected histogram shape {hist_arrays.shape}, flattening")
334 obs_counts = hist_arrays.flatten().astype(float)
336 if total_count <= 0 or obs_counts.sum() <= 0: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true
337 logger.warning(f"[{self.name}] no valid data for column '{col}'")
338 continue
340 total_samples += total_count
342 # distribution parameters and bin edges
343 if col not in self._bin_edges: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true
344 logger.warning(f"[{self.name}] no bin edges for column '{col}' - skipping")
345 continue
347 edges = self._bin_edges[col]
349 # theoretical probabilities - Aligné sur DQM-ML officiel
350 if self.distribution == "normal":
351 logger.debug("Generate normal distribution")
352 # Utilise les MÊMES paramètres que ceux utilisés pour générer les bins
354 sample_key = f"{col}_ks_sample"
355 if sample_key in batch_metrics: 355 ↛ 364line 355 didn't jump to line 364 because the condition on line 355 was always true
356 logger.debug("Use sampled meand and std")
357 sample_arrays = batch_metrics[sample_key].to_numpy()
358 if sample_arrays.ndim > 1: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 sample_arrays = sample_arrays.flatten()
360 mean = float(self.dist_params.get("mean", np.mean(sample_arrays)))
361 std = float(self.dist_params.get("std", np.std(sample_arrays, ddof=0)))
362 std = std if std > 0.0 else self.epsilon
363 else:
364 logger.debug("Fallback: use default or configured mean and std")
365 mean = float(self.dist_params.get("mean", 0.0))
366 std = float(self.dist_params.get("std", 1.0))
367 logger.debug(f"mean={mean}")
368 logger.debug(f"std={mean}")
369 # génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
370 expected_values = np.random.normal(mean, std, total_count)
371 exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)
372 else: # uniform
373 logger.debug("Generate uniform distribution")
374 mn = float(self.dist_params.get("min", edges[0]))
375 mx = float(self.dist_params.get("max", edges[-1]))
376 logger.debug(f"min={mn}")
377 logger.debug(f"max={mx}")
378 # Génère des valeurs aléatoires et compte les fréquences (comme l'officiel)
379 expected_values = np.random.uniform(mn, mx, total_count)
380 exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64)
382 exp_counts = total_count * exp_probs
384 col_res: dict[str, Any] = {}
386 # chi-square: here we compute the chi-square with a alpha value of 0.05
387 if "chi-square" in self.metrics: 387 ↛ 443line 387 didn't jump to line 443 because the condition on line 387 was always true
388 # Ensure observed and expected counts have the same sum
390 mask = exp_counts > 0
391 if mask.sum() >= 2: 391 ↛ 436line 391 didn't jump to line 436 because the condition on line 391 was always true
392 logger.debug("Normalize expected counts to match observed sum")
393 obs_sum = obs_counts[mask].sum()
394 exp_sum = exp_counts[mask].sum()
396 if exp_sum > 0: 396 ↛ 430line 396 didn't jump to line 430 because the condition on line 396 was always true
397 # Scale expected counts to match observed sum
398 exp_counts_normalized = exp_counts[mask] * (obs_sum / exp_sum)
400 logger.debug("Expected frequencies:")
401 logger.debug(exp_counts_normalized)
402 logger.debug("Observed frequencies: ")
403 logger.debug(obs_counts[mask])
405 try:
406 chi = stats.chisquare(
407 f_obs=obs_counts[mask],
408 f_exp=exp_counts_normalized,
409 )
410 logger.debug(f"Chi P value: {chi.pvalue}")
411 col_res["chi-square"] = {
412 "p_value": float(chi.pvalue),
413 "statistic": float(chi.statistic),
414 "interpretation": self.interpretation_thresholds.get(
415 "follows_distribution"
416 if chi.pvalue >= self.alpha
417 else "does_not_follow_distribution",
418 "follows_distribution",
419 ),
420 }
421 except ValueError as e:
422 # Fallback: use only observed counts if chi-square fails
423 col_res["chi-square"] = {
424 "p_value": float("nan"),
425 "statistic": float("nan"),
426 "interpretation": f"chi_square_failed: {e!s}",
427 "note": "using observed counts only due to statistical constraints",
428 }
429 else:
430 col_res["chi-square"] = {
431 "p_value": float("nan"),
432 "statistic": float("nan"),
433 "interpretation": "no_expected_counts",
434 }
435 else:
436 col_res["chi-square"] = {
437 "p_value": float("nan"),
438 "statistic": float("nan"),
439 "interpretation": "insufficient_bins",
440 }
442 # Kolmogorov-Smirnov test using sampled data
443 if "kolmogorov-smirnov" in self.metrics: 443 ↛ 491line 443 didn't jump to line 491 because the condition on line 443 was always true
444 sample_key = f"{col}_ks_sample"
445 if sample_key in batch_metrics: 445 ↛ 484line 445 didn't jump to line 484 because the condition on line 445 was always true
446 sample_arrays = batch_metrics[sample_key].to_numpy()
447 ks_samples = sample_arrays if sample_arrays.ndim == 1 else sample_arrays.flatten()
449 if len(ks_samples) > 0: 449 ↛ 478line 449 didn't jump to line 478 because the condition on line 449 was always true
450 # Perform KS test on aggregated samples
451 if self.distribution == "normal":
452 mean = float(self.dist_params.get("mean", np.mean(ks_samples)))
453 std = float(self.dist_params.get("std", np.std(ks_samples, ddof=0)))
454 std = std if std > 0.0 else self.epsilon
455 ks = stats.kstest(ks_samples, stats.norm.cdf, args=(mean, std))
456 else: # uniform
457 mn = float(self.dist_params.get("min", np.min(ks_samples)))
458 mx = float(self.dist_params.get("max", np.max(ks_samples)))
459 if mx <= mn: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true
460 mx = mn + self.epsilon
461 ks = stats.kstest(
462 ks_samples,
463 stats.uniform.cdf,
464 args=(mn, mx - mn),
465 )
467 col_res["kolmogorov-smirnov"] = {
468 "p_value": float(ks.pvalue),
469 "statistic": float(ks.statistic),
470 "interpretation": self.interpretation_thresholds.get(
471 "follows_distribution" if ks.pvalue >= self.alpha else "does_not_follow_distribution",
472 "follows_distribution",
473 ),
474 "sample_size": len(ks_samples),
475 "note": "approximated_from_random_samples",
476 }
477 else:
478 col_res["kolmogorov-smirnov"] = {
479 "p_value": float("nan"),
480 "statistic": float("nan"),
481 "interpretation": "no_samples_available",
482 }
483 else:
484 col_res["kolmogorov-smirnov"] = {
485 "p_value": float("nan"),
486 "statistic": float("nan"),
487 "interpretation": "no_sample_data_found",
488 }
490 # Shannon entropy - aligned on dqm-ml v1 (using theoretical frequencies)
491 if "shannon-entropy" in self.metrics: 491 ↛ 504line 491 didn't jump to line 504 because the condition on line 491 was always true
492 # Use theoretical frequencies
493 p_exp = exp_probs / exp_probs.sum()
494 h_exp = float(stats.entropy(p_exp))
495 col_res["shannon-entropy"] = {
496 "entropy": h_exp,
497 "interpretation": self.interpretation_thresholds.get(
498 "high_diversity" if h_exp > self.shannon_entropy_threshold else "low_diversity",
499 "high_diversity",
500 ),
501 }
503 # GRTE (gap between observed and theoretical entropies) - aligned on dqm-ml v1
504 if "grte" in self.metrics: 504 ↛ 521line 504 didn't jump to line 521 because the condition on line 504 was always true
505 # Use observed and theoretical frequencies
506 p_obs = obs_counts / obs_counts.sum()
507 p_exp = exp_probs / exp_probs.sum()
508 h_obs = float(stats.entropy(p_obs))
509 h_exp = float(stats.entropy(p_exp))
510 grte = float(np.exp(-2.0 * abs(h_exp - h_obs)))
511 col_res["grte"] = {
512 "grte_value": grte,
513 "interpretation": self.interpretation_thresholds.get(
514 "high_representativeness" if grte > self.grte_threshold else "low_representativeness",
515 "high_representativeness",
516 ),
517 }
519 # Stupid way to flatten tree of keys
520 # TODO : refactor the implementation,for optimization
521 if col_res: 521 ↛ 291line 521 didn't jump to line 291 because the condition on line 521 was always true
522 for key, value in col_res.items():
523 if isinstance(value, dict): 523 ↛ 527line 523 didn't jump to line 527 because the condition on line 523 was always true
524 for prop, content in value.items():
525 out[key + "_" + col + "_" + prop] = content
526 else:
527 out[key + "_" + col] = value
528 # TODO make optional export of metadata
529 meta_data = {
530 "bins": self.bins,
531 "distribution": self.distribution,
532 "metrics_computed": self.metrics,
533 "total_samples": total_samples,
534 "columns_analyzed": [c for c in self.input_columns if f"{c}_count" in batch_metrics],
535 "ks_sampling_enabled": "kolmogorov-smirnov" in self.metrics,
536 "note": "KS test uses random sampling approximation for scalability",
537 }
539 import json
541 out["_metadata"] = json.dumps(meta_data)
542 return out
544 def reset(self) -> None:
545 """Reset processor state for new processing run."""
546 self._bin_edges = {}
547 self._initialized = False
549 # utils methods for bin edge calculation
551 def _bin_edges_normal(self, mean: float, std: float, bins: int, data: np.ndarray) -> np.ndarray:
552 """
553 Calculate bin edges based on the Percent Point Function (PPF) of a Normal distribution.
555 This ensures bins represent equal probability mass under the theoretical distribution.
556 The first and last bins are extended to -inf and +inf respectively.
557 """
558 # logic from dqm-ml v1 : use stats.norm.ppf with linspace(1/bins, 1, bins)
559 interval = []
560 for i in range(1, bins):
561 val = stats.norm.ppf(i / bins, mean, std)
562 interval.append(val)
563 interval.insert(0, -np.inf)
564 interval.append(np.inf)
565 return np.array(interval)
567 def _bin_edges_uniform(self, mn: float, mx: float, bins: int, data: np.ndarray) -> np.ndarray:
568 """
569 Calculate linearly spaced bin edges for a Uniform distribution.
571 The range is determined by the minimum/maximum of both the configured
572 parameters and the actual observed data.
573 """
574 lo = min(mn, float(np.min(data)))
575 hi = max(mx, float(np.max(data)))
576 if hi <= lo: 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true
577 hi = lo + self.epsilon
578 return np.linspace(lo, hi, bins + 1)