๐Ÿ–ผ๏ธ User Guide#

At a very high level, one needs to convert a raw dataset into streaming format files and then use the same streaming format files using streaming.StreamingDataset class for model training.

Streaming supports different dataset writers based on your need for conversion of raw datasets into a streaming format such as

  • streaming.MDSWriter: Writes the dataset into .mds (Mosaic Data Shard) extension. It supports various encoding/decoding formats(str, int, bytes, jpeg, png, pil, pkl, and json) which convert the data from that format to bytes and vice-versa.

  • streaming.CSVWriter: Writes the dataset into .csv (Comma Separated Values) extension. It supports various encoding/decoding formats(str, int, and float) which convert the data from that format to string and vice-versa.

  • streaming.JSONWriter: Writes the dataset into .json (JavaScript Object Notation) extension. It supports various encoding/decoding formats(str, int, and float).

  • streaming.TSVWriter: Writes the dataset into .tsv (Tab Separated Values) extension. It supports various encoding/decoding formats(str, int, and float) which convert the data from that format to string and vice-versa.

  • streaming.XSVWriter: Writes the dataset into .xsv (user defined Separated Values) extension. It supports various encoding/decoding formats(str, int, and float) which convert the data from that format to string and vice-versa.

For more information about writers and their parameters, look at the API reference doc.

After the dataset has been converted to one of our streaming formats, one just needs to instantiate the streaming.StreamingDataset class by providing the dataset path of the streaming formats and use that dataset object in PyTorch torch.utils.data.DataLoader class. For more information about streaming.StreamingDataset and its parameters, look at the streaming.StreamingDataset API reference doc.

Streaming supports various dataset compression formats (Brotli, Bzip2, Gzip, Snappy, and Zstandard) that reduces downloading time and cloud egress fees. Additionally, Streaming also supports various hashing algorithms (SHA2, SHA3, MD5, xxHash, etc.) that ensures data integrity through cryptographic and non-cryptographic hashing algorithm.

Letโ€™s jump right into an example on how to convert a raw dataset into a streaming format and load the same streaming format dataset for model training.

Writing a dataset to streaming format#

This guide shows you how to use your custom StreamingDataset with streaming.MDSWriter, but the steps would remain the same for other writers.

The streaming.MDSWriter takes the raw dataset and converts it into a sharded .mds format for fast data access.

For this tutorial, letโ€™s create a Synthetic Classification dataset drawn from a normal distribution that returns a tuple of features and a label.

import numpy as np

class RandomClassificationDataset:
    """Classification dataset drawn from a normal distribution.

    Args:
        shape: shape of features (default: (5, 1, 1))
        size: number of samples (default: 100)
        num_classes: number of classes (default: 2)
    """

    def __init__(self, shape=(1, 1, 1), size=100, num_classes=2):
        self.size = size
        self.x = np.random.randn(size, *shape)
        self.y = np.random.randint(0, num_classes, size=(size,))

    def __len__(self):
        return self.size

    def __getitem__(self, index: int):
        return self.x[index], self.y[index]

There are a few parameters that need to be initialized before streaming.MDSWriter gets called. Some of the parameters are optional, and others are required parameters. Letโ€™s look at each of them where we start with two required parameters.

  1. Provide the local filesystem directory path or a remote cloud provider storage path to store the compressed dataset files. If it is a remote path, the output files are automatically upload to a remote path.

output_dir = 'test_output_dir'
  1. Provide the column field as Dict[str, str], which maps a feature name or label name with a streaming supported encoding type.

columns = {'x': 'pkl', 'y': 'pkl'}

The below parameters are optional to streaming.MDSWriter. Letโ€™s look at each one of them

  1. Provide a name of a compression algorithm; the default is None. Streaming supports families of compression algorithms such as br, gzip, snappy, zstd, and bz2 with the level of compression.

compression = 'zstd:7'
  1. Provide a name of a hashing algorithm; the default is None. Streaming supports families of hashing algorithm such as sha, blake, md5, xxHash, etc.

hashes = ['sha1']
  1. Provide a shard size limit, after which point to start a new shard.

# Number act as a byte, e.g., 1024 bytes. A string abbreviation (ex: "1024b" or "1kb") is also acceptable
limit = 1024

Once the parameters are initialized, the last thing we need is a generator that iterates over the data sample.

def each(samples):
    """Generator over each dataset sample.

    Args:
        samples (list): List of samples of (feature, label).

    Yields:
        Sample dicts.
    """
    for x, y in samples:
        yield {
            'x': x,
            'y': y,
        }

Itโ€™s time to call the streaming.MDSWriter with the above initialized parameters and write the samples by iterating over a dataset.

from streaming.base import MDSWriter

dataset = RandomClassificationDataset()
with MDSWriter(out=output_dir, columns=columns, compression=compression, hashes=hashes, size_limit=limit) as out:
    for sample in each(dataset):
        out.write(sample)

Clean up after ourselves.

from shutil import rmtree

rmtree(output_dir)

Once the dataset has been written, the output directory contains two types of files. The first is an index.json file that contains the metadata of shards and second is the shard files. For example,

dirname
โ”œโ”€โ”€ index.json
โ”œโ”€โ”€ shard.00000.mds.zstd
โ””โ”€โ”€ shard.00001.mds.zstd

Loading a streaming dataset#

After writing a dataset in the streaming format in the previous step and uploading to a cloud object storage as s3, we are ready to start loading the data.

To load the same dataset files that were created in the above steps, create a CustomDataset class by inheriting the streaming.StreamingDataset class and override the __getitem__(idx: int) method to get the samples. The streaming.StreamingDataset class requires two mandatory parameters which are remote which is a remote directory (S3 or local filesystem) where dataset is stored and local which is a local directory where dataset is cached during operation.

from streaming import StreamingDataset

class CustomDataset(StreamingDataset):
   def __init__(self, local, remote, batch_size):
       super().__init__(local=local, remote=remote, batch_size=batch_size)

   def __getitem__(self, idx: int) -> Any:
       obj = super().__getitem__(idx)
       return obj['x'], obj['y']

The next step is to Instantiate the CustomDataset class with local and remote paths.

# Local filesystem directory where dataset is cached during operation
local = '/tmp/cache'

# Remote directory (S3 or local filesystem) where dataset is stored
remote='s3://mybucket/myfolder'

dataset = CustomDataset(local=local, remote=remote)

The final step is to pass the dataset to PyTorch torch.utils.data.DataLoader and use this dataloader to train your model.

from torch.utils.data import DataLoader

dataloader = DataLoader(dataset=dataset)

Youโ€™ve now seen an in-depth look at how to prepare and use streaming datasets with PyTorch. To continue learning about Streaming, please continue to explore our examples!

Other options#

Please look at the API reference page for the complete list of streaming.StreamingDataset supporting parameters.