Coverage for packages / dqm-ml-job / src / dqm_ml_job / cli.py: 74%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 10:11 +0000
1"""Command-line interface for DQM job execution.
3This module provides CLI functions for parsing arguments and running
4data quality assessment jobs from YAML configuration files.
5"""
7import argparse
8import logging
9from pathlib import Path
10from typing import Any
12import yaml
14from dqm_ml_core import PluginLoadedRegistry
15from dqm_ml_core.api.data_processor import DatametricProcessor
16from dqm_ml_job.dataloaders import DataLoader
17from dqm_ml_job.job import DatasetJob
18from dqm_ml_job.outputwriter import OutputWriter
20logger = logging.getLogger(__name__)
23def parse_args(arg_list: list[str] | None) -> Any:
24 """
25 Parse command line arguments for the DQM job.
27 Args:
28 arg_list: List of arguments (default: sys.argv[1:]).
30 Returns:
31 The parsed Namespace object.
32 """
33 parser = argparse.ArgumentParser(
34 prog="dqm-ml", description="DQM-ML Job client", epilog="for more informations see README"
35 )
37 parser.add_argument(
38 "-p", "--process-config", type=str, nargs="+", required=True, help="configuration files to execute"
39 )
41 parser.add_argument("--save-config", type=str, help="Path to save the resolved configuration")
43 # TODO add parameters to pass directly files / directory as inputs for loaders
44 args = parser.parse_args(arg_list)
46 return args
49# TODO get parameters, logs, ...
50def execute(arg_list: list[str] | None = None) -> None:
51 """
52 Main CLI entry point for executing DQM jobs from YAML configurations.
53 Args:
54 arg_list: List of command line arguments (default: sys.argv[1:]).
55 """
56 args = parse_args(arg_list)
57 config: dict[str, Any] = {}
59 for config_file in args.process_config:
60 logger.debug("Executing job from config file: %s", config_file)
62 with Path(config_file).open() as stream:
63 try:
64 config_content = yaml.safe_load(stream)
65 config.update(config_content)
66 except yaml.YAMLError as exc:
67 logger.error("Fail to part job configuration: %s", config_file)
68 print(exc)
69 return
71 # if we succeed to load all config files, run the job
73 # Optionally save the resolved configuration
74 if args.save_config: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 logger.debug("Saving resolved configuration to: %s", args.save_config)
76 with Path(args.save_config).open("w") as stream:
77 yaml.safe_dump(config, stream)
79 if "config" in config: 79 ↛ 81line 79 didn't jump to line 81 because the condition on line 79 was always true
80 run(config["config"])
81 elif "pipeline_config" in config:
82 logger.warning("'pipeline_config' is deprecated, please use 'config' instead.")
83 run(config["pipeline_config"])
84 else:
85 logger.error("No 'config' found in configuration.")
88def _init_components(config_dict: dict[str, Any], registry: dict[str, Any], component_name: str) -> dict[str, Any]:
89 """
90 Initialize job components (loaders, metrics, writers) from their respective registries.
92 Args:
93 config_dict: Dictionary of component configurations from YAML.
94 registry: The registry containing the component classes.
95 component_name: The name of the component type (for error messages).
97 Returns:
98 A dictionary of initialized component instances.
99 """
100 components = {}
101 for key, comp_config in config_dict.items():
102 if "type" not in comp_config: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 raise ValueError(f"Configuration for {component_name} '{key}' must contain 'type'")
104 comp_type = comp_config["type"]
105 if comp_type not in registry: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 raise ValueError(f"{component_name.capitalize()} '{key}' has invalid type '{comp_type}'")
107 components[key] = registry[comp_type](name=key, config=comp_config)
108 return components
111def run(config: dict[str, Any]) -> None:
112 """
113 Execute a job from a validated configuration dictionary.
115 The config must contain:
116 - dataloaders: Map of configurations for data sources.
117 - metrics_processor: Map of configurations for quality metrics.
118 - outputs: Map of configurations for results storage.
119 """
120 dataloaders_registry = PluginLoadedRegistry.get_dataloaders_registry()
121 metrics_registry = PluginLoadedRegistry.get_metrics_registry()
122 outputs_registry = PluginLoadedRegistry.get_outputwriter_registry()
124 if not config: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 raise ValueError("Job requires a configuration dictionary.")
127 # Load data loaders
128 if "dataloaders" not in config or not isinstance(config["dataloaders"], dict): 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 raise ValueError("'dataloaders' must be provided as a dictionary")
131 dataloaders: dict[str, DataLoader] = _init_components(config["dataloaders"], dataloaders_registry, "dataloader")
133 # Load metrics
134 if "metrics_processor" not in config or not isinstance(config["metrics_processor"], dict): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 raise ValueError("'metrics_processor' must be provided as a dictionary")
137 metrics: dict[str, DatametricProcessor] = _init_components(config["metrics_processor"], metrics_registry, "metric")
139 if "compute_delta" in config:
140 logger.warning("compute_delta' is deprecated and will be removed in future versions.")
142 # Load output writers
143 if "outputs" not in config or not isinstance(config["outputs"], dict): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 raise ValueError("'outputs' must be provided as a dictionary")
146 metrics_output: OutputWriter | None = None
147 features_output: OutputWriter | None = None
148 delta_output: OutputWriter | None = None
150 for key, output_config in config["outputs"].items():
151 if output_config["type"] not in outputs_registry: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 raise ValueError(f"Output '{key}' must have a valid 'type' in {list(outputs_registry.keys())}")
153 writer = outputs_registry[output_config["type"]](name=key, config=output_config)
154 if key == "metrics":
155 metrics_output = writer
156 elif key == "delta_metrics":
157 delta_output = writer
158 elif key == "features": 158 ↛ 161line 158 didn't jump to line 161 because the condition on line 158 was always true
159 features_output = writer
160 else:
161 raise ValueError(f"Unsupported output key '{key}'. Only 'features' and 'metrics' are allowed.")
163 progress_bar = config.get("progress_bar", True)
165 job = DatasetJob(
166 dataloaders=dataloaders, metrics=metrics, features_output=features_output, progress_bar=progress_bar
167 )
169 dataselection_metrics_list, delta_metrics_table = job.run()
171 # If we have computed metrics to store
172 if metrics_output:
173 metrics_output.write_metrics_dict(dataselection_metrics_list)
175 # If we have to compute delta metrics
176 if delta_output and delta_metrics_table:
177 delta_output.write_table("delta", delta_metrics_table)
180if __name__ == "__main__": 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 execute()