Coverage for packages / dqm-ml-images / src / dqm_ml_images / visual_features.py: 69%

162 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 10:11 +0000

1"""Visual feature extraction processor for image quality assessment. 

2 

3This module contains the VisualFeaturesProcessor class that extracts 

4visual quality features from images including luminosity, contrast, 

5blur, and entropy. 

6""" 

7 

8import io 

9import logging 

10from pathlib import Path 

11from typing import Any 

12 

13import numpy as np 

14from PIL import Image 

15import pyarrow as pa 

16from scipy import signal 

17 

18# COMPATIBILITY : from typing import Any, override # When support of 3.10 and 3.11 will be removed 

19from typing_extensions import override 

20 

21from dqm_ml_core import DatametricProcessor 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26class VisualFeaturesProcessor(DatametricProcessor): 

27 """ 

28 Computes basic image quality features per sample. 

29 

30 Features: 

31 - Luminosity: Mean intensity of the image. By default, it is the average gray level 

32 mapped to the [0, 1] range. 

33 - Contrast: RMS contrast, calculated as the standard deviation of the gray level 

34 intensities, mapped to the [0, 1] range. 

35 - Blur: Measured as the variance of the Laplacian of the image. A higher value 

36 indicates more edges and higher sharpness. 

37 - Entropy: Shannon entropy of the image's grayscale histogram. Measures the 

38 information content or complexity. 

39 

40 This processor operates purely at the feature extraction level (per-sample). 

41 """ 

42 

43 DEFAULT_OUTPUTS = { 

44 "luminosity": "m_luminosity", 

45 "contrast": "m_contrast", 

46 "blur": "m_blur_level", 

47 "entropy": "m_entropy", 

48 } 

49 

50 def __init__(self, name: str = "visual_metric", config: dict[str, Any] | None = None) -> None: 

51 """ 

52 Initialize the visual features processor. 

53 

54 Args: 

55 name: Unique name of the processor instance. 

56 config: Configuration dictionary containing: 

57 - input_columns: List containing the name of the image column (bytes or path). 

58 - output_features: Mapping of feature names to output column names. 

59 - grayscale: Whether to convert images to grayscale (default: True). 

60 - normalize: Whether to normalize pixel values to [0, 1] (default: True). 

61 - entropy_bins: Number of bins for entropy calculation (default: 256). 

62 - clip_percentiles: Tuple of (low, high) percentiles for intensity clipping. 

63 - laplacian_kernel: Size of the Laplacian kernel ('3x3' or '5x5'). 

64 - dataset_root_path: Root directory for relative image paths. 

65 """ 

66 super().__init__(name, config) 

67 

68 # Local view of config for convenience 

69 cfg = self.config or {} 

70 

71 # handle relative paths in parquet to a dataset located at dataset_root_path 

72 self.dataset_root_path = str(cfg.get("dataset_root_path", "undefined")) 

73 

74 if not hasattr(self, "input_columns") or not self.input_columns: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 self.input_columns = ["image_bytes"] 

76 

77 if not hasattr(self, "output_features") or not self.output_features: 77 ↛ 85line 77 didn't jump to line 85 because the condition on line 77 was always true

78 # Use config-provided mapping if present, otherwise defaults 

79 cfg_outputs = cfg.get("output_features") if isinstance(cfg.get("output_features"), dict) else None 

80 self.output_features: Any = ( 

81 cfg_outputs.copy() if isinstance(cfg_outputs, dict) else self.DEFAULT_OUTPUTS.copy() 

82 ) 

83 

84 # param 

85 self.grayscale: bool = bool(cfg.get("grayscale", True)) 

86 self.normalize: bool = bool(cfg.get("normalize", True)) 

87 self.entropy_bins: int = int(cfg.get("entropy_bins", 256)) 

88 

89 # TODO written to remove noqa 501 and type check error in same line, to be fixed properly later 

90 if cfg.get("clip_percentiles") is not None: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 self.clip_percentiles = tuple(cfg.get("clip_percentiles")) # type: ignore 

