Coverage for packages / dqm-ml-core / src / dqm_ml_core / metrics / representativeness.py: 77%

264 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 10:11 +0000

1"""Representativeness metric processor for evaluating distribution fit. 

2 

3This module contains the RepresentativenessProcessor class that evaluates 

4how well a dataset represents a target statistical distribution using 

5various statistical tests. 

6""" 

7 

8import logging 

9from typing import Any 

10 

11import numpy as np 

12import pandas as pd 

13import pyarrow as pa 

14from scipy import stats 

15 

16# COMPATIBILITY : from typing import Any, override # When support of 3.10 and 3.11 will be removed 

17from typing_extensions import override 

18 

19from dqm_ml_core.api.data_processor import DatametricProcessor 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24class RepresentativenessProcessor(DatametricProcessor): 

25 """ 

26 Evaluates how well the dataset represents a target statistical distribution. 

27 

28 This processor performs on samples discretisation statistical tests to compare the observed 

29 distribution of numerical columns against a theoretical target distribution 

30 (Normal or Uniform). 

31 

32 Supported Metrics: 

33 - Chi-square: Goodness-of-fit test for categorical/binned data. 

34 - Kolmogorov-Smirnov (KS): Non-parametric test for continuous distributions (approximated via sampling). 

35 - Shannon Entropy: Measures the information diversity of the binned data. 

36 - GRTE (Geometric Representativeness Trajectory Error): Measures the exponential gap 

37 between observed and theoretical entropy. 

38 

39 The processor uses a streaming architecture: 

40 - Batch level: Computes partial calculus. 

41 - Dataset level: Aggregates histograms and performs final statistical tests. 

42 """ 

43 

44 SUPPORTED_METRICS = { 

45 "chi-square", 

46 "grte", 

47 "shannon-entropy", 

48 "kolmogorov-smirnov", 

49 } 

50 SUPPORTED_DISTS = {"normal", "uniform"} 

51 

52 # Configuration constants - can be overridden in config 

53 DEFAULT_ALPHA = 0.05 # Significance level for statistical tests 

54 DEFAULT_SHANNON_ENTROPY_THRESHOLD = 2.0 # Threshold for high/low diversity interpretation 

55 DEFAULT_GRTE_THRESHOLD = 0.5 # Threshold for high/low representativeness interpretation 

56 DEFAULT_KS_SAMPLE_SIZE = 500 # Maximum sample size for KS test 

57 DEFAULT_KS_MIN_SAMPLE_SIZE = 50 # Minimum sample size for KS test 

58 DEFAULT_KS_SAMPLE_DIVISOR = 20 # Divisor for calculating sample size per batch 

59 DEFAULT_EPSILON = 1e-9 # Small value to avoid division by zero 

60 DEFAULT_INTERPRETATION_THRESHOLDS = { 

61 "follows_distribution": "follows_distribution", 

62 "does_not_follow_distribution": "does_not_follow_distribution", 

63 "high_diversity": "high_diversity", 

64 "low_diversity": "low_diversity", 

65 "high_representativeness": "high_representativeness", 

66 "low_representativeness": "low_representativeness", 

67 } 

68 

69 def __init__( 

70 self, 

71 name: str = "representativeness", 

72 config: dict[str, Any] | None = None, 

73 ) -> None: 

74 """ 

75 Initialize the representativeness processor. 

76 

77 Args: 

78 name: Name of the processor. 

79 config: Configuration dictionary containing: 

80 - input_columns: List of columns to analyze. 

81 - metrics: List of metrics to compute (default: all supported). 

82 - bins: Number of bins for histograms (default: 10). 

83 - distribution: Target distribution ("normal" or "uniform"). 

84 - alpha: Significance level (default: 0.05). 

85 - distribution_params: Dictionary of params (e.g., mean, std, min, max). 

86 """ 

87 super().__init__(name, config) 

88 self.name = name 

89 

