Coverage for packages / dqm-ml-core / src / dqm_ml_core / api / data_processor.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 10:11 +0000

1"""Base data metric processor class. 

2 

3This module contains the DatametricProcessor base class that all 

4metric processors must inherit from. It provides the streaming 

5architecture for processing large datasets. 

6""" 

7 

8import logging 

9from typing import Any 

10 

11import pyarrow as pa 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16class DatametricProcessor: 

17 """ 

18 Base class for all Data Quality metrics and feature extractors. 

19 

20 The processor follows a streaming lifecycle designed to handle large datasets 

21 without loading them entirely into memory: 

22 

23 1. Feature Extraction (`compute_features`): Transformation of raw data into 

24 relevant features (e.g., image -> luminosity). 

25 2. Batch Aggregation (`compute_batch_metric`): Compression of features into 

26 intermediate statistics (e.g., count, partial sum, histogram). 

27 3. Global Computation (`compute`): Final aggregation of all batch-level 

28 statistics into dataset-level scores. 

29 """ 

30 

31 def __init__(self, name: str, config: dict[str, Any] | None): 

32 """ 

33 Initialize the dataset processor. 

34 

35 Args: 

36 name: Unique name of the processor instance. 

37 config: Configuration dictionary (optional). 

38 """ 

39 

40 self.name = name 

41 self.config = config or {} 

42 

43 # Validate input_columns if present 

44 if "input_columns" in self.config: 

45 if not isinstance(self.config["input_columns"], list): 

46 raise ValueError( 

47 f"Metric {name} configuration need 'input_columns', got {type(self.config['input_columns'])}" 

48 ) 

49 self.input_columns = self.config["input_columns"] 

50 else: 

51 self.input_columns = [] 

52 

53 # Validate output_columns if present 

54 if "output_columns" in self.config: 

55 if not isinstance(self.config["output_columns"], dict): 

56 raise ValueError( 

57 f"Metric {name} configuration need of 'output_columns', got {type(self.config['output_columns'])}" 

58 ) 

59 self.outputs_columns = self.config["output_columns"] 

60 else: 

61 self.outputs_columns = {} 

62 

63 def needed_columns(self) -> list[str]: 

64 """ 

65 Return the list of raw input columns required for feature extraction. 

66 

67 Returns: 

68 A list of column names. 

69 """ 

70 return getattr(self, "input_columns", []) 

71 

72 def generated_features(self) -> list[str]: 

73 """ 

74 Return the list of columns generated by this processor during feature extraction. 

75 

76 Returns: 

77 A list of feature names. 

78 """ 

79 

80 outputs = getattr(self, "output_features", {}) 

81 return list(outputs.values()) 

82 

83 def generated_metrics(self) -> list[str]: 

84 """ 

85 Return the names of the final metrics produced by this processor. 

86 

87 Returns: 

88 A list of metric names. 

89 """ 

90 

91 outputs = getattr(self, "output_metrics", {}) 

92 return list(outputs.values()) 

93 

94 def compute_features(self, batch: pa.RecordBatch, prev_features: dict[str, pa.Array]) -> dict[str, pa.Array]: 

95 """ 

96 Transform a raw data batch into features. 

97 

98 Args: 

99 batch: The input pyarrow RecordBatch. 

100 prev_features: Features already computed by preceding processors. 

101 

102 Returns: 

103 A dictionary mapping feature names to pyarrow Arrays. 

104 """ 

105 features = {} 

106 

107 for col in self.needed_columns(): 

108 if col in prev_features: 

109 # feature already computed no need to add it again 

110 continue 

111 

112 if col not in batch.schema.names: 

113 logger.warning(f"[{self.name}] column '{col}' not found in batch") 

114 continue 

115 features[col] = batch.column(col) 

116 

117 return features 

118 

119 def compute_batch_metric(self, features: dict[str, pa.Array]) -> dict[str, pa.Array]: 

120 """ 

121 Aggregate features into intermediate statistics for the current batch. 

122 

123 This method is critical for scalability. It should return a compact 

124 representation of the data (e.g., partial sums) that can be 

125 efficiently combined later. 

126 

127 Args: 

128 features: Dictionary of feature arrays computed on the batch. 

129 

130 Returns: 

131 A dictionary of aggregated statistics. 

132 """ 

133 return {} 

134 

135 def compute(self, batch_metrics: dict[str, pa.Array]) -> dict[str, Any]: 

136 """ 

137 Perform the final dataset-level metric calculation. 

138 

139 Args: 

140 batch_metrics: The aggregated intermediate statistics from all batches. 

141 

142 Returns: 

143 A dictionary containing the final metrics. 

144 """ 

145 return {} 

146 

147 def compute_delta(self, source: dict[str, Any], target: dict[str, Any]) -> dict[str, Any]: 

148 """ 

149 Compare metrics between two different dataselection. 

150 

151 Args: 

152 source: Final metrics from the source dataselection. 

153 target: Final metrics from the target dataselection. 

154 

155 Returns: 

156 A dictionary containing distance or difference scores. 

157 """ 

158 return {}