#
# Copyright (c) 2022 by Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Based off of neighbor.py
#
"""Labor sampling APIs"""
from .. import backend as F, ndarray as nd, utils
from .._ffi.function import _init_api
from ..base import DGLError
from ..heterograph import DGLGraph
from ..random import choice
from .utils import EidExcluder
__all__ = ["sample_labors"]
[docs]def sample_labors(
g,
nodes,
fanout,
edge_dir="in",
prob=None,
importance_sampling=0,
random_seed=None,
seed2_contribution=0,
copy_ndata=True,
copy_edata=True,
exclude_edges=None,
output_device=None,
):
"""Sampler that builds computational dependency of node representations via
labor sampling for multilayer GNN from
`(LA)yer-neigh(BOR) Sampling: Defusing Neighborhood Explosion in GNNs
<https://arxiv.org/abs/2210.13339>`
This sampler will make every node gather messages from a fixed number of neighbors
per edge type. The neighbors are picked uniformly with default parameters. For every vertex t
that will be considered to be sampled, there will be a single random variate r_t.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The graph returned will then contain all the nodes in the
original graph, but only the sampled edges.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
Parameters
----------
g : DGLGraph
The graph, allowed to have multiple node or edge types. Can be either on CPU or GPU.
nodes : tensor or dict
Node IDs to sample neighbors from.
This argument can take a single ID tensor or a dictionary of node types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
fanout : int or dict[etype, int]
The number of edges to be sampled for each node on each edge type.
This argument can take a single int or a dictionary of edge types and ints.
If a single int is given, DGL will sample this number of edges for each node for
every edge type.
If -1 is given for a single edge type, all the neighboring edges with that edge
type will be selected.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
prob : str, optional
Feature name used as the (unnormalized) probabilities associated with each
neighboring edge of a node. The feature must have only one element for each
edge.
The features must be non-negative floats, and the sum of the features of
inbound/outbound edges for every node must be positive (though they don't have
to sum up to one). Otherwise, the result will be undefined.
If :attr:`prob` is not None, GPU sampling is not supported.
importance_sampling : int, optional
Whether to use importance sampling or uniform sampling, use of negative values optimizes
importance sampling probabilities until convergence while use of positive values runs
optimization steps that many times. If the value is i, then LABOR-i variant is used.
random_seed : tensor
An int64 tensor with one element.
The passed random_seed makes it so that for any seed vertex ``s`` and its neighbor ``t``,
the rolled random variate ``r_t`` is the same for any call to this function with the same
random seed. When sampling as part of the same batch, one would want identical seeds so that
LABOR can globally sample. One example is that for heterogenous graphs, there is a single
random seed passed for each edge type. This will sample much fewer vertices compared to
having unique random seeds for each edge type. If one called this function individually for
each edge type for a heterogenous graph with different random seeds, then it would run LABOR
locally for each edge type, resulting into a larger number of vertices being sampled.
If this function is called without a ``random_seed``, we get the random seed by getting a
random number from DGL. Use this argument with identical random_seed if multiple calls to
this function are used to sample as part of a single batch.
seed2_contribution : float, optional
A float value between [0, 1) that determines the contribution
of the second random seed to generate the random variates for the
LABOR sampling algorithm.
copy_ndata: bool, optional
If True, the node features of the new graph are copied from
the original graph. If False, the new graph will not have any
node features.
(Default: True)
copy_edata: bool, optional
If True, the edge features of the new graph are copied from
the original graph. If False, the new graph will not have any
edge features.
(Default: True)
exclude_edges: tensor or dict
Edge IDs to exclude during sampling neighbors for the seed nodes.
This argument can take a single ID tensor or a dictionary of edge types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
output_device : Framework-specific device context object, optional
The output device. Default is the same as the input graph.
Returns
-------
tuple(DGLGraph, list[Tensor])
A sampled subgraph containing only the sampled neighboring edges along with edge weights.
Notes
-----
If :attr:`copy_ndata` or :attr:`copy_edata` is True, same tensors are used as
the node or edge features of the original graph and the new graph.
As a result, users should avoid performing in-place operations
on the node features of the new graph to avoid feature corruption.
Examples
--------
Assume that you have the following graph
>>> g = dgl.graph(([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]))
And the weights
>>> g.edata['prob'] = torch.FloatTensor([0., 1., 0., 1., 0., 1.])
To sample one inbound edge for node 0 and node 1:
>>> sg = dgl.sampling.sample_labors(g, [0, 1], 1)
>>> sg.edges(order='eid')
(tensor([1, 0]), tensor([0, 1]))
>>> sg.edata[dgl.EID]
tensor([2, 0])
To sample one inbound edge for node 0 and node 1 with probability in edge feature
``prob``:
>>> sg = dgl.sampling.sample_labors(g, [0, 1], 1, prob='prob')
>>> sg.edges(order='eid')
(tensor([2, 1]), tensor([0, 1]))
With ``fanout`` greater than the number of actual neighbors and without replacement,
DGL will take all neighbors instead:
>>> sg = dgl.sampling.sample_labors(g, [0, 1], 3)
>>> sg.edges(order='eid')
(tensor([1, 2, 0, 1]), tensor([0, 0, 1, 1]))
To exclude certain EID's during sampling for the seed nodes:
>>> g = dgl.graph(([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]))
>>> g_edges = g.all_edges(form='all')``
(tensor([0, 0, 1, 1, 2, 2]), tensor([1, 2, 0, 1, 2, 0]), tensor([0, 1, 2, 3, 4, 5]))
>>> sg = dgl.sampling.sample_labors(g, [0, 1], 3, exclude_edges=[0, 1, 2])
>>> sg.all_edges(form='all')
(tensor([2, 1]), tensor([0, 1]), tensor([0, 1]))
>>> sg.has_edges_between(g_edges[0][:3],g_edges[1][:3])
tensor([False, False, False])
>>> g = dgl.heterograph({
... ('drug', 'interacts', 'drug'): ([0, 0, 1, 1, 3, 2], [1, 2, 0, 1, 2, 0]),
... ('drug', 'interacts', 'gene'): ([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]),
... ('drug', 'treats', 'disease'): ([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0])})
>>> g_edges = g.all_edges(form='all', etype=('drug', 'interacts', 'drug'))
(tensor([0, 0, 1, 1, 3, 2]), tensor([1, 2, 0, 1, 2, 0]), tensor([0, 1, 2, 3, 4, 5]))
>>> excluded_edges = {('drug', 'interacts', 'drug'): g_edges[2][:3]}
>>> sg = dgl.sampling.sample_labors(g, {'drug':[0, 1]}, 3, exclude_edges=excluded_edges)
>>> sg.all_edges(form='all', etype=('drug', 'interacts', 'drug'))
(tensor([2, 1]), tensor([0, 1]), tensor([0, 1]))
>>> sg.has_edges_between(g_edges[0][:3],g_edges[1][:3],etype=('drug', 'interacts', 'drug'))
tensor([False, False, False])
"""
if F.device_type(g.device) == "cpu" and not g.is_pinned():
frontier, importances = _sample_labors(
g,
nodes,
fanout,
edge_dir=edge_dir,
prob=prob,
importance_sampling=importance_sampling,
random_seed=random_seed,
seed2_contribution=seed2_contribution,
copy_ndata=copy_ndata,
copy_edata=copy_edata,
exclude_edges=exclude_edges,
)
else:
frontier, importances = _sample_labors(
g,
nodes,
fanout,
edge_dir=edge_dir,
prob=prob,
importance_sampling=importance_sampling,
random_seed=random_seed,
seed2_contribution=seed2_contribution,
copy_ndata=copy_ndata,
copy_edata=copy_edata,
)
if exclude_edges is not None:
eid_excluder = EidExcluder(exclude_edges)
frontier, importances = eid_excluder(frontier, importances)
if output_device is None:
return (frontier, importances)
else:
return (
frontier.to(output_device),
list(map(lambda x: x.to(output_device), importances)),
)
def _sample_labors(
g,
nodes,
fanout,
edge_dir="in",
prob=None,
importance_sampling=0,
random_seed=None,
seed2_contribution=0,
copy_ndata=True,
copy_edata=True,
exclude_edges=None,
):
if random_seed is None:
random_seed = F.to_dgl_nd(choice(1e18, 1))
if not isinstance(nodes, dict):
if len(g.ntypes) > 1:
raise DGLError(
"Must specify node type when the graph is not homogeneous."
)
nodes = {g.ntypes[0]: nodes}
nodes = utils.prepare_tensor_dict(g, nodes, "nodes")
if len(nodes) == 0:
raise ValueError(
"Got an empty dictionary in the nodes argument. "
"Please pass in a dictionary with empty tensors as values instead."
)
ctx = utils.to_dgl_context(F.context(next(iter(nodes.values()))))
nodes_all_types = []
# nids_all_types is needed if one wants labor to work for subgraphs whose vertices have
# been renamed and the rolled randoms should be rolled for global vertex ids.
# It is disabled for now below by passing empty ndarrays.
nids_all_types = [nd.array([], ctx=ctx) for _ in g.ntypes]
for ntype in g.ntypes:
if ntype in nodes:
nodes_all_types.append(F.to_dgl_nd(nodes[ntype]))
else:
nodes_all_types.append(nd.array([], ctx=ctx))
if isinstance(fanout, nd.NDArray):
fanout_array = fanout
else:
if not isinstance(fanout, dict):
fanout_array = [int(fanout)] * len(g.etypes)
else:
if len(fanout) != len(g.etypes):
raise DGLError(
"Fan-out must be specified for each edge type "
"if a dict is provided."
)
fanout_array = [None] * len(g.etypes)
for etype, value in fanout.items():
fanout_array[g.get_etype_id(etype)] = value
fanout_array = F.to_dgl_nd(F.tensor(fanout_array, dtype=F.int64))
if (
isinstance(prob, list)
and len(prob) > 0
and isinstance(prob[0], nd.NDArray)
):
prob_arrays = prob
elif prob is None:
prob_arrays = [nd.array([], ctx=nd.cpu())] * len(g.etypes)
else:
prob_arrays = []
for etype in g.canonical_etypes:
if prob in g.edges[etype].data:
prob_arrays.append(F.to_dgl_nd(g.edges[etype].data[prob]))
else:
prob_arrays.append(nd.array([], ctx=nd.cpu()))
excluded_edges_all_t = []
if exclude_edges is not None:
if not isinstance(exclude_edges, dict):
if len(g.etypes) > 1:
raise DGLError(
"Must specify etype when the graph is not homogeneous."
)
exclude_edges = {g.canonical_etypes[0]: exclude_edges}
exclude_edges = utils.prepare_tensor_dict(g, exclude_edges, "edges")
for etype in g.canonical_etypes:
if etype in exclude_edges:
excluded_edges_all_t.append(F.to_dgl_nd(exclude_edges[etype]))
else:
excluded_edges_all_t.append(nd.array([], ctx=ctx))
ret_val = _CAPI_DGLSampleLabors(
g._graph,
nodes_all_types,
fanout_array,
edge_dir,
prob_arrays,
excluded_edges_all_t,
importance_sampling,
random_seed,
seed2_contribution,
nids_all_types,
)
subgidx = ret_val[0]
importances = [F.from_dgl_nd(importance) for importance in ret_val[1:]]
induced_edges = subgidx.induced_edges
ret = DGLGraph(subgidx.graph, g.ntypes, g.etypes)
if copy_ndata:
node_frames = utils.extract_node_subframes(g, None)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
edge_frames = utils.extract_edge_subframes(g, induced_edges)
utils.set_new_frames(ret, edge_frames=edge_frames)
return ret, importances
DGLGraph.sample_labors = utils.alias_func(sample_labors)
_init_api("dgl.sampling.labor", __name__)