Skip to content

dqm_ml_job.dataloaders.parquet

Parquet data loader for reading Parquet files.

This module contains the ParquetDataLoader and ParquetDataSelection classes for loading and iterating over Parquet file data.

logger = logging.getLogger(__name__) module-attribute

ParquetDataLoader

Data loader for Parquet files that generates one or more DataSelections.

This loader can read from a single Parquet file or a directory of Parquet files, optionally splitting the data by a column value to create multiple selections.

Attributes:

Name Type Description
type str

The loader type identifier ("parquet").

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class ParquetDataLoader:
    """Data loader for Parquet files that generates one or more DataSelections.

    This loader can read from a single Parquet file or a directory of Parquet
    files, optionally splitting the data by a column value to create multiple
    selections.

    Attributes:
        type: The loader type identifier ("parquet").
    """

    type: str = "parquet"

    def __init__(self, name: str, config: dict[str, Any] | None = None):
        """Initialize the Parquet data loader.

        Args:
            name: Unique name for this loader instance.
            config: Configuration dictionary containing:
                - path: Path to Parquet file or directory (required)
                - batch_size: Rows per batch (default: 100000)
                - threads: Number of threads (default: 4)
                - split_by: Column name to split selections by
                - split_values: Specific values to split on
                - filter: Dictionary of column filters

        Raises:
            ValueError: If required config keys are missing.
        """
        if not config or "path" not in config:
            raise ValueError(f"Configuration for dataloader '{name}' must contain 'path'")

        self.name = name
        self.config = config
        self.path = config["path"]
        self.batch_size = config.get("batch_size", 100_000)
        self.threads = config.get("threads", 4)
        self.split_by = config.get("split_by")
        self.split_values = config.get("split_values")
        self.filters_dict = config.get("filter", None)

    def get_selections(self) -> list[DataSelection]:
        """Create one or more ParquetDataSelection instances based on configuration.

        Returns:
            A list of DataSelection instances. If split_by is configured,
            returns one selection per unique value. Otherwise, returns a
            single selection for the entire dataset.
        """
        if not self.split_by:
            # Single selection
            return [
                ParquetDataSelection(
                    name=self.name,
                    path=self.path,
                    batch_size=self.batch_size,
                    threads=self.threads,
                    filters_dict=self.filters_dict,
                )
            ]

        # Splitting logic
        values = self.split_values
        if values is None:
            # Automatic discovery if split_values not provided
            logger.info(f"Discovering unique values for split_by='{self.split_by}' in {self.path}")
            table = pq.read_table(self.path, columns=[self.split_by])
            values = [str(v) for v in pc.unique(table.column(0)).to_pylist() if v is not None]

        selections: list[DataSelection] = []
        for val in values:
            selection_name = f"{self.name}_{val}"
            # Merge existing filters with the split filter
            merged_filters = (self.filters_dict or {}).copy()
            merged_filters[self.split_by] = val

            selections.append(
                ParquetDataSelection(
                    name=selection_name,
                    path=self.path,
                    batch_size=self.batch_size,
                    threads=self.threads,
                    filters_dict=merged_filters,
                )
            )
        return selections

batch_size = config.get('batch_size', 100000) instance-attribute

config = config instance-attribute

filters_dict = config.get('filter', None) instance-attribute

name = name instance-attribute

path = config['path'] instance-attribute

split_by = config.get('split_by') instance-attribute

split_values = config.get('split_values') instance-attribute

threads = config.get('threads', 4) instance-attribute

type: str = 'parquet' class-attribute instance-attribute

__init__(name: str, config: dict[str, Any] | None = None)

Initialize the Parquet data loader.

Parameters:

Name Type Description Default
name str

Unique name for this loader instance.

required
config dict[str, Any] | None

Configuration dictionary containing: - path: Path to Parquet file or directory (required) - batch_size: Rows per batch (default: 100000) - threads: Number of threads (default: 4) - split_by: Column name to split selections by - split_values: Specific values to split on - filter: Dictionary of column filters

None

Raises:

Type Description
ValueError

