Stream#

class streaming.Stream(*, remote=None, local=None, split=None, proportion=None, repeat=None, choose=None, download_retry=None, download_timeout=None, validate_hash=None, keep_zip=None)[source]#

A dataset, or sub-dataset if mixing, from which we stream/cache samples.

We initialize a StreamingDataset with one or more Streams. Streams may be resampled to achieve different mixtures of samples.

Stream init takes three kinds of arguments:

  • At least one of remote and local must exist. If no remote, the data must be local. If no local, we cache to a temp directory.

    • remote

    • local

  • At most one of proportion, repeat, or choose may exist. If provided one of these, we derive the rest. Note that proportion (relative) and repeat/choose (absolute) are mutually incompatible – you must entirely use one or the other (or neither) for all sub-datasets. If none are provided for all streams and epoch_size is unspecified, then each sample from each stream is seen once per epoch. If none are provided for all streams and epoch_size is specified, then streams are sampled in proportion to their size.

    • proportion

    • repeat

    • choose

  • The remaining arguments are optional knobs for controlling downloading behavior and default to None. If None, they take a default value provided to or by the StreamingDataset init.

    • split

    • download_retry

    • download_timeout

    • validate_hash

    • keep_zip

Parameters
  • remote (str, optional) – Remote path or directory to download the dataset from. If None, its data must exist locally. Defaults to None.

  • local (str, optional) – Local working directory to download shards to. This is where shards are cached while they are being used. Uses a temp directory if not set. Defaults to None.

  • split (str, optional) – Which dataset split to use, if any. If provided, we stream from/to the split subdirs of remote and local. Defaults to None.

  • proportion (float, optional) – How much to upsample or downsample this sub-dataset, as the proportion of the total combined dataset that consists of this sub-dataset. If using proportions, all sub-datasets provided together to the StreamingDataset init must define their proportions. The total combined number of samples is either the StreamingDataset argument β€œepoch_size” if provided, or kept the same total size as the underlying data if not. If provided, must be non-negative. Defaults to None.

  • repeat (float, optional) – How much to upsample or downsample this sub-dataset, as a multipler on the number of samples. If provided, must be non-negative. Defaults to None.

  • choose (int, optional) – How much to upsample or downsample this sub-dataset, as the exact number of resulting samples. If provided, must be non-negative. Defaults to None.

  • download_retry (int, optional) – Number of download re-attempts before giving up. Defaults to None.

  • download_timeout (float, optional) – Number of seconds to wait for a shard to download before raising an exception. Defaults to None.

  • validate_hash (str, optional) – Optional hash or checksum algorithm to use to validate shards. Defaults to None.

  • keep_zip (bool, optional) – Whether to keep or delete the compressed form when decompressing downloaded shards. If False, keep if and only if remote is local or no remote. Defaults to None.

apply_default(default)[source]#

Apply defaults, setting any unset fields.

We use pairs of (name, _name) in order to make type checking happy.

Parameters

default (Self) – Stream containing default values for all optional fields.

classmethod apply_weights(streams, samples_per_stream, choose_per_epoch, seed)[source]#

Given samples per stream, derive each stream’s proportion/repeat/samples.

Modifies streams to save the derived weights.

Parameters
  • streams (Sequence[Stream]) – The list of streams which comprise the dataset.

  • samples_per_stream (NDArray[np.int64]) – Underlying samples of each stream.

  • choose_per_epoch (int, optional) – Absolute epoch size if weighting relatively.

  • seed (int) – Random number generator seed used to sample evenly.

Returns

int – Number of samples to draw per epoch.

get_index_size()[source]#

Get the size of the index file in bytes.

Returns

int – Size in bytes.

get_shards(world, allow_unsafe_types)[source]#

Load this Stream’s index, retrieving its shard readers.

Parameters
  • world (World) – Distributed context.

  • allow_unsafe_types (bool) – If a shard contains Pickle, which allows arbitrary code execution during deserialization, whether to keep going if True or raise an error.

Returns

`List[Reader] – Shard readers.

prepare_shard(shard)[source]#

Ensure (download, validate, extract, etc.) that we have the given shard.

Parameters

shard (Reader) – Which shard.

Returns

int – Change in cache usage.

set_up_local(shards, cache_usage_per_shard)[source]#

Bring a local directory into a consistent state, getting which shards are present.

Parameters
  • shards (List[Reader]) – List of this stream’s shards.

  • cache_usage_per_shard (NDArray[np.int64]) – Cache usage per shard of this stream.

classmethod validate_weights(streams)[source]#

Validate stream weights, returning whether relative or absolute weighting was used.

Parameters

streams (Sequence[Stream]) – Every stream comprising the dataset.

Returns

bool – Whether streams are weighted relatively (proportionally).