90 cfg = self.config 

91 self.metrics: list[str] = list( 

92 cfg.get( 

93 "metrics", 

94 ["chi-square", "grte", "kolmogorov-smirnov", "shannon-entropy"], 

95 ) 

96 ) 

97 

98 self.bins: int = int(cfg.get("bins", 10)) 

99 self.distribution: str = str(cfg.get("distribution", "normal")).lower() 

100 

101 # Load configurable constants from config or use defaults 

102 self.alpha: float = float(cfg.get("alpha", self.DEFAULT_ALPHA)) 

103 self.shannon_entropy_threshold: float = float( 

104 cfg.get( 

105 "shannon_entropy_threshold", 

106 self.DEFAULT_SHANNON_ENTROPY_THRESHOLD, 

107 ) 

108 ) 

109 self.grte_threshold: float = float(cfg.get("grte_threshold", self.DEFAULT_GRTE_THRESHOLD)) 

110 self.ks_sample_size: int = int(cfg.get("ks_sample_size", self.DEFAULT_KS_SAMPLE_SIZE)) 

111 self.ks_min_sample_size: int = int(cfg.get("ks_min_sample_size", self.DEFAULT_KS_MIN_SAMPLE_SIZE)) 

112 self.ks_sample_divisor: int = int(cfg.get("ks_sample_divisor", self.DEFAULT_KS_SAMPLE_DIVISOR)) 

113 self.epsilon: float = float(cfg.get("epsilon", self.DEFAULT_EPSILON)) 

114 

115 # Load interpretation thresholds from config or use defaults 

116 self.interpretation_thresholds: dict[str, str] = cfg.get( 

117 "interpretation_thresholds", self.DEFAULT_INTERPRETATION_THRESHOLDS 

118 ) 

119 

120 # Handle distribution_params properly - it can be None or a dict 

121 dist_params_raw = cfg.get("distribution_params") 

122 

123 self.dist_params: dict[str, Any] = {} 

124 if dist_params_raw is not None: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 self.dist_params = dict(dist_params_raw) 

126 

127 # check config: avoid redondancy checks with pipeline (see datasetpipeline ) 

128 if not self.input_columns: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 raise ValueError(f"[{self.name}] 'input_columns' must be provided") 

130 if any(m not in self.SUPPORTED_METRICS for m in self.metrics): 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true

131 raise ValueError(f"[{self.name}] unsupported metric; supported: {self.SUPPORTED_METRICS}") 

132 if self.distribution not in self.SUPPORTED_DISTS: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 raise ValueError(f"[{self.name}] 'distribution' must be in {self.SUPPORTED_DISTS}") 

134 if self.bins < 2: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 raise ValueError(f"[{self.name}] 'bins' must be >= 2") 

136 if self.alpha <= 0 or self.alpha >= 1: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 raise ValueError(f"[{self.name}] 'alpha' must be between 0 and 1") 

138 if self.epsilon <= 0: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 raise ValueError(f"[{self.name}] 'epsilon' must be positive") 

140 

141 self._bin_edges: dict[str, np.ndarray] = {} 

142 self._initialized: bool = False 

143 

144 @override 

145 def generated_metrics(self) -> list[str]: 

146 """ 

147 Return the list of metric columns that will be generated. 

148 

149 Returns: 

150 List of output metric column names 

151 """ 

152 # TODO : manage output metrics names with configuration 

153 # for now we follow a fixed naming convention 

154 metrics = [] 

155 for col in self.input_columns: 

156 if "chi-square" in self.metrics: 156 ↛ 160line 156 didn't jump to line 160 because the condition on line 156 was always true

157 metrics.append(f"{col}_chi-square_p_value") 

158 metrics.append(f"{col}_chi-square_statistic") 

159 metrics.append(f"{col}_chi-square_interpretation") 

160 if "kolmogorov-smirnov" in self.metrics: 160 ↛ 164line 160 didn't jump to line 164 because the condition on line 160 was always true