92 else: 

93 self.clip_percentiles = None # type: ignore 

94 

95 self.laplacian_kernel: str = str(cfg.get("laplacian_kernel", "3x3")) 

96 

97 # check if the transformation is defined in the processor 

98 if not isinstance(self.output_features, dict): 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 raise ValueError(f"[{self.name}] 'output_features' must be a dict of metric->column_name") 

100 for k in ("luminosity", "contrast", "blur", "entropy"): 

101 if k not in self.output_features: 

102 self.output_features[k] = self.DEFAULT_OUTPUTS[k] 

103 

104 @override 

105 def compute_features( 

106 self, 

107 batch: pa.RecordBatch, 

108 prev_features: dict[str, pa.Array] | None = None, 

109 ) -> dict[str, pa.Array]: 

110 """Compute per-sample image features. 

111 

112 Args: 

113 batch: Input batch of data containing image column. 

114 prev_features: Previously computed features (not used in this processor). 

115 

116 Returns: 

117 Dictionary mapping feature names to their computed values. 

118 """ 

119 if not self.input_columns: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 logger.warning(f"[{self.name}] no input_columns configured") 

121 return {} 

122 

123 image_column = self.input_columns[0] 

124 if image_column not in batch.schema.names: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 logger.warning(f"[{self.name}] column '{image_column}' not found in batch") 

126 return {} 

127 

128 col = batch.column(image_column) 

129 values = col.to_pylist() # 

130 # Use grayscale image 

131 gray_images: list[Any] = [] 

132 for idx, v in enumerate(values): 

133 try: 

134 gray = self._to_gray_np(v) 

135 if self.clip_percentiles is not None: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 p_lo, p_hi = self.clip_percentiles 

137 lo = np.percentile(gray, p_lo) 

138 hi = np.percentile(gray, p_hi) 

139 if hi > lo: 

140 gray = np.clip(gray, lo, hi) 

141 if self.normalize: 

142 gray = (gray - lo) / max(1e-12, (hi - lo)) 

143 gray_images.append(gray) 

144 except Exception as e: 

145 logger.exception(f"[{self.name}] failed to process sample {idx}: {e}") 

146 gray_images.append(None) 

147 

148 # Compute each feature type with dedicated functions 

149 features = {} 

150 features[self.output_features["luminosity"]] = self._compute_luminosity_feature(gray_images) 

151 features[self.output_features["contrast"]] = self._compute_contrast_feature(gray_images) 

152 features[self.output_features["blur"]] = self._compute_blur_feature(gray_images) 

153 features[self.output_features["entropy"]] = self._compute_entropy_feature(gray_images) 

154 return features 

155 

156 @override 

157 def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]: 

158 """No-op aggregation: metrics are image-level only. 

159 

160 Returns: 

161 Empty dictionary as this processor computes features only. 

162 """ 

163 return {} 

164 

165 @override 

166 def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, pa.Array]: 

167 """No dataset-level aggregation required for this processor. 

168 

169 Returns: 

170 Empty dictionary as features are computed at batch level. 

171 """ 

172 return {} 

173 

174 def reset(self) -> None: 

175 """Reset processor state for new processing run.""" 

176 

177 # TODO : Check if it can be vectorized, parallelized 

178 

179 def _compute_luminosity_feature(self, gray_images: list[np.ndarray | None]) -> pa.Array: 

180 """Compute luminosity (mean gray level) for each image. 

181 

182 Args: 

183 gray_images: List of grayscale image arrays (or None for failed images). 

184 

185 Returns: 

186 PyArrow array of luminosity values. 

187 """ 

188 values = [] 

189 for gray in gray_images: 

190 if gray is not None: 190 ↛ 196line 190 didn't jump to line 196 because the condition on line 190 was always true

191 # Original logic: if not self.normalize, it's uint8 [0,255], divide by 255 

192 # If self.normalize, it's already [0,1] (min-max) 

