Tip

This tutorial is available as a Jupyter notebook.

Open in Colab

Parallel dataset conversion#

If your dataset is huge, running single process dataset conversion script could be very time consuming. You can use multiprocessing with MDSWriter to convert your dataset in parallel. There are few ways in which you can convert your raw data into MDS format in parallel fashion.

  1. Download a raw data in parallel and convert to MDS format sequentially.

  2. Group raw data and convert in parallel to MDS format in separate sub-directories. Then, merge all the index.json files from these subdirectories to get a unified MDS dataset.

Let’s look at an example for each option.

1. Fetch raw data in parallel and write sequentially#

For a dataset with large files (such as images or videos), it would be useful to download those files in parallel using multiple processes and call the MDSWriter to write the data into MDS format.

Setup#

Let’s start by installing the mosaicml-streaming package, and importing necessary dependencies.

[ ]:
%pip install mosaicml-streaming
[ ]:
import os
from multiprocessing import Pool

from streaming import MDSWriter, StreamingDataset

Global settings#

Initialize global variables:

[ ]:
out_root = './data'
# This could be a list of URLs needs to download
dataset = [i for i in range(25)]
columns = {'number': 'int'}

Download data from remote URLs. Here, we just return a number for demonstration purposes.

[ ]:
def get_data(number):
    print(f'\nWorker PID: {os.getpid()}\tnumber: {number}', flush=True, end='')
    # Add code here to downloads the data from URL.
    return {'number': number}

An initialization method for each worker process which prints the worker PID.

[ ]:
# Initialize the worker process
def init_worker():
    # Get the pid for the current worker process
    pid = os.getpid()
    print(f'\nInitialize Worker PID: {pid}', flush=True, end='')

Convert to MDS format#

Initialize 4 worker processes which download the data in parallel. Once the data is ready, it is written to MDS format using the write method of {class}streaming.MDSWriter.

[ ]:
# clean up root directory
%rm -rf $out_root

with Pool(initializer=init_worker, processes=4) as pool:
    with MDSWriter(out=out_root, columns=columns) as out:
        for sample in pool.imap(get_data, dataset):
            out.write(sample)

Load MDS dataset#

Read samples from MDS by iterating over StreamingDataset. Here, we just print sample IDs.

[ ]:
# read the sample
dataset = StreamingDataset(local=out_root,
                           remote=None,
                           shuffle=False,)
for sample in dataset:
    print(sample['number'])
[ ]:
# Clean up
%rm -rf $out_root

2. Group the raw data and convert to MDS format in parallel#

For large raw datasets, or raw datasets with large files, we recommend partitioning dataset conversion among multiple MDSWriters. Dataset conversion will take place with multiple processes in parallel.

Importing dependencies:

[ ]:
import os
import json
from glob import glob
from typing import Iterator, Tuple

from multiprocessing import Pool

from streaming import MDSWriter, StreamingDataset

Global settings#

Initializing needed global variables:

[ ]:
out_root = './group_data'
num_groups = 4
num_process = 2

This function yields a sub-directory path where MDS shards will be stored, as well as the raw dataset sample range of that directory. For example, the first sub-directory will contain samples 0 to 9, the second sub-directory will contain samples 10 to 19, and so on.

If you are working with large files, you can also yield a single raw dataset file path instead of a sample range.

[ ]:
def each_task(out_root: str, groups: int) -> Iterator[Tuple[str, int, int]]:
    """Get the sub-directory path and the sample range for each sub-directory.

    Args:
        out_root (str): base output mds directory
        groups (int): Number of sub-directories to create

    Yields:
        Iterator[Tuple[str, int, int]]: Each argument tuple
    """
    for data_group in range(groups):
        sub_out_root = os.path.join(out_root, str(data_group))
        start_sample_idx = data_group * 10
        end_sample_idx = start_sample_idx + 9
        yield sub_out_root, start_sample_idx, end_sample_idx

This function converts raw dataset samples into MDS format.

[ ]:
def convert_to_mds(args: Iterator[Tuple[str, int, int]]) -> None:
    """Convert raw dataset into MDS format

    Args:
        args (Iterator[Tuple[str, int, int]]): All arguments, packed into a tuple because
            process pools only pass one argument.

    Yields:
        Dict: A sample
    """
    sub_out_root, start_sample_idx, end_sample_idx = args

    def get_data(start: int, end: int):
        for i in range(start, end + 1):
            yield {'number': i}

    columns = {'number': 'int'}

    with MDSWriter(out=sub_out_root,
                   columns=columns) as out:
        for sample in get_data(start_sample_idx, end_sample_idx):
            out.write(sample)

We partition the raw dataset into 4 sub-groups, and each process takes a converts a sub-group into MDS format. The resulting shards are stored in the respective sub-directories.

[ ]:
# clean up root directory
%rm -rf $out_root

arg_tuples = each_task(out_root, groups=num_groups)

# Process group of data in parallel into directories of shards.
with Pool(initializer=init_worker, processes=num_process) as pool:
    for count in pool.imap(convert_to_mds, arg_tuples):
        pass
print('Finished')

Once dataset has been converted to an MDS format, let’s look at the directory structure. You will find 4 sub-directories, each containing an index.json file and shard files.

[ ]:
%ll $out_root

Merge meta data#

The last step of the conversion process is to merge all the index.json files of the sub-directories. The content of the shard files will remain the same. By calling the merge_index utility function, information for all the shards will be written to a new index.json file placed in the out directory.

[ ]:
from streaming.base.util import merge_index
merge_index(out_root, keep_local=True)

Let’s checkout the root directory, where you can see one index.json file along with subdirectories that contain shard files.

[ ]:
%ll $out_root

Load MDS dataset#

Read the sample using StreamingDataset. Here, we just print the sample IDs.

[ ]:
# read the sample
dataset = StreamingDataset(local=out_root,
                           remote=None,
                           shuffle=False)
for ix, sample in enumerate(dataset):
    print(sample['number'])

Cleanup#

[ ]:
%rm -rf $out_root

What next?#

You’ve now seen an in-depth tutorial on converting a dataset into MDS format using multiple process. If you are interested in some real-world examples, then, check out the WebVid and Pile dataset conversion scripts which convert datasets into MDS format via multiprocessing.