Coverage for packages / dqm-ml-job / src / dqm_ml_job / outputwriter / parquet.py: 87%
38 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
1"""Parquet output writer for persisting pipeline results.
3This module contains the ParquetOutputWriter class that writes
4metrics and features to Parquet files.
5"""
7import logging
8from pathlib import Path
9from typing import Any
11import pyarrow as pa
12import pyarrow.parquet as pq
14logger = logging.getLogger(__name__)
17class ParquetOutputWriter:
18 """
19 Output writer that saves processed features to a Parquet file.
20 """
22 def __init__(self, name: str, config: dict[str, Any] | None = None):
23 """
24 Initialize a ParquetOutputWriter.
26 Args:
27 name: Unique name for this output writer.
28 config: Configuration dictionary with keys:
29 - path_pattern (str): Output file path format string.
30 - columns (List[str]): Columns to save.
32 Raises:
33 ValueError: If required config keys are missing.
34 """
35 if not config or "path_pattern" not in config: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 raise ValueError(f"Configuration for ParquetOutputWriter '{name}' must contain 'path_pattern'")
37 if "columns" not in config: 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true
38 raise ValueError(f"Configuration for ParquetOutputWriter '{name}' must contain 'columns'")
40 self.path_pattern = config["path_pattern"]
41 self.columns = config["columns"]
42 self.name = name
44 def write_metrics_dict(self, metrics_dict: dict[str, dict[str, Any]]) -> None:
45 """Aggregate and write dataset-level metrics for all selections to a Parquet file.
47 Args:
48 metrics_dict: Map of selection names to their computed metric dictionaries.
49 """
50 if len(metrics_dict) > 0: 50 ↛ 60line 50 didn't jump to line 60 because the condition on line 50 was always true
51 logger.debug(f"Writing metrics for the {len(metrics_dict)} data selections")
53 # We get all the selections computed
54 keys = list(metrics_dict.keys())
55 metric_names = list(metrics_dict[keys[0]].keys())
56 metrics_table = {"selection": pa.array(keys)}
58 for metric_name in metric_names:
59 metrics_table[metric_name] = pa.array([metrics_dict[key][metric_name] for key in keys])
60 self.write_table("", metrics_table)
62 def write_table(self, path_pattern: str, features_array: dict[str, Any], part: int | None = None) -> None:
63 """
64 Write a table of features or metrics to a Parquet file on disk.
66 Handles directory creation if the target path doesn't exist.
68 Args:
69 path_pattern: Identifier for the data detination (used in filename pattern).
70 features_array: Map of column names to pyarrow Arrays.
71 part: Optional partition index for chunked output.
72 """
74 for key in self.columns:
75 if key not in features_array: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 logger.error(f"Missing {key} in features for output")
78 table = pa.table(features_array)
79 if part is None:
80 filename = self.path_pattern.format(path_pattern, "")
81 else:
82 filename = self.path_pattern.format(path_pattern, part)
84 output_dir = Path(filename).parent
85 if not Path.exists(output_dir):
86 logger.info(f"Creating output directory: {output_dir}")
87 Path.mkdir(output_dir, parents=True, exist_ok=True)
89 pq.write_table(table, filename)
90 logger.info(f"Wrote output table to {filename}")