"""Neighbor sampling APIs"""
import os
import torch
from .. import backend as F, ndarray as nd, utils
from .._ffi.function import _init_api
from ..base import DGLError, EID
from ..heterograph import DGLBlock, DGLGraph
from .utils import EidExcluder
__all__ = [
"sample_etype_neighbors",
"sample_neighbors",
"sample_neighbors_fused",
"sample_neighbors_biased",
"select_topk",
]
def _prepare_edge_arrays(g, arg):
"""Converts the argument into a list of NDArrays.
If the argument is already a list of array-like objects, directly do the
conversion.
If the argument is a string, converts g.edata[arg] into a list of NDArrays
ordered by the edge types.
"""
if isinstance(arg, list) and len(arg) > 0:
if isinstance(arg[0], nd.NDArray):
return arg
else:
# The list can have None as placeholders for empty arrays with
# undetermined data type.
dtype = None
ctx = None
result = []
for entry in arg:
if F.is_tensor(entry):
result.append(F.to_dgl_nd(entry))
dtype = F.dtype(entry)
ctx = F.context(entry)
else:
result.append(None)
result = [
F.to_dgl_nd(F.copy_to(F.tensor([], dtype=dtype), ctx))
if x is None
else x
for x in result
]
return result
elif arg is None:
return [nd.array([], ctx=nd.cpu())] * len(g.etypes)
else:
arrays = []
for etype in g.canonical_etypes:
if arg in g.edges[etype].data:
arrays.append(F.to_dgl_nd(g.edges[etype].data[arg]))
else:
arrays.append(nd.array([], ctx=nd.cpu()))
return arrays
def sample_etype_neighbors(
g,
nodes,
etype_offset,
fanout,
edge_dir="in",
prob=None,
replace=False,
copy_ndata=True,
copy_edata=True,
etype_sorted=False,
_dist_training=False,
output_device=None,
):
"""Sample neighboring edges of the given nodes and return the induced subgraph.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The graph returned will then contain all the nodes in the
original graph, but only the sampled edges.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
Parameters
----------
g : DGLGraph
The graph. Can only be in CPU. Should only have one node type and one edge type.
nodes : tensor or dict
Node IDs to sample neighbors from.
This argument can take a single ID tensor or a dictionary of node types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
etype_offset : list[int]
The offset of each edge type ID.
fanout : Tensor
The number of edges to be sampled for each node per edge type. Must be a
1D tensor with the number of elements same as the number of edge types.
If -1 is given, all of the neighbors with non-zero probability will be selected.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
prob : list[Tensor], optional
The (unnormalized) probabilities associated with each neighboring edge of
a node.
The features must be non-negative floats or boolean. Otherwise, the
result will be undefined.
replace : bool, optional
If True, sample with replacement.
copy_ndata: bool, optional
If True, the node features of the new graph are copied from
the original graph. If False, the new graph will not have any
node features.
(Default: True)
copy_edata: bool, optional
If True, the edge features of the new graph are copied from
the original graph. If False, the new graph will not have any
edge features.
(Default: True)
_dist_training : bool, optional
Internal argument. Do not use.
(Default: False)
etype_sorted: bool, optional
A hint telling whether the etypes are already sorted.
(Default: False)
output_device : Framework-specific device context object, optional
The output device. Default is the same as the input graph.
Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges, with the
same device as the input graph.
Notes
-----
If :attr:`copy_ndata` or :attr:`copy_edata` is True, same tensors are used as
the node or edge features of the original graph and the new graph.
As a result, users should avoid performing in-place operations
on the node features of the new graph to avoid feature corruption.
"""
if g.device != F.cpu():
raise DGLError("The graph should be in cpu.")
# (BarclayII) because the homogenized graph no longer contains the *name* of edge
# types, the fanout argument can no longer be a dict of etypes and ints, as opposed
# to sample_neighbors.
if not F.is_tensor(fanout):
raise DGLError("The fanout should be a tensor")
if isinstance(nodes, dict):
assert len(nodes) == 1, "The input graph should not have node types"
nodes = list(nodes.values())[0]
nodes = utils.prepare_tensor(g, nodes, "nodes")
device = utils.context_of(nodes)
nodes = F.to_dgl_nd(nodes)
# treat etypes as int32, it is much cheaper than int64
# TODO(xiangsx): int8 can be a better choice.
fanout = F.to_dgl_nd(fanout)
prob_array = _prepare_edge_arrays(g, prob)
subgidx = _CAPI_DGLSampleNeighborsEType(
g._graph,
nodes,
etype_offset,
fanout,
edge_dir,
prob_array,
replace,
etype_sorted,
)
induced_edges = subgidx.induced_edges
ret = DGLGraph(subgidx.graph, g.ntypes, g.etypes)
# handle features
# (TODO) (BarclayII) DGL distributed fails with bus error, freezes, or other
# incomprehensible errors with lazy feature copy.
# So in distributed training context, we fall back to old behavior where we
# only set the edge IDs.
if not _dist_training:
if copy_ndata:
node_frames = utils.extract_node_subframes(g, device)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
edge_frames = utils.extract_edge_subframes(g, induced_edges)
utils.set_new_frames(ret, edge_frames=edge_frames)
else:
for i, etype in enumerate(ret.canonical_etypes):
ret.edges[etype].data[EID] = induced_edges[i]
return ret if output_device is None else ret.to(output_device)
DGLGraph.sample_etype_neighbors = utils.alias_func(sample_etype_neighbors)
[docs]def sample_neighbors(
g,
nodes,
fanout,
edge_dir="in",
prob=None,
replace=False,
copy_ndata=True,
copy_edata=True,
_dist_training=False,
exclude_edges=None,
output_device=None,
):
"""Sample neighboring edges of the given nodes and return the induced subgraph.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The graph returned will then contain all the nodes in the
original graph, but only the sampled edges.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
GPU sampling is supported for this function. Refer to :ref:`guide-minibatch-gpu-sampling`
for more details.
Parameters
----------
g : DGLGraph
The graph. Can be either on CPU or GPU.
nodes : tensor or dict
Node IDs to sample neighbors from.
This argument can take a single ID tensor or a dictionary of node types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
fanout : int or dict[etype, int]
The number of edges to be sampled for each node on each edge type.
This argument can take a single int or a dictionary of edge types and ints.
If a single int is given, DGL will sample this number of edges for each node for
every edge type.
If -1 is given for a single edge type, all the neighboring edges with that edge
type and non-zero probability will be selected.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
prob : str, optional
Feature name used as the (unnormalized) probabilities associated with each
neighboring edge of a node. The feature must have only one element for each
edge.
The features must be non-negative floats or boolean. Otherwise, the result
will be undefined.
exclude_edges: tensor or dict
Edge IDs to exclude during sampling neighbors for the seed nodes.
This argument can take a single ID tensor or a dictionary of edge types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
replace : bool, optional
If True, sample with replacement.
copy_ndata: bool, optional
If True, the node features of the new graph are copied from
the original graph. If False, the new graph will not have any
node features.
(Default: True)
copy_edata: bool, optional
If True, the edge features of the new graph are copied from
the original graph. If False, the new graph will not have any
edge features.
(Default: True)
_dist_training : bool, optional
Internal argument. Do not use.
(Default: False)
output_device : Framework-specific device context object, optional
The output device. Default is the same as the input graph.
Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges.
Notes
-----
If :attr:`copy_ndata` or :attr:`copy_edata` is True, same tensors are used as
the node or edge features of the original graph and the new graph.
As a result, users should avoid performing in-place operations
on the node features of the new graph to avoid feature corruption.
Examples
--------
Assume that you have the following graph
>>> g = dgl.graph(([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]))
And the weights
>>> g.edata['prob'] = torch.FloatTensor([0., 1., 0., 1., 0., 1.])
To sample one inbound edge for node 0 and node 1:
>>> sg = dgl.sampling.sample_neighbors(g, [0, 1], 1)
>>> sg.edges(order='eid')
(tensor([1, 0]), tensor([0, 1]))
>>> sg.edata[dgl.EID]
tensor([2, 0])
To sample one inbound edge for node 0 and node 1 with probability in edge feature
``prob``:
>>> sg = dgl.sampling.sample_neighbors(g, [0, 1], 1, prob='prob')
>>> sg.edges(order='eid')
(tensor([2, 1]), tensor([0, 1]))
With ``fanout`` greater than the number of actual neighbors and without replacement,
DGL will take all neighbors instead:
>>> sg = dgl.sampling.sample_neighbors(g, [0, 1], 3)
>>> sg.edges(order='eid')
(tensor([1, 2, 0, 1]), tensor([0, 0, 1, 1]))
To exclude certain EID's during sampling for the seed nodes:
>>> g = dgl.graph(([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]))
>>> g_edges = g.all_edges(form='all')``
(tensor([0, 0, 1, 1, 2, 2]), tensor([1, 2, 0, 1, 2, 0]), tensor([0, 1, 2, 3, 4, 5]))
>>> sg = dgl.sampling.sample_neighbors(g, [0, 1], 3, exclude_edges=[0, 1, 2])
>>> sg.all_edges(form='all')
(tensor([2, 1]), tensor([0, 1]), tensor([0, 1]))
>>> sg.has_edges_between(g_edges[0][:3],g_edges[1][:3])
tensor([False, False, False])
>>> g = dgl.heterograph({
... ('drug', 'interacts', 'drug'): ([0, 0, 1, 1, 3, 2], [1, 2, 0, 1, 2, 0]),
... ('drug', 'interacts', 'gene'): ([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]),
... ('drug', 'treats', 'disease'): ([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0])})
>>> g_edges = g.all_edges(form='all', etype=('drug', 'interacts', 'drug'))
(tensor([0, 0, 1, 1, 3, 2]), tensor([1, 2, 0, 1, 2, 0]), tensor([0, 1, 2, 3, 4, 5]))
>>> excluded_edges = {('drug', 'interacts', 'drug'): g_edges[2][:3]}
>>> sg = dgl.sampling.sample_neighbors(g, {'drug':[0, 1]}, 3, exclude_edges=excluded_edges)
>>> sg.all_edges(form='all', etype=('drug', 'interacts', 'drug'))
(tensor([2, 1]), tensor([0, 1]), tensor([0, 1]))
>>> sg.has_edges_between(g_edges[0][:3],g_edges[1][:3],etype=('drug', 'interacts', 'drug'))
tensor([False, False, False])
"""
if F.device_type(g.device) == "cpu" and not g.is_pinned():
frontier = _sample_neighbors(
g,
nodes,
fanout,
edge_dir=edge_dir,
prob=prob,
replace=replace,
copy_ndata=copy_ndata,
copy_edata=copy_edata,
exclude_edges=exclude_edges,
)
else:
frontier = _sample_neighbors(
g,
nodes,
fanout,
edge_dir=edge_dir,
prob=prob,
replace=replace,
copy_ndata=copy_ndata,
copy_edata=copy_edata,
)
if exclude_edges is not None:
eid_excluder = EidExcluder(exclude_edges)
frontier = eid_excluder(frontier)
return frontier if output_device is None else frontier.to(output_device)
def sample_neighbors_fused(
g,
nodes,
fanout,
edge_dir="in",
prob=None,
replace=False,
copy_ndata=True,
copy_edata=True,
exclude_edges=None,
mapping=None,
):
"""Sample neighboring edges of the given nodes and return the induced subgraph.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The graph returned will then contain all the nodes in the
original graph, but only the sampled edges. Nodes will be renumbered starting from id 0,
which would be new node id of first seed node.
Parameters
----------
g : DGLGraph
The graph. Can be either on CPU or GPU.
nodes : tensor or dict
Node IDs to sample neighbors from.
This argument can take a single ID tensor or a dictionary of node types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
fanout : int or dict[etype, int]
The number of edges to be sampled for each node on each edge type.
This argument can take a single int or a dictionary of edge types and ints.
If a single int is given, DGL will sample this number of edges for each node for
every edge type.
If -1 is given for a single edge type, all the neighboring edges with that edge
type and non-zero probability will be selected.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
prob : str, optional
Feature name used as the (unnormalized) probabilities associated with each
neighboring edge of a node. The feature must have only one element for each
edge.
The features must be non-negative floats or boolean. Otherwise, the result
will be undefined.
exclude_edges: tensor or dict
Edge IDs to exclude during sampling neighbors for the seed nodes.
This argument can take a single ID tensor or a dictionary of edge types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
replace : bool, optional
If True, sample with replacement.
copy_ndata: bool, optional
If True, the node features of the new graph are copied from
the original graph. If False, the new graph will not have any
node features.
(Default: True)
copy_edata: bool, optional
If True, the edge features of the new graph are copied from
the original graph. If False, the new graph will not have any
edge features.
(Default: False)
mapping : dictionary, optional
Used by fused version of NeighborSampler. To avoid constant data allocation
provide empty dictionary ({}) that will be allocated once with proper data and reused
by each function call
(Default: None)
Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges.
Notes
-----
If :attr:`copy_ndata` or :attr:`copy_edata` is True, same tensors are used as
the node or edge features of the original graph and the new graph.
As a result, users should avoid performing in-place operations
on the node features of the new graph to avoid feature corruption.
"""
if not g.is_pinned():
frontier = _sample_neighbors(
g,
nodes,
fanout,
edge_dir=edge_dir,
prob=prob,
replace=replace,
copy_ndata=copy_ndata,
copy_edata=copy_edata,
exclude_edges=exclude_edges,
fused=True,
mapping=mapping,
)
else:
frontier = _sample_neighbors(
g,
nodes,
fanout,
edge_dir=edge_dir,
prob=prob,
replace=replace,
copy_ndata=copy_ndata,
copy_edata=copy_edata,
fused=True,
mapping=mapping,
)
if exclude_edges is not None:
eid_excluder = EidExcluder(exclude_edges)
frontier = eid_excluder(frontier)
return frontier
def _sample_neighbors(
g,
nodes,
fanout,
edge_dir="in",
prob=None,
replace=False,
copy_ndata=True,
copy_edata=True,
_dist_training=False,
exclude_edges=None,
fused=False,
mapping=None,
):
if not isinstance(nodes, dict):
if len(g.ntypes) > 1:
raise DGLError(
"Must specify node type when the graph is not homogeneous."
)
nodes = {g.ntypes[0]: nodes}
nodes = utils.prepare_tensor_dict(g, nodes, "nodes")
if len(nodes) == 0:
raise ValueError(
"Got an empty dictionary in the nodes argument. "
"Please pass in a dictionary with empty tensors as values instead."
)
device = utils.context_of(nodes)
ctx = utils.to_dgl_context(device)
nodes_all_types = []
for ntype in g.ntypes:
if ntype in nodes:
nodes_all_types.append(F.to_dgl_nd(nodes[ntype]))
else:
nodes_all_types.append(nd.array([], ctx=ctx))
if isinstance(fanout, nd.NDArray):
fanout_array = fanout
else:
if not isinstance(fanout, dict):
fanout_array = [int(fanout)] * len(g.etypes)
else:
if len(fanout) != len(g.etypes):
raise DGLError(
"Fan-out must be specified for each edge type "
"if a dict is provided."
)
fanout_array = [None] * len(g.etypes)
for etype, value in fanout.items():
fanout_array[g.get_etype_id(etype)] = value
fanout_array = F.to_dgl_nd(F.tensor(fanout_array, dtype=F.int64))
prob_arrays = _prepare_edge_arrays(g, prob)
excluded_edges_all_t = []
if exclude_edges is not None:
if not isinstance(exclude_edges, dict):
if len(g.etypes) > 1:
raise DGLError(
"Must specify etype when the graph is not homogeneous."
)
exclude_edges = {g.canonical_etypes[0]: exclude_edges}
exclude_edges = utils.prepare_tensor_dict(g, exclude_edges, "edges")
for etype in g.canonical_etypes:
if etype in exclude_edges:
excluded_edges_all_t.append(F.to_dgl_nd(exclude_edges[etype]))
else:
excluded_edges_all_t.append(nd.array([], ctx=ctx))
if fused:
if _dist_training:
raise DGLError(
"distributed training not supported in fused sampling"
)
cpu = F.device_type(g.device) == "cpu"
if isinstance(nodes, dict):
for ntype in list(nodes.keys()):
if not cpu:
break
cpu = cpu and F.device_type(nodes[ntype].device) == "cpu"
else:
cpu = cpu and F.device_type(nodes.device) == "cpu"
if not cpu or F.backend_name != "pytorch":
raise DGLError(
"Only PyTorch backend and cpu is supported in fused sampling"
)
if mapping is None:
mapping = {}
mapping_name = "__mapping" + str(os.getpid())
if mapping_name not in mapping.keys():
mapping[mapping_name] = [
torch.LongTensor(g.num_nodes(ntype)).fill_(-1)
for ntype in g.ntypes
]
subgidx, induced_nodes, induced_edges = _CAPI_DGLSampleNeighborsFused(
g._graph,
nodes_all_types,
[F.to_dgl_nd(m) for m in mapping[mapping_name]],
fanout_array,
edge_dir,
prob_arrays,
excluded_edges_all_t,
replace,
)
for mapping_vector, src_nodes in zip(
mapping[mapping_name], induced_nodes
):
mapping_vector[F.from_dgl_nd(src_nodes).type(F.int64)] = -1
new_ntypes = (g.ntypes, g.ntypes)
ret = DGLBlock(subgidx, new_ntypes, g.etypes)
assert ret.is_unibipartite
else:
subgidx = _CAPI_DGLSampleNeighbors(
g._graph,
nodes_all_types,
fanout_array,
edge_dir,
prob_arrays,
excluded_edges_all_t,
replace,
)
ret = DGLGraph(subgidx.graph, g.ntypes, g.etypes)
induced_edges = subgidx.induced_edges
# handle features
# (TODO) (BarclayII) DGL distributed fails with bus error, freezes, or other
# incomprehensible errors with lazy feature copy.
# So in distributed training context, we fall back to old behavior where we
# only set the edge IDs.
if not _dist_training:
if copy_ndata:
if fused:
src_node_ids = [F.from_dgl_nd(src) for src in induced_nodes]
dst_node_ids = [
utils.toindex(
nodes.get(ntype, []), g._idtype_str
).tousertensor(ctx=F.to_backend_ctx(g._graph.ctx))
for ntype in g.ntypes
]
node_frames = utils.extract_node_subframes_for_block(
g, src_node_ids, dst_node_ids
)
utils.set_new_frames(ret, node_frames=node_frames)
else:
node_frames = utils.extract_node_subframes(g, device)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
if fused:
edge_ids = [F.from_dgl_nd(eid) for eid in induced_edges]
edge_frames = utils.extract_edge_subframes(g, edge_ids)
utils.set_new_frames(ret, edge_frames=edge_frames)
else:
edge_frames = utils.extract_edge_subframes(g, induced_edges)
utils.set_new_frames(ret, edge_frames=edge_frames)
else:
for i, etype in enumerate(ret.canonical_etypes):
ret.edges[etype].data[EID] = induced_edges[i]
return ret
DGLGraph.sample_neighbors = utils.alias_func(sample_neighbors)
DGLGraph.sample_neighbors_fused = utils.alias_func(sample_neighbors_fused)
[docs]def sample_neighbors_biased(
g,
nodes,
fanout,
bias,
edge_dir="in",
tag_offset_name="_TAG_OFFSET",
replace=False,
copy_ndata=True,
copy_edata=True,
output_device=None,
):
r"""Sample neighboring edges of the given nodes and return the induced subgraph, where each
neighbor's probability to be picked is determined by its tag.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The graph returned will then contain all the nodes in the
original graph, but only the sampled edges.
This version of neighbor sampling can support the scenario where adjacent nodes with different
types have different sampling probability. Each node is assigned an integer (called a *tag*)
which represents its type. Tag is an analogue of node type under the framework of homogeneous
graphs. Nodes with the same tag share the same probability.
For example, assume a node has :math:`N+M` neighbors, and :math:`N` of them
have tag 0 while :math:`M` of them have tag 1. Assume a node of tag 0 has
an unnormalized probability :math:`p` to be picked while a node of tag 1
has :math:`q`. This function first chooses a tag according to the
unnormalized probability distribution
:math:`\frac{P(tag=0)}{P(tag=1)}=\frac{Np}{Mq}`, and then run a uniform
sampling to get a node of the chosen tag.
In order to make sampling more efficient, the input graph must have its
CSC matrix (or CSR matrix if ``edge_dir='out'``) sorted according to the tag. The API
:func:`~dgl.sort_csc_by_tag` and
:func:`~dgl.sort_csr_by_tag` are designed for this purpose, which
will internally reorder the neighbors by tags so that neighbors of the same tags are
stored in a consecutive range. The two APIs will also store the offsets of these ranges
in a node feature with :attr:`tag_offset_name` as its name.
**Please make sure that the CSR (or CSC) matrix of the graph has been sorted before
calling this function.** This function itself will not check whether the
input graph is sorted. Note that the input :attr:`tag_offset_name` should
be consistent with that in the sorting function.
Only homogeneous or bipartite graphs are supported. For bipartite graphs,
the tag offsets of the source nodes when ``edge_dir='in'`` (or the destination
nodes when ``edge_dir='out'``) will be used in sampling.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the ``dgl.EID`` feature in the returned graph.
Parameters
----------
g : DGLGraph
The graph. Must be homogeneous or bipartite (only one edge type). Must be on CPU.
nodes : tensor or list
Node IDs to sample neighbors from.
fanout : int
The number of edges to be sampled for each node on each edge type.
If -1 is given, all the neighboring edges with non-zero probability will be selected.
bias : tensor or list
The (unnormalized) probabilities associated with each tag. Its length should be equal
to the number of tags.
Entries of this array must be non-negative floats. Otherwise, the result will be
undefined.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
tag_offset_name : str, optional
The name of the node feature storing tag offsets.
(Default: "_TAG_OFFSET")
replace : bool, optional
If True, sample with replacement.
copy_ndata: bool, optional
If True, the node features of the new graph are copied from
the original graph. If False, the new graph will not have any
node features.
(Default: True)
copy_edata: bool, optional
If True, the edge features of the new graph are copied from
the original graph. If False, the new graph will not have any
edge features.
(Default: True)
output_device : Framework-specific device context object, optional
The output device. Default is the same as the input graph.
Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges. It is on CPU.
Notes
-----
If :attr:`copy_ndata` or :attr:`copy_edata` is True, same tensors are used as
the node or edge features of the original graph and the new graph.
As a result, users should avoid performing in-place operations
on the node features of the new graph to avoid feature corruption.
See Also
--------
dgl.sort_csc_by_tag
dgl.sort_csr_by_tag
Examples
--------
Assume that you have the following graph
>>> g = dgl.graph(([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]))
And the tags
>>> tag = torch.IntTensor([0, 0, 1])
Sort the graph (necessary!)
>>> g_sorted = dgl.transforms.sort_csr_by_tag(g, tag)
>>> g_sorted.ndata['_TAG_OFFSET']
tensor([[0, 1, 2],
[0, 2, 2],
[0, 1, 2]])
Set the probability of each tag:
>>> bias = torch.tensor([1.0, 0.001])
>>> # node 2 is almost impossible to be sampled because it has tag 1.
To sample one out bound edge for node 0 and node 2:
>>> sg = dgl.sampling.sample_neighbors_biased(g_sorted, [0, 2], 1, bias, edge_dir='out')
>>> sg.edges(order='eid')
(tensor([0, 2]), tensor([1, 0]))
>>> sg.edata[dgl.EID]
tensor([0, 5])
With ``fanout`` greater than the number of actual neighbors and without replacement,
DGL will take all neighbors instead:
>>> sg = dgl.sampling.sample_neighbors_biased(g_sorted, [0, 2], 3, bias, edge_dir='out')
>>> sg.edges(order='eid')
(tensor([0, 0, 2, 2]), tensor([1, 2, 0, 2]))
"""
if isinstance(nodes, list):
nodes = F.tensor(nodes)
if isinstance(bias, list):
bias = F.tensor(bias)
device = utils.context_of(nodes)
nodes_array = F.to_dgl_nd(nodes)
bias_array = F.to_dgl_nd(bias)
if edge_dir == "in":
tag_offset_array = F.to_dgl_nd(g.dstdata[tag_offset_name])
elif edge_dir == "out":
tag_offset_array = F.to_dgl_nd(g.srcdata[tag_offset_name])
else:
raise DGLError("edge_dir can only be 'in' or 'out'")
subgidx = _CAPI_DGLSampleNeighborsBiased(
g._graph,
nodes_array,
fanout,
bias_array,
tag_offset_array,
edge_dir,
replace,
)
induced_edges = subgidx.induced_edges
ret = DGLGraph(subgidx.graph, g.ntypes, g.etypes)
if copy_ndata:
node_frames = utils.extract_node_subframes(g, device)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
edge_frames = utils.extract_edge_subframes(g, induced_edges)
utils.set_new_frames(ret, edge_frames=edge_frames)
ret.edata[EID] = induced_edges[0]
return ret if output_device is None else ret.to(output_device)
DGLGraph.sample_neighbors_biased = utils.alias_func(sample_neighbors_biased)
[docs]def select_topk(
g,
k,
weight,
nodes=None,
edge_dir="in",
ascending=False,
copy_ndata=True,
copy_edata=True,
output_device=None,
):
"""Select the neighboring edges with k-largest (or k-smallest) weights of the given
nodes and return the induced subgraph.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
with the largest (or smallest when ``ascending == True``) weights will be chosen.
The graph returned will then contain all the nodes in the original graph, but only
the sampled edges.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
Parameters
----------
g : DGLGraph
The graph. Must be on CPU.
k : int or dict[etype, int]
The number of edges to be selected for each node on each edge type.
This argument can take a single int or a dictionary of edge types and ints.
If a single int is given, DGL will select this number of edges for each node for
every edge type.
If -1 is given for a single edge type, all the neighboring edges with that edge
type will be selected.
weight : str
Feature name of the weights associated with each edge. The feature should have only
one element for each edge. The feature can be either int32/64 or float32/64.
nodes : tensor or dict, optional
Node IDs to sample neighbors from.
This argument can take a single ID tensor or a dictionary of node types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
If None, DGL will select the edges for all nodes.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
ascending : bool, optional
If True, DGL will return edges with k-smallest weights instead of
k-largest weights.
copy_ndata: bool, optional
If True, the node features of the new graph are copied from
the original graph. If False, the new graph will not have any
node features.
(Default: True)
copy_edata: bool, optional
If True, the edge features of the new graph are copied from
the original graph. If False, the new graph will not have any
edge features.
(Default: True)
output_device : Framework-specific device context object, optional
The output device. Default is the same as the input graph.
Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges. It is on CPU.
Notes
-----
If :attr:`copy_ndata` or :attr:`copy_edata` is True, same tensors are used as
the node or edge features of the original graph and the new graph.
As a result, users should avoid performing in-place operations
on the node features of the new graph to avoid feature corruption.
Examples
--------
>>> g = dgl.graph(([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]))
>>> g.edata['weight'] = torch.FloatTensor([0, 1, 0, 1, 0, 1])
>>> sg = dgl.sampling.select_topk(g, 1, 'weight')
>>> sg.edges(order='eid')
(tensor([2, 1, 0]), tensor([0, 1, 2]))
"""
# Rectify nodes to a dictionary
if nodes is None:
nodes = {
ntype: F.astype(F.arange(0, g.num_nodes(ntype)), g.idtype)
for ntype in g.ntypes
}
elif not isinstance(nodes, dict):
if len(g.ntypes) > 1:
raise DGLError(
"Must specify node type when the graph is not homogeneous."
)
nodes = {g.ntypes[0]: nodes}
assert g.device == F.cpu(), "Graph must be on CPU."
# Parse nodes into a list of NDArrays.
nodes = utils.prepare_tensor_dict(g, nodes, "nodes")
device = utils.context_of(nodes)
nodes_all_types = []
for ntype in g.ntypes:
if ntype in nodes:
nodes_all_types.append(F.to_dgl_nd(nodes[ntype]))
else:
nodes_all_types.append(nd.array([], ctx=nd.cpu()))
if not isinstance(k, dict):
k_array = [int(k)] * len(g.etypes)
else:
if len(k) != len(g.etypes):
raise DGLError(
"K value must be specified for each edge type "
"if a dict is provided."
)
k_array = [None] * len(g.etypes)
for etype, value in k.items():
k_array[g.get_etype_id(etype)] = value
k_array = F.to_dgl_nd(F.tensor(k_array, dtype=F.int64))
weight_arrays = []
for etype in g.canonical_etypes:
if weight in g.edges[etype].data:
weight_arrays.append(F.to_dgl_nd(g.edges[etype].data[weight]))
else:
raise DGLError(
'Edge weights "{}" do not exist for relation graph "{}".'.format(
weight, etype
)
)
subgidx = _CAPI_DGLSampleNeighborsTopk(
g._graph,
nodes_all_types,
k_array,
edge_dir,
weight_arrays,
bool(ascending),
)
induced_edges = subgidx.induced_edges
ret = DGLGraph(subgidx.graph, g.ntypes, g.etypes)
# handle features
if copy_ndata:
node_frames = utils.extract_node_subframes(g, device)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
edge_frames = utils.extract_edge_subframes(g, induced_edges)
utils.set_new_frames(ret, edge_frames=edge_frames)
return ret if output_device is None else ret.to(output_device)
DGLGraph.select_topk = utils.alias_func(select_topk)
_init_api("dgl.sampling.neighbor", __name__)