Tip

This tutorial is available as a Jupyter notebook.

Open in Colab

Synthetic NLP#

In this tutorial, we will demonstrate how to create a Synthetic dataset, write a synthetic dataset into a streaming format and use the streaming Dataset class to load the dataset.

Tutorial Goals and Concepts Covered#

The goal of this tutorial is to showcase how to prepare the dataset and use Streaming data loading to iterate and fetch the samples. It will consist of a few steps:

  1. Generate a synthetic dataset

  2. Preparing the dataset for streaming

  3. Streaming the dataset to the local machine

  4. Iterate through the dataset and fetch the samples

Let’s get started!

Setup#

Let’s start by making sure the right packages are installed and imported. We need to install the mosaicml-streaming package which installs the sufficient dependencies to run this tutorial.

[ ]:
%pip install mosaicml-streaming
# To install from source instead of the last release, comment the command above and uncomment the following one.
# %pip install git+https://github.com/mosaicml/streaming.git

# (Optional) To upload a streaming dataset to an AWS S3 bucket
%pip install awscli
[ ]:
import os
import shutil
from typing import Any, Dict, List, Tuple

import numpy as np
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm

We’ll be using Streaming’s MDSWriter which writes the dataset in Streaming format and Dataset to load the streaming dataset.

[ ]:
import streaming as ms

Global settings#

For this tutorial, let’s import some of the global setting at the start.

[ ]:
# the location of the "remote" streaming dataset (`sds`).
# Upload `out_root` to your cloud storage provider of choice.
out_root = "./sds"
out_train = "./sds/train"
out_val = "./sds/val"

# the location to download the streaming dataset during training
local = './local'
local_train = './local/train'
local_val = './local/val'

# toggle shuffling in dataloader
shuffle_train = True
shuffle_val = False

# training batch size
batch_size = 512
[ ]:
# upload location for the dataset splits (change this if you want to upload to a different location, for example, AWS S3 bucket location)
upload_location = None

if upload_location is None:
    upload_train_location = None
    upload_val_location = None
else:
    upload_train_location = os.path.join(upload_location, 'train')
    upload_val_location = os.path.join(upload_location, 'val')

Create a Synthetic NLP dataset#

In this tutorial, we will be creating a synthetic number-saying dataset, i.e. converting a numbers from digits to words, for example, number 123 would spell as one hundred twenty three. The numbers are generated randomly and it supports a number up-to positive/negative approximately 99 Millions.

Let’s import a utility functions to generate those synthetic number-saying dataset.

[ ]:
# Word representation of a number
ones = ('zero one two three four five six seven eight nine ten eleven twelve thirteen fourteen ' +
        'fifteen sixteen seventeen eighteen nineteen').split()

tens = 'twenty thirty forty fifty sixty seventy eighty ninety'.split()


