Coverage for packages / dqm-ml-job / src / dqm_ml_job / cli.py: 74%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 10:11 +0000

1"""Command-line interface for DQM job execution. 

2 

3This module provides CLI functions for parsing arguments and running 

4data quality assessment jobs from YAML configuration files. 

5""" 

6 

7import argparse 

8import logging 

9from pathlib import Path 

10from typing import Any 

11 

12import yaml 

13 

14from dqm_ml_core import PluginLoadedRegistry 

15from dqm_ml_core.api.data_processor import DatametricProcessor 

16from dqm_ml_job.dataloaders import DataLoader 

17from dqm_ml_job.job import DatasetJob 

18from dqm_ml_job.outputwriter import OutputWriter 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23def parse_args(arg_list: list[str] | None) -> Any: 

24 """ 

25 Parse command line arguments for the DQM job. 

26 

27 Args: 

28 arg_list: List of arguments (default: sys.argv[1:]). 

29 

30 Returns: 

31 The parsed Namespace object. 

32 """ 

33 parser = argparse.ArgumentParser( 

34 prog="dqm-ml", description="DQM-ML Job client", epilog="for more informations see README" 

35 ) 

36 

37 parser.add_argument( 

38 "-p", "--process-config", type=str, nargs="+", required=True, help="configuration files to execute" 

39 ) 

40 

41 parser.add_argument("--save-config", type=str, help="Path to save the resolved configuration") 

42 

43 # TODO add parameters to pass directly files / directory as inputs for loaders 

44 args = parser.parse_args(arg_list) 

45 

46 return args 

47 

48 

49# TODO get parameters, logs, ... 

50def execute(arg_list: list[str] | None = None) -> None: 

51 """ 

52 Main CLI entry point for executing DQM jobs from YAML configurations. 

53 Args: 

54 arg_list: List of command line arguments (default: sys.argv[1:]). 

55 """ 

56 args = parse_args(arg_list) 

57 config: dict[str, Any] = {} 

58 

59 for config_file in args.process_config: 

60 logger.debug("Executing job from config file: %s", config_file) 

61 

62 with Path(config_file).open() as stream: 

63 try: 

64 config_content = yaml.safe_load(stream) 

65 config.update(config_content) 

66 except yaml.YAMLError as exc: 

67 logger.error("Fail to part job configuration: %s", config_file) 

68 print(exc) 

69 return 

70 

71 # if we succeed to load all config files, run the job 

72 

73 # Optionally save the resolved configuration 

74 if args.save_config: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 logger.debug("Saving resolved configuration to: %s", args.save_config) 

76 with Path(args.save_config).open("w") as stream: 

77 yaml.safe_dump(config, stream) 

78 

79 if "config" in config: 79 ↛ 81line 79 didn't jump to line 81 because the condition on line 79 was always true

80 run(config["config"]) 

81 elif "pipeline_config" in config: 

82 logger.warning("'pipeline_config' is deprecated, please use 'config' instead.") 

83 run(config["pipeline_config"]) 

84 else: 

85 logger.error("No 'config' found in configuration.") 

86 

87 

88def _init_components(config_dict: dict[str, Any], registry: dict[str, Any], component_name: str) -> dict[str, Any]: 

89 """ 

90 Initialize job components (loaders, metrics, writers) from their respective registries. 

91 

92 Args: 

93 config_dict: Dictionary of component configurations from YAML. 

94 registry: The registry containing the component classes. 

95 component_name: The name of the component type (for error messages). 

96 

97 Returns: 

98 A dictionary of initialized component instances. 

99 """ 

100 components = {} 

101 for key, comp_config in config_dict.items(): 

102 if "type" not in comp_config: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 raise ValueError(f"Configuration for {component_name} '{key}' must contain 'type'") 

104 comp_type = comp_config["type"] 

105 if comp_type not in registry: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 raise ValueError(f"{component_name.capitalize()} '{key}' has invalid type '{comp_type}'") 

107 components[key] = registry[comp_type](name=key, config=comp_config) 

108 return components 

109 

110 

111def run(config: dict[str, Any]) -> None: 

112 """ 

113 Execute a job from a validated configuration dictionary. 

114 

115 The config must contain: 

116 - dataloaders: Map of configurations for data sources. 

117 - metrics_processor: Map of configurations for quality metrics. 

118 - outputs: Map of configurations for results storage. 

119 """ 

120 dataloaders_registry = PluginLoadedRegistry.get_dataloaders_registry() 

121 metrics_registry = PluginLoadedRegistry.get_metrics_registry() 

122 outputs_registry = PluginLoadedRegistry.get_outputwriter_registry() 

123 

124 if not config: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 raise ValueError("Job requires a configuration dictionary.") 

126 

127 # Load data loaders 

128 if "dataloaders" not in config or not isinstance(config["dataloaders"], dict): 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 raise ValueError("'dataloaders' must be provided as a dictionary") 

130 

131 dataloaders: dict[str, DataLoader] = _init_components(config["dataloaders"], dataloaders_registry, "dataloader") 

132 

133 # Load metrics 

134 if "metrics_processor" not in config or not isinstance(config["metrics_processor"], dict): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 raise ValueError("'metrics_processor' must be provided as a dictionary") 

136 

137 metrics: dict[str, DatametricProcessor] = _init_components(config["metrics_processor"], metrics_registry, "metric") 

138 

139 if "compute_delta" in config: 

140 logger.warning("compute_delta' is deprecated and will be removed in future versions.") 

141 

142 # Load output writers 

143 if "outputs" not in config or not isinstance(config["outputs"], dict): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 raise ValueError("'outputs' must be provided as a dictionary") 

145 

146 metrics_output: OutputWriter | None = None 

147 features_output: OutputWriter | None = None 

148 delta_output: OutputWriter | None = None 

149 

150 for key, output_config in config["outputs"].items(): 

151 if output_config["type"] not in outputs_registry: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 raise ValueError(f"Output '{key}' must have a valid 'type' in {list(outputs_registry.keys())}") 

153 writer = outputs_registry[output_config["type"]](name=key, config=output_config) 

154 if key == "metrics": 

155 metrics_output = writer 

156 elif key == "delta_metrics": 

157 delta_output = writer 

158 elif key == "features": 158 ↛ 161line 158 didn't jump to line 161 because the condition on line 158 was always true

159 features_output = writer 

160 else: 

161 raise ValueError(f"Unsupported output key '{key}'. Only 'features' and 'metrics' are allowed.") 

162 

163 progress_bar = config.get("progress_bar", True) 

164 

165 job = DatasetJob( 

166 dataloaders=dataloaders, metrics=metrics, features_output=features_output, progress_bar=progress_bar 

167 ) 

168 

169 dataselection_metrics_list, delta_metrics_table = job.run() 

170 

171 # If we have computed metrics to store 

172 if metrics_output: 

173 metrics_output.write_metrics_dict(dataselection_metrics_list) 

174 

175 # If we have to compute delta metrics 

176 if delta_output and delta_metrics_table: 

177 delta_output.write_table("delta", delta_metrics_table) 

178 

179 

180if __name__ == "__main__": 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 execute()