Skip to content

dqm_ml_job.dataloaders

Data loaders module for DQM ML Job.

This module contains classes for loading data from various sources and protocols. It provides the DataLoader and DataSelection protocols along with concrete implementations for different file formats.

Classes:

Name Description
DataLoader

Protocol for data loader factories.

DataSelection

Protocol for data subsets.

ParquetDataLoader

Loader for Parquet files.

PandasDataLoader

Loader for CSV files using Pandas.

__all__ = ['DataLoader', 'DataSelection', 'PandasDataLoader', 'ParquetDataLoader', 'dqml_dataloaders_registry'] module-attribute

dqml_dataloaders_registry = {'parquet': ParquetDataLoader, 'csv': PandasDataLoader} module-attribute

DataLoader

Bases: Protocol

Protocol for Data Loader factories.

A DataLoader is responsible for scanning a source (disk, DB, S3) and discovering available DataSelections based on its configuration.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/proto.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@runtime_checkable
class DataLoader(Protocol):
    """
    Protocol for Data Loader factories.

    A DataLoader is responsible for scanning a source (disk, DB, S3) and
    discovering available DataSelections based on its configuration.
    """

    def get_selections(self) -> list[DataSelection]:
        """
        Discover and return the list of available selections for this loader.

        Returns:
            A list of initialized DataSelection instances.
        """

get_selections() -> list[DataSelection]

Discover and return the list of available selections for this loader.

Returns:

Type Description
list[DataSelection]

A list of initialized DataSelection instances.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/proto.py
52
53
54
55
56
57
58
def get_selections(self) -> list[DataSelection]:
    """
    Discover and return the list of available selections for this loader.

    Returns:
        A list of initialized DataSelection instances.
    """

DataSelection

Bases: Protocol

Protocol for a specific subset of data discovered by a DataLoader.

A DataSelection represents a concrete set of samples (e.g., a specific folder, a filtered view of a database, or a single file) and provides an iterator over data batches.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/proto.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@runtime_checkable
class DataSelection(Protocol):
    """
    Protocol for a specific subset of data discovered by a DataLoader.

    A DataSelection represents a concrete set of samples (e.g., a specific folder,
    a filtered view of a database, or a single file) and provides an iterator
    over data batches.
    """

    name: str

    def bootstrap(self, columns_list: list[str]) -> None:
        """
        Perform initial setup for the selection before iteration starts.

        Args:
            columns_list: List of column names that must be loaded for this selection.
        """

    def get_nb_batches(self) -> int:
        """
        Return the estimated number of batches in this selection.

        Used primarily for progress bar estimation.
        """

    def __iter__(self) -> Any:
        """
        Iterate over the selection, yielding pyarrow.RecordBatch objects.
        """

name: str instance-attribute

__iter__() -> Any

Iterate over the selection, yielding pyarrow.RecordBatch objects.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/proto.py
37
38
39
40
def __iter__(self) -> Any:
    """
    Iterate over the selection, yielding pyarrow.RecordBatch objects.
    """

bootstrap(columns_list: list[str]) -> None

Perform initial setup for the selection before iteration starts.

Parameters:

Name Type Description Default
columns_list list[str]

List of column names that must be loaded for this selection.

required
Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/proto.py
22
23
24
25
26
27
28
def bootstrap(self, columns_list: list[str]) -> None:
    """
    Perform initial setup for the selection before iteration starts.

    Args:
        columns_list: List of column names that must be loaded for this selection.
    """

get_nb_batches() -> int

Return the estimated number of batches in this selection.

Used primarily for progress bar estimation.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/proto.py
30
31
32
33
34
35
def get_nb_batches(self) -> int:
    """
    Return the estimated number of batches in this selection.

    Used primarily for progress bar estimation.
    """

PandasDataLoader

Data loader for CSV files using Pandas.

This loader reads CSV files and provides DataSelections for processing by the DQM pipeline.

Attributes:

Name Type Description
type str

