Skip to content

Commit 8651be5

Browse files
authored
[Perf] Accelerate block_compute when all nodes are invoked. (#434)
* refactor. * accelerate update_all in nodeflow. * fix. * refactor. * fix lint. * fix lint. * reorganize. * reorg. * remove. * add doc. * impl block_incidence_matrix * fix lint. * fix. * simple fix. * fix test. * fix interface. * fix eid. * fix comments.
1 parent ca2a7e1 commit 8651be5

File tree

10 files changed

+510
-85
lines changed

10 files changed

+510
-85
lines changed

include/dgl/immutable_graph.h

+10-6
Original file line numberDiff line numberDiff line change
@@ -534,23 +534,27 @@ class ImmutableGraph: public GraphInterface {
534534
return edge_list_;
535535
}
536536

537-
protected:
538-
DGLIdIters GetInEdgeIdRef(dgl_id_t src, dgl_id_t dst) const;
539-
DGLIdIters GetOutEdgeIdRef(dgl_id_t src, dgl_id_t dst) const;
540-
541537
/*!
542538
* \brief Get the CSR array that represents the in-edges.
543539
* This method copies data from std::vector to IdArray.
540+
* \param start the first row to copy.
541+
* \param end the last row to copy (exclusive).
544542
* \return the CSR array.
545543
*/
546-
CSRArray GetInCSRArray() const;
544+
CSRArray GetInCSRArray(size_t start, size_t end) const;
547545

548546
/*!
549547
* \brief Get the CSR array that represents the out-edges.
550548
* This method copies data from std::vector to IdArray.
549+
* \param start the first row to copy.
550+
* \param end the last row to copy (exclusive).
551551
* \return the CSR array.
552552
*/
553-
CSRArray GetOutCSRArray() const;
553+
CSRArray GetOutCSRArray(size_t start, size_t end) const;
554+
555+
protected:
556+
DGLIdIters GetInEdgeIdRef(dgl_id_t src, dgl_id_t dst) const;
557+
DGLIdIters GetOutEdgeIdRef(dgl_id_t src, dgl_id_t dst) const;
554558

555559
/*!
556560
* \brief Compact a subgraph.

include/dgl/nodeflow.h

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*!
2+
* Copyright (c) 2019 by Contributors
3+
* \file dgl/nodeflow.h
4+
* \brief DGL NodeFlow class.
5+
*/
6+
#ifndef DGL_NODEFLOW_H_
7+
#define DGL_NODEFLOW_H_
8+
9+
#include <vector>
10+
#include <string>
11+
12+
#include "graph_interface.h"
13+
14+
namespace dgl {
15+
16+
class ImmutableGraph;
17+
18+
/*!
19+
* \brief A NodeFlow graph stores the sampling results for a sampler that samples
20+
* nodes/edges in layers.
21+
*
22+
* We store multiple layers of the sampling results in a single graph, which results
23+
* in a more compact format. We store extra information,
24+
* such as the node and edge mapping from the NodeFlow graph to the parent graph.
25+
*/
26+
struct NodeFlow {
27+
/*! \brief The graph. */
28+
GraphPtr graph;
29+
/*!
30+
* \brief the offsets of each layer.
31+
*/
32+
IdArray layer_offsets;
33+
/*!
34+
* \brief the offsets of each flow.
35+
*/
36+
IdArray flow_offsets;
37+
/*!
38+
* \brief The node mapping from the NodeFlow graph to the parent graph.
39+
*/
40+
IdArray node_mapping;
41+
/*!
42+
* \brief The edge mapping from the NodeFlow graph to the parent graph.
43+
*/
44+
IdArray edge_mapping;
45+
};
46+
47+
/*!
48+
* \brief Get a slice on a graph that represents a NodeFlow.
49+
*
50+
* The entire block has to be taken as a slice. Users have to specify the
51+
* correct starting and ending location of a layer.
52+
*
53+
* If remap is false, the returned arrays can be viewed as a sub-matrix slice
54+
* of the adjmat of the input graph. Let the adjmat of the input graph be A,
55+
* then the slice is equal to (in numpy syntax):
56+
* A[layer1_start:layer1_end, layer0_start:layer0_end]
57+
*
58+
* If remap is true, the returned arrays represents an adjacency matrix
59+
* of shape NxM, where N is the number of nodes in layer1 and M is
60+
* the number of nodes in layer0. Nodes in layer0 will be remapped to
61+
* [0, M) and nodes in layer1 will be remapped to [0, N).
62+
*
63+
* A row of the returned adjacency matrix represents the destination
64+
* of an edge and the column represents the source.
65+
*
66+
* If fmt == "csr", the function returns three arrays: indptr, indices, eid.
67+
* If fmt == "coo", the function returns two arrays: idx, eid. Here, the idx array
68+
* is the concatenation of src and dst node id arrays.
69+
*
70+
* \param graph An immutable graph.
71+
* \param fmt the format of the returned adjacency matrix.
72+
* \param layer0_size the size of the first layer in the block.
73+
* \param layer1_start the location where the second layer starts.
74+
* \param layer1_end the location where the secnd layer ends.
75+
* \param remap Indicates to remap all vertex ids and edge Ids to local Id
76+
* space.
77+
* \return a vector of IdArrays.
78+
*/
79+
std::vector<IdArray> GetNodeFlowSlice(const ImmutableGraph &graph, const std::string &fmt,
80+
size_t layer0_size, size_t layer1_start,
81+
size_t layer1_end, bool remap);
82+
83+
} // namespace dgl
84+
85+
#endif // DGL_NODEFLOW_H_
86+

include/dgl/sampler.h

+1-29
Original file line numberDiff line numberDiff line change
@@ -9,40 +9,12 @@
99
#include <vector>
1010
#include <string>
1111
#include "graph_interface.h"
12+
#include "nodeflow.h"
1213

1314
namespace dgl {
1415

1516
class ImmutableGraph;
1617

17-
/*!
18-
* \brief A NodeFlow graph stores the sampling results for a sampler that samples
19-
* nodes/edges in layers.
20-
*
21-
* We store multiple layers of the sampling results in a single graph, which results
22-
* in a more compact format. We store extra information,
23-
* such as the node and edge mapping from the NodeFlow graph to the parent graph.
24-
*/
25-
struct NodeFlow {
26-
/*! \brief The graph. */
27-
GraphPtr graph;
28-
/*!
29-
* \brief the offsets of each layer.
30-
*/
31-
IdArray layer_offsets;
32-
/*!
33-
* \brief the offsets of each flow.
34-
*/
35-
IdArray flow_offsets;
36-
/*!
37-
* \brief The node mapping from the NodeFlow graph to the parent graph.
38-
*/
39-
IdArray node_mapping;
40-
/*!
41-
* \brief The edge mapping from the NodeFlow graph to the parent graph.
42-
*/
43-
IdArray edge_mapping;
44-
};
45-
4618
class SamplerOp {
4719
public:
4820
/*!

python/dgl/nodeflow.py

+174-18
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,161 @@ def block_parent_eid(self, block_id):
395395
assert F.asnumpy(F.sum(ret == -1, 0)) == 0, "The eid in the parent graph is invalid."
396396
return ret
397397

398+
def block_edges(self, block_id):
399+
"""Return the edges in a block.
400+
401+
Parameters
402+
----------
403+
block_id : int
404+
The specified block to return the edges.
405+
406+
Returns
407+
-------
408+
Tensor
409+
The src nodes.
410+
Tensor
411+
The dst nodes.
412+
Tensor
413+
The edge ids.
414+
"""
415+
layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id]
416+
rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, "coo", layer0_size,
417+
self._layer_offsets[block_id + 1],
418+
self._layer_offsets[block_id + 2])
419+
idx = utils.toindex(rst(0)).tousertensor()
420+
eid = utils.toindex(rst(1))
421+
num_edges = int(len(idx) / 2)
422+
assert len(eid) == num_edges
423+
return idx[num_edges:len(idx)], idx[0:num_edges], eid.tousertensor()
424+
425+
def block_adjacency_matrix(self, block_id, ctx):
426+
"""Return the adjacency matrix representation for a specific block in a NodeFlow.
427+
428+
A row of the returned adjacency matrix represents the destination
429+
of an edge and the column represents the source.
430+
431+
Parameters
432+
----------
433+
block_id : int
434+
The specified block to return the adjacency matrix.
435+
ctx : context
436+
The context of the returned matrix.
437+
438+
Returns
439+
-------
440+
SparseTensor
441+
The adjacency matrix.
442+
Tensor
443+
A index for data shuffling due to sparse format change. Return None
444+
if shuffle is not required.
445+
"""
446+
fmt = F.get_preferred_sparse_format()
447+
# We need to extract two layers.
448+
layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id]
449+
rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, fmt, layer0_size,
450+
self._layer_offsets[block_id + 1],
451+
self._layer_offsets[block_id + 2])
452+
num_rows = self.layer_size(block_id + 1)
453+
num_cols = self.layer_size(block_id)
454+
455+
if fmt == "csr":
456+
indptr = F.copy_to(utils.toindex(rst(0)).tousertensor(), ctx)
457+
indices = F.copy_to(utils.toindex(rst(1)).tousertensor(), ctx)
458+
shuffle = utils.toindex(rst(2))
459+
dat = F.ones(indices.shape, dtype=F.float32, ctx=ctx)
460+
return F.sparse_matrix(dat, ('csr', indices, indptr),
461+
(num_rows, num_cols))[0], shuffle.tousertensor()
462+
elif fmt == "coo":
463+
## FIXME(minjie): data type
464+
idx = F.copy_to(utils.toindex(rst(0)).tousertensor(), ctx)
465+
m = self.block_size(block_id)
466+
idx = F.reshape(idx, (2, m))
467+
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
468+
adj, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (num_rows, num_cols))
469+
return adj, shuffle_idx
470+
else:
471+
raise Exception("unknown format")
472+
473+
def block_incidence_matrix(self, block_id, typestr, ctx):
474+
"""Return the incidence matrix representation of the block.
475+
476+
An incidence matrix is an n x m sparse matrix, where n is
477+
the number of nodes and m is the number of edges. Each nnz
478+
value indicating whether the edge is incident to the node
479+
or not.
480+
481+
There are three types of an incidence matrix `I`:
482+
* "in":
483+
- I[v, e] = 1 if e is the in-edge of v (or v is the dst node of e);
484+
- I[v, e] = 0 otherwise.
485+
* "out":
486+
- I[v, e] = 1 if e is the out-edge of v (or v is the src node of e);
487+
- I[v, e] = 0 otherwise.
488+
* "both":
489+
- I[v, e] = 1 if e is the in-edge of v;
490+
- I[v, e] = -1 if e is the out-edge of v;
491+
- I[v, e] = 0 otherwise (including self-loop).
492+
493+
Parameters
494+
----------
495+
block_id : int
496+
The specified block to return the incidence matrix.
497+
typestr : str
498+
Can be either "in", "out" or "both"
499+
ctx : context
500+
The context of returned incidence matrix.
501+
502+
Returns
503+
-------
504+
SparseTensor
505+
The incidence matrix.
506+
Tensor
507+
A index for data shuffling due to sparse format change. Return None
508+
if shuffle is not required.
509+
"""
510+
src, dst, eid = self.block_edges(block_id)
511+
src = F.copy_to(src, ctx) # the index of the ctx will be cached
512+
dst = F.copy_to(dst, ctx) # the index of the ctx will be cached
513+
eid = F.copy_to(eid, ctx) # the index of the ctx will be cached
514+
if typestr == 'in':
515+
n = self.layer_size(block_id + 1)
516+
m = self.block_size(block_id)
517+
row = F.unsqueeze(dst, 0)
518+
col = F.unsqueeze(eid, 0)
519+
idx = F.cat([row, col], dim=0)
520+
# FIXME(minjie): data type
521+
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
522+
inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m))
523+
elif typestr == 'out':
524+
n = self.layer_size(block_id)
525+
m = self.block_size(block_id)
526+
row = F.unsqueeze(src, 0)
527+
col = F.unsqueeze(eid, 0)
528+
idx = F.cat([row, col], dim=0)
529+
# FIXME(minjie): data type
530+
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
531+
inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m))
532+
elif typestr == 'both':
533+
# TODO does it work for bipartite graph?
534+
# first remove entries for self loops
535+
mask = F.logical_not(F.equal(src, dst))
536+
src = F.boolean_mask(src, mask)
537+
dst = F.boolean_mask(dst, mask)
538+
eid = F.boolean_mask(eid, mask)
539+
n_entries = F.shape(src)[0]
540+
# create index
541+
row = F.unsqueeze(F.cat([src, dst], dim=0), 0)
542+
col = F.unsqueeze(F.cat([eid, eid], dim=0), 0)
543+
idx = F.cat([row, col], dim=0)
544+
# FIXME(minjie): data type
545+
x = -F.ones((n_entries,), dtype=F.float32, ctx=ctx)
546+
y = F.ones((n_entries,), dtype=F.float32, ctx=ctx)
547+
dat = F.cat([x, y], dim=0)
548+
inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m))
549+
else:
550+
raise DGLError('Invalid incidence matrix type: %s' % str(typestr))
551+
return inc, shuffle_idx
552+
398553
def set_n_initializer(self, initializer, layer_id=ALL, field=None):
399554
"""Set the initializer for empty node features.
400555
@@ -651,12 +806,13 @@ def block_compute(self, block_id, message_func="default", reduce_func="default",
651806
assert reduce_func is not None
652807

653808
if is_all(v):
654-
dest_nodes = utils.toindex(self.layer_nid(block_id + 1))
655-
u, v, _ = self._graph.in_edges(dest_nodes)
656-
u = utils.toindex(self._glb2lcl_nid(u.tousertensor(), block_id))
657-
v = utils.toindex(self._glb2lcl_nid(v.tousertensor(), block_id + 1))
658-
dest_nodes = utils.toindex(F.arange(0, self.layer_size(block_id + 1)))
659-
eid = utils.toindex(F.arange(0, self.block_size(block_id)))
809+
with ir.prog() as prog:
810+
scheduler.schedule_nodeflow_update_all(graph=self,
811+
block_id=block_id,
812+
message_func=message_func,
813+
reduce_func=reduce_func,
814+
apply_func=apply_node_func)
815+
Runtime.run(prog)
660816
else:
661817
dest_nodes = utils.toindex(v)
662818
u, v, eid = self._graph.in_edges(dest_nodes)
@@ -667,18 +823,18 @@ def block_compute(self, block_id, message_func="default", reduce_func="default",
667823
block_id + 1))
668824
eid = utils.toindex(self._glb2lcl_eid(eid.tousertensor(), block_id))
669825

670-
with ir.prog() as prog:
671-
scheduler.schedule_nodeflow_compute(graph=self,
672-
block_id=block_id,
673-
u=u,
674-
v=v,
675-
eid=eid,
676-
dest_nodes=dest_nodes,
677-
message_func=message_func,
678-
reduce_func=reduce_func,
679-
apply_func=apply_node_func,
680-
inplace=inplace)
681-
Runtime.run(prog)
826+
with ir.prog() as prog:
827+
scheduler.schedule_nodeflow_compute(graph=self,
828+
block_id=block_id,
829+
u=u,
830+
v=v,
831+
eid=eid,
832+
dest_nodes=dest_nodes,
833+
message_func=message_func,
834+
reduce_func=reduce_func,
835+
apply_func=apply_node_func,
836+
inplace=inplace)
837+
Runtime.run(prog)
682838

683839
def prop_flow(self, message_funcs="default", reduce_funcs="default",
684840
apply_node_funcs="default", flow_range=ALL, inplace=False):

0 commit comments

Comments
 (0)