def say(i: int) -> List[str]:
    """Get the word form of a number.

    Args:
        i (int): The number.

    Returns:
        List[str]: The number in word form.
    """
    if i < 0:
        return ['negative'] + say(-i)
    elif i <= 19:
        return [ones[i]]
    elif i < 100:
        return [tens[i // 10 - 2]] + ([ones[i % 10]] if i % 10 else [])
    elif i < 1_000:
        return [ones[i // 100], 'hundred'] + (say(i % 100) if i % 100 else [])
    elif i < 1_000_000:
        return say(i // 1_000) + ['thousand'] + (say(i % 1_000) if i % 1_000 else [])
    elif i < 1_000_000_000:
        return say(i // 1_000_000) + ['million'] + (say(i % 1_000_000) if i % 1_000_000 else [])
    else:
        assert False


def get_random_number() -> int:
    """Pick a random number the way humans would.

    Picked numbers are positively skewed, exponentially distributed (good for curriculum learning).

    Returns:
        int: The number.
    """
    sign = (np.random.random() < 0.8) * 2 - 1
    mag = 10**np.random.uniform(1, 4) - 10
    return sign * int(mag**2)


def get_numbers(num_train: int, num_val: int) -> Tuple[List[int], List[int]]:
    """Get two non-overlapping splits of unique random numbers.

    Because the distribution is exponential, we are unlikely to run out of numbers.

    Args:
        num_train (int): Number of training samples.
        num_val (int): Number of validation samples.

    Returns:
        Tuple[List[int], List[int]]: The two generated splits.
    """
    total = num_train + num_val
    numbers = set()
    bar = tqdm(total=total, leave=False)
    while len(numbers) < total:
        was = len(numbers)
        numbers.add(get_random_number())
        bar.update(len(numbers) - was)
    numbers = list(numbers)
    np.random.shuffle(numbers)
    return numbers[:num_train], numbers[num_train:]

Initialize a method to generate a train and validation samples where each sample is a dictionary with attributes {'number': <Integer number>, 'words': <word representation of an integer number as string>}.

[ ]:
def generate_samples(numbers: List[int]) -> List[Dict[str, Any]]:
    """Generate samples from a list of numbers.

    Args:
        numbers (List[int]): The numbers.

    Returns:
        List[Dict[str, Any]]: The corresponding samples.
    """
    samples = []
    for num in numbers:
        words = ' '.join(say(num))
        sample = {'number': num, 'words': words}
        samples.append(sample)
    return samples


def get_dataset(num_train: int, num_val: int) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """Generate a number-saying dataset of the given size.

    Args:
        num_train (int): Number of training samples.
        num_val (int): Number of validation samples.

    Returns:
        Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: The two generated splits.
    """
    train_nums, val_nums = get_numbers(num_train, num_val)
    train_samples = generate_samples(train_nums)
    val_samples = generate_samples(val_nums)
    return train_samples, val_samples

Create a non-overlapping train and val split dataset of unique random numbers.

[ ]:
# Number of training and validation samples
num_train_samples = 10_000 # 10k samples
num_val_samples = 2000    # 2k samples

# Create the samples.
print(f'Generating synthetic dataset ({num_train_samples} train, {num_val_samples} val)...')
train_samples, val_samples = get_dataset(num_train_samples, num_val_samples)
splits = [
    ('train', train_samples),
    ('val', val_samples)
]

Let’s visualize the first train and test sample

[ ]:
print(f'Train sample: {train_samples[0]}')
print(f'Val sample: {val_samples[0]}')

Convert the dataset to MosaicML Streaming#

We are going to use the MDSWriter to convert the raw synthetic NLP dataset into a .mds file format.

For more information on the Streaming MDSWriter class check out the API reference.

[ ]:
# Mapping of sample keyword with their data type
columns = {
    'number': 'int',
    'words': 'str',
}

# Compression algorithm to use for dataset
compression = 'zstd:12'

# Hashing algorithm to use for dataset
hashes = ['sha1', 'xxh3_64']

# shard size limit, in bytes
size_limit = 1 << 16  # Override to a small number for more shards.

print(f'Saving dataset (to {out_root})...')
for split, samples in splits:
    print(f'* {split}')
    dirname = os.path.join(out_root, split)
    with ms.MDSWriter(dirname, columns, compression, hashes, size_limit) as out:
        for sample in tqdm(samples, leave=False):
            out.write(sample)

Now that we’ve written the datasets to out_root, one can upload them to a cloud storage provider, and we are ready to stream them.

[ ]:
remote_train = upload_train_location or out_train # replace this with your URL for cloud streaming
remote_val  = upload_val_location or out_val

(Optional) Upload the Streaming dataset to an AWS S3 bucket of your choice. Uncomment the below line if you have provided the S3 bucket link to upload_location.

[ ]:
# !aws s3 cp $out_root $upload_location --recursive

Loading the Data#

We extend Streaming’s Dataset to deserialize the data. Let’s verify the dataloading samples from Streaming’s Dataset class with the raw samples for content validity and deterministic sample ordering.

For more information on the Streaming Dataset parent class check out the API reference.

[ ]:
# Load the samples back.
print('Walking the dataset:')
for split, samples in splits:
    print(f'verifying samples for {split}')
    dataset = ms.Dataset(remote=upload_location or out_root, local=local_train, split=split, shuffle=False)
    for old, new in tqdm(zip(samples, dataset), total=len(samples), leave=False):
        assert old == new

We can also visualize the single sample by indexing on a Streaming’s Dataset.

[ ]:
train_dataset = ms.Dataset(remote=remote_train, local=local_train, shuffle=False, batch_size=batch_size)

# Fetch the 10th sample and print it on a console
print(train_dataset[10])

Below are some utility methods about the dataset which would be highly useful for debugging and model training. For more information on the Streaming Dataset parameters, check out the API reference.

[ ]:
# Get the total number of samples
print(f'Total number of samples: {train_dataset.index.total_samples}')

# Get the number of shard files
print(f'Total number of shards: {len(train_dataset.shards)}')

# Get the number of samples inside each shard files.
# Number of samples in each shard can vary based on each sample size.
print(f'Number of samples inside each shards: {train_dataset.index.samples_per_shard}')

we can now instantiate our streaming datasets and wrap them in standard PyTorch dataloaders for training!

[ ]:
train_dataset = ms.Dataset(remote=remote_train, local=local_train, shuffle=shuffle_train, batch_size=batch_size)
val_dataset = ms.Dataset(remote=remote_val, local=local_train, shuffle=shuffle_train, batch_size=batch_size)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

The Streaming Dataset class supports a batch_size parameter, if when provided by the user, worker indices will be constructed so that there is at most one incomplete batch at the end of each epoch for better workload distribution across workers and lesser or no drop samples. Let’s look at the example below with drop_last=True in PyTorch DataLoader to drop the last non-divisible batch for demonstration purpose:

[ ]:
# Instantiate a streaming Dataset class without a `batch_size` parameter
dataset_without_bs = ms.Dataset(remote=remote_train, local=local_train, shuffle=shuffle_train)
dataloader_ds_wo_bs = DataLoader(dataset_without_bs, batch_size=batch_size, num_workers=8, drop_last=True)

# Instantiate a streaming Dataset class with a `batch_size` parameter
dataset_with_bs = ms.Dataset(remote=remote_train, local=local_train, shuffle=shuffle_train, batch_size=batch_size)
dataloader_ds_with_bs = DataLoader(dataset_with_bs, batch_size=batch_size, num_workers=8, drop_last=True)

Visualize the number of samples processed by the dataloader when batch_size was not provided during instantiation of Streaming Dataset class.

[ ]:
total_samples = 0
for idx, batch in enumerate(dataloader_ds_wo_bs):
    total_samples += len(batch["number"])
print(f'Total number of samples processed by the dataloader is {total_samples} out of {num_train_samples}')

Visualize the number of samples processed by the dataloader when batch_size was provided during instantiation of Streaming Dataset class. We will see that the number of samples processed is higher than when batch_size is absent.

[ ]:
total_samples = 0
for idx, batch in enumerate(dataloader_ds_with_bs):
    total_samples += len(batch["number"])
print(f'Total number of samples processed by the dataloader is {total_samples} out of {num_train_samples}')

Cleanup#

That’s it. No need to hang on to the files created by the tutorial…

[ ]:
shutil.rmtree(out_root, ignore_errors=True)
shutil.rmtree(local, ignore_errors=True)

What next?#

You’ve now seen an in-depth look at how to prepare and use streaming datasets with PyTorch.

To continue learning about Streaming, please continue to explore our examples!

Come get involved with MosaicML!#

We’d love for you to get involved with the MosaicML community in any of these ways:

Star Streaming on GitHub#

Help make others aware of our work by starring Streaming on GitHub.

Join the MosaicML Slack#

Head on over to the MosaicML slack to join other ML efficiency enthusiasts. Come for the paper discussions, stay for the memes!

Contribute to Streaming#

Is there a bug you noticed or a feature you’d like? File an issue or make a pull request!