-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrte.py
330 lines (268 loc) · 9.83 KB
/
rte.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import os
import time
import paramiko
import yaml
from importlib_resources import files
from osfv.libs.rtectrl_api import rtectrl
from voluptuous import Any, Optional, Required, Schema
class RTE(rtectrl):
GPIO_SPI_ON = 1
GPIO_SPI_VOLTAGE = 2
GPIO_SPI_VCC = 3
GPIO_RELAY = 0
GPIO_RESET = 8
GPIO_POWER = 9
GPIO_CMOS = 12
SSH_USER = "root"
SSH_PWD = "meta-rte"
FW_PATH_WRITE = "/tmp/write.rom"
FW_PATH_READ = "/tmp/read.rom"
PROGRAMMER_RTE = "linux_spi:dev=/dev/spidev1.0,spispeed=16000"
PROGRAMMER_CH341A = "ch341a_spi"
FLASHROM_CMD = "flashrom -p {programmer} {args}"
def __init__(self, rte_ip, dut_model, sonoff):
self.rte_ip = rte_ip
self.dut_model = dut_model
self.dut_data = self.load_model_data()
self.sonoff = sonoff
if not self.sonoff_sanity_check():
raise SonoffNotFound(
exit(f"Missing value for 'sonoff_ip' or Sonoff not found in SnipeIT")
)
def load_model_data(self):
file_path = os.path.join(files("osfv"), "models", f"{self.dut_model}.yml")
# Check if the file exists
if not os.path.isfile(file_path):
raise UnsupportedDUTModel(
f"The {self.dut_model} model is not yet supported"
)
# Load the YAML file
with open(file_path, "r") as file:
data = yaml.safe_load(file)
voltage_validator = Any("1.8V", "3.3V")
programmer_name_validator = Any("rte_1_1", "rte_1_0", "ch341a")
flashing_power_state_validator = Any("G3", "OFF")
schema = Schema(
{
Required("programmer"): {
Required("name"): programmer_name_validator,
},
Required("flash_chip"): {
Required("voltage"): voltage_validator,
Optional("model"): str,
},
Required("pwr_ctrl"): {
Required("sonoff"): bool,
Required("relay"): bool,
Required("flashing_power_state"): flashing_power_state_validator,
},
Optional("reset_cmos", default=False): bool,
}
)
try:
schema(data)
except Exception as e:
exit(f"Model file is invalid: {e}")
# Check if required fields are present
required_fields = [
"pwr_ctrl",
"pwr_ctrl.sonoff",
"pwr_ctrl.relay",
"flash_chip",
"flash_chip.voltage",
"programmer",
"programmer.name",
]
for field in required_fields:
current_field = data
keys = field.split(".")
for key in keys:
if key in current_field:
current_field = current_field[key]
else:
exit(f"Required field '{field}' is missing in model config.")
# Return the loaded data
return data
def power_on(self, sleep=1):
self.gpio_set(self.GPIO_POWER, "low", sleep)
time.sleep(sleep)
def power_off(self, sleep=6):
self.gpio_set(self.GPIO_POWER, "low", sleep)
time.sleep(sleep)
def reset(self, sleep=1):
self.gpio_set(self.GPIO_RESET, "low", sleep)
time.sleep(sleep)
def relay_get(self):
gpio_state = self.gpio_get(self.GPIO_RELAY)
relay_state = None
if gpio_state == "high":
relay_state = "on"
if gpio_state == "low":
relay_state = "off"
return relay_state
def relay_set(self, relay_state):
gpio_state = None
if relay_state == "on":
gpio_state = "high"
if relay_state == "off":
gpio_state = "low"
self.gpio_set(self.GPIO_RELAY, gpio_state)
def reset_cmos(self):
self.gpio_set(self.GPIO_CMOS, "low")
time.sleep(10)
self.gpio_set(self.GPIO_CMOS, "high-z")
def spi_enable(self):
voltage = self.dut_data["flash_chip"]["voltage"]
if voltage == "1.8V":
state = "high-z"
elif voltage == "3.3V":
state = "low"
else:
raise SPIWrongVoltage
self.gpio_set(self.GPIO_SPI_VOLTAGE, state)
time.sleep(2)
self.gpio_set(self.GPIO_SPI_VCC, "low")
time.sleep(2)
self.gpio_set(self.GPIO_SPI_ON, "low")
time.sleep(10)
def spi_disable(self):
self.gpio_set(self.GPIO_SPI_VCC, "high-z")
self.gpio_set(self.GPIO_SPI_ON, "high-z")
def pwr_ctrl_on(self):
if self.dut_data["pwr_ctrl"]["sonoff"] is True:
self.sonoff.turn_on()
state = self.sonoff.get_state()
if state != "ON":
raise Exception("Failed to power control ON")
elif self.dut_data["pwr_ctrl"]["relay"] is True:
self.relay_set("on")
state = self.relay_get()
if state != "on":
raise Exception("Failed to power control ON")
time.sleep(5)
def pwr_ctrl_off(self):
if self.dut_data["pwr_ctrl"]["sonoff"] is True:
self.sonoff.turn_off()
state = self.sonoff.get_state()
if state != "OFF":
raise Exception("Failed to power control OFF")
elif self.dut_data["pwr_ctrl"]["relay"] is True:
self.relay_set("off")
state = self.relay_get()
if state != "off":
raise Exception("Failed to power control OFF")
time.sleep(2)
def discharge_psu(self):
"""
Push power button 5 times in the loop to make sure the charge from PSU is dissipated
"""
for _ in range(5):
self.power_off(3)
def pwr_ctrl_before_flash(self, programmer, power_state):
# Always start from the same state (PSU active)
self.pwr_ctrl_on()
time.sleep(5)
self.power_off(6)
time.sleep(10)
# Some platforms need to enable SPI lines at this point
# when PSU is active (e.g. VP6650). Otherwise the chip is not detected.
# So we must turn the PSU ON first in the middle of power cycle,
# even if we perform flashing with the PSU OFF.
if programmer == "rte_1_1":
self.spi_enable()
time.sleep(3)
if power_state == "G3":
pass
elif power_state == "OFF":
self.pwr_ctrl_off()
self.discharge_psu()
else:
exit(
f"Power state: '{power_state}' is not supported. Please check model config."
)
def pwr_ctrl_after_flash(self, programmer):
if programmer == "rte_1_1":
self.spi_disable()
time.sleep(2)
def flash_cmd(self, args, read_file=None, write_file=None):
self.pwr_ctrl_before_flash(
self.dut_data["programmer"]["name"],
self.dut_data["pwr_ctrl"]["flashing_power_state"],
)
# Create SSH client
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# Connect to the SSH server
ssh.connect(
self.rte_ip,
username=self.SSH_USER,
password=self.SSH_PWD,
look_for_keys=False,
)
if write_file:
scp = ssh.open_sftp()
scp.put(write_file, self.FW_PATH_WRITE)
scp.close()
# Execute the flashrom command
if self.dut_data["programmer"]["name"] == "ch341a":
flashrom_programmer = self.PROGRAMMER_CH341A
else:
flashrom_programmer = self.PROGRAMMER_RTE
command = self.FLASHROM_CMD.format(
programmer=flashrom_programmer, args=args
)
print(f"Executing command: {command}")
channel = ssh.get_transport().open_session()
channel.exec_command(command)
# Print the command output in real-time
while True:
if channel.exit_status_ready():
break
print(channel.recv(1024).decode())
if read_file:
scp = ssh.open_sftp()
scp.get(self.FW_PATH_READ, read_file)
scp.close()
finally:
self.pwr_ctrl_after_flash(self.dut_data["programmer"]["name"])
# Close the SSH connection
ssh.close()
def flash_create_args(self, extra_args=""):
args = ""
# Set chip explicitly, if defined in model configuration
if "flash_chip" in self.dut_data:
if "model" in self.dut_data["flash_chip"]:
args = " ".join(["-c", self.dut_data["flash_chip"]["model"]])
if extra_args:
args = " ".join([args, extra_args])
return args
def flash_probe(self):
args = self.flash_create_args()
self.flash_cmd(args)
def flash_read(self, read_file):
args = self.flash_create_args(f"-r {self.FW_PATH_READ}")
self.flash_cmd(args, read_file=read_file)
def flash_erase(self):
args = self.flash_create_args(f"-E")
self.flash_cmd(args)
def flash_write(self, write_file):
args = self.flash_create_args(f"-w {self.FW_PATH_WRITE}")
self.flash_cmd(args, write_file=write_file)
time.sleep(2)
if "reset_cmos" in self.dut_data:
if self.dut_data["reset_cmos"] == True:
self.reset_cmos()
def sonoff_sanity_check(self):
"""
Verify that if DUT is powered by Sonoff, Sonoff IP is not None
"""
return not self.dut_data["pwr_ctrl"]["sonoff"] or self.sonoff.sonoff_ip
class IncompleteModelData(Exception):
pass
class UnsupportedDUTModel(Exception):
pass
class SPIWrongVoltage(Exception):
pass
class SonoffNotFound(Exception):
pass