Coverage for packages / dqm-ml-images / src / dqm_ml_images / visual_features.py: 69%
162 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
1"""Visual feature extraction processor for image quality assessment.
3This module contains the VisualFeaturesProcessor class that extracts
4visual quality features from images including luminosity, contrast,
5blur, and entropy.
6"""
8import io
9import logging
10from pathlib import Path
11from typing import Any
13import numpy as np
14from PIL import Image
15import pyarrow as pa
16from scipy import signal
18# COMPATIBILITY : from typing import Any, override # When support of 3.10 and 3.11 will be removed
19from typing_extensions import override
21from dqm_ml_core import DatametricProcessor
23logger = logging.getLogger(__name__)
26class VisualFeaturesProcessor(DatametricProcessor):
27 """
28 Computes basic image quality features per sample.
30 Features:
31 - Luminosity: Mean intensity of the image. By default, it is the average gray level
32 mapped to the [0, 1] range.
33 - Contrast: RMS contrast, calculated as the standard deviation of the gray level
34 intensities, mapped to the [0, 1] range.
35 - Blur: Measured as the variance of the Laplacian of the image. A higher value
36 indicates more edges and higher sharpness.
37 - Entropy: Shannon entropy of the image's grayscale histogram. Measures the
38 information content or complexity.
40 This processor operates purely at the feature extraction level (per-sample).
41 """
43 DEFAULT_OUTPUTS = {
44 "luminosity": "m_luminosity",
45 "contrast": "m_contrast",
46 "blur": "m_blur_level",
47 "entropy": "m_entropy",
48 }
50 def __init__(self, name: str = "visual_metric", config: dict[str, Any] | None = None) -> None:
51 """
52 Initialize the visual features processor.
54 Args:
55 name: Unique name of the processor instance.
56 config: Configuration dictionary containing:
57 - input_columns: List containing the name of the image column (bytes or path).
58 - output_features: Mapping of feature names to output column names.
59 - grayscale: Whether to convert images to grayscale (default: True).
60 - normalize: Whether to normalize pixel values to [0, 1] (default: True).
61 - entropy_bins: Number of bins for entropy calculation (default: 256).
62 - clip_percentiles: Tuple of (low, high) percentiles for intensity clipping.
63 - laplacian_kernel: Size of the Laplacian kernel ('3x3' or '5x5').
64 - dataset_root_path: Root directory for relative image paths.
65 """
66 super().__init__(name, config)
68 # Local view of config for convenience
69 cfg = self.config or {}
71 # handle relative paths in parquet to a dataset located at dataset_root_path
72 self.dataset_root_path = str(cfg.get("dataset_root_path", "undefined"))
74 if not hasattr(self, "input_columns") or not self.input_columns: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 self.input_columns = ["image_bytes"]
77 if not hasattr(self, "output_features") or not self.output_features: 77 ↛ 85line 77 didn't jump to line 85 because the condition on line 77 was always true
78 # Use config-provided mapping if present, otherwise defaults
79 cfg_outputs = cfg.get("output_features") if isinstance(cfg.get("output_features"), dict) else None
80 self.output_features: Any = (
81 cfg_outputs.copy() if isinstance(cfg_outputs, dict) else self.DEFAULT_OUTPUTS.copy()
82 )
84 # param
85 self.grayscale: bool = bool(cfg.get("grayscale", True))
86 self.normalize: bool = bool(cfg.get("normalize", True))
87 self.entropy_bins: int = int(cfg.get("entropy_bins", 256))
89 # TODO written to remove noqa 501 and type check error in same line, to be fixed properly later
90 if cfg.get("clip_percentiles") is not None: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 self.clip_percentiles = tuple(cfg.get("clip_percentiles")) # type: ignore
92 else:
93 self.clip_percentiles = None # type: ignore
95 self.laplacian_kernel: str = str(cfg.get("laplacian_kernel", "3x3"))
97 # check if the transformation is defined in the processor
98 if not isinstance(self.output_features, dict): 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 raise ValueError(f"[{self.name}] 'output_features' must be a dict of metric->column_name")
100 for k in ("luminosity", "contrast", "blur", "entropy"):
101 if k not in self.output_features:
102 self.output_features[k] = self.DEFAULT_OUTPUTS[k]
104 @override
105 def compute_features(
106 self,
107 batch: pa.RecordBatch,
108 prev_features: dict[str, pa.Array] | None = None,
109 ) -> dict[str, pa.Array]:
110 """Compute per-sample image features.
112 Args:
113 batch: Input batch of data containing image column.
114 prev_features: Previously computed features (not used in this processor).
116 Returns:
117 Dictionary mapping feature names to their computed values.
118 """
119 if not self.input_columns: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 logger.warning(f"[{self.name}] no input_columns configured")
121 return {}
123 image_column = self.input_columns[0]
124 if image_column not in batch.schema.names: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 logger.warning(f"[{self.name}] column '{image_column}' not found in batch")
126 return {}
128 col = batch.column(image_column)
129 values = col.to_pylist() #
130 # Use grayscale image
131 gray_images: list[Any] = []
132 for idx, v in enumerate(values):
133 try:
134 gray = self._to_gray_np(v)
135 if self.clip_percentiles is not None: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 p_lo, p_hi = self.clip_percentiles
137 lo = np.percentile(gray, p_lo)
138 hi = np.percentile(gray, p_hi)
139 if hi > lo:
140 gray = np.clip(gray, lo, hi)
141 if self.normalize:
142 gray = (gray - lo) / max(1e-12, (hi - lo))
143 gray_images.append(gray)
144 except Exception as e:
145 logger.exception(f"[{self.name}] failed to process sample {idx}: {e}")
146 gray_images.append(None)
148 # Compute each feature type with dedicated functions
149 features = {}
150 features[self.output_features["luminosity"]] = self._compute_luminosity_feature(gray_images)
151 features[self.output_features["contrast"]] = self._compute_contrast_feature(gray_images)
152 features[self.output_features["blur"]] = self._compute_blur_feature(gray_images)
153 features[self.output_features["entropy"]] = self._compute_entropy_feature(gray_images)
154 return features
156 @override
157 def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
158 """No-op aggregation: metrics are image-level only.
160 Returns:
161 Empty dictionary as this processor computes features only.
162 """
163 return {}
165 @override
166 def compute(self, batch_metrics: dict[str, pa.Array] | None = None) -> dict[str, pa.Array]:
167 """No dataset-level aggregation required for this processor.
169 Returns:
170 Empty dictionary as features are computed at batch level.
171 """
172 return {}
174 def reset(self) -> None:
175 """Reset processor state for new processing run."""
177 # TODO : Check if it can be vectorized, parallelized
179 def _compute_luminosity_feature(self, gray_images: list[np.ndarray | None]) -> pa.Array:
180 """Compute luminosity (mean gray level) for each image.
182 Args:
183 gray_images: List of grayscale image arrays (or None for failed images).
185 Returns:
186 PyArrow array of luminosity values.
187 """
188 values = []
189 for gray in gray_images:
190 if gray is not None: 190 ↛ 196line 190 didn't jump to line 196 because the condition on line 190 was always true
191 # Original logic: if not self.normalize, it's uint8 [0,255], divide by 255
192 # If self.normalize, it's already [0,1] (min-max)
193 luminosity = float(np.mean(gray if self.normalize else gray / 255.0))
194 values.append(luminosity)
195 else:
196 values.append(float("nan"))
197 return pa.array(values, type=pa.float32())
199 def _compute_contrast_feature(self, gray_images: list[np.ndarray | None]) -> pa.Array:
200 """Compute contrast (RMS contrast = std of gray) for each image.
202 Args:
203 gray_images: List of grayscale image arrays (or None for failed images).
205 Returns:
206 PyArrow array of contrast values.
207 """
208 values = []
209 for gray in gray_images:
210 if gray is not None: 210 ↛ 214line 210 didn't jump to line 214 because the condition on line 210 was always true
211 contrast = float(np.std(gray if self.normalize else gray / 255.0))
212 values.append(contrast)
213 else:
214 values.append(float("nan"))
215 return pa.array(values, type=pa.float32())
217 def _compute_blur_feature(self, gray_images: list[np.ndarray | None]) -> pa.Array:
218 """Compute blur (variance of Laplacian) for each image.
220 Args:
221 gray_images: List of grayscale image arrays (or None for failed images).
223 Returns:
224 PyArrow array of blur values.
225 """
226 values = []
227 for gray in gray_images:
228 if gray is not None: 228 ↛ 232line 228 didn't jump to line 232 because the condition on line 228 was always true
229 blur_val = float(self._variance_of_laplacian(gray))
230 values.append(blur_val)
231 else:
232 values.append(float("nan"))
233 return pa.array(values, type=pa.float32())
235 def _compute_entropy_feature(self, gray_images: list[np.ndarray | None]) -> pa.Array:
236 """Compute entropy (Shannon entropy) for each image.
238 Args:
239 gray_images: List of grayscale image arrays (or None for failed images).
241 Returns:
242 PyArrow array of entropy values.
243 """
244 values = []
245 for gray in gray_images:
246 if gray is not None: 246 ↛ 250line 246 didn't jump to line 250 because the condition on line 246 was always true
247 entropy_val = float(self._entropy(gray))
248 values.append(entropy_val)
249 else:
250 values.append(float("nan"))
251 return pa.array(values, type=pa.float32())
253 # --- helpers --------------------------------------------------------------
255 def _to_gray_np(self, x: Any) -> np.ndarray:
256 """Convert various input types to a 2D grayscale numpy array.
258 If `self.normalize` is True, returns float32 in [0,1]. Otherwise returns uint8 [0,255].
260 Args:
261 x: Input data (PIL Image, bytes, string path, or numpy array).
263 Returns:
264 2D numpy array in grayscale.
266 Raises:
267 ValueError: If input type is unsupported or path does not exist.
268 """
269 img: Image.Image | None = None
271 if isinstance(x, Image.Image): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 img = x
273 elif isinstance(x, (bytes, bytearray)):
274 img = Image.open(io.BytesIO(x))
275 elif isinstance(x, str): 275 ↛ 281line 275 didn't jump to line 281 because the condition on line 275 was always true
276 img_path = Path(self.dataset_root_path) / x if self.dataset_root_path != "undefined" else Path(x)
277 if img_path.is_file(): 277 ↛ 280line 277 didn't jump to line 280 because the condition on line 277 was always true
278 img = Image.open(img_path)
279 else:
280 raise ValueError(f"Path does not exist: {img_path}")
281 elif isinstance(x, np.ndarray):
282 arr = x
283 if arr.ndim == 2: # already gray
284 gray = arr
285 elif arr.ndim == 3 and arr.shape[2] in (3, 4):
286 # manual luminance conversion to be independent of PIL for ndarray
287 rgb = arr[..., :3].astype(np.float32)
288 gray = 0.2126 * rgb[..., 0] + 0.7152 * rgb[..., 1] + 0.0722 * rgb[..., 2]
289 else:
290 raise ValueError(f"Unsupported ndarray shape {arr.shape}")
292 return self._to_float01(gray) if self.normalize else gray.astype(np.uint8)
293 else:
294 raise ValueError(f"Unsupported type for image input: {type(x)}")
296 # Use PIL pipeline
297 if self.grayscale and img.mode != "L": 297 ↛ 299line 297 didn't jump to line 299 because the condition on line 297 was always true
298 img = img.convert("L")
299 elif not self.grayscale and img.mode not in ("RGB", "L"):
300 img = img.convert("RGB")
302 gray_np = np.array(img)
303 if gray_np.ndim == 3: # RGB -> gray 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 gray_np = 0.2126 * gray_np[..., 0] + 0.7152 * gray_np[..., 1] + 0.0722 * gray_np[..., 2]
306 if self.normalize: 306 ↛ 310line 306 didn't jump to line 310 because the condition on line 306 was always true
307 # Revert to min-max normalization as required by existing tests
308 return self._to_float01(gray_np)
309 else:
310 return gray_np.astype(np.uint8)
312 @staticmethod
313 def _to_float01(arr: np.ndarray) -> np.ndarray:
314 """Normalize array to [0, 1] range using min-max scaling.
316 Args:
317 arr: Input numpy array.
319 Returns:
320 Normalized array with float32 values in [0, 1].
321 """
322 arr = arr.astype(np.float32)
323 vmin, vmax = float(arr.min()), float(arr.max())
324 arr = (arr - vmin) / (vmax - vmin) if vmax > vmin else np.zeros_like(arr, dtype=np.float32)
325 return arr
327 def _variance_of_laplacian(self, gray: np.ndarray) -> float:
328 """Variance of Laplacian as a blur metric.
330 Args:
331 gray: Grayscale image array.
333 Returns:
334 Variance of Laplacian (higher values indicate more edges/sharpness).
335 """
336 g = gray.astype(np.float32)
337 if self.laplacian_kernel == "5x5": 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 k = np.array(
339 [
340 [0, 0, -1, 0, 0],
341 [0, -1, -2, -1, 0],
342 [-1, -2, 16, -2, -1],
343 [0, -1, -2, -1, 0],
344 [0, 0, -1, 0, 0],
345 ],
346 dtype=np.float32,
347 )
348 else:
349 k = np.array([[0, 1, 0], [1, -4, 1], [0, 1, 0]], dtype=np.float32)
351 # Use scipy for optimized convolution
352 lap = signal.convolve2d(g, k, mode="same")
353 return float(np.var(lap))
355 def _entropy(self, gray: np.ndarray) -> float:
356 """Shannon entropy of the gray histogram (natural log).
358 Args:
359 gray: Grayscale image array.
361 Returns:
362 Shannon entropy value. Returns NaN if histogram sum is zero.
363 """
364 g = gray
365 if self.normalize: 365 ↛ 370line 365 didn't jump to line 370 because the condition on line 365 was always true
366 # histogram on [0,1]
367 hist, _ = np.histogram(g, bins=self.entropy_bins, range=(0.0, 1.0))
368 else:
369 # uint8 range
370 hist, _ = np.histogram(g, bins=min(256, self.entropy_bins), range=(0, 255))
371 p = hist.astype(np.float64)
372 s = p.sum()
373 if s <= 0: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 return float("nan")
375 p /= s
376 # avoid log(0)
377 p = p[p > 0]
378 return float(-(p * np.log(p)).sum())