-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathparallel_utils.py
184 lines (148 loc) · 5.28 KB
/
parallel_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import math
import re
import subprocess
import time
from typing import List
import torch
import torch.nn as nn
def nvidia_smi_memory_info():
result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=index,memory.total,memory.used,memory.free",
"--format=csv,noheader,nounits",
],
stdout=subprocess.PIPE,
text=True,
)
output = result.stdout.split("\n")[:-1]
gpu_memory_info = []
for line in output:
gpu_id, total_memory, used_memory, free_memory = map(int, re.split(",\s", line))
gpu_memory_info.append(
{
"id": gpu_id,
"total_memory": total_memory,
"used_memory": used_memory,
"free_memory": free_memory,
}
)
return gpu_memory_info
num_gpus = torch.cuda.device_count()
def get_gpu_memory():
memory_info = []
gpu_memory_info = nvidia_smi_memory_info()
for i, gpu in enumerate(gpu_memory_info):
gpu_id = gpu["id"]
gpu = gpu_memory_info[gpu_id]
total_memory = gpu["total_memory"]
used_memory = gpu["used_memory"]
memory_info.append((gpu_id, total_memory, used_memory))
return memory_info
def get_lowest_occupied_gpu(wait_memory=1000):
now_lowest_memory = 1e9
while now_lowest_memory > wait_memory:
if not now_lowest_memory == 1e9:
time.sleep(10)
memory_info = get_gpu_memory()
gpu_id, tot_mem, used_mem = sorted(
memory_info, key=lambda x: x[2], reverse=False
)[0]
now_lowest_memory = used_mem
return gpu_id
def sort_layers_by_params(layers: List[nn.Module]):
return sorted(
layers, key=lambda m: sum(p.numel() for p in m.parameters()), reverse=True
)
def get_all_gpu_free_memory():
return sum(
[
total_memory - used_memory
for gpu_id, total_memory, used_memory in get_gpu_memory()
]
)
def assign_layers_to_gpus(layers: List[nn.Module]):
layer_gpu_map = {}
prev_gpu_id = None
weight_num = 0
for module in layers:
if hasattr(module, "weight"):
weight_num += module.weight.numel()
weight_mb = weight_num * 2 / 1024 / 1024
all_gpu_mems = get_all_gpu_free_memory()
while all_gpu_mems < weight_mb * 1.3:
time.sleep(10)
all_gpu_mems = get_all_gpu_free_memory()
for i, layer in enumerate(layers):
if i == len(layers) - 1:
layer_gpu_map[layer] = layer_gpu_map[layers[0]]
layer.to(layers[0].device)
layer.device = layers[0].device
print(f"map last layer {i} to gpu {layer_gpu_map[layer]}")
continue
layer_memory = (
sum(p.element_size() * p.numel() for p in layer.parameters()) / 1024**2
)
available_gpus = get_gpu_memory()
if prev_gpu_id is None:
gpus = sorted(available_gpus, key=lambda x: x[2])
else:
pre_gpu_info = available_gpus[prev_gpu_id]
gpus = [pre_gpu_info] + sorted(available_gpus, key=lambda x: x[2])
mapped = False
for gpu_id, tot_memory, allocated_memory in gpus:
if (tot_memory - allocated_memory * 1.35) > layer_memory:
layer_gpu_map[layer] = gpu_id
layer.to(f"cuda:{gpu_id}")
layer.device = f"cuda:{gpu_id}"
print(f"map layer {i} to gpu {gpu_id}, {available_gpus}")
mapped = True
prev_gpu_id = gpu_id
break
if not mapped:
raise RuntimeError(f"memory not enough {available_gpus}")
return layer_gpu_map
def assign_layers_to_num_gpus(layers: List[nn.Module], gpu_num=2):
num_layers = len(layers)
num_layers_per_gpu = math.ceil(num_layers / gpu_num)
print(f"Number of layers: {num_layers}")
print(f"Using {num_layers_per_gpu} layers per gpu")
layer_gpu_map = {}
for i, layer in enumerate(layers):
gpu_id = i // num_layers_per_gpu
layer_gpu_map[layer] = gpu_id
layer.to(f"cuda:{gpu_id}")
layer.device = f"cuda:{gpu_id}"
print(f"map layer {i} to gpu {gpu_id}")
return layer_gpu_map
# forward hook
def forward_hook_wrapper(gpu_id):
def forward_hook(module, input, kwargs):
# breakpoint()
input = tuple(_.to(f"cuda:{gpu_id}") for _ in input)
kwargs = {
k: v.to(f"cuda:{gpu_id}") if isinstance(v, torch.Tensor) else v
for k, v in kwargs.items()
}
return input, kwargs
return forward_hook
def add_forward_hooks(layer_gpu_map):
prev_gpu_id = None
for layer, gpu_id in layer_gpu_map.items():
layer: nn.Module
if prev_gpu_id is None:
prev_gpu_id = gpu_id
# if gpu_id != prev_gpu_id:
layer.register_forward_pre_hook(forward_hook_wrapper(gpu_id), with_kwargs=True)
prev_gpu_id = gpu_id
def map_layers_to_multi_gpus(layers):
layer_gpu_map = assign_layers_to_gpus(layers)
add_forward_hooks(layer_gpu_map)
def map_layers_to_num_gpus(layers, gpu_num=2):
layer_gpu_map = assign_layers_to_num_gpus(layers, gpu_num=gpu_num)
print("Add forward hook")
add_forward_hooks(layer_gpu_map)
return layer_gpu_map
if __name__ == "__main__":
info = get_gpu_memory()
print(info)