The loader type identifier ("csv").

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/pandas.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class PandasDataLoader:
    """Data loader for CSV files using Pandas.

    This loader reads CSV files and provides DataSelections for
    processing by the DQM pipeline.

    Attributes:
        type: The loader type identifier ("csv").
    """

    type: str = "csv"

    def __init__(self, name: str, config: dict[str, Any] | None = None):
        """Initialize the Pandas data loader.

        Args:
            name: Unique name for this loader instance.
            config: Configuration dictionary containing:
                - path: Path to CSV file (required)

        Raises:
            ValueError: If required config keys are missing.
        """
        if not config or "path" not in config:
            raise ValueError(f"Configuration for dataloader '{name}' must contain 'path'")
        self.name = name
        self.path = config["path"]

    def get_selections(self) -> list[DataSelection]:
        """Create a PandasDataSelection for the CSV file.

        Returns:
            A list containing a single PandasDataSelection instance.
        """
        return [PandasDataSelection(name=self.name, path=self.path)]

name = name instance-attribute

path = config['path'] instance-attribute

type: str = 'csv' class-attribute instance-attribute

__init__(name: str, config: dict[str, Any] | None = None)

Initialize the Pandas data loader.

Parameters:

Name Type Description Default
name str

Unique name for this loader instance.

required
config dict[str, Any] | None

Configuration dictionary containing: - path: Path to CSV file (required)

None

Raises:

Type Description
ValueError

If required config keys are missing.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/pandas.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __init__(self, name: str, config: dict[str, Any] | None = None):
    """Initialize the Pandas data loader.

    Args:
        name: Unique name for this loader instance.
        config: Configuration dictionary containing:
            - path: Path to CSV file (required)

    Raises:
        ValueError: If required config keys are missing.
    """
    if not config or "path" not in config:
        raise ValueError(f"Configuration for dataloader '{name}' must contain 'path'")
    self.name = name
    self.path = config["path"]

get_selections() -> list[DataSelection]

Create a PandasDataSelection for the CSV file.

Returns:

Type Description
list[DataSelection]

A list containing a single PandasDataSelection instance.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/pandas.py
 94
 95
 96
 97
 98
 99
100
def get_selections(self) -> list[DataSelection]:
    """Create a PandasDataSelection for the CSV file.

    Returns:
        A list containing a single PandasDataSelection instance.
    """
    return [PandasDataSelection(name=self.name, path=self.path)]

ParquetDataLoader

Data loader for Parquet files that generates one or more DataSelections.

This loader can read from a single Parquet file or a directory of Parquet files, optionally splitting the data by a column value to create multiple selections.

Attributes:

Name Type Description
type str

The loader type identifier ("parquet").

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class ParquetDataLoader:
    """Data loader for Parquet files that generates one or more DataSelections.

    This loader can read from a single Parquet file or a directory of Parquet
    files, optionally splitting the data by a column value to create multiple
    selections.

    Attributes:
        type: The loader type identifier ("parquet").
    """

    type: str = "parquet"

    def __init__(self, name: str, config: dict[str, Any] | None = None):
        """Initialize the Parquet data loader.

        Args:
            name: Unique name for this loader instance.
            config: Configuration dictionary containing:
                - path: Path to Parquet file or directory (required)
                - batch_size: Rows per batch (default: 100000)
                - threads: Number of threads (default: 4)
                - split_by: Column name to split selections by
                - split_values: Specific values to split on
                - filter: Dictionary of column filters

        Raises:
            ValueError: If required config keys are missing.
        """
        if not config or "path" not in config:
            raise ValueError(f"Configuration for dataloader '{name}' must contain 'path'")

        self.name = name
        self.config = config
        self.path = config["path"]
        self.batch_size = config.get("batch_size", 100_000)
        self.threads = config.get("threads", 4)
        self.split_by = config.get("split_by")
        self.split_values = config.get("split_values")
        self.filters_dict = config.get("filter", None)

    def get_selections(self) -> list[DataSelection]:
        """Create one or more ParquetDataSelection instances based on configuration.

        Returns:
            A list of DataSelection instances. If split_by is configured,
            returns one selection per unique value. Otherwise, returns a
            single selection for the entire dataset.
        """
        if not self.split_by:
            # Single selection
            return [
                ParquetDataSelection(
                    name=self.name,
                    path=self.path,
                    batch_size=self.batch_size,
                    threads=self.threads,
                    filters_dict=self.filters_dict,
                )
            ]

        # Splitting logic
        values = self.split_values
        if values is None:
            # Automatic discovery if split_values not provided
            logger.info(f"Discovering unique values for split_by='{self.split_by}' in {self.path}")
            table = pq.read_table(self.path, columns=[self.split_by])
            values = [str(v) for v in pc.unique(table.column(0)).to_pylist() if v is not None]

        selections: list[DataSelection] = []
        for val in values:
            selection_name = f"{self.name}_{val}"
            # Merge existing filters with the split filter
            merged_filters = (self.filters_dict or {}).copy()
            merged_filters[self.split_by] = val

            selections.append(
                ParquetDataSelection(
                    name=selection_name,
                    path=self.path,
                    batch_size=self.batch_size,
                    threads=self.threads,
                    filters_dict=merged_filters,
                )
            )
        return selections

