Coverage for packages / dqm-ml-core / src / dqm_ml_core / metrics / completeness.py: 85%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 10:11 +0000

1"""Completeness metric processor for evaluating data completeness. 

2 

3This module contains the CompletenessProcessor class that evaluates 

4the completeness of tabular data by computing non-null value ratios. 

5""" 

6 

7import json 

8import logging 

9from typing import Any 

10 

11import numpy as np 

12import pyarrow as pa 

13 

14# COMPATIBILITY : from typing import Any, override # When support of 3.10 and 3.11 will be removed 

15from typing_extensions import override 

16 

17from dqm_ml_core.api.data_processor import DatametricProcessor 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22class CompletenessProcessor(DatametricProcessor): 

23 """ 

24 Data completeness processor that evaluates the completeness of tabular data. 

25 

26 This processor calculates completeness scores (ratio of non-null to total values) 

27 for specified columns and provides overall dataset completeness metrics. 

28 

29 The processor operates at multiple levels: 

30 - Batch level: Aggregated counts for streaming processing 

31 - Dataset level: Final completeness scores and statistics 

32 """ 

33 

34 def __init__(self, name: str = "completeness", config: dict[str, Any] | None = None) -> None: 

35 """ 

36 Initialize the completeness processor. 

37 

38 Args: 

39 name: Name of the processor 

40 config: Configuration dictionary containing: 

41 - input_columns: List of columns to analyze for completeness 

42 - output_metrics: Dictionary mapping metric names to output column names 

43 - include_per_column: Whether to include per-column completeness scores 

44 - include_overall: Whether to include overall completeness score 

45 """ 

46 super().__init__(name, config) 

47 

48 cfg = self.config or {} 

49 

50 # Configuration for what metrics to compute - pour chaque colonne et pour l'ensemble des colonnes 

51 self.include_per_column: bool = bool(cfg.get("include_per_column", True)) 

52 self.include_overall: bool = bool(cfg.get("include_overall", True)) 

53 self.include_metadata: bool = bool(cfg.get("include_metadata", False)) 

54 

55 # Output column mappings 

56 self.output_metrics = cfg.get("output_metrics", {}) 

57 

58 # Validation 

59 if not self.include_per_column and not self.include_overall: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 raise ValueError(f"[{self.name}] At least one of 'include_per_column' or 'include_overall' must be True") 

61 

62 @override 

63 def generated_metrics(self) -> list[str]: 

64 """ 

65 Return the list of metric columns that will be generated. 

66 

67 Returns: 

68 List of output metric column names 

69 """ 

70 # TODO : manage output metrics names with configuration 

71 metrics = [] 

72 

73 if self.include_overall: 73 ↛ 77line 73 didn't jump to line 77 because the condition on line 73 was always true

74 overall_key = self.output_metrics.get("overall_completeness", "completeness_overall") 

75 metrics.append(overall_key) 

76 

77 if self.include_per_column: 77 ↛ 82line 77 didn't jump to line 82 because the condition on line 77 was always true

78 for col in self.input_columns: 

79 col_key = self.output_metrics.get(f"completeness_{col}", f"completeness_{col}") 

80 metrics.append(col_key) 

81 

82 return metrics 

83 

84 @override 

85 def compute_features( 

86 self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array] | None = None 

87 ) -> dict[str, pa.Array]: 

88 """ 

89 Extract the needed columns from the batch for completeness analysis. 

90 

91 This method simply passes through the columns we need to analyze, 

92 as completeness calculation is done at batch and dataset levels. 

93 

94 Args: 

95 batch: Input batch of data 

96 prev_features: Previous features (not used in this processor) 

97 

98 Returns: 

99 Dictionary containing the columns to analyze 

100 """ 

101 features = {} 

102 

103 columns_to_analyze = self.input_columns if self.input_columns else batch.column_names 

104 

105 for col in columns_to_analyze: 

106 if col not in batch.schema.names: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 logger.warning(f"[{self.name}] column '{col}' not found in batch") 

108 continue 

109 

110 # Simply pass through the column data for batch-level processing 

111 features[col] = batch.column(col) 

112 

113 return features 

114 

115 @override 

116 def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]: 

117 """ 

118 Compute batch-level completeness counts for streaming aggregation. 

119 

120 This counts total and non-null values per column in this batch, 

121 which will be aggregated across all batches for final dataset completeness. 

122 

123 Args: 

124 features: Dictionary of column arrays from this batch 

125 

126 Returns: 

127 Dictionary of batch-level completeness counts 

128 """ 