161 metrics.append(f"{col}_kolmogorov-smirnov_p_value") 

162 metrics.append(f"{col}_kolmogorov-smirnov_statistic") 

163 metrics.append(f"{col}_kolmogorov-smirnov_interpretation") 

164 if "shannon-entropy" in self.metrics: 164 ↛ 167line 164 didn't jump to line 167 because the condition on line 164 was always true

165 metrics.append(f"{col}_shannon-entropy_entropy") 

166 metrics.append(f"{col}_shannon-entropy_interpretation") 

167 if "grte" in self.metrics: 167 ↛ 155line 167 didn't jump to line 155 because the condition on line 167 was always true

168 metrics.append(f"{col}_grte_grte_value") 

169 metrics.append(f"{col}_grte_interpretation") 

170 

171 return metrics 

172 

173 @override 

174 def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]: 

175 """ 

176 Compute partial histogram statistics per batch for streaming aggregation. 

177 

178 Args: 

179 features: Dictionary of column arrays from this batch. 

180 

181 Returns: 

182 Dictionary containing: 

183 - {col}_count: Total valid numeric samples. 

184 - {col}_hist: Histogram counts. 

185 - {col}_ks_sample: Random subset of data for KS test. 

186 """ 

187 batch_metrics = {} 

188 

189 for col in self.input_columns: 

190 if col not in features: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 logger.warning(f"[{self.name}] column '{col}' not found in batch") 

192 continue 

193 

194 arr = features[col] 

195 # convert to numeric, handle mixed types and NaN 

196 try: 

197 np_col = np.asarray(arr.to_numpy(zero_copy_only=False)) 

198 except Exception: 

199 np_col = pd.Series(arr.to_pylist()).to_numpy(copy=True) 

200 

201 values = pd.to_numeric(pd.Series(np_col), errors="coerce").dropna() 

202 

203 if values.empty: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 logger.warning(f"[{self.name}] column '{col}' has no valid numeric values in this batch") 

205 continue 

206 

207 if not self._initialized or col not in self._bin_edges: 

208 self._initialize_bin_edges(values.to_numpy(), col) 

209 

210 edges = self._bin_edges[col] 

211 

212 # Debug 

213 logger.debug(f"[{self.name}] edges shape: {edges.shape}, values shape: {values.shape}") 

214 

215 hist_counts = np.histogram(values, bins=edges)[0].astype(np.int64) 

216 

217 # debug: check histogram 

218 logger.debug(f"[{self.name}] hist_counts shape: {hist_counts.shape}, expected: {self.bins}") 

219 

220 # store as Arrow arrays for aggregation 

221 batch_metrics[f"{col}_count"] = pa.array([len(values)], type=pa.int64()) 

222 # batch_metrics[f"{col}_hist"] = pa.array(hist_counts.tolist(), type=pa.int64()) 

223 batch_metrics[f"{col}_hist"] = pa.FixedSizeListArray.from_arrays( 

224 hist_counts, list_size=hist_counts.shape[0] 

225 ) 

226 

227 # sampling for KS test approximation 

228 # TODO: KS need to have all the data in memory to compute, 

229 # this metrics need to rely on other metrics prior computation of mean, std, min, max computation 

230 if "kolmogorov-smirnov" in self.metrics or "chi-square" in self.metrics: 230 ↛ 189line 230 didn't jump to line 189 because the condition on line 230 was always true

231 sample_per_batch = min( 

232 self.ks_sample_size, 

233 max( 

234 self.ks_min_sample_size, 

235 len(values) // self.ks_sample_divisor, 

236 ), 

237 ) 

238 if len(values) > sample_per_batch: 

239 # Random sampling without replacement 

240 sample_indices = np.random.choice(len(values), sample_per_batch, replace=False) 

241 sample = values[sample_indices] 

242 else: 

243 sample = values 

244 

