-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathl2_switch_app.py
116 lines (101 loc) Β· 4 KB
/
l2_switch_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import asyncio
import logging
import os
import struct
import p4.v1.p4runtime_pb2 as p4r_pb2
from aiop4 import Client
log_format = (
"%(asctime)s - %(levelname)s [%(filename)s:%(lineno)d]"
" (%(threadName)s) %(message)s"
)
logging.basicConfig(level=logging.DEBUG, format=log_format)
log = logging.getLogger(__name__)
class L2SWClient:
"""L2SWClient."""
def __init__(
self,
client: Client,
p4info_path: str,
config_json_path: str,
multicast_group=0xAB,
ports=None,
) -> None:
"""L2SWClient."""
self.client = client
self.p4info_path = p4info_path
self.config_json_path = config_json_path
self.multicast_group = multicast_group
self.ports = ports if ports else list(range(0, 7))
self.consumer_task: asyncio.Task = None
self.keep_consuming = True
async def learn_mac(self, digest: p4r_pb2.DigestList):
"""learn_mac digest task."""
entities = []
for item in digest.data:
src_addr = item.struct.members[0].bitstring
in_port = item.struct.members[1].bitstring
smac_entry = self.client.new_table_entry(
"IngressImpl.smac",
{"hdr.ethernet.srcAddr": p4r_pb2.FieldMatch.Exact(value=src_addr)},
"NoAction",
)
dmac_entry = self.client.new_table_entry(
"IngressImpl.dmac",
{"hdr.ethernet.dstAddr": p4r_pb2.FieldMatch.Exact(value=src_addr)},
"IngressImpl.fwd",
[in_port],
)
entities.extend([smac_entry, dmac_entry])
await self.client.insert_entity(*entities)
await self.client.ack_digest_list(digest)
async def digests_consumer(self) -> None:
"""digests consumer."""
while self.keep_consuming:
msg = await self.client.queue.get()
log.debug(f"Consumer device_id {self.client.device_id} got message {msg}")
match msg.WhichOneof("update"):
case "digest":
asyncio.create_task(self.learn_mac(msg.digest))
case "error":
log.error(f"Got StreamError {msg}")
case _:
pass
async def setup_config(self) -> None:
"""Setup config."""
log.info(f"Setting up config for {self.client.host_device}")
await self.client.become_primary_or_raise(timeout=5)
self.consumer_task = asyncio.create_task(self.digests_consumer())
await self.client.set_fwd_pipeline_from_file(
self.p4info_path, self.config_json_path
)
await self.client.enable_digest(
self.client.elems_info.digests["digest_t"].preamble.id
)
await self.client.insert_multicast_group(self.multicast_group, self.ports)
table_entry = self.client.new_table_entry(
"IngressImpl.dmac",
{},
"IngressImpl.broadcast",
[struct.pack("!h", self.multicast_group)],
)
await self.client.modify_entity(table_entry)
async def main():
"""main entry point for linear_topo.py that has two switches."""
p4info_path = os.getenv(
"P4INFO_FILE", "~/repos/aiop4/examples/l2_switch/l2_switch.p4info.txt"
)
config_json_path = os.getenv(
"P4CONFIG_JSON", "~/repos/aiop4/examples/l2_switch/l2_switch.json"
)
assert os.path.isfile(os.path.expanduser(p4info_path)), p4info_path
assert os.path.isfile(os.path.expanduser(config_json_path)), config_json_path
client1 = L2SWClient(Client("localhost:9559", 1), p4info_path, config_json_path)
client2 = L2SWClient(Client("localhost:9560", 2), p4info_path, config_json_path)
await asyncio.gather(*[client1.setup_config(), client2.setup_config()])
await asyncio.gather(*[client1.digests_consumer(), client2.digests_consumer()])
if __name__ == "__main__":
try:
print("Hit <C-c> to stop this app")
asyncio.run(main())
except KeyboardInterrupt:
pass