Coverage for packages / dqm-ml-job / src / dqm_ml_job / outputwriter / parquet.py: 87%

38 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 10:11 +0000

1"""Parquet output writer for persisting pipeline results. 

2 

3This module contains the ParquetOutputWriter class that writes 

4metrics and features to Parquet files. 

5""" 

6 

7import logging 

8from pathlib import Path 

9from typing import Any 

10 

11import pyarrow as pa 

12import pyarrow.parquet as pq 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class ParquetOutputWriter: 

18 """ 

19 Output writer that saves processed features to a Parquet file. 

20 """ 

21 

22 def __init__(self, name: str, config: dict[str, Any] | None = None): 

23 """ 

24 Initialize a ParquetOutputWriter. 

25 

26 Args: 

27 name: Unique name for this output writer. 

28 config: Configuration dictionary with keys: 

29 - path_pattern (str): Output file path format string. 

30 - columns (List[str]): Columns to save. 

31 

32 Raises: 

33 ValueError: If required config keys are missing. 

34 """ 

35 if not config or "path_pattern" not in config: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 raise ValueError(f"Configuration for ParquetOutputWriter '{name}' must contain 'path_pattern'") 

37 if "columns" not in config: 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true

38 raise ValueError(f"Configuration for ParquetOutputWriter '{name}' must contain 'columns'") 

39 

40 self.path_pattern = config["path_pattern"] 

41 self.columns = config["columns"] 

42 self.name = name 

43 

44 def write_metrics_dict(self, metrics_dict: dict[str, dict[str, Any]]) -> None: 

45 """Aggregate and write dataset-level metrics for all selections to a Parquet file. 

46 

47 Args: 

48 metrics_dict: Map of selection names to their computed metric dictionaries. 

49 """ 

50 if len(metrics_dict) > 0: 50 ↛ 60line 50 didn't jump to line 60 because the condition on line 50 was always true

51 logger.debug(f"Writing metrics for the {len(metrics_dict)} data selections") 

52 

53 # We get all the selections computed 

54 keys = list(metrics_dict.keys()) 

55 metric_names = list(metrics_dict[keys[0]].keys()) 

56 metrics_table = {"selection": pa.array(keys)} 

57 

58 for metric_name in metric_names: 

59 metrics_table[metric_name] = pa.array([metrics_dict[key][metric_name] for key in keys]) 

60 self.write_table("", metrics_table) 

61 

62 def write_table(self, path_pattern: str, features_array: dict[str, Any], part: int | None = None) -> None: 

63 """ 

64 Write a table of features or metrics to a Parquet file on disk. 

65 

66 Handles directory creation if the target path doesn't exist. 

67 

68 Args: 

69 path_pattern: Identifier for the data detination (used in filename pattern). 

70 features_array: Map of column names to pyarrow Arrays. 

71 part: Optional partition index for chunked output. 

72 """ 

73 

74 for key in self.columns: 

75 if key not in features_array: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 logger.error(f"Missing {key} in features for output") 

77 

78 table = pa.table(features_array) 

79 if part is None: 

80 filename = self.path_pattern.format(path_pattern, "") 

81 else: 

82 filename = self.path_pattern.format(path_pattern, part) 

83 

84 output_dir = Path(filename).parent 

85 if not Path.exists(output_dir): 

86 logger.info(f"Creating output directory: {output_dir}") 

87 Path.mkdir(output_dir, parents=True, exist_ok=True) 

88 

89 pq.write_table(table, filename) 

90 logger.info(f"Wrote output table to {filename}")