10
10
from dataclasses import dataclass
11
11
from typing import Any , Callable , Coroutine
12
12
13
- from pymodbus .framer import ModbusFramer
14
13
from pymodbus .logging import Log
15
14
from pymodbus .transport .serial_asyncio import create_serial_connection
16
15
17
16
18
- class BaseTransport :
19
- """Base class for transport types .
17
+ class Transport :
18
+ """Transport layer .
20
19
21
- BaseTransport contains functions common to all transport types and client/server.
20
+ Contains pure transport methods needed to connect/listen, send/receive and close connections
21
+ for unix socket, tcp, tls and serial communications.
22
22
23
- This class is not available in the pymodbus API, and should not be referenced in Applications.
23
+ Contains high level methods like reconnect.
24
+
25
+ This class is not available in the pymodbus API, and should not be referenced in Applications
26
+ nor in the pymodbus documentation.
27
+
28
+ The class is designed to be an object in the message level class.
24
29
"""
25
30
26
31
@dataclass
@@ -33,7 +38,6 @@ class CommParamsClass:
33
38
reconnect_delay : float = None
34
39
reconnect_delay_max : float = None
35
40
timeout_connect : float = None
36
- framer : ModbusFramer = None
37
41
38
42
# tcp / tls / udp / serial
39
43
host : str = None
@@ -60,19 +64,19 @@ def check_done(self):
60
64
def __init__ (
61
65
self ,
62
66
comm_name : str ,
63
- reconnect_delay : tuple [int , int ],
67
+ reconnect_delay : int ,
68
+ reconnect_max : int ,
64
69
timeout_connect : int ,
65
- framer : ModbusFramer ,
66
70
callback_connected : Callable [[], None ],
67
71
callback_disconnected : Callable [[Exception ], None ],
68
72
callback_data : Callable [[bytes ], int ],
69
73
) -> None :
70
74
"""Initialize a transport instance.
71
75
72
76
:param comm_name: name of this transport connection
73
- :param reconnect_delay: delay and max in milliseconds for first reconnect (0,0 for no reconnect)
77
+ :param reconnect_delay: delay in milliseconds for first reconnect (0 for no reconnect)
78
+ :param reconnect_delay: max reconnect delay in milliseconds
74
79
:param timeout_connect: Max. time in milliseconds for connect to complete
75
- :param framer: Modbus framer to decode/encode messagees.
76
80
:param callback_connected: Called when connection is established
77
81
:param callback_disconnected: Called when connection is disconnected
78
82
:param callback_data: Called when data is received
@@ -84,25 +88,25 @@ def __init__(
84
88
# properties, can be read, but may not be mingled with
85
89
self .comm_params = self .CommParamsClass (
86
90
comm_name = comm_name ,
87
- reconnect_delay = reconnect_delay [ 0 ] / 1000 ,
88
- reconnect_delay_max = reconnect_delay [ 1 ] / 1000 ,
91
+ reconnect_delay = reconnect_delay / 1000 ,
92
+ reconnect_delay_max = reconnect_max / 1000 ,
89
93
timeout_connect = timeout_connect / 1000 ,
90
- framer = framer ,
91
94
)
92
95
93
- self .reconnect_delay_current : float = 0
96
+ self .reconnect_delay_current : float = 0.0
94
97
self .transport : asyncio .BaseTransport | asyncio .Server = None
95
98
self .protocol : asyncio .BaseProtocol = None
99
+ self .loop : asyncio .AbstractEventLoop = None
96
100
with suppress (RuntimeError ):
97
- self .loop : asyncio . AbstractEventLoop = asyncio .get_running_loop ()
98
- self .reconnect_timer : asyncio .TimerHandle = None
101
+ self .loop = asyncio .get_running_loop ()
102
+ self .reconnect_task : asyncio .Task = None
99
103
self .recv_buffer : bytes = b""
100
104
self .call_connect_listen : Callable [[], Coroutine [Any , Any , Any ]] = lambda : None
101
105
self .use_udp = False
102
106
103
- # ----------------------------- #
104
- # Transport specific parameters #
105
- # ----------------------------- #
107
+ # ------------------------ #
108
+ # Transport specific setup #
109
+ # ------------------------ #
106
110
def setup_unix (self , setup_server : bool , host : str ):
107
111
"""Prepare transport unix"""
108
112
if sys .platform .startswith ("win" ):
@@ -263,6 +267,9 @@ def setup_serial(
263
267
async def transport_connect (self ):
264
268
"""Handle generic connect and call on to specific transport connect."""
265
269
Log .debug ("Connecting {}" , self .comm_params .comm_name )
270
+ if not self .loop :
271
+ self .loop = asyncio .get_running_loop ()
272
+ self .transport , self .protocol = None , None
266
273
try :
267
274
self .transport , self .protocol = await asyncio .wait_for (
268
275
self .call_connect_listen (),
@@ -295,6 +302,8 @@ def connection_made(self, transport: asyncio.BaseTransport):
295
302
:param transport: socket etc. representing the connection.
296
303
"""
297
304
Log .debug ("Connected to {}" , self .comm_params .comm_name )
305
+ if not self .loop :
306
+ self .loop = asyncio .get_running_loop ()
298
307
self .transport = transport
299
308
self .reset_delay ()
300
309
self .cb_connection_made ()
@@ -306,7 +315,9 @@ def connection_lost(self, reason: Exception):
306
315
"""
307
316
Log .debug ("Connection lost {} due to {}" , self .comm_params .comm_name , reason )
308
317
self .cb_connection_lost (reason )
309
- self .close (reconnect = True )
318
+ if self .transport :
319
+ self .close ()
320
+ self .reconnect_task = asyncio .create_task (self .reconnect_connect ())
310
321
311
322
def eof_received (self ):
312
323
"""Call when eof received (other end closed connection).
@@ -352,29 +363,11 @@ def close(self, reconnect: bool = False) -> None:
352
363
self .transport .close ()
353
364
self .transport = None
354
365
self .protocol = None
355
- if self .reconnect_timer :
356
- self .reconnect_timer .cancel ()
357
- self .reconnect_timer = None
366
+ if not reconnect and self .reconnect_task :
367
+ self .reconnect_task .cancel ()
368
+ self .reconnect_task = None
358
369
self .recv_buffer = b""
359
370
360
- if not reconnect or not self .reconnect_delay_current :
361
- self .reconnect_delay_current = 0
362
- return
363
-
364
- Log .debug (
365
- "Waiting {} {} ms reconnecting." ,
366
- self .comm_params .comm_name ,
367
- self .reconnect_delay_current * 1000 ,
368
- )
369
- self .reconnect_timer = self .loop .call_later (
370
- self .reconnect_delay_current ,
371
- asyncio .create_task ,
372
- self .transport_connect (),
373
- )
374
- self .reconnect_delay_current = min (
375
- 2 * self .reconnect_delay_current , self .comm_params .reconnect_delay_max
376
- )
377
-
378
371
def reset_delay (self ) -> None :
379
372
"""Reset wait time before next reconnect to minimal period."""
380
373
self .reconnect_delay_current = self .comm_params .reconnect_delay
@@ -386,6 +379,27 @@ def handle_listen(self):
386
379
"""Handle incoming connect."""
387
380
return self
388
381
382
+ async def reconnect_connect (self ):
383
+ """Handle reconnect as a task."""
384
+ try :
385
+ self .reconnect_delay_current = self .comm_params .reconnect_delay
386
+ transport = None
387
+ while not transport :
388
+ Log .debug (
389
+ "Wait {} {} ms before reconnecting." ,
390
+ self .comm_params .comm_name ,
391
+ self .reconnect_delay_current * 1000 ,
392
+ )
393
+ await asyncio .sleep (self .reconnect_delay_current )
394
+ transport , _protocol = await self .transport_connect ()
395
+ self .reconnect_delay_current = min (
396
+ 2 * self .reconnect_delay_current ,
397
+ self .comm_params .reconnect_delay_max ,
398
+ )
399
+ except asyncio .CancelledError :
400
+ pass
401
+ self .reconnect_task = None
402
+
389
403
# ----------------- #
390
404
# The magic methods #
391
405
# ----------------- #
0 commit comments