129 batch_metrics = {} 

130 

131 for col, col_array in features.items(): 

132 # Count total samples in this batch 

133 total_count = len(col_array) 

134 

135 # Count non-null (complete) samples in this batch 

136 # pa.compute.is_valid() returns True for non-null values 

137 is_valid_mask = pa.compute.is_valid(col_array) 

138 complete_count = pa.compute.sum(is_valid_mask).as_py() 

139 

140 # store counts for aggregation across batches 

141 batch_metrics[f"{col}_total_count"] = pa.array([total_count], type=pa.int64()) 

142 batch_metrics[f"{col}_complete_count"] = pa.array([complete_count], type=pa.int64()) 

143 

144 return batch_metrics 

145 

146 @override 

147 def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, Any]: 

148 """ 

149 Compute final dataset-level completeness metrics. 

150 

151 This aggregates the batch-level counts to compute final completeness scores 

152 for each column and overall dataset completeness. 

153 

154 Args: 

155 batch_metrics: Dictionary of batch-level metrics to aggregate 

156 

157 Returns: 

158 Dictionary of final completeness metrics 

159 """ 

160 if not batch_metrics: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 return {"_metadata": {"error": "No batch metrics provided"}} 

162 

163 results: dict[str, Any] = {} 

164 

165 # Determine columns from batch metrics 

166 columns_analyzed = [] 

167 for key in batch_metrics: 

168 if key.endswith("_total_count"): 

169 col = key.replace("_total_count", "") 

170 columns_analyzed.append(col) 

171 

172 if not columns_analyzed: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 logger.warning(f"[{self.name}] No columns found in batch metrics") 

174 return {"_metadata": {"error": "No columns found in batch metrics"}} 

175 

176 per_column_completeness = {} 

177 total_samples = 0 

178 total_complete = 0 

179 

180 # Calculate completeness for each column 

181 for col in columns_analyzed: 

182 total_key = f"{col}_total_count" 

183 complete_key = f"{col}_complete_count" 

184 

185 if total_key not in batch_metrics or complete_key not in batch_metrics: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 logger.warning(f"[{self.name}] Missing batch metrics for column '{col}'") 

187 continue 

188 

189 # agg counts across all batches 

190 col_total = int(np.sum(batch_metrics[total_key].to_numpy())) 

191 col_complete = int(np.sum(batch_metrics[complete_key].to_numpy())) 

192 

193 # Calculate completeness score for this column 

194 completeness_score = col_complete / col_total if col_total > 0 else 0.0 

195 

196 per_column_completeness[col] = completeness_score 

197 

198 # Add to overall totals 

199 total_samples += col_total 

200 total_complete += col_complete 

201 

202 # Generate output metrics based on configuration 

203 if self.include_per_column: 203 ↛ 208line 203 didn't jump to line 208 because the condition on line 203 was always true

204 for col, score in per_column_completeness.items(): 

205 output_key = self.output_metrics.get(f"completeness_{col}", f"completeness_{col}") 

206 results[output_key] = score 

207 

208 if self.include_overall: 208 ↛ 219line 208 didn't jump to line 219 because the condition on line 208 was always true

209 # Calculate overall completeness as average of column completeness scores 

210 if per_column_completeness: 210 ↛ 213line 210 didn't jump to line 213 because the condition on line 210 was always true

211 overall_completeness = sum(per_column_completeness.values()) / len(per_column_completeness) 

212 else: 

213 overall_completeness = 0.0 

214 

215 output_key = self.output_metrics.get("overall_completeness", "completeness_overall") 

216 results[output_key] = overall_completeness 

217 

218 # Add metadata 

219 metadata = { 

220 "columns_analyzed": columns_analyzed, 

221 "total_samples_per_column": total_samples // len(columns_analyzed) if columns_analyzed else 0, 

222 "per_column_scores": per_column_completeness, 

223 "overall_score": sum(per_column_completeness.values()) / len(per_column_completeness) 

224 if per_column_completeness 

225 else 0.0, 

226 } 

227 

228 if self.include_metadata: 

229 results["_metadata"] = json.dumps(metadata) 

230 

231 return results 

232 

233 def reset(self) -> None: 

234 """Reset processor state for new processing run.""" 

235 # No persistent state to reset for completeness processor