From 8f9e6393b3f4bfcb8e2c5c18b78c65ddeaa17ef2 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 14 Aug 2024 02:35:23 +0000 Subject: [PATCH 01/10] change a variable --- python/dgl/distributed/partition.py | 93 ++++++++++++++++++----------- 1 file changed, 57 insertions(+), 36 deletions(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 73ea48959597..fd0fcae9d9c2 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1105,7 +1105,7 @@ def get_homogeneous(g, balance_ntypes): inner_node_mask = _get_inner_node_mask(parts[i], ntype_id) val.append( F.as_scalar(F.sum(F.astype(inner_node_mask, F.int64), 0)) - ) + )#note inner_node_mask(tensor[n,bool])->tensor[n,int64]->sum->scalar, compute the num of one partition inner_nids = F.boolean_mask( parts[i].ndata[NID], inner_node_mask ) @@ -1115,7 +1115,7 @@ def get_homogeneous(g, balance_ntypes): int(F.as_scalar(inner_nids[-1])) + 1, ] ) - val = np.cumsum(val).tolist() + val = np.cumsum(val).tolist()# note computing the cumulative sum of array elements. assert val[-1] == g.num_nodes(ntype) for etype in g.canonical_etypes: etype_id = g.get_etype_id(etype) @@ -1135,7 +1135,7 @@ def get_homogeneous(g, balance_ntypes): [int(inner_eids[0]), int(inner_eids[-1]) + 1] ) val = np.cumsum(val).tolist() - assert val[-1] == g.num_edges(etype) + assert val[-1] == g.num_edges(etype)# note assure the tot graph can be used else: node_map_val = {} edge_map_val = {} @@ -1305,32 +1305,52 @@ def get_homogeneous(g, balance_ntypes): part_dir = os.path.join(out_path, "part" + str(part_id)) node_feat_file = os.path.join(part_dir, "node_feat.dgl") edge_feat_file = os.path.join(part_dir, "edge_feat.dgl") - part_graph_file = os.path.join(part_dir, "graph.dgl") - part_metadata["part-{}".format(part_id)] = { - "node_feats": os.path.relpath(node_feat_file, out_path), - "edge_feats": os.path.relpath(edge_feat_file, out_path), - "part_graph": os.path.relpath(part_graph_file, out_path), - } + os.makedirs(part_dir, mode=0o775, exist_ok=True) save_tensors(node_feat_file, node_feats) save_tensors(edge_feat_file, edge_feats) - sort_etypes = len(g.etypes) > 1 - _save_graphs( - part_graph_file, - [part], - formats=graph_formats, - sort_etypes=sort_etypes, + #save + if use_graphbolt: + part_metadata["part-{}".format(part_id)] = { + "node_feats": os.path.relpath(node_feat_file, out_path), + "edge_feats": os.path.relpath(edge_feat_file, out_path), + } + else: + part_graph_file = os.path.join(part_dir, "graph.dgl") + + part_metadata["part-{}".format(part_id)] = { + "node_feats": os.path.relpath(node_feat_file, out_path), + "edge_feats": os.path.relpath(edge_feat_file, out_path), + "part_graph": os.path.relpath(part_graph_file, out_path), + } + sort_etypes = len(g.etypes) > 1 + _save_graphs( + part_graph_file, + [part], + formats=graph_formats, + sort_etypes=sort_etypes, + ) + + + part_config = os.path.join(out_path, graph_name + ".json") + if use_graphbolt: + kwargs["graph_formats"] = graph_formats + dgl_partition_to_graphbolt( + part_config, + parts=parts, + part_meta=part_metadata, + **kwargs, ) + else: + _dump_part_config(part_config, part_metadata) + print( "Save partitions: {:.3f} seconds, peak memory: {:.3f} GB".format( time.time() - start, get_peak_mem() ) ) - part_config = os.path.join(out_path, graph_name + ".json") - _dump_part_config(part_config, part_metadata) - num_cuts = sim_g.num_edges() - tot_num_inner_edges if num_parts == 1: num_cuts = 0 @@ -1340,13 +1360,6 @@ def get_homogeneous(g, balance_ntypes): ) ) - if use_graphbolt: - kwargs["graph_formats"] = graph_formats - dgl_partition_to_graphbolt( - part_config, - **kwargs, - ) - if return_mapping: return orig_nids, orig_eids @@ -1392,9 +1405,9 @@ def init_type_per_edge(graph, gpb): etype_ids = gpb.map_to_per_etype(graph.edata[EID])[0] return etype_ids - -def gb_convert_single_dgl_partition( +def gb_convert_single_dgl_partition(# TODO change this part_id, + parts, graph_formats, part_config, store_eids, @@ -1427,14 +1440,18 @@ def gb_convert_single_dgl_partition( "Running in debug mode which means all attributes of DGL partitions" " will be saved to the new format." ) - + part_meta = _load_part_config(part_config) num_parts = part_meta["num_parts"] - graph, _, _, gpb, _, _, _ = load_partition( - part_config, part_id, load_feats=False - ) - _, _, ntypes, etypes = load_partition_book(part_config, part_id) + if parts!=None: + assert len(parts)==num_parts + graph=parts[part_id] + else: + graph, _, _, gpb, _, _, _ = load_partition( + part_config, part_id, load_feats=False + ) + gpb, _, ntypes, etypes = load_partition_book(part_config, part_id) is_homo = is_homogeneous(ntypes, etypes) node_type_to_id = ( None if is_homo else {ntype: ntid for ntid, ntype in enumerate(ntypes)} @@ -1503,7 +1520,7 @@ def gb_convert_single_dgl_partition( indptr, dtype=indices.dtype ) - # Cast various data to minimum dtype. + # Cast various data to minimum dtype.#note convert to minimun dtype # Cast 1: indptr. indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr) # Cast 2: indices. @@ -1552,7 +1569,6 @@ def gb_convert_single_dgl_partition( return os.path.relpath(csc_graph_path, os.path.dirname(part_config)) # Update graph path. - def dgl_partition_to_graphbolt( part_config, *, @@ -1561,7 +1577,10 @@ def dgl_partition_to_graphbolt( store_inner_edge=False, graph_formats=None, n_jobs=1, -): + parts=None, + part_meta=None +):# note + """Convert partitions of dgl to FusedCSCSamplingGraph of GraphBolt. This API converts `DGLGraph` partitions to `FusedCSCSamplingGraph` which is @@ -1598,7 +1617,8 @@ def dgl_partition_to_graphbolt( "Running in debug mode which means all attributes of DGL partitions" " will be saved to the new format." ) - part_meta = _load_part_config(part_config) + if part_meta==None: + part_meta = _load_part_config(part_config) new_part_meta = copy.deepcopy(part_meta) num_parts = part_meta["num_parts"] @@ -1615,6 +1635,7 @@ def dgl_partition_to_graphbolt( convert_with_format = partial( gb_convert_single_dgl_partition, graph_formats=graph_formats, + parts=parts, part_config=part_config, store_eids=store_eids, store_inner_node=store_inner_node, From 7f339b9cd7919098af8aabe63521d52b7d04f319 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 17 Oct 2024 01:13:29 +0000 Subject: [PATCH 02/10] fix convert_partition.py --- tools/distpartitioning/convert_partition.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/distpartitioning/convert_partition.py b/tools/distpartitioning/convert_partition.py index 5013b6d40f20..b03053ccb6df 100644 --- a/tools/distpartitioning/convert_partition.py +++ b/tools/distpartitioning/convert_partition.py @@ -351,6 +351,8 @@ def _process_partition_gb( sorted_idx = ( th.repeat_interleave(indptr[:-1], split_size, dim=0) + sorted_idx ) + else: + sorted_idx = th.arange(len(edge_ids)) return indptr, indices[sorted_idx], edge_ids[sorted_idx] From 530e5ef7fe501410156eca18b30bc96a9735fe86 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 17 Oct 2024 01:59:52 +0000 Subject: [PATCH 03/10] change partition.py --- python/dgl/distributed/partition.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 2c2f40e9b17b..48005ffb4d27 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1268,7 +1268,7 @@ def get_homogeneous(g, balance_ntypes): int(F.as_scalar(inner_nids[-1])) + 1, ] ) - val = np.cumsum(val).tolist()# note computing the cumulative sum of array elements. + val = np.cumsum(val).tolist() assert val[-1] == g.num_nodes(ntype) for etype in g.canonical_etypes: etype_id = g.get_etype_id(etype) From 0a107177ea3ac274b5be5fe1dda57b3fb05fa0d7 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Oct 2024 02:02:41 +0000 Subject: [PATCH 04/10] fix partition --- python/dgl/distributed/partition.py | 18 +-- tests/tools/pytest_utils.py | 131 ++++++++++++++++++- tests/tools/test_dist_lookup.py | 4 +- tests/tools/test_dist_part.py | 4 +- tests/tools/test_dist_partition_graphbolt.py | 70 +++++++--- tests/tools/test_parmetis.py | 8 +- tools/distpartitioning/utils.py | 2 +- 7 files changed, 199 insertions(+), 38 deletions(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 48005ffb4d27..7c6fc7138199 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1417,9 +1417,9 @@ def get_homogeneous(g, balance_ntypes): for name in g.edges[etype].data: if name in [EID, "inner_edge"]: continue - edge_feats[ - _etype_tuple_to_str(etype) + "/" + name - ] = F.gather_row(g.edges[etype].data[name], local_edges) + edge_feats[_etype_tuple_to_str(etype) + "/" + name] = ( + F.gather_row(g.edges[etype].data[name], local_edges) + ) else: for ntype in g.ntypes: if len(g.ntypes) > 1: @@ -1454,9 +1454,9 @@ def get_homogeneous(g, balance_ntypes): for name in g.edges[etype].data: if name in [EID, "inner_edge"]: continue - edge_feats[ - _etype_tuple_to_str(etype) + "/" + name - ] = F.gather_row(g.edges[etype].data[name], local_edges) + edge_feats[_etype_tuple_to_str(etype) + "/" + name] = ( + F.gather_row(g.edges[etype].data[name], local_edges) + ) # delete `orig_id` from ndata/edata del part.ndata["orig_id"] del part.edata["orig_id"] @@ -1502,9 +1502,9 @@ def get_homogeneous(g, balance_ntypes): for part_id, part in parts.items(): part_dir = os.path.join(out_path, "part" + str(part_id)) part_graph_file = os.path.join(part_dir, "graph.dgl") - part_metadata["part-{}".format(part_id)][ - "part_graph" - ] = os.path.relpath(part_graph_file, out_path) + part_metadata["part-{}".format(part_id)]["part_graph"] = ( + os.path.relpath(part_graph_file, out_path) + ) # save DGLGraph _save_dgl_graphs( part_graph_file, diff --git a/tests/tools/pytest_utils.py b/tests/tools/pytest_utils.py index 7aa1944120a5..c85bc8033e58 100644 --- a/tests/tools/pytest_utils.py +++ b/tests/tools/pytest_utils.py @@ -339,7 +339,136 @@ def chunk_graph( ) -def create_chunked_dataset( +def create_homo_chunked_dataset( + root_dir, + num_chunks, + data_fmt="numpy", + edges_fmt="csv", + vector_rows=False, + **kwargs, +): + """ + This function creates a sample homo dataset. + + Parameters: + ----------- + root_dir : string + directory in which all the files for the chunked dataset will be stored. + """ + # Step0: prepare chunked graph data format. + # A synthetic mini MAG240. + num_N = 1200 + + def rand_edges(num_src, num_dst, num_edges): + eids = np.random.choice(num_src * num_dst, num_edges, replace=False) + src = torch.from_numpy(eids // num_dst) + dst = torch.from_numpy(eids % num_dst) + + return src, dst + + num_E = 24 * 1000 + + # Structure. + data_dict = {("_N", "_E", "_N"): rand_edges(num_N, num_N, num_E)} + src, dst = data_dict[("_N", "_E", "_N")] + data_dict[("_N", "_E", "_N")] = (dst, src) + g = dgl.heterograph(data_dict) + + # paper feat, label, year + num_paper_feats = 3 + _N_feat = np.random.randn(num_N, num_paper_feats) + num_classes = 4 + _N_label = np.random.choice(num_classes, num_N) + _N_year = np.random.choice(2022, num_N) + _N_orig_ids = np.arange(0, num_N) + + # masks. + _N_train_mask = np.random.choice([True, False], num_N) + _N_test_mask = np.random.choice([True, False], num_N) + _N_val_mask = np.random.choice([True, False], num_N) + + # Edge features. + _E_count = np.random.choice(10, num_E) + + # Save features. + input_dir = os.path.join(root_dir, "data_test") + os.makedirs(input_dir) + for sub_d in ["_N", "_E"]: + os.makedirs(os.path.join(input_dir, sub_d)) + + _N_feat_path = os.path.join(input_dir, "_N/feat.npy") + with open(_N_feat_path, "wb") as f: + np.save(f, _N_feat) + g.nodes["_N"].data["feat"] = torch.from_numpy(_N_feat) + + _N_label_path = os.path.join(input_dir, "_N/label.npy") + with open(_N_label_path, "wb") as f: + np.save(f, _N_label) + g.nodes["_N"].data["label"] = torch.from_numpy(_N_label) + + _N_year_path = os.path.join(input_dir, "_N/year.npy") + with open(_N_year_path, "wb") as f: + np.save(f, _N_year) + g.nodes["_N"].data["year"] = torch.from_numpy(_N_year) + + _N_orig_ids_path = os.path.join(input_dir, "_N/orig_ids.npy") + with open(_N_orig_ids_path, "wb") as f: + np.save(f, _N_orig_ids) + g.nodes["_N"].data["orig_ids"] = torch.from_numpy(_N_orig_ids) + + _E_count_path = os.path.join(input_dir, "_E/count.npy") + with open(_E_count_path, "wb") as f: + np.save(f, _E_count) + g.edges["_E"].data["count"] = torch.from_numpy(_E_count) + + _N_train_mask_path = os.path.join(input_dir, "_N/train_mask.npy") + with open(_N_train_mask_path, "wb") as f: + np.save(f, _N_train_mask) + g.nodes["_N"].data["train_mask"] = torch.from_numpy(_N_train_mask) + + _N_test_mask_path = os.path.join(input_dir, "_N/test_mask.npy") + with open(_N_test_mask_path, "wb") as f: + np.save(f, _N_test_mask) + g.nodes["_N"].data["test_mask"] = torch.from_numpy(_N_test_mask) + + _N_val_mask_path = os.path.join(input_dir, "_N/val_mask.npy") + with open(_N_val_mask_path, "wb") as f: + np.save(f, _N_val_mask) + g.nodes["_N"].data["val_mask"] = torch.from_numpy(_N_val_mask) + + node_data = { + "_N": { + "feat": _N_feat_path, + "train_mask": _N_train_mask_path, + "test_mask": _N_test_mask_path, + "val_mask": _N_val_mask_path, + "label": _N_label_path, + "year": _N_year_path, + "orig_ids": _N_orig_ids_path, + } + } + + edge_data = {"_E": {"count": _E_count_path}} + + output_dir = os.path.join(root_dir, "chunked-data") + chunk_graph( + g, + "mag240m", + node_data, + edge_data, + num_chunks=num_chunks, + output_path=output_dir, + data_fmt=data_fmt, + edges_fmt=edges_fmt, + vector_rows=vector_rows, + **kwargs, + ) + logging.debug("Done with creating chunked graph") + + return g + + +def create_hetero_chunked_dataset( root_dir, num_chunks, data_fmt="numpy", diff --git a/tests/tools/test_dist_lookup.py b/tests/tools/test_dist_lookup.py index f249b51f1cf5..5462f8e9f2ba 100644 --- a/tests/tools/test_dist_lookup.py +++ b/tests/tools/test_dist_lookup.py @@ -13,7 +13,7 @@ import torch.distributed as dist import torch.multiprocessing as mp -from pytest_utils import create_chunked_dataset +from pytest_utils import create_hetero_chunked_dataset from tools.distpartitioning import constants, dist_lookup from tools.distpartitioning.gloo_wrapper import allgather_sizes from tools.distpartitioning.utils import ( @@ -210,7 +210,7 @@ def test_lookup_service( ): with tempfile.TemporaryDirectory() as root_dir: - g = create_chunked_dataset( + g = create_hetero_chunked_dataset( root_dir, num_chunks, data_fmt="numpy", diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index 80b6419d4938..e28c570a5098 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -19,7 +19,7 @@ from distpartitioning import array_readwriter from distpartitioning.utils import generate_read_list -from pytest_utils import create_chunked_dataset +from pytest_utils import create_hetero_chunked_dataset from tools.verification_utils import ( verify_graph_feats, @@ -39,7 +39,7 @@ def _test_chunk_graph( num_chunks_edge_data=None, ): with tempfile.TemporaryDirectory() as root_dir: - g = create_chunked_dataset( + g = create_hetero_chunked_dataset( root_dir, num_chunks, data_fmt=data_fmt, diff --git a/tests/tools/test_dist_partition_graphbolt.py b/tests/tools/test_dist_partition_graphbolt.py index 81c16f8809c3..195bf8a48553 100644 --- a/tests/tools/test_dist_partition_graphbolt.py +++ b/tests/tools/test_dist_partition_graphbolt.py @@ -22,7 +22,10 @@ from distpartitioning import array_readwriter from distpartitioning.utils import generate_read_list -from pytest_utils import create_chunked_dataset +from pytest_utils import ( + create_hetero_chunked_dataset, + create_homo_chunked_dataset, +) def _verify_metadata_gb(gpb, g, num_parts, part_id, part_sizes): @@ -829,24 +832,41 @@ def _test_pipeline_graphbolt( store_eids=True, store_inner_edge=True, store_inner_node=True, + is_homogeneous=False, ): if num_parts % world_size != 0: # num_parts should be a multiple of world_size return with tempfile.TemporaryDirectory() as root_dir: - g = create_chunked_dataset( - root_dir, - num_chunks, - data_fmt=data_fmt, - num_chunks_nodes=num_chunks_nodes, - num_chunks_edges=num_chunks_edges, - num_chunks_node_data=num_chunks_node_data, - num_chunks_edge_data=num_chunks_edge_data, - ) - graph_name = "test" - test_ntype = "paper" - test_etype = ("paper", "cites", "paper") + if is_homogeneous: + g = create_homo_chunked_dataset( + root_dir, + num_chunks, + data_fmt=data_fmt, + num_chunks_nodes=num_chunks_nodes, + num_chunks_edges=num_chunks_edges, + num_chunks_node_data=num_chunks_node_data, + num_chunks_edge_data=num_chunks_edge_data, + ) + graph_name = "test" + test_ntype = "_N" + test_etype = ("_N", "_E", "_N") + ntypes = ["_N"] + else: + g = create_hetero_chunked_dataset( + root_dir, + num_chunks, + data_fmt=data_fmt, + num_chunks_nodes=num_chunks_nodes, + num_chunks_edges=num_chunks_edges, + num_chunks_node_data=num_chunks_node_data, + num_chunks_edge_data=num_chunks_edge_data, + ) + graph_name = "test" + test_ntype = "paper" + test_etype = ("paper", "cites", "paper") + ntypes = ["author", "institution", "paper"] # Step1: graph partition in_dir = os.path.join(root_dir, "chunked-data") @@ -857,7 +877,7 @@ def _test_pipeline_graphbolt( in_dir, output_dir, num_parts ) ) - for ntype in ["author", "institution", "paper"]: + for ntype in ntypes: fname = os.path.join(output_dir, "{}.txt".format(ntype)) with open(fname, "r") as f: header = f.readline().rstrip() @@ -893,7 +913,7 @@ def _test_pipeline_graphbolt( # check if verify_partitions.py is used for validation. if use_verify_partitions: - cmd = "python3 tools/verify_partitions.py " + cmd = "fhomopython3 tools/verify_partitions.py " cmd += f" --orig-dataset-dir {in_dir}" cmd += f" --part-graph {out_dir}" cmd += f" --partitions-dir {output_dir}" @@ -952,14 +972,20 @@ def read_orig_ids(fname): "num_chunks, num_parts, world_size", [[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]], ) -def test_pipeline_basics(num_chunks, num_parts, world_size): +@pytest.mark.parametrize("is_homogeneous", [False, True]) +def test_pipeline_basics(num_chunks, num_parts, world_size, is_homogeneous): _test_pipeline_graphbolt( num_chunks, num_parts, world_size, + is_homogeneous=is_homogeneous, ) _test_pipeline_graphbolt( - num_chunks, num_parts, world_size, use_verify_partitions=False + num_chunks, + num_parts, + world_size, + use_verify_partitions=False, + is_homogeneous=is_homogeneous, ) @@ -1001,12 +1027,14 @@ def test_pipeline_attributes(store_inner_node, store_inner_edge, store_eids): [1, 5, 3, 1, 1], ], ) +@pytest.mark.parametrize("is_homogeneous", [False, True]) def test_pipeline_arbitrary_chunks( num_chunks, num_parts, world_size, num_chunks_node_data, num_chunks_edge_data, + is_homogeneous, ): _test_pipeline_graphbolt( @@ -1015,9 +1043,13 @@ def test_pipeline_arbitrary_chunks( world_size, num_chunks_node_data=num_chunks_node_data, num_chunks_edge_data=num_chunks_edge_data, + is_homogeneous=is_homogeneous, ) @pytest.mark.parametrize("data_fmt", ["numpy", "parquet"]) -def test_pipeline_feature_format(data_fmt): - _test_pipeline_graphbolt(4, 4, 4, data_fmt=data_fmt) +@pytest.mark.parametrize("is_homogeneous", [False, True]) +def test_pipeline_feature_format(data_fmt, is_homogeneous): + _test_pipeline_graphbolt( + 4, 4, 4, data_fmt=data_fmt, is_homogeneous=is_homogeneous + ) diff --git a/tests/tools/test_parmetis.py b/tests/tools/test_parmetis.py index 6f96dd60d112..88698576c6be 100644 --- a/tests/tools/test_parmetis.py +++ b/tests/tools/test_parmetis.py @@ -11,7 +11,7 @@ from dgl.data.utils import load_graphs, load_tensors from partition_algo.base import load_partition_meta -from pytest_utils import create_chunked_dataset +from pytest_utils import create_hetero_chunked_dataset """ TODO: skipping this test case since the dependency, mpirun, is @@ -23,7 +23,7 @@ def test_parmetis_preprocessing(): with tempfile.TemporaryDirectory() as root_dir: num_chunks = 2 - g = create_chunked_dataset(root_dir, num_chunks) + g = create_hetero_chunked_dataset(root_dir, num_chunks) # Trigger ParMETIS pre-processing here. input_dir = os.path.join(root_dir, "chunked-data") @@ -117,7 +117,7 @@ def test_parmetis_preprocessing(): def test_parmetis_postprocessing(): with tempfile.TemporaryDirectory() as root_dir: num_chunks = 2 - g = create_chunked_dataset(root_dir, num_chunks) + g = create_hetero_chunked_dataset(root_dir, num_chunks) num_nodes = g.num_nodes() num_institutions = g.num_nodes("institution") @@ -188,7 +188,7 @@ def test_parmetis_wrapper(): with tempfile.TemporaryDirectory() as root_dir: num_chunks = 2 graph_name = "mag240m" - g = create_chunked_dataset(root_dir, num_chunks) + g = create_hetero_chunked_dataset(root_dir, num_chunks) all_ntypes = g.ntypes all_etypes = g.etypes num_constraints = len(all_ntypes) + 3 diff --git a/tools/distpartitioning/utils.py b/tools/distpartitioning/utils.py index 32292a843bc5..558f11fe8e5c 100644 --- a/tools/distpartitioning/utils.py +++ b/tools/distpartitioning/utils.py @@ -1,8 +1,8 @@ import json import logging import os -from itertools import cycle +from itertools import cycle import constants import dgl From 46d9fe80b3da31a42d54b376587ab248a6a50690 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Oct 2024 02:04:52 +0000 Subject: [PATCH 05/10] remove unused cde --- python/dgl/distributed/partition.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 7c6fc7138199..48005ffb4d27 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1417,9 +1417,9 @@ def get_homogeneous(g, balance_ntypes): for name in g.edges[etype].data: if name in [EID, "inner_edge"]: continue - edge_feats[_etype_tuple_to_str(etype) + "/" + name] = ( - F.gather_row(g.edges[etype].data[name], local_edges) - ) + edge_feats[ + _etype_tuple_to_str(etype) + "/" + name + ] = F.gather_row(g.edges[etype].data[name], local_edges) else: for ntype in g.ntypes: if len(g.ntypes) > 1: @@ -1454,9 +1454,9 @@ def get_homogeneous(g, balance_ntypes): for name in g.edges[etype].data: if name in [EID, "inner_edge"]: continue - edge_feats[_etype_tuple_to_str(etype) + "/" + name] = ( - F.gather_row(g.edges[etype].data[name], local_edges) - ) + edge_feats[ + _etype_tuple_to_str(etype) + "/" + name + ] = F.gather_row(g.edges[etype].data[name], local_edges) # delete `orig_id` from ndata/edata del part.ndata["orig_id"] del part.edata["orig_id"] @@ -1502,9 +1502,9 @@ def get_homogeneous(g, balance_ntypes): for part_id, part in parts.items(): part_dir = os.path.join(out_path, "part" + str(part_id)) part_graph_file = os.path.join(part_dir, "graph.dgl") - part_metadata["part-{}".format(part_id)]["part_graph"] = ( - os.path.relpath(part_graph_file, out_path) - ) + part_metadata["part-{}".format(part_id)][ + "part_graph" + ] = os.path.relpath(part_graph_file, out_path) # save DGLGraph _save_dgl_graphs( part_graph_file, From ee672006b32c70cdd4b97f962e05bfefb448d36b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Oct 2024 02:19:15 +0000 Subject: [PATCH 06/10] change utils.py --- tools/distpartitioning/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/distpartitioning/utils.py b/tools/distpartitioning/utils.py index 558f11fe8e5c..32292a843bc5 100644 --- a/tools/distpartitioning/utils.py +++ b/tools/distpartitioning/utils.py @@ -1,8 +1,8 @@ import json import logging import os - from itertools import cycle + import constants import dgl From bdffbe606cc6b51a09efcf8074bdc7813fb73b70 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Oct 2024 02:20:13 +0000 Subject: [PATCH 07/10] change test_dist_partition_graphbolt --- tests/tools/test_dist_partition_graphbolt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tools/test_dist_partition_graphbolt.py b/tests/tools/test_dist_partition_graphbolt.py index 195bf8a48553..94c561e5f4dd 100644 --- a/tests/tools/test_dist_partition_graphbolt.py +++ b/tests/tools/test_dist_partition_graphbolt.py @@ -913,7 +913,7 @@ def _test_pipeline_graphbolt( # check if verify_partitions.py is used for validation. if use_verify_partitions: - cmd = "fhomopython3 tools/verify_partitions.py " + cmd = "python3 tools/verify_partitions.py " cmd += f" --orig-dataset-dir {in_dir}" cmd += f" --part-graph {out_dir}" cmd += f" --partitions-dir {output_dir}" From e03e6898fe872e5fe7d13ea22503909409f0464d Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Oct 2024 02:40:27 +0000 Subject: [PATCH 08/10] change test_dist_part.py function name --- tests/tools/test_dist_part.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index e28c570a5098..8811f499fa7a 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -220,7 +220,7 @@ def _test_pipeline( return with tempfile.TemporaryDirectory() as root_dir: - g = create_chunked_dataset( + g = create_hetero_chunked_dataset( root_dir, num_chunks, data_fmt=data_fmt, From a0b020c6a08f71df6949f95685c4c89f1c3c6e44 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Oct 2024 03:08:35 +0000 Subject: [PATCH 09/10] change format --- tests/tools/test_dist_part.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index 80b6419d4938..37b841d73016 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -19,7 +19,8 @@ from distpartitioning import array_readwriter from distpartitioning.utils import generate_read_list -from pytest_utils import create_chunked_dataset +from pytest_utils import chunk_graph, create_hetero_chunked_dataset +from scipy import sparse as spsp from tools.verification_utils import ( verify_graph_feats, @@ -39,7 +40,7 @@ def _test_chunk_graph( num_chunks_edge_data=None, ): with tempfile.TemporaryDirectory() as root_dir: - g = create_chunked_dataset( + g = create_hetero_chunked_dataset( root_dir, num_chunks, data_fmt=data_fmt, @@ -220,7 +221,7 @@ def _test_pipeline( return with tempfile.TemporaryDirectory() as root_dir: - g = create_chunked_dataset( + g = create_hetero_chunked_dataset( root_dir, num_chunks, data_fmt=data_fmt, From 45c772cd144ec99b721362d584f1a8b18bec54d8 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Oct 2024 03:09:47 +0000 Subject: [PATCH 10/10] change format --- tests/tools/test_dist_part.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index 37b841d73016..80b6419d4938 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -19,8 +19,7 @@ from distpartitioning import array_readwriter from distpartitioning.utils import generate_read_list -from pytest_utils import chunk_graph, create_hetero_chunked_dataset -from scipy import sparse as spsp +from pytest_utils import create_chunked_dataset from tools.verification_utils import ( verify_graph_feats, @@ -40,7 +39,7 @@ def _test_chunk_graph( num_chunks_edge_data=None, ): with tempfile.TemporaryDirectory() as root_dir: - g = create_hetero_chunked_dataset( + g = create_chunked_dataset( root_dir, num_chunks, data_fmt=data_fmt, @@ -221,7 +220,7 @@ def _test_pipeline( return with tempfile.TemporaryDirectory() as root_dir: - g = create_hetero_chunked_dataset( + g = create_chunked_dataset( root_dir, num_chunks, data_fmt=data_fmt,