If required config keys are missing.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, name: str, config: dict[str, Any] | None = None):
    """Initialize the Parquet data loader.

    Args:
        name: Unique name for this loader instance.
        config: Configuration dictionary containing:
            - path: Path to Parquet file or directory (required)
            - batch_size: Rows per batch (default: 100000)
            - threads: Number of threads (default: 4)
            - split_by: Column name to split selections by
            - split_values: Specific values to split on
            - filter: Dictionary of column filters

    Raises:
        ValueError: If required config keys are missing.
    """
    if not config or "path" not in config:
        raise ValueError(f"Configuration for dataloader '{name}' must contain 'path'")

    self.name = name
    self.config = config
    self.path = config["path"]
    self.batch_size = config.get("batch_size", 100_000)
    self.threads = config.get("threads", 4)
    self.split_by = config.get("split_by")
    self.split_values = config.get("split_values")
    self.filters_dict = config.get("filter", None)

get_selections() -> list[DataSelection]

Create one or more ParquetDataSelection instances based on configuration.

Returns:

Type Description
list[DataSelection]

A list of DataSelection instances. If split_by is configured,

list[DataSelection]

returns one selection per unique value. Otherwise, returns a

list[DataSelection]

single selection for the entire dataset.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def get_selections(self) -> list[DataSelection]:
    """Create one or more ParquetDataSelection instances based on configuration.

    Returns:
        A list of DataSelection instances. If split_by is configured,
        returns one selection per unique value. Otherwise, returns a
        single selection for the entire dataset.
    """
    if not self.split_by:
        # Single selection
        return [
            ParquetDataSelection(
                name=self.name,
                path=self.path,
                batch_size=self.batch_size,
                threads=self.threads,
                filters_dict=self.filters_dict,
            )
        ]

    # Splitting logic
    values = self.split_values
    if values is None:
        # Automatic discovery if split_values not provided
        logger.info(f"Discovering unique values for split_by='{self.split_by}' in {self.path}")
        table = pq.read_table(self.path, columns=[self.split_by])
        values = [str(v) for v in pc.unique(table.column(0)).to_pylist() if v is not None]

    selections: list[DataSelection] = []
    for val in values:
        selection_name = f"{self.name}_{val}"
        # Merge existing filters with the split filter
        merged_filters = (self.filters_dict or {}).copy()
        merged_filters[self.split_by] = val

        selections.append(
            ParquetDataSelection(
                name=selection_name,
                path=self.path,
                batch_size=self.batch_size,
                threads=self.threads,
                filters_dict=merged_filters,
            )
        )
    return selections

ParquetDataSelection

Bases: DataSelection

A specific selection of data from a Parquet dataset.

This class represents a filtered subset of a Parquet dataset and provides an iterator over PyArrow RecordBatches.

Attributes:

Name Type Description
name

Name identifier for this selection.

path

Path to the Parquet file or directory.

batch_size

Number of rows per batch.

threads

Number of threads for parallel reading.

filters_dict

Optional dictionary of column filters to apply.

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class ParquetDataSelection(DataSelection):
    """A specific selection of data from a Parquet dataset.

    This class represents a filtered subset of a Parquet dataset
    and provides an iterator over PyArrow RecordBatches.

    Attributes:
        name: Name identifier for this selection.
        path: Path to the Parquet file or directory.
        batch_size: Number of rows per batch.
        threads: Number of threads for parallel reading.
        filters_dict: Optional dictionary of column filters to apply.
    """

    def __init__(
        self,
        name: str,
        path: str,
        batch_size: int = 100_000,
        threads: int = 4,
        filters_dict: dict[str, Any] | None = None,
    ):
        """Initialize a Parquet data selection.

        Args:
            name: Name identifier for this selection.
            path: Path to the Parquet file or directory.
            batch_size: Number of rows per batch (default: 100000).
            threads: Number of threads for parallel reading (default: 4).
            filters_dict: Optional dictionary of column filters to apply.
        """
        self.name = name
        self.path = path
        self.batch_size = batch_size
        self.threads = threads
        self.filters_dict = filters_dict
        self.columns_list: list[str] | None = None
        self.dataset: pq.ParquetDataset | None = None
        self.samples_count: int = 0

    @override
    def bootstrap(self, columns_list: list[str]) -> None:
        self.columns_list = columns_list
        filter_expr = None
        if self.filters_dict is not None:
            expr = None
            for col, val in self.filters_dict.items():
                if col not in (self.columns_list):
                    self.columns_list.append(col)
                col_expr = pc.equal(pc.field(col), val)
                expr = col_expr if expr is None else pc.and_(expr, col_expr)
            filter_expr = expr
        self.filter_expr = filter_expr
        self.dataset = pq.ParquetDataset(self.path, filters=filter_expr)
        if len(self.dataset.fragments) > 0:
            self.samples_count = sum(p.count_rows() for p in self.dataset.fragments)
        else:
            self.samples_count = 0

    def __len__(self) -> int:
        return int(self.samples_count)

    @override
    def get_nb_batches(self) -> int:
        return int(len(self) / self.batch_size) + (len(self) % self.batch_size > 0)

    @override
    def __iter__(self) -> Any:
        if self.dataset is None:
            return
        for file in self.dataset.files:
            parquet_file = pq.ParquetFile(file)
            batch_iterator = parquet_file.iter_batches(
                batch_size=self.batch_size, columns=self.columns_list, use_threads=self.threads
            )
            for batch in batch_iterator:
                if self.filter_expr is not None:
                    batch = batch.filter(self.filter_expr)
                if len(batch) == 0:
                    continue
                yield batch

    @override
    def __repr__(self) -> str:
        return f"ParquetSelection(name='{self.name}', path='{self.path}', filters={self.filters_dict})"

