Coverage for packages / dqm-ml-core / src / dqm_ml_core / metrics / completeness.py: 85%
92 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
1"""Completeness metric processor for evaluating data completeness.
3This module contains the CompletenessProcessor class that evaluates
4the completeness of tabular data by computing non-null value ratios.
5"""
7import json
8import logging
9from typing import Any
11import numpy as np
12import pyarrow as pa
14# COMPATIBILITY : from typing import Any, override # When support of 3.10 and 3.11 will be removed
15from typing_extensions import override
17from dqm_ml_core.api.data_processor import DatametricProcessor
19logger = logging.getLogger(__name__)
22class CompletenessProcessor(DatametricProcessor):
23 """
24 Data completeness processor that evaluates the completeness of tabular data.
26 This processor calculates completeness scores (ratio of non-null to total values)
27 for specified columns and provides overall dataset completeness metrics.
29 The processor operates at multiple levels:
30 - Batch level: Aggregated counts for streaming processing
31 - Dataset level: Final completeness scores and statistics
32 """
34 def __init__(self, name: str = "completeness", config: dict[str, Any] | None = None) -> None:
35 """
36 Initialize the completeness processor.
38 Args:
39 name: Name of the processor
40 config: Configuration dictionary containing:
41 - input_columns: List of columns to analyze for completeness
42 - output_metrics: Dictionary mapping metric names to output column names
43 - include_per_column: Whether to include per-column completeness scores
44 - include_overall: Whether to include overall completeness score
45 """
46 super().__init__(name, config)
48 cfg = self.config or {}
50 # Configuration for what metrics to compute - pour chaque colonne et pour l'ensemble des colonnes
51 self.include_per_column: bool = bool(cfg.get("include_per_column", True))
52 self.include_overall: bool = bool(cfg.get("include_overall", True))
53 self.include_metadata: bool = bool(cfg.get("include_metadata", False))
55 # Output column mappings
56 self.output_metrics = cfg.get("output_metrics", {})
58 # Validation
59 if not self.include_per_column and not self.include_overall: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 raise ValueError(f"[{self.name}] At least one of 'include_per_column' or 'include_overall' must be True")
62 @override
63 def generated_metrics(self) -> list[str]:
64 """
65 Return the list of metric columns that will be generated.
67 Returns:
68 List of output metric column names
69 """
70 # TODO : manage output metrics names with configuration
71 metrics = []
73 if self.include_overall: 73 ↛ 77line 73 didn't jump to line 77 because the condition on line 73 was always true
74 overall_key = self.output_metrics.get("overall_completeness", "completeness_overall")
75 metrics.append(overall_key)
77 if self.include_per_column: 77 ↛ 82line 77 didn't jump to line 82 because the condition on line 77 was always true
78 for col in self.input_columns:
79 col_key = self.output_metrics.get(f"completeness_{col}", f"completeness_{col}")
80 metrics.append(col_key)
82 return metrics
84 @override
85 def compute_features(
86 self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array] | None = None
87 ) -> dict[str, pa.Array]:
88 """
89 Extract the needed columns from the batch for completeness analysis.
91 This method simply passes through the columns we need to analyze,
92 as completeness calculation is done at batch and dataset levels.
94 Args:
95 batch: Input batch of data
96 prev_features: Previous features (not used in this processor)
98 Returns:
99 Dictionary containing the columns to analyze
100 """
101 features = {}
103 columns_to_analyze = self.input_columns if self.input_columns else batch.column_names
105 for col in columns_to_analyze:
106 if col not in batch.schema.names: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 logger.warning(f"[{self.name}] column '{col}' not found in batch")
108 continue
110 # Simply pass through the column data for batch-level processing
111 features[col] = batch.column(col)
113 return features
115 @override
116 def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
117 """
118 Compute batch-level completeness counts for streaming aggregation.
120 This counts total and non-null values per column in this batch,
121 which will be aggregated across all batches for final dataset completeness.
123 Args:
124 features: Dictionary of column arrays from this batch
126 Returns:
127 Dictionary of batch-level completeness counts
128 """
129 batch_metrics = {}
131 for col, col_array in features.items():
132 # Count total samples in this batch
133 total_count = len(col_array)
135 # Count non-null (complete) samples in this batch
136 # pa.compute.is_valid() returns True for non-null values
137 is_valid_mask = pa.compute.is_valid(col_array)
138 complete_count = pa.compute.sum(is_valid_mask).as_py()
140 # store counts for aggregation across batches
141 batch_metrics[f"{col}_total_count"] = pa.array([total_count], type=pa.int64())
142 batch_metrics[f"{col}_complete_count"] = pa.array([complete_count], type=pa.int64())
144 return batch_metrics
146 @override
147 def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]:
148 """
149 Compute final dataset-level completeness metrics.
151 This aggregates the batch-level counts to compute final completeness scores
152 for each column and overall dataset completeness.
154 Args:
155 batch_metrics: Dictionary of batch-level metrics to aggregate
157 Returns:
158 Dictionary of final completeness metrics
159 """
160 if not batch_metrics: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 return {"_metadata": {"error": "No batch metrics provided"}}
163 results: dict[str, Any] = {}
165 # Determine columns from batch metrics
166 columns_analyzed = []
167 for key in batch_metrics:
168 if key.endswith("_total_count"):
169 col = key.replace("_total_count", "")
170 columns_analyzed.append(col)
172 if not columns_analyzed: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 logger.warning(f"[{self.name}] No columns found in batch metrics")
174 return {"_metadata": {"error": "No columns found in batch metrics"}}
176 per_column_completeness = {}
177 total_samples = 0
178 total_complete = 0
180 # Calculate completeness for each column
181 for col in columns_analyzed:
182 total_key = f"{col}_total_count"
183 complete_key = f"{col}_complete_count"
185 if total_key not in batch_metrics or complete_key not in batch_metrics: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 logger.warning(f"[{self.name}] Missing batch metrics for column '{col}'")
187 continue
189 # agg counts across all batches
190 col_total = int(np.sum(batch_metrics[total_key].to_numpy()))
191 col_complete = int(np.sum(batch_metrics[complete_key].to_numpy()))
193 # Calculate completeness score for this column
194 completeness_score = col_complete / col_total if col_total > 0 else 0.0
196 per_column_completeness[col] = completeness_score
198 # Add to overall totals
199 total_samples += col_total
200 total_complete += col_complete
202 # Generate output metrics based on configuration
203 if self.include_per_column: 203 ↛ 208line 203 didn't jump to line 208 because the condition on line 203 was always true
204 for col, score in per_column_completeness.items():
205 output_key = self.output_metrics.get(f"completeness_{col}", f"completeness_{col}")
206 results[output_key] = score
208 if self.include_overall: 208 ↛ 219line 208 didn't jump to line 219 because the condition on line 208 was always true
209 # Calculate overall completeness as average of column completeness scores
210 if per_column_completeness: 210 ↛ 213line 210 didn't jump to line 213 because the condition on line 210 was always true
211 overall_completeness = sum(per_column_completeness.values()) / len(per_column_completeness)
212 else:
213 overall_completeness = 0.0
215 output_key = self.output_metrics.get("overall_completeness", "completeness_overall")
216 results[output_key] = overall_completeness
218 # Add metadata
219 metadata = {
220 "columns_analyzed": columns_analyzed,
221 "total_samples_per_column": total_samples // len(columns_analyzed) if columns_analyzed else 0,
222 "per_column_scores": per_column_completeness,
223 "overall_score": sum(per_column_completeness.values()) / len(per_column_completeness)
224 if per_column_completeness
225 else 0.0,
226 }
228 if self.include_metadata:
229 results["_metadata"] = json.dumps(metadata)
231 return results
233 def reset(self) -> None:
234 """Reset processor state for new processing run."""
235 # No persistent state to reset for completeness processor