batch_size = config.get('batch_size', 100000) instance-attribute

config = config instance-attribute

filters_dict = config.get('filter', None) instance-attribute

name = name instance-attribute

path = config['path'] instance-attribute

split_by = config.get('split_by') instance-attribute

split_values = config.get('split_values') instance-attribute

threads = config.get('threads', 4) instance-attribute

type: str = 'parquet' class-attribute instance-attribute

__init__(name: str, config: dict[str, Any] | None = None)

Initialize the Parquet data loader.

Parameters:

Name Type Description Default
name str

Unique name for this loader instance.

required
config dict[str, Any] | None

Configuration dictionary containing: - path: Path to Parquet file or directory (required) - batch_size: Rows per batch (default: 100000) - threads: Number of threads (default: 4) - split_by: Column name to split selections by - split_values: Specific values to split on - filter: Dictionary of column filters

None

Raises:

Type Description
ValueError

If required config keys are missing.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, name: str, config: dict[str, Any] | None = None):
    """Initialize the Parquet data loader.

    Args:
        name: Unique name for this loader instance.
        config: Configuration dictionary containing:
            - path: Path to Parquet file or directory (required)
            - batch_size: Rows per batch (default: 100000)
            - threads: Number of threads (default: 4)
            - split_by: Column name to split selections by
            - split_values: Specific values to split on
            - filter: Dictionary of column filters

    Raises:
        ValueError: If required config keys are missing.
    """
    if not config or "path" not in config:
        raise ValueError(f"Configuration for dataloader '{name}' must contain 'path'")

    self.name = name
    self.config = config
    self.path = config["path"]
    self.batch_size = config.get("batch_size", 100_000)
    self.threads = config.get("threads", 4)
    self.split_by = config.get("split_by")
    self.split_values = config.get("split_values")
    self.filters_dict = config.get("filter", None)

get_selections() -> list[DataSelection]

Create one or more ParquetDataSelection instances based on configuration.

Returns:

Type Description
list[DataSelection]

A list of DataSelection instances. If split_by is configured,

list[DataSelection]

returns one selection per unique value. Otherwise, returns a

list[DataSelection]

single selection for the entire dataset.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def get_selections(self) -> list[DataSelection]:
    """Create one or more ParquetDataSelection instances based on configuration.

    Returns:
        A list of DataSelection instances. If split_by is configured,
        returns one selection per unique value. Otherwise, returns a
        single selection for the entire dataset.
    """
    if not self.split_by:
        # Single selection
        return [
            ParquetDataSelection(
                name=self.name,
                path=self.path,
                batch_size=self.batch_size,
                threads=self.threads,
                filters_dict=self.filters_dict,
            )
        ]

    # Splitting logic
    values = self.split_values
    if values is None:
        # Automatic discovery if split_values not provided
        logger.info(f"Discovering unique values for split_by='{self.split_by}' in {self.path}")
        table = pq.read_table(self.path, columns=[self.split_by])
        values = [str(v) for v in pc.unique(table.column(0)).to_pylist() if v is not None]

    selections: list[DataSelection] = []
    for val in values:
        selection_name = f"{self.name}_{val}"
        # Merge existing filters with the split filter
        merged_filters = (self.filters_dict or {}).copy()
        merged_filters[self.split_by] = val

        selections.append(
            ParquetDataSelection(
                name=selection_name,
                path=self.path,
                batch_size=self.batch_size,
                threads=self.threads,
                filters_dict=merged_filters,
            )
        )
    return selections