DistributedItemSampler

class dgl.graphbolt.DistributedItemSampler(item_set: ~dgl.graphbolt.itemset.ItemSet | ~dgl.graphbolt.itemset.ItemSetDict, batch_size: int, minibatcher: ~typing.Callable | None = <function minibatcher_default>, drop_last: bool | None = False, shuffle: bool | None = False, drop_uneven_inputs: bool | None = False, buffer_size: int | None = -1)[source]

Bases: ItemSampler

A sampler to iterate over input items and create subsets distributedly.

This sampler creates a distributed subset of items from the given data set, which can be used for training with PyTorch’s Distributed Data Parallel (DDP). The items can be node IDs, node pairs with or without labels, node pairs with negative sources/destinations, DGLGraphs, or heterogeneous counterparts. The original item set is split such that each replica (process) receives an exclusive subset.

Note: The items will be first split onto each replica, then get shuffled (if needed) and batched. Therefore, each replica will always get a same set of items.

Note: This class DistributedItemSampler is not decorated with torchdata.datapipes.functional_datapipe on purpose. This indicates it does not support function-like call. But any iterable datapipes from torchdata can be further appended.

Parameters:
  • item_set (Union[ItemSet, ItemSetDict]) – Data to be sampled.

  • batch_size (int) – The size of each batch.

  • minibatcher (Optional[Callable]) – A callable that takes in a list of items and returns a MiniBatch.

  • drop_last (bool) – Option to drop the last batch if it’s not full.

  • shuffle (bool) – Option to shuffle before sample.

  • num_replicas (int) – The number of model replicas that will be created during Distributed Data Parallel (DDP) training. It should be the same as the real world size, otherwise it could cause errors. By default, it is retrieved from the current distributed group.

  • drop_uneven_inputs (bool) – Option to make sure the numbers of batches for each replica are the same. If some of the replicas have more batches than the others, the redundant batches of those replicas will be dropped. If the drop_last parameter is also set to True, the last batch will be dropped before the redundant batches are dropped. Note: When using Distributed Data Parallel (DDP) training, the program may hang or error if the a replica has fewer inputs. It is recommended to use the Join Context Manager provided by PyTorch to solve this problem. Please refer to https://pytorch.org/tutorials/advanced/generic_join.html. However, this option can be used if the Join Context Manager is not helpful for any reason.

  • buffer_size (int) – The size of the buffer to store items sliced from the ItemSet or ItemSetDict. By default, it is set to -1, which means the buffer size will be set as the total number of items in the item set. If the item set is too large, it is recommended to set a smaller buffer size to avoid out of memory error. As items are shuffled within each buffer, a smaller buffer size may incur less randomness and such less randomness can further affect the training performance such as convergence speed and accuracy. Therefore, it is recommended to set a larger buffer size if possible.

Examples

0. Preparation: DistributedItemSampler needs multi-processing environment to work. You need to spawn subprocesses and initialize processing group before executing following examples. Due to randomness, the output is not always the same as listed below.

>>> import torch
>>> from dgl import graphbolt as gb
>>> item_set = gb.ItemSet(torch.arange(15))
>>> num_replicas = 4
>>> batch_size = 2
>>> mp.spawn(...)
  1. shuffle = False, drop_last = False, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=False,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13]), tensor([14])]
  1. shuffle = False, drop_last = True, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=True,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13])]
  1. shuffle = False, drop_last = False, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=False,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13]), tensor([14])]
  1. shuffle = False, drop_last = True, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=True,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1])]
Replica#1: [tensor([4, 5])]
Replica#2: [tensor([8, 9])]
Replica#3: [tensor([12, 13])]
  1. shuffle = True, drop_last = True, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=True, drop_last=True,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
(One possible output:)
Replica#0: [tensor([3, 2]), tensor([0, 1])]
Replica#1: [tensor([6, 5]), tensor([7, 4])]
Replica#2: [tensor([8, 10])]
Replica#3: [tensor([14, 12])]
  1. shuffle = True, drop_last = True, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=True, drop_last=True,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
(One possible output:)
Replica#0: [tensor([1, 3])]
Replica#1: [tensor([7, 5])]
Replica#2: [tensor([11, 9])]
Replica#3: [tensor([13, 14])]