245 batch_metrics[f"{col}_ks_sample"] = pa.array(sample.tolist(), type=pa.float64()) 

246 

247 if not self._initialized and batch_metrics: 

248 self._initialized = True 

249 

250 return batch_metrics 

251 

252 def _initialize_bin_edges(self, sample_data: np.ndarray, col: str) -> None: 

253 """ 

254 Initialize bin edges for a column based on sample data and target distribution. 

255 

256 Args: 

257 sample_data: Data array used to infer parameters if not provided in config. 

258 col: Name of the column. 

259 """ 

260 if self.distribution == "normal": 

261 mean = float(self.dist_params.get("mean", np.mean(sample_data))) 

262 std = float(self.dist_params.get("std", np.std(sample_data, ddof=0))) 

263 std = std if std > 0.0 else self.epsilon 

264 edges = self._bin_edges_normal(mean, std, self.bins, sample_data) 

265 else: 

266 mn = float(self.dist_params.get("min", np.min(sample_data))) 

267 mx = float(self.dist_params.get("max", np.max(sample_data))) 

268 if mx <= mn: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 mx = mn + self.epsilon 

270 edges = self._bin_edges_uniform(mn, mx, self.bins, sample_data) 

271 

272 self._bin_edges[col] = edges 

273 

274 @override 

275 def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]: 

276 """ 

277 Compute final dataset-level metrics by aggregating batch histograms. 

278 

279 Args: 

280 batch_metrics: Dictionary of batch-level metrics collected during processing. 

281 

282 Returns: 

283 Dictionary containing final scores and interpretations for all selected metrics. 

284 """ 

285 if not batch_metrics: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 return {"_metadata": {"error": "No batch metrics provided"}} 

287 

288 out: dict[str, Any] = {} 

289 total_samples = 0 

290 

291 for col in self.input_columns: 

292 count_key = f"{col}_count" 

293 hist_key = f"{col}_hist" 

294 

295 if count_key not in batch_metrics or hist_key not in batch_metrics: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true

296 logger.warning(f"[{self.name}] no batch metrics for column '{col}'") 

297 continue 

298 

299 # TODO : maybe we need a try ? as in batch or not as na already removed 

300 # N/A handling shall be documented, and logs added 

301 hist_batch_arrays = np.asarray(batch_metrics[hist_key].to_numpy(zero_copy_only=False)) 

302 if hist_batch_arrays.shape[0] == 0: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 logger.warning(f"[{self.name}] no histograme batch for '{col}'") 

304 continue 

305 

306 # aggregate counts and histograms across all batches 

307 total_count = int(np.sum(batch_metrics[count_key].to_numpy())) 

308 

309 hist_arrays = None 

310 

311 for batch_hist in hist_batch_arrays: 

312 hist_arrays = batch_hist if hist_arrays is None else hist_arrays + batch_hist 

313 

314 # Debug: vérifier les dimensions 

315 if hist_arrays is None: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 logger.warning(f"[{self.name}] no valid histogram for column '{col}'") 

317 continue 

318 

319 logger.debug(f"[{self.name}] hist_arrays shape: {hist_arrays.shape}, expected bins: {self.bins}") 

320 

321 # sum histogram counts across batches 

322 if hist_arrays.ndim == 1: 322 ↛ 327line 322 didn't jump to line 327 because the condition on line 322 was always true

323 logger.debug("Single histogram") 

324 obs_counts = hist_arrays.astype(float) 

325 else: 

326 # Ensure we're summing along the right axis 

327 logger.debug("Multiple histograms from different batches") 

328 if hist_arrays.shape[1] == self.bins: 

329 logger.debug("Sum along batch dimension (axis=0)") 

330 obs_counts = np.sum(hist_arrays, axis=0).astype(float) 

331 else: 

332 logger.debug("Flatten and create a single histogram") 

333 logger.warning(f"[{self.name}] Unexpected histogram shape {hist_arrays.shape}, flattening") 