batch_size = batch_size instance-attribute

columns_list: list[str] | None = None instance-attribute

dataset: pq.ParquetDataset | None = None instance-attribute

filters_dict = filters_dict instance-attribute

name = name instance-attribute

path = path instance-attribute

samples_count: int = 0 instance-attribute

threads = threads instance-attribute

__init__(name: str, path: str, batch_size: int = 100000, threads: int = 4, filters_dict: dict[str, Any] | None = None)

Initialize a Parquet data selection.

Parameters:

Name Type Description Default
name str

Name identifier for this selection.

required
path str

Path to the Parquet file or directory.

required
batch_size int

Number of rows per batch (default: 100000).

100000
threads int

Number of threads for parallel reading (default: 4).

4
filters_dict dict[str, Any] | None

Optional dictionary of column filters to apply.

None
Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    name: str,
    path: str,
    batch_size: int = 100_000,
    threads: int = 4,
    filters_dict: dict[str, Any] | None = None,
):
    """Initialize a Parquet data selection.

    Args:
        name: Name identifier for this selection.
        path: Path to the Parquet file or directory.
        batch_size: Number of rows per batch (default: 100000).
        threads: Number of threads for parallel reading (default: 4).
        filters_dict: Optional dictionary of column filters to apply.
    """
    self.name = name
    self.path = path
    self.batch_size = batch_size
    self.threads = threads
    self.filters_dict = filters_dict
    self.columns_list: list[str] | None = None
    self.dataset: pq.ParquetDataset | None = None
    self.samples_count: int = 0

__iter__() -> Any

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@override
def __iter__(self) -> Any:
    if self.dataset is None:
        return
    for file in self.dataset.files:
        parquet_file = pq.ParquetFile(file)
        batch_iterator = parquet_file.iter_batches(
            batch_size=self.batch_size, columns=self.columns_list, use_threads=self.threads
        )
        for batch in batch_iterator:
            if self.filter_expr is not None:
                batch = batch.filter(self.filter_expr)
            if len(batch) == 0:
                continue
            yield batch

__len__() -> int

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
80
81
def __len__(self) -> int:
    return int(self.samples_count)

__repr__() -> str

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
103
104
105
@override
def __repr__(self) -> str:
    return f"ParquetSelection(name='{self.name}', path='{self.path}', filters={self.filters_dict})"

bootstrap(columns_list: list[str]) -> None

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@override
def bootstrap(self, columns_list: list[str]) -> None:
    self.columns_list = columns_list
    filter_expr = None
    if self.filters_dict is not None:
        expr = None
        for col, val in self.filters_dict.items():
            if col not in (self.columns_list):
                self.columns_list.append(col)
            col_expr = pc.equal(pc.field(col), val)
            expr = col_expr if expr is None else pc.and_(expr, col_expr)
        filter_expr = expr
    self.filter_expr = filter_expr
    self.dataset = pq.ParquetDataset(self.path, filters=filter_expr)
    if len(self.dataset.fragments) > 0:
        self.samples_count = sum(p.count_rows() for p in self.dataset.fragments)
    else:
        self.samples_count = 0

get_nb_batches() -> int

Source code in packages/dqm-ml-job/src/dqm_ml_job/dataloaders/parquet.py
83
84
85
@override
def get_nb_batches(self) -> int:
    return int(len(self) / self.batch_size) + (len(self) % self.batch_size > 0)