"""Module for graph partition utilities."""importosimportreimporttimeimportnumpyasnpfrom.importbackendasF,utilsfrom._ffi.functionimport_init_apifrom.baseimportEID,ETYPE,NID,NTYPEfrom.heterographimportDGLGraphfrom.ndarrayimportNDArrayfrom.subgraphimportedge_subgraph__all__=["metis_partition","metis_partition_assignment","partition_graph_with_halo",]defreorder_nodes(g,new_node_ids):"""Generate a new graph with new node IDs. We assign each node in the input graph with a new node ID. This results in a new graph. Parameters ---------- g : DGLGraph The input graph new_node_ids : a tensor The new node IDs Returns ------- DGLGraph The graph with new node IDs. """assert(len(new_node_ids)==g.num_nodes()),"The number of new node ids must match #nodes in the graph."new_node_ids=utils.toindex(new_node_ids)sorted_ids,idx=F.sort_1d(new_node_ids.tousertensor())assert(F.asnumpy(sorted_ids[0])==0andF.asnumpy(sorted_ids[-1])==g.num_nodes()-1),"The new node IDs are incorrect."new_gidx=_CAPI_DGLReorderGraph_Hetero(g._graph,new_node_ids.todgltensor())new_g=DGLGraph(gidx=new_gidx,ntypes=["_N"],etypes=["_E"])new_g.ndata["orig_id"]=idxreturnnew_gdef_get_halo_heterosubgraph_inner_node(halo_subg):return_CAPI_GetHaloSubgraphInnerNodes_Hetero(halo_subg)defreshuffle_graph(g,node_part=None):"""Reshuffle node ids and edge IDs of a graph. This function reshuffles nodes and edges in a graph so that all nodes/edges of the same type have contiguous IDs. If a graph is partitioned and nodes are assigned to different partitions, all nodes/edges in a partition should get contiguous IDs; within a partition, all nodes/edges of the same type have contigous IDs. Parameters ---------- g : DGLGraph The input graph. node_part : Tensor This is a vector whose length is the same as the number of nodes in the input graph. Each element indicates the partition ID the corresponding node is assigned to. Returns ------- (DGLGraph, Tensor) The graph whose nodes and edges are reshuffled. The 1D tensor that indicates the partition IDs of the nodes in the reshuffled graph. """# In this case, we don't need to reshuffle node IDs and edge IDs.ifnode_partisNone:g.ndata["orig_id"]=F.arange(0,g.num_nodes())g.edata["orig_id"]=F.arange(0,g.num_edges())returng,Nonestart=time.time()ifnode_partisnotNone:node_part=utils.toindex(node_part)node_part=node_part.tousertensor()ifNTYPEing.ndata:is_hetero=len(F.unique(g.ndata[NTYPE]))>1else:is_hetero=Falseifis_hetero:num_node_types=F.max(g.ndata[NTYPE],0)+1ifnode_partisnotNone:sorted_part,new2old_map=F.sort_1d(node_part*num_node_types+g.ndata[NTYPE])else:sorted_part,new2old_map=F.sort_1d(g.ndata[NTYPE])sorted_part=F.floor_div(sorted_part,num_node_types)elifnode_partisnotNone:sorted_part,new2old_map=F.sort_1d(node_part)else:g.ndata["orig_id"]=g.ndata[NID]g.edata["orig_id"]=g.edata[EID]returng,Nonenew_node_ids=np.zeros((g.num_nodes(),),dtype=np.int64)new_node_ids[F.asnumpy(new2old_map)]=np.arange(0,g.num_nodes())# If the input graph is homogneous, we only need to create an empty array, so that# _CAPI_DGLReassignEdges_Hetero knows how to handle it.etype=(g.edata[ETYPE]ifETYPEing.edataelseF.zeros((0),F.dtype(sorted_part),F.cpu()))g=reorder_nodes(g,new_node_ids)node_part=utils.toindex(sorted_part)# We reassign edges in in-CSR. In this way, after partitioning, we can ensure# that all edges in a partition are in the contiguous ID space.etype_idx=utils.toindex(etype)orig_eids=_CAPI_DGLReassignEdges_Hetero(g._graph,etype_idx.todgltensor(),node_part.todgltensor(),True)orig_eids=utils.toindex(orig_eids)orig_eids=orig_eids.tousertensor()g.edata["orig_id"]=orig_eidsprint("Reshuffle nodes and edges: {:.3f} seconds".format(time.time()-start))returng,node_part.tousertensor()
[docs]defpartition_graph_with_halo(g,node_part,extra_cached_hops,reshuffle=False):"""Partition a graph. Based on the given node assignments for each partition, the function splits the input graph into subgraphs. A subgraph may contain HALO nodes which does not belong to the partition of a subgraph but are connected to the nodes in the partition within a fixed number of hops. If `reshuffle` is turned on, the function reshuffles node IDs and edge IDs of the input graph before partitioning. After reshuffling, all nodes and edges in a partition fall in a contiguous ID range in the input graph. The partitioend subgraphs have node data 'orig_id', which stores the node IDs in the original input graph. Parameters ------------ g: DGLGraph The graph to be partitioned node_part: 1D tensor Specify which partition a node is assigned to. The length of this tensor needs to be the same as the number of nodes of the graph. Each element indicates the partition ID of a node. extra_cached_hops: int The number of hops a HALO node can be accessed. reshuffle : bool Resuffle nodes so that nodes in the same partition are in the same ID range. Returns -------- a dict of DGLGraphs The key is the partition ID and the value is the DGLGraph of the partition. Tensor 1D tensor that stores the mapping between the reshuffled node IDs and the original node IDs if 'reshuffle=True'. Otherwise, return None. Tensor 1D tensor that stores the mapping between the reshuffled edge IDs and the original edge IDs if 'reshuffle=True'. Otherwise, return None. """assertlen(node_part)==g.num_nodes()ifreshuffle:g,node_part=reshuffle_graph(g,node_part)orig_nids=g.ndata["orig_id"]orig_eids=g.edata["orig_id"]node_part=utils.toindex(node_part)start=time.time()subgs=_CAPI_DGLPartitionWithHalo_Hetero(g._graph,node_part.todgltensor(),extra_cached_hops)# g is no longer needed. Free memory.g=Noneprint("Split the graph: {:.3f} seconds".format(time.time()-start))subg_dict={}node_part=node_part.tousertensor()start=time.time()# This function determines whether an edge belongs to a partition.# An edge is assigned to a partition based on its destination node. If its destination node# is assigned to a partition, we assign the edge to the partition as well.defget_inner_edge(subg,inner_node):inner_edge=F.zeros((subg.num_edges(),),F.int8,F.cpu())inner_nids=F.nonzero_1d(inner_node)# TODO(zhengda) we need to fix utils.toindex() to avoid the dtype cast below.inner_nids=F.astype(inner_nids,F.int64)inner_eids=subg.in_edges(inner_nids,form="eid")inner_edge=F.scatter_row(inner_edge,inner_eids,F.ones((len(inner_eids),),F.dtype(inner_edge),F.cpu()),)returninner_edge# This creaets a subgraph from subgraphs returned from the CAPI above.defcreate_subgraph(subg,induced_nodes,induced_edges,inner_node):subg1=DGLGraph(gidx=subg.graph,ntypes=["_N"],etypes=["_E"])# If IDs are shuffled, we should shuffled edges. This will help us collect edge data# from the distributed graph after training.ifreshuffle:# When we shuffle edges, we need to make sure that the inner edges are assigned with# contiguous edge IDs and their ID range starts with 0. In other words, we want to# place these edge IDs in the front of the edge list. To ensure that, we add the IDs# of outer edges with a large value, so we will get the sorted list as we want.max_eid=F.max(induced_edges[0],0)+1inner_edge=get_inner_edge(subg1,inner_node)eid=F.astype(induced_edges[0],F.int64)+max_eid*F.astype(inner_edge==0,F.int64)_,index=F.sort_1d(eid)subg1=edge_subgraph(subg1,index,relabel_nodes=False)subg1.ndata[NID]=induced_nodes[0]subg1.edata[EID]=F.gather_row(induced_edges[0],index)else:subg1.ndata[NID]=induced_nodes[0]subg1.edata[EID]=induced_edges[0]returnsubg1fori,subginenumerate(subgs):inner_node=_get_halo_heterosubgraph_inner_node(subg)inner_node=F.zerocopy_from_dlpack(inner_node.to_dlpack())subg=create_subgraph(subg,subg.induced_nodes,subg.induced_edges,inner_node)subg.ndata["inner_node"]=inner_nodesubg.ndata["part_id"]=F.gather_row(node_part,subg.ndata[NID])ifreshuffle:subg.ndata["orig_id"]=F.gather_row(orig_nids,subg.ndata[NID])subg.edata["orig_id"]=F.gather_row(orig_eids,subg.edata[EID])ifextra_cached_hops>=1:inner_edge=get_inner_edge(subg,inner_node)else:inner_edge=F.ones((subg.num_edges(),),F.int8,F.cpu())subg.edata["inner_edge"]=inner_edgesubg_dict[i]=subgprint("Construct subgraphs: {:.3f} seconds".format(time.time()-start))ifreshuffle:returnsubg_dict,orig_nids,orig_eidselse:returnsubg_dict,None,None
defget_peak_mem():"""Get the peak memory size. Returns ------- float The peak memory size in GB. """ifnotos.path.exists("/proc/self/status"):return0.0forlineinopen("/proc/self/status","r"):if"VmPeak"inline:mem=re.findall(r"\d+",line)[0]returnint(mem)/1024/1024return0.0
[docs]defmetis_partition_assignment(g,k,balance_ntypes=None,balance_edges=False,mode="k-way",objtype="cut"):"""This assigns nodes to different partitions with Metis partitioning algorithm. When performing Metis partitioning, we can put some constraint on the partitioning. Current, it supports two constrants to balance the partitioning. By default, Metis always tries to balance the number of nodes in each partition. * `balance_ntypes` balances the number of nodes of different types in each partition. * `balance_edges` balances the number of edges in each partition. To balance the node types, a user needs to pass a vector of N elements to indicate the type of each node. N is the number of nodes in the input graph. After the partition assignment, we construct partitions. Parameters ---------- g : DGLGraph The graph to be partitioned k : int The number of partitions. balance_ntypes : tensor Node type of each node balance_edges : bool Indicate whether to balance the edges. mode : str, "k-way" or "recursive" Whether use multilevel recursive bisection or multilevel k-way paritioning. objtype : str, "cut" or "vol" Set the objective as edge-cut minimization or communication volume minimization. This argument is used by the Metis algorithm. Returns ------- a 1-D tensor A vector with each element that indicates the partition ID of a vertex. """assertmodein("k-way","recursive",),"'mode' can only be 'k-way' or 'recursive'"assert(g.idtype==F.int64),"IdType of graph is required to be int64 for now."# METIS works only on symmetric graphs.# The METIS runs on the symmetric graph to generate the node assignment to partitions.start=time.time()sym_gidx=_CAPI_DGLMakeSymmetric_Hetero(g._graph)sym_g=DGLGraph(gidx=sym_gidx)print("Convert a graph into a bidirected graph: {:.3f} seconds, peak memory: {:.3f} GB".format(time.time()-start,get_peak_mem()))vwgt=[]# To balance the node types in each partition, we can take advantage of the vertex weights# in Metis. When vertex weights are provided, Metis will tries to generate partitions with# balanced vertex weights. A vertex can be assigned with multiple weights. The vertex weights# are stored in a vector of N * w elements, where N is the number of vertices and w# is the number of weights per vertex. Metis tries to balance the first weight, and then# the second weight, and so on.# When balancing node types, we use the first weight to indicate the first node type.# if a node belongs to the first node type, its weight is set to 1; otherwise, 0.# Similary, we set the second weight for the second node type and so on. The number# of weights is the same as the number of node types.start=time.time()ifbalance_ntypesisnotNone:assert(len(balance_ntypes)==g.num_nodes()),"The length of balance_ntypes should be equal to #nodes in the graph"balance_ntypes=F.tensor(balance_ntypes)uniq_ntypes=F.unique(balance_ntypes)forntypeinuniq_ntypes:vwgt.append(F.astype(balance_ntypes==ntype,F.int64))# When balancing edges in partitions, we use in-degree as one of the weights.ifbalance_edges:ifbalance_ntypesisNone:vwgt.append(F.astype(g.in_degrees(),F.int64))else:forntypeinuniq_ntypes:nids=F.asnumpy(F.nonzero_1d(balance_ntypes==ntype))degs=np.zeros((g.num_nodes(),),np.int64)degs[nids]=F.asnumpy(g.in_degrees(nids))vwgt.append(F.zerocopy_from_numpy(degs))# The vertex weights have to be stored in a vector.iflen(vwgt)>0:vwgt=F.stack(vwgt,1)shape=(np.prod(F.shape(vwgt),),)vwgt=F.reshape(vwgt,shape)vwgt=F.to_dgl_nd(vwgt)else:vwgt=F.zeros((0,),F.int64,F.cpu())vwgt=F.to_dgl_nd(vwgt)print("Construct multi-constraint weights: {:.3f} seconds, peak memory: {:.3f} GB".format(time.time()-start,get_peak_mem()))start=time.time()node_part=_CAPI_DGLMetisPartition_Hetero(sym_g._graph,k,vwgt,mode,(objtype=="cut"))print("Metis partitioning: {:.3f} seconds, peak memory: {:.3f} GB".format(time.time()-start,get_peak_mem()))iflen(node_part)==0:returnNoneelse:node_part=utils.toindex(node_part)returnnode_part.tousertensor()
[docs]defmetis_partition(g,k,extra_cached_hops=0,reshuffle=False,balance_ntypes=None,balance_edges=False,mode="k-way",):"""This is to partition a graph with Metis partitioning. Metis assigns vertices to partitions. This API constructs subgraphs with the vertices assigned to the partitions and their incoming edges. A subgraph may contain HALO nodes which does not belong to the partition of a subgraph but are connected to the nodes in the partition within a fixed number of hops. When performing Metis partitioning, we can put some constraint on the partitioning. Current, it supports two constrants to balance the partitioning. By default, Metis always tries to balance the number of nodes in each partition. * `balance_ntypes` balances the number of nodes of different types in each partition. * `balance_edges` balances the number of edges in each partition. To balance the node types, a user needs to pass a vector of N elements to indicate the type of each node. N is the number of nodes in the input graph. If `reshuffle` is turned on, the function reshuffles node IDs and edge IDs of the input graph before partitioning. After reshuffling, all nodes and edges in a partition fall in a contiguous ID range in the input graph. The partitioend subgraphs have node data 'orig_id', which stores the node IDs in the original input graph. The partitioned subgraph is stored in DGLGraph. The DGLGraph has the `part_id` node data that indicates the partition a node belongs to. The subgraphs do not contain the node/edge data in the input graph. Parameters ------------ g: DGLGraph The graph to be partitioned k: int The number of partitions. extra_cached_hops: int The number of hops a HALO node can be accessed. reshuffle : bool Resuffle nodes so that nodes in the same partition are in the same ID range. balance_ntypes : tensor Node type of each node balance_edges : bool Indicate whether to balance the edges. mode : str, "k-way" or "recursive" Whether use multilevel recursive bisection or multilevel k-way paritioning. Returns -------- a dict of DGLGraphs The key is the partition ID and the value is the DGLGraph of the partition. """assertmodein("k-way","recursive",),"'mode' can only be 'k-way' or 'recursive'"node_part=metis_partition_assignment(g,k,balance_ntypes,balance_edges,mode)ifnode_partisNone:returnNone# Then we split the original graph into parts based on the METIS partitioning results.returnpartition_graph_with_halo(g,node_part,extra_cached_hops,reshuffle)[0]
classNDArrayPartition(object):"""Create a new partition of an NDArray. That is, an object which assigns each row of an NDArray to a specific partition. Parameters ---------- array_size : int The first dimension of the array being partitioned. num_parts : int The number of parts to divide the array into. mode : String The type of partition. Currently, the only valid values are 'remainder' and 'range'. 'remainder' assigns rows based on remainder when dividing the row id by the number of parts (e.g., i % num_parts). 'range' assigns rows based on which part of the range 'part_ranges' they fall into. part_ranges : Tensor or dgl.NDArray, Optional Should only be specified when the mode is 'range'. Should be of the length `num_parts + 1`, and be the exclusive prefix-sum of the number of nodes in each partition. That is, for 3 partitions, we could have the list [0, a, b, 'array_size'], and all rows with index less than 'a' are assigned to partition 0, all rows with index greater than or equal to 'a' and less than 'b' are in partition 1, and all rows with index greater or equal to 'b' are in partition 2. Should have the same context as the partitioned NDArray (i.e., be on the same GPU). Examples -------- A partition of a homgeonous graph `g`, where the vertices are striped across processes can be generated via: >>> from dgl.partition import NDArrayPartition >>> part = NDArrayPartition(g.num_nodes(), num_parts, mode='remainder' ) A range based partition of a homogenous graph `g`'s nodes, where the nodes are stored in contiguous memory. This converts an existing range based partitioning (e.g. from a dgl.distributed.graph_partition_book.RangePartitionBook) 'max_node_map', to an NDArrayPartition 'part'. >>> part_range = [0] >>> for part in part_book.metadata(): >>> part_range.append(part_range[-1] + part['num_nodes']) >>> part = NDArrayPartition(g.num_nodes(), num_parts, mode='range', ... part_ranges=part_range) """def__init__(self,array_size,num_parts,mode="remainder",part_ranges=None):assertnum_parts>0,'Invalid "num_parts", must be > 0.'ifmode=="remainder":assertpart_rangesisNone,("When using remainder-based "'partitioning, "part_ranges" should not be specified.')self._partition=_CAPI_DGLNDArrayPartitionCreateRemainderBased(array_size,num_parts)elifmode=="range":assertpart_rangesisnotNone,("When using range-based "'partitioning, "part_ranges" must not be None.')assertpart_ranges[0]==0andpart_ranges[-1]==array_size,("part_ranges[0] must be 0, and part_ranges[-1] must be "'"array_size".')ifF.is_tensor(part_ranges):part_ranges=F.zerocopy_to_dgl_ndarray(part_ranges)assertisinstance(part_ranges,NDArray),('"part_ranges" must '"be Tensor or dgl.NDArray.")self._partition=_CAPI_DGLNDArrayPartitionCreateRangeBased(array_size,num_parts,part_ranges)else:assertFalse,'Unknown partition mode "{}"'.format(mode)self._array_size=array_sizeself._num_parts=num_partsdefnum_parts(self):"""Get the number of partitions."""returnself._num_partsdefarray_size(self):"""Get the total size of the first dimension of the partitioned array."""returnself._array_sizedefget(self):"""Get the C-handle for this object."""returnself._partitiondefget_local_indices(self,part,ctx):"""Get the set of global indices in this given partition."""returnself.map_to_global(F.arange(0,self.local_size(part),ctx=ctx),part)deflocal_size(self,part):"""Get the number of rows/items assigned to the given part."""return_CAPI_DGLNDArrayPartitionGetPartSize(self._partition,part)defmap_to_local(self,idxs):"""Convert the set of global indices to local indices"""returnF.zerocopy_from_dgl_ndarray(_CAPI_DGLNDArrayPartitionMapToLocal(self._partition,F.zerocopy_to_dgl_ndarray(idxs)))defmap_to_global(self,idxs,part_id):"""Convert the set of local indices ot global indices"""returnF.zerocopy_from_dgl_ndarray(_CAPI_DGLNDArrayPartitionMapToGlobal(self._partition,F.zerocopy_to_dgl_ndarray(idxs),part_id))defgenerate_permutation(self,idxs):"""Produce a scheme that maps the given indices to separate partitions and the counts of how many indices are in each partition. Parameters ---------- idxs: torch.Tensor. A tensor with shape (`num_indices`,), representing global indices. Return ------ torch.Tensor. A tensor with shape (`num_indices`,), representing the permutation to re-order the indices by partition. torch.Tensor. A tensor with shape (`num_partition`,), representing the number of indices per partition. Examples -------- >>> import torch >>> from dgl.partition import NDArrayPartition >>> part = NDArrayPartition(10, 2, mode="remainder") >>> idx = torch.tensor([0, 2, 4, 5, 8, 8, 9], device="cuda:0") >>> perm, splits_sum = part.generate_permutation(idx) >>> perm tensor([0, 1, 2, 4, 5, 3, 6], device='cuda:0') >>> splits_sum tensor([5, 2], device='cuda:0') """ret=_CAPI_DGLNDArrayPartitionGeneratePermutation(self._partition,F.zerocopy_to_dgl_ndarray(idxs))returnF.zerocopy_from_dgl_ndarray(ret(0)),F.zerocopy_from_dgl_ndarray(ret(1))_init_api("dgl.partition")