Coverage for packages / dqm-ml-core / src / dqm_ml_core / api / data_processor.py: 100%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
1"""Base data metric processor class.
3This module contains the DatametricProcessor base class that all
4metric processors must inherit from. It provides the streaming
5architecture for processing large datasets.
6"""
8import logging
9from typing import Any
11import pyarrow as pa
13logger = logging.getLogger(__name__)
16class DatametricProcessor:
17 """
18 Base class for all Data Quality metrics and feature extractors.
20 The processor follows a streaming lifecycle designed to handle large datasets
21 without loading them entirely into memory:
23 1. Feature Extraction (`compute_features`): Transformation of raw data into
24 relevant features (e.g., image -> luminosity).
25 2. Batch Aggregation (`compute_batch_metric`): Compression of features into
26 intermediate statistics (e.g., count, partial sum, histogram).
27 3. Global Computation (`compute`): Final aggregation of all batch-level
28 statistics into dataset-level scores.
29 """
31 def __init__(self, name: str, config: dict[str, Any] | None):
32 """
33 Initialize the dataset processor.
35 Args:
36 name: Unique name of the processor instance.
37 config: Configuration dictionary (optional).
38 """
40 self.name = name
41 self.config = config or {}
43 # Validate input_columns if present
44 if "input_columns" in self.config:
45 if not isinstance(self.config["input_columns"], list):
46 raise ValueError(
47 f"Metric {name} configuration need 'input_columns', got {type(self.config['input_columns'])}"
48 )
49 self.input_columns = self.config["input_columns"]
50 else:
51 self.input_columns = []
53 # Validate output_columns if present
54 if "output_columns" in self.config:
55 if not isinstance(self.config["output_columns"], dict):
56 raise ValueError(
57 f"Metric {name} configuration need of 'output_columns', got {type(self.config['output_columns'])}"
58 )
59 self.outputs_columns = self.config["output_columns"]
60 else:
61 self.outputs_columns = {}
63 def needed_columns(self) -> list[str]:
64 """
65 Return the list of raw input columns required for feature extraction.
67 Returns:
68 A list of column names.
69 """
70 return getattr(self, "input_columns", [])
72 def generated_features(self) -> list[str]:
73 """
74 Return the list of columns generated by this processor during feature extraction.
76 Returns:
77 A list of feature names.
78 """
80 outputs = getattr(self, "output_features", {})
81 return list(outputs.values())
83 def generated_metrics(self) -> list[str]:
84 """
85 Return the names of the final metrics produced by this processor.
87 Returns:
88 A list of metric names.
89 """
91 outputs = getattr(self, "output_metrics", {})
92 return list(outputs.values())
94 def compute_features(self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array]) -> dict[str, pa.Array]:
95 """
96 Transform a raw data batch into features.
98 Args:
99 batch: The input pyarrow RecordBatch.
100 prev_features: Features already computed by preceding processors.
102 Returns:
103 A dictionary mapping feature names to pyarrow Arrays.
104 """
105 features = {}
107 for col in self.needed_columns():
108 if col in prev_features:
109 # feature already computed no need to add it again
110 continue
112 if col not in batch.schema.names:
113 logger.warning(f"[{self.name}] column '{col}' not found in batch")
114 continue
115 features[col] = batch.column(col)
117 return features
119 def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]:
120 """
121 Aggregate features into intermediate statistics for the current batch.
123 This method is critical for scalability. It should return a compact
124 representation of the data (e.g., partial sums) that can be
125 efficiently combined later.
127 Args:
128 features: Dictionary of feature arrays computed on the batch.
130 Returns:
131 A dictionary of aggregated statistics.
132 """
133 return {}
135 def compute(self, batch_metrics: dict[str, pa.Array]) -> dict[str, Any]:
136 """
137 Perform the final dataset-level metric calculation.
139 Args:
140 batch_metrics: The aggregated intermediate statistics from all batches.
142 Returns:
143 A dictionary containing the final metrics.
144 """
145 return {}
147 def compute_delta(self, source: dict[str, Any], target: dict[str, Any]) -> dict[str, Any]:
148 """
149 Compare metrics between two different dataselection.
151 Args:
152 source: Final metrics from the source dataselection.
153 target: Final metrics from the target dataselection.
155 Returns:
156 A dictionary containing distance or difference scores.
157 """
158 return {}