Source code for dgl.graphbolt.internal.sample_utils
"""Utility functions for sampling."""
import copy
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union
import torch
from ..base import CSCFormatBase, etype_str_to_tuple, expand_indptr
[docs]def unique_and_compact(
nodes: Union[
List[torch.Tensor],
Dict[str, List[torch.Tensor]],
],
):
"""
Compact a list of nodes tensor.
Parameters
----------
nodes : List[torch.Tensor] or Dict[str, List[torch.Tensor]]
List of nodes for compacting.
the unique_and_compact will be done per type
- If `nodes` is a list of tensor: All the tensors will do unique and
compact together, usually it is used for homogeneous graph.
- If `nodes` is a list of dictionary: The keys should be node type and
the values should be corresponding nodes, the unique and compact will
be done per type, usually it is used for heterogeneous graph.
Returns
-------
Tuple[unique_nodes, compacted_node_list]
The Unique nodes (per type) of all nodes in the input. And the compacted
nodes list, where IDs inside are replaced with compacted node IDs.
"Compacted node list" indicates that the node IDs in the input node
list are replaced with mapped node IDs, where each type of node is
mapped to a contiguous space of IDs ranging from 0 to N.
"""
is_heterogeneous = isinstance(nodes, dict)
def unique_and_compact_per_type(nodes):
nums = [node.size(0) for node in nodes]
nodes = torch.cat(nodes)
empty_tensor = nodes.new_empty(0)
unique, compacted, _ = torch.ops.graphbolt.unique_and_compact(
nodes, empty_tensor, empty_tensor
)
compacted = compacted.split(nums)
return unique, list(compacted)
if is_heterogeneous:
unique, compacted = {}, {}
for ntype, nodes_of_type in nodes.items():
unique[ntype], compacted[ntype] = unique_and_compact_per_type(
nodes_of_type
)
return unique, compacted
else:
return unique_and_compact_per_type(nodes)
def compact_temporal_nodes(nodes, nodes_timestamp):
"""Compact a list of temporal nodes without unique.
Note that since there is no unique, the nodes and nodes_timestamp are simply
concatenated. And the compacted nodes are consecutive numbers starting from
0.
Parameters
----------
nodes : List[torch.Tensor] or Dict[str, List[torch.Tensor]]
List of nodes for compacting.
the compact operator will be done per type
- If `nodes` is a list of tensor: All the tensors will compact together,
usually it is used for homogeneous graph.
- If `nodes` is a list of dictionary: The keys should be node type and
the values should be corresponding nodes, the compact will be done per
type, usually it is used for heterogeneous graph.
nodes_timestamp : List[torch.Tensor] or Dict[str, List[torch.Tensor]]
List of timestamps for compacting.
Returns
-------
Tuple[nodes, nodes_timestamp, compacted_node_list]
The concatenated nodes and nodes_timestamp, and the compacted nodes list,
where IDs inside are replaced with compacted node IDs.
"""
def _compact_per_type(per_type_nodes, per_type_nodes_timestamp):
nums = [node.size(0) for node in per_type_nodes]
per_type_nodes = torch.cat(per_type_nodes)
per_type_nodes_timestamp = torch.cat(per_type_nodes_timestamp)
compacted_nodes = torch.arange(
0,
per_type_nodes.numel(),
dtype=per_type_nodes.dtype,
device=per_type_nodes.device,
)
compacted_nodes = list(compacted_nodes.split(nums))
return per_type_nodes, per_type_nodes_timestamp, compacted_nodes
if isinstance(nodes, dict):
ret_nodes, ret_timestamp, compacted = {}, {}, {}
for ntype, nodes_of_type in nodes.items():
(
ret_nodes[ntype],
ret_timestamp[ntype],
compacted[ntype],
) = _compact_per_type(nodes_of_type, nodes_timestamp[ntype])
return ret_nodes, ret_timestamp, compacted
else:
return _compact_per_type(nodes, nodes_timestamp)
[docs]def unique_and_compact_csc_formats(
csc_formats: Union[
Tuple[torch.Tensor, torch.Tensor],
Dict[str, Tuple[torch.Tensor, torch.Tensor]],
],
unique_dst_nodes: Union[
torch.Tensor,
Dict[str, torch.Tensor],
],
):
"""
Compact csc formats and return unique nodes (per type).
Parameters
----------
csc_formats : Union[CSCFormatBase, Dict(str, CSCFormatBase)]
CSC formats representing source-destination edges.
- If `csc_formats` is a CSCFormatBase: It means the graph is
homogeneous. Also, indptr and indice in it should be torch.tensor
representing source and destination pairs in csc format. And IDs inside
are homogeneous ids.
- If `csc_formats` is a Dict[str, CSCFormatBase]: The keys
should be edge type and the values should be csc format node pairs.
And IDs inside are heterogeneous ids.
unique_dst_nodes: torch.Tensor or Dict[str, torch.Tensor]
Unique nodes of all destination nodes in the node pairs.
- If `unique_dst_nodes` is a tensor: It means the graph is homogeneous.
- If `csc_formats` is a dictionary: The keys are node type and the
values are corresponding nodes. And IDs inside are heterogeneous ids.
Returns
-------
Tuple[csc_formats, unique_nodes]
The compacted csc formats, where node IDs are replaced with mapped node
IDs, and the unique nodes (per type).
"Compacted csc formats" indicates that the node IDs in the input node
pairs are replaced with mapped node IDs, where each type of node is
mapped to a contiguous space of IDs ranging from 0 to N.
Examples
--------
>>> import dgl.graphbolt as gb
>>> N1 = torch.LongTensor([1, 2, 2])
>>> N2 = torch.LongTensor([5, 5, 6])
>>> unique_dst = {
... "n1": torch.LongTensor([1, 2]),
... "n2": torch.LongTensor([5, 6])}
>>> csc_formats = {
... "n1:e1:n2": gb.CSCFormatBase(indptr=torch.tensor([0, 2, 3]),indices=N1),
... "n2:e2:n1": gb.CSCFormatBase(indptr=torch.tensor([0, 1, 3]),indices=N2)}
>>> unique_nodes, compacted_csc_formats = gb.unique_and_compact_csc_formats(
... csc_formats, unique_dst
... )
>>> print(unique_nodes)
{'n1': tensor([1, 2]), 'n2': tensor([5, 6])}
>>> print(compacted_csc_formats)
{"n1:e1:n2": CSCFormatBase(indptr=torch.tensor([0, 2, 3]),
indices=torch.tensor([0, 1, 1])),
"n2:e2:n1": CSCFormatBase(indptr=torch.tensor([0, 1, 3]),
indices=torch.Longtensor([0, 0, 1]))}
"""
is_homogeneous = not isinstance(csc_formats, dict)
if is_homogeneous:
csc_formats = {"_N:_E:_N": csc_formats}
if unique_dst_nodes is not None:
assert isinstance(
unique_dst_nodes, torch.Tensor
), "Edge type not supported in homogeneous graph."
unique_dst_nodes = {"_N": unique_dst_nodes}
# Collect all source and destination nodes for each node type.
indices = defaultdict(list)
device = None
for etype, csc_format in csc_formats.items():
if device is None:
device = csc_format.indices.device
src_type, _, dst_type = etype_str_to_tuple(etype)
assert len(unique_dst_nodes.get(dst_type, [])) + 1 == len(
csc_format.indptr
), "The seed nodes should correspond to indptr."
indices[src_type].append(csc_format.indices)
indices = {ntype: torch.cat(nodes) for ntype, nodes in indices.items()}
ntypes = set(indices.keys())
unique_nodes = {}
compacted_indices = {}
dtype = list(indices.values())[0].dtype
default_tensor = torch.tensor([], dtype=dtype, device=device)
indice_list = []
unique_dst_list = []
for ntype in ntypes:
indice_list.append(indices.get(ntype, default_tensor))
unique_dst_list.append(unique_dst_nodes.get(ntype, default_tensor))
dst_list = [torch.tensor([], dtype=dtype, device=device)] * len(
unique_dst_list
)
results = torch.ops.graphbolt.unique_and_compact_batched(
indice_list, dst_list, unique_dst_list
)
for i, ntype in enumerate(ntypes):
unique_nodes[ntype], compacted_indices[ntype], _ = results[i]
compacted_csc_formats = {}
# Map back with the same order.
for etype, csc_format in csc_formats.items():
num_elem = csc_format.indices.size(0)
src_type, _, _ = etype_str_to_tuple(etype)
indice = compacted_indices[src_type][:num_elem]
indptr = csc_format.indptr
compacted_csc_formats[etype] = CSCFormatBase(
indptr=indptr, indices=indice
)
compacted_indices[src_type] = compacted_indices[src_type][num_elem:]
# Return singleton for a homogeneous graph.
if is_homogeneous:
compacted_csc_formats = list(compacted_csc_formats.values())[0]
unique_nodes = list(unique_nodes.values())[0]
return unique_nodes, compacted_csc_formats
def _broadcast_timestamps(csc, dst_timestamps):
"""Broadcast the timestamp of each destination node to its corresponding
source nodes."""
return expand_indptr(
csc.indptr, node_ids=dst_timestamps, output_size=len(csc.indices)
)
[docs]def compact_csc_format(
csc_formats: Union[CSCFormatBase, Dict[str, CSCFormatBase]],
dst_nodes: Union[torch.Tensor, Dict[str, torch.Tensor]],
dst_timestamps: Optional[
Union[torch.Tensor, Dict[str, torch.Tensor]]
] = None,
):
"""
Relabel the row (source) IDs in the csc formats into a contiguous range from
0 and return the original row node IDs per type.
Note that
1. The column (destination) IDs are included in the relabeled row IDs.
2. If there are repeated row IDs, they would not be uniqued and will be
treated as different nodes.
3. If `dst_timestamps` is given, the timestamp of each destination node will
be broadcasted to its corresponding source nodes.
Parameters
----------
csc_formats: Union[CSCFormatBase, Dict[str, CSCFormatBase]]
CSC formats representing source-destination edges.
- If `csc_formats` is a CSCFormatBase: It means the graph is
homogeneous. Also, indptr and indice in it should be torch.tensor
representing source and destination pairs in csc format. And IDs inside
are homogeneous ids.
- If `csc_formats` is a Dict[str, CSCFormatBase]: The keys
should be edge type and the values should be csc format node pairs.
And IDs inside are heterogeneous ids.
dst_nodes: Union[torch.Tensor, Dict[str, torch.Tensor]]
Nodes of all destination nodes in the node pairs.
- If `dst_nodes` is a tensor: It means the graph is homogeneous.
- If `dst_nodes` is a dictionary: The keys are node type and the
values are corresponding nodes. And IDs inside are heterogeneous ids.
dst_timestamps: Optional[Union[torch.Tensor, Dict[str, torch.Tensor]]]
Timestamps of all destination nodes in the csc formats.
If given, the timestamp of each destination node will be broadcasted
to its corresponding source nodes.
Returns
-------
Tuple[original_row_node_ids, compacted_csc_formats, ...]
A tensor of original row node IDs (per type) of all nodes in the input.
The compacted CSC formats, where node IDs are replaced with mapped node
IDs ranging from 0 to N.
The source timestamps (per type) of all nodes in the input if
`dst_timestamps` is given.
Examples
--------
>>> import dgl.graphbolt as gb
>>> csc_formats = {
... "n2:e2:n1": gb.CSCFormatBase(
... indptr=torch.tensor([0, 1, 3]), indices=torch.tensor([5, 4, 6])
... ),
... "n1:e1:n1": gb.CSCFormatBase(
... indptr=torch.tensor([0, 1, 3]), indices=torch.tensor([1, 2, 3])
... ),
... }
>>> dst_nodes = {"n1": torch.LongTensor([2, 4])}
>>> original_row_node_ids, compacted_csc_formats = gb.compact_csc_format(
... csc_formats, dst_nodes
... )
>>> original_row_node_ids
{'n1': tensor([2, 4, 1, 2, 3]), 'n2': tensor([5, 4, 6])}
>>> compacted_csc_formats
{'n2:e2:n1': CSCFormatBase(indptr=tensor([0, 1, 3]),
indices=tensor([0, 1, 2]),
), 'n1:e1:n1': CSCFormatBase(indptr=tensor([0, 1, 3]),
indices=tensor([2, 3, 4]),
)}
>>> csc_formats = {
... "n2:e2:n1": gb.CSCFormatBase(
... indptr=torch.tensor([0, 1, 3]), indices=torch.tensor([5, 4, 6])
... ),
... "n1:e1:n1": gb.CSCFormatBase(
... indptr=torch.tensor([0, 1, 3]), indices=torch.tensor([1, 2, 3])
... ),
... }
>>> dst_nodes = {"n1": torch.LongTensor([2, 4])}
>>> original_row_node_ids, compacted_csc_formats = gb.compact_csc_format(
... csc_formats, dst_nodes
... )
>>> original_row_node_ids
{'n1': tensor([2, 4, 1, 2, 3]), 'n2': tensor([5, 4, 6])}
>>> compacted_csc_formats
{'n2:e2:n1': CSCFormatBase(indptr=tensor([0, 1, 3]),
indices=tensor([0, 1, 2]),
), 'n1:e1:n1': CSCFormatBase(indptr=tensor([0, 1, 3]),
indices=tensor([2, 3, 4]),
)}
>>> dst_timestamps = {"n1": torch.LongTensor([10, 20])}
>>> (
... original_row_node_ids,
... compacted_csc_formats,
... src_timestamps,
... ) = gb.compact_csc_format(csc_formats, dst_nodes, dst_timestamps)
>>> src_timestamps
{'n1': tensor([10, 20, 10, 20, 20]), 'n2': tensor([10, 20, 20])}
"""
is_homogeneous = not isinstance(csc_formats, dict)
has_timestamp = dst_timestamps is not None
if is_homogeneous:
if dst_nodes is not None:
assert isinstance(
dst_nodes, torch.Tensor
), "Edge type not supported in homogeneous graph."
assert len(dst_nodes) + 1 == len(
csc_formats.indptr
), "The seed nodes should correspond to indptr."
offset = dst_nodes.size(0)
original_row_ids = torch.cat((dst_nodes, csc_formats.indices))
compacted_csc_formats = CSCFormatBase(
indptr=csc_formats.indptr,
indices=(
torch.arange(
0,
csc_formats.indices.size(0),
device=csc_formats.indices.device,
)
+ offset
),
)
src_timestamps = None
if has_timestamp:
src_timestamps = torch.cat(
[
dst_timestamps,
_broadcast_timestamps(
compacted_csc_formats, dst_timestamps
),
]
)
else:
compacted_csc_formats = {}
src_timestamps = None
original_row_ids = copy.deepcopy(dst_nodes)
if has_timestamp:
src_timestamps = copy.deepcopy(dst_timestamps)
for etype, csc_format in csc_formats.items():
src_type, _, dst_type = etype_str_to_tuple(etype)
assert len(dst_nodes.get(dst_type, [])) + 1 == len(
csc_format.indptr
), "The seed nodes should correspond to indptr."
device = csc_format.indices.device
offset = original_row_ids.get(
src_type, torch.tensor([], device=device)
).size(0)
original_row_ids[src_type] = torch.cat(
(
original_row_ids.get(
src_type,
torch.tensor(
[], dtype=csc_format.indices.dtype, device=device
),
),
csc_format.indices,
)
)
compacted_csc_formats[etype] = CSCFormatBase(
indptr=csc_format.indptr,
indices=(
torch.arange(
0,
csc_format.indices.size(0),
dtype=csc_format.indices.dtype,
device=device,
)
+ offset
),
)
if has_timestamp:
# If destination timestamps are given, broadcast them to the
# corresponding source nodes.
src_timestamps[src_type] = torch.cat(
(
src_timestamps.get(
src_type,
torch.tensor(
[], dtype=dst_timestamps[dst_type].dtype
),
),
_broadcast_timestamps(
csc_format, dst_timestamps[dst_type]
),
)
)
if has_timestamp:
return original_row_ids, compacted_csc_formats, src_timestamps
return original_row_ids, compacted_csc_formats