193 luminosity = float(np.mean(gray if self.normalize else gray / 255.0)) 

194 values.append(luminosity) 

195 else: 

196 values.append(float("nan")) 

197 return pa.array(values, type=pa.float32()) 

198 

199 def _compute_contrast_feature(self, gray_images: list[np.ndarray | None]) -> pa.Array: 

200 """Compute contrast (RMS contrast = std of gray) for each image. 

201 

202 Args: 

203 gray_images: List of grayscale image arrays (or None for failed images). 

204 

205 Returns: 

206 PyArrow array of contrast values. 

207 """ 

208 values = [] 

209 for gray in gray_images: 

210 if gray is not None: 210 ↛ 214line 210 didn't jump to line 214 because the condition on line 210 was always true

211 contrast = float(np.std(gray if self.normalize else gray / 255.0)) 

212 values.append(contrast) 

213 else: 

214 values.append(float("nan")) 

215 return pa.array(values, type=pa.float32()) 

216 

217 def _compute_blur_feature(self, gray_images: list[np.ndarray | None]) -> pa.Array: 

218 """Compute blur (variance of Laplacian) for each image. 

219 

220 Args: 

221 gray_images: List of grayscale image arrays (or None for failed images). 

222 

223 Returns: 

224 PyArrow array of blur values. 

225 """ 

226 values = [] 

227 for gray in gray_images: 

228 if gray is not None: 228 ↛ 232line 228 didn't jump to line 232 because the condition on line 228 was always true

229 blur_val = float(self._variance_of_laplacian(gray)) 

230 values.append(blur_val) 

231 else: 

232 values.append(float("nan")) 

233 return pa.array(values, type=pa.float32()) 

234 

235 def _compute_entropy_feature(self, gray_images: list[np.ndarray | None]) -> pa.Array: 

236 """Compute entropy (Shannon entropy) for each image. 

237 

238 Args: 

239 gray_images: List of grayscale image arrays (or None for failed images). 

240 

241 Returns: 

242 PyArrow array of entropy values. 

243 """ 

244 values = [] 

245 for gray in gray_images: 

246 if gray is not None: 246 ↛ 250line 246 didn't jump to line 250 because the condition on line 246 was always true

247 entropy_val = float(self._entropy(gray)) 

248 values.append(entropy_val) 

249 else: 

250 values.append(float("nan")) 

251 return pa.array(values, type=pa.float32()) 

252 

253 # --- helpers -------------------------------------------------------------- 

254 

255 def _to_gray_np(self, x: Any) -> np.ndarray: 

256 """Convert various input types to a 2D grayscale numpy array. 

257 

258 If `self.normalize` is True, returns float32 in [0,1]. Otherwise returns uint8 [0,255]. 

259 

260 Args: 

261 x: Input data (PIL Image, bytes, string path, or numpy array). 

262 

263 Returns: 

264 2D numpy array in grayscale. 

265 

266 Raises: 

267 ValueError: If input type is unsupported or path does not exist. 

268 """ 

269 img: Image.Image | None = None 

270 

271 if isinstance(x, Image.Image): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 img = x 

273 elif isinstance(x, (bytes, bytearray)): 

274 img = Image.open(io.BytesIO(x)) 

275 elif isinstance(x, str): 275 ↛ 281line 275 didn't jump to line 281 because the condition on line 275 was always true

276 img_path = Path(self.dataset_root_path) / x if self.dataset_root_path != "undefined" else Path(x) 

277 if img_path.is_file(): 277 ↛ 280line 277 didn't jump to line 280 because the condition on line 277 was always true

278 img = Image.open(img_path) 

279 else: 

280 raise ValueError(f"Path does not exist: {img_path}") 

281 elif isinstance(x, np.ndarray): 

282 arr = x 

283 if arr.ndim == 2: # already gray 

284 gray = arr 

285 elif arr.ndim == 3 and arr.shape[2] in (3, 4): 

286 # manual luminance conversion to be independent of PIL for ndarray 