334 obs_counts = hist_arrays.flatten().astype(float) 

335 

336 if total_count <= 0 or obs_counts.sum() <= 0: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 logger.warning(f"[{self.name}] no valid data for column '{col}'") 

338 continue 

339 

340 total_samples += total_count 

341 

342 # distribution parameters and bin edges 

343 if col not in self._bin_edges: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 logger.warning(f"[{self.name}] no bin edges for column '{col}' - skipping") 

345 continue 

346 

347 edges = self._bin_edges[col] 

348 

349 # theoretical probabilities - Aligné sur DQM-ML officiel 

350 if self.distribution == "normal": 

351 logger.debug("Generate normal distribution") 

352 # Utilise les MÊMES paramètres que ceux utilisés pour générer les bins 

353 

354 sample_key = f"{col}_ks_sample" 

355 if sample_key in batch_metrics: 355 ↛ 364line 355 didn't jump to line 364 because the condition on line 355 was always true

356 logger.debug("Use sampled meand and std") 

357 sample_arrays = batch_metrics[sample_key].to_numpy() 

358 if sample_arrays.ndim > 1: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 sample_arrays = sample_arrays.flatten() 

360 mean = float(self.dist_params.get("mean", np.mean(sample_arrays))) 

361 std = float(self.dist_params.get("std", np.std(sample_arrays, ddof=0))) 

362 std = std if std > 0.0 else self.epsilon 

363 else: 

364 logger.debug("Fallback: use default or configured mean and std") 

365 mean = float(self.dist_params.get("mean", 0.0)) 

366 std = float(self.dist_params.get("std", 1.0)) 

367 logger.debug(f"mean={mean}") 

368 logger.debug(f"std={mean}") 

369 # génère des valeurs aléatoires et compte les fréquences (comme l'officiel) 

370 expected_values = np.random.normal(mean, std, total_count) 

371 exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64) 

372 else: # uniform 

373 logger.debug("Generate uniform distribution") 

374 mn = float(self.dist_params.get("min", edges[0])) 

375 mx = float(self.dist_params.get("max", edges[-1])) 

376 logger.debug(f"min={mn}") 

377 logger.debug(f"max={mx}") 

378 # Génère des valeurs aléatoires et compte les fréquences (comme l'officiel) 

379 expected_values = np.random.uniform(mn, mx, total_count) 

380 exp_probs = np.histogram(expected_values, bins=edges)[0].astype(np.float64) 

381 

382 exp_counts = total_count * exp_probs 

383 

384 col_res: dict[str, Any] = {} 

385 

386 # chi-square: here we compute the chi-square with a alpha value of 0.05 

387 if "chi-square" in self.metrics: 387 ↛ 443line 387 didn't jump to line 443 because the condition on line 387 was always true

388 # Ensure observed and expected counts have the same sum 

389 

390 mask = exp_counts > 0 

391 if mask.sum() >= 2: 391 ↛ 436line 391 didn't jump to line 436 because the condition on line 391 was always true

392 logger.debug("Normalize expected counts to match observed sum") 

393 obs_sum = obs_counts[mask].sum() 

394 exp_sum = exp_counts[mask].sum() 

395 

396 if exp_sum > 0: 396 ↛ 430line 396 didn't jump to line 430 because the condition on line 396 was always true

397 # Scale expected counts to match observed sum 

398 exp_counts_normalized = exp_counts[mask] * (obs_sum / exp_sum) 

399 

400 logger.debug("Expected frequencies:") 

401 logger.debug(exp_counts_normalized) 

402 logger.debug("Observed frequencies: ") 

403 logger.debug(obs_counts[mask]) 

404 

405 try: 

406 chi = stats.chisquare( 

407 f_obs=obs_counts[mask], 

408 f_exp=exp_counts_normalized, 

409 ) 

410 logger.debug(f"Chi P value: {chi.pvalue}") 

