Skip to content

Commit e562a0c

Browse files
committedJul 16, 2021
Example MIN host program to output CAN frames
1 parent 833de51 commit e562a0c

File tree

1 file changed

+199
-0
lines changed

1 file changed

+199
-0
lines changed
 

‎canpcap.py

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
"""
2+
Simple example program that reads MIN frames that contain captured CAN frames and pushes them to stdout
3+
for tshark or wireshark to use.
4+
5+
To run:
6+
7+
$ python3 canpcap.py -p [port] | tshark -i -
8+
9+
where [port] is the serial port that connects to a MIN device uploading CAN frames. See the function minmon()
10+
in the file canpico.py in the CANHack repository for an example of a simple MicroPython monitor function.
11+
"""
12+
import logging
13+
import struct
14+
from struct import unpack
15+
from time import sleep, time
16+
import sys
17+
import getopt
18+
from min import MINTransportSerial
19+
from binascii import hexlify
20+
21+
# Linux USB serial ports are of the form '/dev/ttyACM*' and the CANPico board
22+
# from Canis Labs runs two serial ports, one for the REPL console and one
23+
# for applications to use; the default CANPico firmware includes MIN support
24+
# on this second port
25+
MIN_PORT = '/dev/ttyACM1'
26+
27+
28+
def wait_for_frames(min_handler: MINTransportSerial):
29+
while True:
30+
# The polling will generally block waiting for characters on a timeout
31+
# How much CPU time this takes depends on the Python serial implementation
32+
# on the target machine
33+
frames = min_handler.poll()
34+
if frames:
35+
return frames
36+
37+
38+
class CANPCAPNG:
39+
"""
40+
The file produces:
41+
42+
- SHB (Section header block)
43+
- IDB (Interface description block)
44+
- Multiple EPBs (Extended packet blocks)
45+
46+
The SHB defines the file size and endianness.
47+
48+
The IDB defines the link type, which will be passed through into protocol decoders.
49+
- The option if_tsresol should be set to select nanosecond timestamp resolution (since 1st Jan 1970)
50+
51+
The EPBs define the CAN frames. The timestamp is 64-bit. The interface ID matches the one in the IBD.
52+
"""
53+
54+
@staticmethod
55+
def get_shb() -> bytes:
56+
return b"".join(struct.pack('>I', i) for i in [0x0a0d0d0a, 28, 0x1a2b3c4d, 0x00010000, 0xffffffff, 0xffffffff, 28])
57+
58+
@staticmethod
59+
def get_idb() -> bytes:
60+
return b"".join(struct.pack('>I', i) for i in [0x00000001, 20, 227 << 16, 0, 20])
61+
62+
@staticmethod
63+
def get_epbs(b) -> bytes:
64+
"""
65+
Returns a block of EPBs from a block of bytes from the CANPico
66+
MicroPython firmware recv() call. Events are always 19 bytes.
67+
68+
Format is:
69+
70+
n+0: flags byte
71+
bits 1 to 0: event type (0=transmitted, 1=received CAN frame, 2=CAN error, 3=FIFO overflow)
72+
bit 7: remote frame
73+
74+
n+1: timestamp (big-endian format)
75+
76+
If received frame:
77+
n+5: dlc
78+
n+6: ID filter hit
79+
n+7: CAN ID (32-bit big-endian word)
80+
n+11: payload bytes
81+
82+
CAN ID 32-bits in the following format:
83+
bit 29: IDE flag
84+
bits 28 to 18: ID A
85+
bits 17 to 0: ID B
86+
"""
87+
epbs = bytes()
88+
89+
while len(b) > 0:
90+
eventb = b[:19]
91+
b = b[19:]
92+
logging.debug("len={}".format(len(eventb)))
93+
94+
if eventb[0] & 0x3 == 1:
95+
rtr = (eventb[0] >> 7) & 1
96+
timestamp = struct.unpack('>I', eventb[1:5])[0]
97+
dlc = eventb[5]
98+
canid = struct.unpack('>I', eventb[7:11])[0]
99+
ida = (canid >> 18) & 0x7ff
100+
idb = canid & 0x3ffff
101+
ide = (canid >> 29) & 1
102+
if dlc > 8:
103+
l = 8
104+
else:
105+
l = dlc
106+
data = eventb[11:11 + l]
107+
108+
logging.debug("dlc={}".format(dlc))
109+
logging.debug("ide={}".format(ide))
110+
logging.debug("rtr={}".format(rtr))
111+
logging.debug("ida={}".format(ida))
112+
logging.debug("idb={}".format(idb))
113+
logging.debug("data={}".format(data))
114+
logging.debug("timestamp={}".format(timestamp))
115+
logging.debug(hexlify(eventb))
116+
117+
epbs += CANPCAPNG.get_epb(ida=ida,
118+
idb=idb,
119+
ide=ide,
120+
rtr=rtr,
121+
data=data,
122+
timestamp=timestamp)
123+
return epbs
124+
125+
126+
@staticmethod
127+
def get_epb(ida: int, idb: int, ide: int, rtr: int, data: bytes, timestamp: int, interface_id=0, error_frame=False) -> bytes:
128+
assert ide in [1, 0]
129+
assert rtr in [1, 0]
130+
assert ida < (1 << 11)
131+
assert idb < (1 << 18)
132+
assert len(data) <= 8
133+
134+
if ide:
135+
canid = ida << 18 | idb | 0x80000000
136+
else:
137+
canid = ida
138+
if rtr:
139+
canid |= 0x40000000
140+
if error_frame:
141+
canid = 0x20000000
142+
data = bytes([8] * 8) # Error reports need to be 8 bytes of data
143+
padded_data = (data + bytes([0] * 8))[:8]
144+
145+
data_h = struct.unpack('>I', padded_data[:4])[0]
146+
data_l = struct.unpack('>I', padded_data[4:])[0]
147+
148+
return b"".join(struct.pack('>I', i) for i in [0x00000006,
149+
48,
150+
interface_id,
151+
timestamp >> 32,
152+
timestamp & 0xffffffff,
153+
16,
154+
16,
155+
canid,
156+
len(data) << 24,
157+
data_h,
158+
data_l,
159+
48])
160+
161+
162+
if __name__ == "__main__":
163+
options = "hp:"
164+
165+
port = MIN_PORT
166+
167+
try:
168+
args, vals = getopt.getopt(sys.argv[1:], options)
169+
for arg, val in args:
170+
if arg in ("-h"):
171+
print ("-p [port]")
172+
quit()
173+
elif arg in ("-p"):
174+
port = val
175+
print("port={}".format(port))
176+
177+
except getopt.error as err:
178+
print (str(err))
179+
quit()
180+
181+
logging.basicConfig(level=logging.WARNING)
182+
183+
min_handler = MINTransportSerial(port=port)
184+
logging.debug("Polling for MIN frames")
185+
186+
187+
hdr = CANPCAPNG.get_shb() + CANPCAPNG.get_idb()
188+
sys.stdout.buffer.write(hdr)
189+
190+
min_handler.transport_reset()
191+
192+
while True:
193+
# Wait for one or more frames to come back from the serial line and print them out
194+
for frame in wait_for_frames(min_handler):
195+
if frame.min_id == 1:
196+
epbs = CANPCAPNG.get_epbs(b=frame.payload)
197+
sys.stdout.buffer.write(epbs)
198+
sys.stdout.buffer.flush()
199+

0 commit comments

Comments
 (0)