287 rgb = arr[..., :3].astype(np.float32) 

288 gray = 0.2126 * rgb[..., 0] + 0.7152 * rgb[..., 1] + 0.0722 * rgb[..., 2] 

289 else: 

290 raise ValueError(f"Unsupported ndarray shape {arr.shape}") 

291 

292 return self._to_float01(gray) if self.normalize else gray.astype(np.uint8) 

293 else: 

294 raise ValueError(f"Unsupported type for image input: {type(x)}") 

295 

296 # Use PIL pipeline 

297 if self.grayscale and img.mode != "L": 297 ↛ 299line 297 didn't jump to line 299 because the condition on line 297 was always true

298 img = img.convert("L") 

299 elif not self.grayscale and img.mode not in ("RGB", "L"): 

300 img = img.convert("RGB") 

301 

302 gray_np = np.array(img) 

303 if gray_np.ndim == 3: # RGB -> gray 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 gray_np = 0.2126 * gray_np[..., 0] + 0.7152 * gray_np[..., 1] + 0.0722 * gray_np[..., 2] 

305 

306 if self.normalize: 306 ↛ 310line 306 didn't jump to line 310 because the condition on line 306 was always true

307 # Revert to min-max normalization as required by existing tests 

308 return self._to_float01(gray_np) 

309 else: 

310 return gray_np.astype(np.uint8) 

311 

312 @staticmethod 

313 def _to_float01(arr: np.ndarray) -> np.ndarray: 

314 """Normalize array to [0, 1] range using min-max scaling. 

315 

316 Args: 

317 arr: Input numpy array. 

318 

319 Returns: 

320 Normalized array with float32 values in [0, 1]. 

321 """ 

322 arr = arr.astype(np.float32) 

323 vmin, vmax = float(arr.min()), float(arr.max()) 

324 arr = (arr - vmin) / (vmax - vmin) if vmax > vmin else np.zeros_like(arr, dtype=np.float32) 

325 return arr 

326 

327 def _variance_of_laplacian(self, gray: np.ndarray) -> float: 

328 """Variance of Laplacian as a blur metric. 

329 

330 Args: 

331 gray: Grayscale image array. 

332 

333 Returns: 

334 Variance of Laplacian (higher values indicate more edges/sharpness). 

335 """ 

336 g = gray.astype(np.float32) 

337 if self.laplacian_kernel == "5x5": 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 k = np.array( 

339 [ 

340 [0, 0, -1, 0, 0], 

341 [0, -1, -2, -1, 0], 

342 [-1, -2, 16, -2, -1], 

343 [0, -1, -2, -1, 0], 

344 [0, 0, -1, 0, 0], 

345 ], 

346 dtype=np.float32, 

347 ) 

348 else: 

349 k = np.array([[0, 1, 0], [1, -4, 1], [0, 1, 0]], dtype=np.float32) 

350 

351 # Use scipy for optimized convolution 

352 lap = signal.convolve2d(g, k, mode="same") 

353 return float(np.var(lap)) 

354 

355 def _entropy(self, gray: np.ndarray) -> float: 

356 """Shannon entropy of the gray histogram (natural log). 

357 

358 Args: 

359 gray: Grayscale image array. 

360 

361 Returns: 

362 Shannon entropy value. Returns NaN if histogram sum is zero. 

363 """ 

364 g = gray 

365 if self.normalize: 365 ↛ 370line 365 didn't jump to line 370 because the condition on line 365 was always true

366 # histogram on [0,1] 

367 hist, _ = np.histogram(g, bins=self.entropy_bins, range=(0.0, 1.0)) 

368 else: 

369 # uint8 range 

370 hist, _ = np.histogram(g, bins=min(256, self.entropy_bins), range=(0, 255)) 

371 p = hist.astype(np.float64) 

372 s = p.sum() 

373 if s <= 0: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 return float("nan") 

375 p /= s 

376 # avoid log(0) 

377 p = p[p > 0] 

378 return float(-(p * np.log(p)).sum())