411 col_res["chi-square"] = { 

412 "p_value": float(chi.pvalue), 

413 "statistic": float(chi.statistic), 

414 "interpretation": self.interpretation_thresholds.get( 

415 "follows_distribution" 

416 if chi.pvalue >= self.alpha 

417 else "does_not_follow_distribution", 

418 "follows_distribution", 

419 ), 

420 } 

421 except ValueError as e: 

422 # Fallback: use only observed counts if chi-square fails 

423 col_res["chi-square"] = { 

424 "p_value": float("nan"), 

425 "statistic": float("nan"), 

426 "interpretation": f"chi_square_failed: {e!s}", 

427 "note": "using observed counts only due to statistical constraints", 

428 } 

429 else: 

430 col_res["chi-square"] = { 

431 "p_value": float("nan"), 

432 "statistic": float("nan"), 

433 "interpretation": "no_expected_counts", 

434 } 

435 else: 

436 col_res["chi-square"] = { 

437 "p_value": float("nan"), 

438 "statistic": float("nan"), 

439 "interpretation": "insufficient_bins", 

440 } 

441 

442 # Kolmogorov-Smirnov test using sampled data 

443 if "kolmogorov-smirnov" in self.metrics: 443 ↛ 491line 443 didn't jump to line 491 because the condition on line 443 was always true

444 sample_key = f"{col}_ks_sample" 

445 if sample_key in batch_metrics: 445 ↛ 484line 445 didn't jump to line 484 because the condition on line 445 was always true

446 sample_arrays = batch_metrics[sample_key].to_numpy() 

447 ks_samples = sample_arrays if sample_arrays.ndim == 1 else sample_arrays.flatten() 

448 

449 if len(ks_samples) > 0: 449 ↛ 478line 449 didn't jump to line 478 because the condition on line 449 was always true

450 # Perform KS test on aggregated samples 

451 if self.distribution == "normal": 

452 mean = float(self.dist_params.get("mean", np.mean(ks_samples))) 

453 std = float(self.dist_params.get("std", np.std(ks_samples, ddof=0))) 

454 std = std if std > 0.0 else self.epsilon 

455 ks = stats.kstest(ks_samples, stats.norm.cdf, args=(mean, std)) 

456 else: # uniform 

457 mn = float(self.dist_params.get("min", np.min(ks_samples))) 

458 mx = float(self.dist_params.get("max", np.max(ks_samples))) 

459 if mx <= mn: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true

460 mx = mn + self.epsilon 

461 ks = stats.kstest( 

462 ks_samples, 

463 stats.uniform.cdf, 

464 args=(mn, mx - mn), 

465 ) 

466 

467 col_res["kolmogorov-smirnov"] = { 

468 "p_value": float(ks.pvalue), 

469 "statistic": float(ks.statistic), 

470 "interpretation": self.interpretation_thresholds.get( 

471 "follows_distribution" if ks.pvalue >= self.alpha else "does_not_follow_distribution", 

472 "follows_distribution", 

473 ), 

474 "sample_size": len(ks_samples), 

475 "note": "approximated_from_random_samples", 

476 } 

477 else: 

478 col_res["kolmogorov-smirnov"] = { 

479 "p_value": float("nan"), 

480 "statistic": float("nan"), 

481 "interpretation": "no_samples_available", 

482 } 

483 else: 

484 col_res["kolmogorov-smirnov"] = { 

485 "p_value": float("nan"), 

486 "statistic": float("nan"), 

487 "interpretation": "no_sample_data_found", 

488 } 

489 

490 # Shannon entropy - aligned on dqm-ml v1 (using theoretical frequencies) 

491 if "shannon-entropy" in self.metrics: 491 ↛ 504line 491 didn't jump to line 504 because the condition on line 491 was always true

492 # Use theoretical frequencies 

493 p_exp = exp_probs / exp_probs.sum() 

494 h_exp = float(stats.entropy(p_exp)) 

495 col_res["shannon-entropy"] = { 

496 "entropy": h_exp, 

497 "interpretation": self.interpretation_thresholds.get( 

498 "high_diversity" if h_exp > self.shannon_entropy_threshold else "low_diversity", 

499 "high_diversity", 

500 ), 

501 } 

502 

503 # GRTE (gap between observed and theoretical entropies) - aligned on dqm-ml v1 

504 if "grte" in self.metrics: 504 ↛ 521line 504 didn't jump to line 521 because the condition on line 504 was always true

505 # Use observed and theoretical frequencies 

506 p_obs = obs_counts / obs_counts.sum() 

507 p_exp = exp_probs / exp_probs.sum() 

508 h_obs = float(stats.entropy(p_obs)) 

509 h_exp = float(stats.entropy(p_exp)) 

510 grte = float(np.exp(-2.0 * abs(h_exp - h_obs))) 

511 col_res["grte"] = { 

512 "grte_value": grte, 

513 "interpretation": self.interpretation_thresholds.get( 

514 "high_representativeness" if grte > self.grte_threshold else "low_representativeness", 

515 "high_representativeness", 

516 ), 

517 } 

518 

519 # Stupid way to flatten tree of keys 

520 # TODO : refactor the implementation,for optimization 

521 if col_res: 521 ↛ 291line 521 didn't jump to line 291 because the condition on line 521 was always true

522 for key, value in col_res.items(): 

523 if isinstance(value, dict): 523 ↛ 527line 523 didn't jump to line 527 because the condition on line 523 was always true

524 for prop, content in value.items(): 

525 out[key + "_" + col + "_" + prop] = content 

526 else: 

527 out[key + "_" + col] = value 

528 # TODO make optional export of metadata 

529 meta_data = { 

530 "bins": self.bins, 

531 "distribution": self.distribution, 

532 "metrics_computed": self.metrics, 

533 "total_samples": total_samples, 

534 "columns_analyzed": [c for c in self.input_columns if f"{c}_count" in batch_metrics], 

535 "ks_sampling_enabled": "kolmogorov-smirnov" in self.metrics, 

536 "note": "KS test uses random sampling approximation for scalability", 

537 } 

538 

539 import json 

540 

541 out["_metadata"] = json.dumps(meta_data) 

542 return out 

543 

544 def reset(self) -> None: 

545 """Reset processor state for new processing run.""" 

546 self._bin_edges = {} 

547 self._initialized = False 

548 

549 # utils methods for bin edge calculation 

550 

551 def _bin_edges_normal(self, mean: float, std: float, bins: int, data: np.ndarray) -> np.ndarray: 

552 """ 

553 Calculate bin edges based on the Percent Point Function (PPF) of a Normal distribution. 

554 

555 This ensures bins represent equal probability mass under the theoretical distribution. 

556 The first and last bins are extended to -inf and +inf respectively. 

557 """ 

558 # logic from dqm-ml v1 : use stats.norm.ppf with linspace(1/bins, 1, bins) 

559 interval = [] 

560 for i in range(1, bins): 

561 val = stats.norm.ppf(i / bins, mean, std) 

562 interval.append(val) 

563 interval.insert(0, -np.inf) 

564 interval.append(np.inf) 

565 return np.array(interval) 

566 

567 def _bin_edges_uniform(self, mn: float, mx: float, bins: int, data: np.ndarray) -> np.ndarray: 

568 """ 

569 Calculate linearly spaced bin edges for a Uniform distribution. 

570 

571 The range is determined by the minimum/maximum of both the configured 

572 parameters and the actual observed data. 

573 """ 

574 lo = min(mn, float(np.min(data))) 

575 hi = max(mx, float(np.max(data))) 

576 if hi <= lo: 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true

577 hi = lo + self.epsilon 

578 return np.linspace(lo, hi, bins + 1)