diff --git a/PKG-INFO b/PKG-INFO index 940416fe..19b85975 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,12 +1,12 @@ -Metadata-Version: 1.1 -Name: adsPy -Version: 1.0.2 -Summary: Python wrapper for TwinCAT ADS-DLL -Home-page: http://mrleeh.square7.ch/ -Author: Stefan Lehmann -Author-email: mrleeh@gmx.de -License: UNKNOWN -Description: UNKNOWN -Platform: UNKNOWN -Requires: ctypes -Provides: adsPy +Metadata-Version: 1.1 +Name: adsPy +Version: 1.0.2 +Summary: Python wrapper for TwinCAT ADS-DLL +Home-page: http://mrleeh.square7.ch/ +Author: Stefan Lehmann +Author-email: mrleeh@gmx.de +License: UNKNOWN +Description: UNKNOWN +Platform: UNKNOWN +Requires: ctypes +Provides: adsPy diff --git a/README.md b/README.md index a5bc050b..75246296 100644 --- a/README.md +++ b/README.md @@ -1,66 +1,66 @@ -pyads - Python package -====================== - -[![Build Status](https://travis-ci.org/stlehmann/pyads.svg?branch=master)](https://travis-ci.org/stlehmann/pyads) -[![Coverage Status](https://coveralls.io/repos/github/stlehmann/pyads/badge.svg?branch=master)](https://coveralls.io/github/stlehmann/pyads?branch=master) -[![Documentation Status](https://readthedocs.org/projects/pyads/badge/?version=latest)](http://pyads.readthedocs.io/en/latest/?badge=latest) -[![PyPI version](https://badge.fury.io/py/pyads.svg)](https://badge.fury.io/py/pyads) -[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) -[![Downloads](https://pepy.tech/badge/pyads)](https://pepy.tech/project/pyads) -[![Downloads](https://pepy.tech/badge/pyads/week)](https://pepy.tech/project/pyads/week) - -This is a python wrapper for TwinCATs ADS library. It provides python functions -for communicating with TwinCAT devices. *pyads* uses the C API provided by *TcAdsDll.dll* on Windows *adslib.so* on Linux. The documentation for the ADS API is available on [infosys.beckhoff.com](http://infosys.beckhoff.com/english.php?content=../content/1033/tcadsdll2/html/tcadsdll_api_overview.htm&id=20557). - - -Documentation: http://pyads.readthedocs.io/en/latest/index.html - -# Installation - -From PyPi: - -```bash -$ pip install pyads -``` - -From Github: - -```bash -$ git clone https://github.com/MrLeeh/pyads.git --recursive -$ cd pyads -$ python setup.py install -``` - -## Features - -* connect to a remote TwinCAT device like a plc or a PC with TwinCAT -* create routes on Linux devices and on remote plcs -* supports TwinCAT 2 and TwinCAT 3 -* read and write values by name or address -* read DUTs (structures) from the plc -* notification callbacks - -## Basic usage - -```python -import pyads - -# add route to remote plc -pyads.add_route("192.168.1.12.1.1", "192.168.1.12") - -# connect to plc and open connection -plc = pyads.Connection('127.0.0.1.1.1', pyads.PORT_SPS1) -plc.open() - -# read int value by name -i = plc.read_by_name("GVL.int_val", pyads.PLCTYPE_INT) - -# write int value by name -plc.write_by_name("GVL.int_val", i, pyads.PLCTYPE_INT) - -# close connection -plc.close() -``` - -[0]: https://infosys.beckhoff.de/english.php?content=../content/1033/TcSystemManager/Basics/TcSysMgr_AddRouteDialog.htm&id= - +pyads - Python package +====================== + +[![Build Status](https://travis-ci.org/stlehmann/pyads.svg?branch=master)](https://travis-ci.org/stlehmann/pyads) +[![Coverage Status](https://coveralls.io/repos/github/stlehmann/pyads/badge.svg?branch=master)](https://coveralls.io/github/stlehmann/pyads?branch=master) +[![Documentation Status](https://readthedocs.org/projects/pyads/badge/?version=latest)](http://pyads.readthedocs.io/en/latest/?badge=latest) +[![PyPI version](https://badge.fury.io/py/pyads.svg)](https://badge.fury.io/py/pyads) +[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) +[![Downloads](https://pepy.tech/badge/pyads)](https://pepy.tech/project/pyads) +[![Downloads](https://pepy.tech/badge/pyads/week)](https://pepy.tech/project/pyads/week) + +This is a python wrapper for TwinCATs ADS library. It provides python functions +for communicating with TwinCAT devices. *pyads* uses the C API provided by *TcAdsDll.dll* on Windows *adslib.so* on Linux. The documentation for the ADS API is available on [infosys.beckhoff.com](http://infosys.beckhoff.com/english.php?content=../content/1033/tcadsdll2/html/tcadsdll_api_overview.htm&id=20557). + + +Documentation: http://pyads.readthedocs.io/en/latest/index.html + +# Installation + +From PyPi: + +```bash +$ pip install pyads +``` + +From Github: + +```bash +$ git clone https://github.com/MrLeeh/pyads.git --recursive +$ cd pyads +$ python setup.py install +``` + +## Features + +* connect to a remote TwinCAT device like a plc or a PC with TwinCAT +* create routes on Linux devices and on remote plcs +* supports TwinCAT 2 and TwinCAT 3 +* read and write values by name or address +* read DUTs (structures) from the plc +* notification callbacks + +## Basic usage + +```python +import pyads + +# add route to remote plc +pyads.add_route("192.168.1.12.1.1", "192.168.1.12") + +# connect to plc and open connection +plc = pyads.Connection('127.0.0.1.1.1', pyads.PORT_SPS1) +plc.open() + +# read int value by name +i = plc.read_by_name("GVL.int_val", pyads.PLCTYPE_INT) + +# write int value by name +plc.write_by_name("GVL.int_val", i, pyads.PLCTYPE_INT) + +# close connection +plc.close() +``` + +[0]: https://infosys.beckhoff.de/english.php?content=../content/1033/TcSystemManager/Basics/TcSysMgr_AddRouteDialog.htm&id= + diff --git a/pyads/__init__.py b/pyads/__init__.py index 2a84867f..5cbac81f 100644 --- a/pyads/__init__.py +++ b/pyads/__init__.py @@ -1,45 +1,45 @@ -# -*- coding: utf-8 -*- -"""The pyads package. - -:author: Stefan Lehmann -:license: MIT, see license file or https://opensource.org/licenses/MIT - -:created on: 2018-06-11 18:15:53 - -""" -from .structs import AmsAddr, NotificationAttrib - -from .ads import open_port, close_port, get_local_address, read_state, \ - write_control, read_device_info, write, read_write, read, \ - read_by_name, write_by_name, add_route, add_route_to_plc, delete_route, \ - add_device_notification, del_device_notification, Connection, \ - set_local_address, set_timeout, size_of_structure, dict_from_bytes - -from .pyads_ex import ADSError - -from .constants import PLCTYPE_BOOL, PLCTYPE_BYTE, PLCTYPE_DATE, \ - PLCTYPE_DINT, PLCTYPE_DT, PLCTYPE_DWORD, PLCTYPE_INT, PLCTYPE_LREAL, \ - PLCTYPE_REAL, PLCTYPE_SINT, PLCTYPE_STRING, PLCTYPE_TIME, PLCTYPE_TOD, \ - PLCTYPE_ULINT, PLCTYPE_UDINT, PLCTYPE_UINT, PLCTYPE_USINT, PLCTYPE_WORD, \ - PLCTYPE_ARR_DINT, PLCTYPE_ARR_INT, PLCTYPE_ARR_LREAL, PLCTYPE_ARR_REAL, \ - PLCTYPE_ARR_SHORT, PLC_DEFAULT_STRING_SIZE, DATATYPE_MAP - -from .constants import PORT_EVENTLOGGER, PORT_IO, PORT_LOGGER, PORT_NC, \ - PORT_NOCKE, PORT_SCOPE, PORT_SPECIALTASK1, PORT_SPECIALTASK2, PORT_SPS1, \ - PORT_SPS2, PORT_SPS3, PORT_SPS4, PORT_SYSTEMSERVICE, \ - PORT_CAM, PORT_TC2PLC1, PORT_TC2PLC2, PORT_TC2PLC3, PORT_TC2PLC4, PORT_TC3PLC1 - -from .constants import INDEXGROUP_MEMORYBYTE, INDEXGROUP_MEMORYBIT, \ - INDEXGROUP_MEMORYSIZE, INDEXGROUP_RETAIN, INDEXGROUP_RETAINSIZE, \ - INDEXGROUP_DATA, INDEXGROUP_DATASIZE - -from .constants import ADSSTATE_INVALID, ADSSTATE_IDLE, ADSSTATE_RESET, \ - ADSSTATE_INIT, ADSSTATE_START, ADSSTATE_RUN, ADSSTATE_STOP, \ - ADSSTATE_SAVECFG, ADSSTATE_LOADCFG, ADSSTATE_POWERFAILURE, \ - ADSSTATE_POWERGOOD, ADSSTATE_ERROR, ADSSTATE_SHUTDOWN, ADSSTATE_SUSPEND, \ - ADSSTATE_RESUME, ADSSTATE_CONFIG, ADSSTATE_RECONFIG - -from .constants import ADSTRANS_NOTRANS, ADSTRANS_CLIENTCYCLE, \ - ADSTRANS_CLIENT1REQ, ADSTRANS_SERVERCYCLE, ADSTRANS_SERVERONCHA - -__version__ = '3.2.1' +# -*- coding: utf-8 -*- +"""The pyads package. + +:author: Stefan Lehmann +:license: MIT, see license file or https://opensource.org/licenses/MIT + +:created on: 2018-06-11 18:15:53 + +""" +from .structs import AmsAddr, NotificationAttrib + +from .ads import open_port, close_port, get_local_address, read_state, \ + write_control, read_device_info, write, read_write, read, \ + read_by_name, write_by_name, add_route, add_route_to_plc, delete_route, \ + add_device_notification, del_device_notification, Connection, \ + set_local_address, set_timeout, size_of_structure, dict_from_bytes + +from .pyads_ex import ADSError + +from .constants import PLCTYPE_BOOL, PLCTYPE_BYTE, PLCTYPE_DATE, \ + PLCTYPE_DINT, PLCTYPE_DT, PLCTYPE_DWORD, PLCTYPE_INT, PLCTYPE_LREAL, \ + PLCTYPE_REAL, PLCTYPE_SINT, PLCTYPE_STRING, PLCTYPE_TIME, PLCTYPE_TOD, \ + PLCTYPE_ULINT, PLCTYPE_UDINT, PLCTYPE_UINT, PLCTYPE_USINT, PLCTYPE_WORD, \ + PLCTYPE_ARR_DINT, PLCTYPE_ARR_INT, PLCTYPE_ARR_LREAL, PLCTYPE_ARR_REAL, \ + PLCTYPE_ARR_SHORT, PLC_DEFAULT_STRING_SIZE, DATATYPE_MAP + +from .constants import PORT_EVENTLOGGER, PORT_IO, PORT_LOGGER, PORT_NC, \ + PORT_NOCKE, PORT_SCOPE, PORT_SPECIALTASK1, PORT_SPECIALTASK2, PORT_SPS1, \ + PORT_SPS2, PORT_SPS3, PORT_SPS4, PORT_SYSTEMSERVICE, \ + PORT_CAM, PORT_TC2PLC1, PORT_TC2PLC2, PORT_TC2PLC3, PORT_TC2PLC4, PORT_TC3PLC1 + +from .constants import INDEXGROUP_MEMORYBYTE, INDEXGROUP_MEMORYBIT, \ + INDEXGROUP_MEMORYSIZE, INDEXGROUP_RETAIN, INDEXGROUP_RETAINSIZE, \ + INDEXGROUP_DATA, INDEXGROUP_DATASIZE + +from .constants import ADSSTATE_INVALID, ADSSTATE_IDLE, ADSSTATE_RESET, \ + ADSSTATE_INIT, ADSSTATE_START, ADSSTATE_RUN, ADSSTATE_STOP, \ + ADSSTATE_SAVECFG, ADSSTATE_LOADCFG, ADSSTATE_POWERFAILURE, \ + ADSSTATE_POWERGOOD, ADSSTATE_ERROR, ADSSTATE_SHUTDOWN, ADSSTATE_SUSPEND, \ + ADSSTATE_RESUME, ADSSTATE_CONFIG, ADSSTATE_RECONFIG + +from .constants import ADSTRANS_NOTRANS, ADSTRANS_CLIENTCYCLE, \ + ADSTRANS_CLIENT1REQ, ADSTRANS_SERVERCYCLE, ADSTRANS_SERVERONCHA + +__version__ = '3.2.1' diff --git a/pyads/ads.py b/pyads/ads.py index 75f755da..91509617 100644 --- a/pyads/ads.py +++ b/pyads/ads.py @@ -1,1150 +1,1150 @@ -"""Pythonic ADS functions. - -:author: Stefan Lehmann -:license: MIT, see license file or https://opensource.org/licenses/MIT - -:created on: 2018-06-11 18:15:53 - -""" -from typing import Optional, Union, Tuple, Any, Type, Callable, Dict -from datetime import datetime -import struct -from ctypes import memmove, addressof, c_ubyte, Array, Structure, sizeof -from collections import OrderedDict - -from .utils import platform_is_linux, deprecated -from .filetimes import filetime_to_dt - -from .pyads_ex import ( - adsAddRoute, - adsAddRouteToPLC, - adsDelRoute, - adsPortOpenEx, - adsPortCloseEx, - adsGetLocalAddressEx, - adsSyncReadStateReqEx, - adsSyncReadDeviceInfoReqEx, - adsSyncWriteControlReqEx, - adsSyncWriteReqEx, - adsSyncReadWriteReqEx2, - adsSyncReadReqEx2, - adsGetHandle, - adsReleaseHandle, - adsSyncReadByNameEx, - adsSyncWriteByNameEx, - adsSyncAddDeviceNotificationReqEx, - adsSyncDelDeviceNotificationReqEx, - adsSyncSetTimeoutEx, - adsSetLocalAddress, - ADSError, -) - -from .constants import ( - PLCTYPE_BOOL, - PLCTYPE_BYTE, - PLCTYPE_DATE, - PLCTYPE_DINT, - PLCTYPE_DT, - PLCTYPE_DWORD, - PLCTYPE_INT, - PLCTYPE_LREAL, - PLCTYPE_REAL, - PLCTYPE_SINT, - PLCTYPE_STRING, - PLCTYPE_TIME, - PLCTYPE_TOD, - PLCTYPE_UDINT, - PLCTYPE_UINT, - PLCTYPE_USINT, - PLCTYPE_WORD, - PLC_DEFAULT_STRING_SIZE, - DATATYPE_MAP, -) - -from .structs import ( - AmsAddr, - SAmsNetId, - AdsVersion, - NotificationAttrib, - SAdsNotificationHeader, -) - -linux = platform_is_linux() -port = None # type: int - - -def _parse_ams_netid(ams_netid): - # type: (str) -> SAmsNetId - """Parse an AmsNetId from *str* to *SAmsNetId*. - - :param str ams_netid: NetId as a string - :rtype: SAmsNetId - :return: NetId as a struct - - """ - try: - id_numbers = list(map(int, ams_netid.split("."))) - except ValueError: - raise ValueError("no valid netid") - - if len(id_numbers) != 6: - raise ValueError("no valid netid") - - # Fill the netId struct with data - ams_netid_st = SAmsNetId() - ams_netid_st.b = (c_ubyte * 6)(*id_numbers) - return ams_netid_st - - -def open_port(): - # type: () -> int - """Connect to the TwinCAT message router. - - :rtype: int - :return: port number - - """ - global port - - port = port or adsPortOpenEx() - return port - - -def close_port(): - # type: () -> None - """Close the connection to the TwinCAT message router.""" - global port - - if port is not None: - adsPortCloseEx(port) - port = None - - -def get_local_address(): - # type: () -> Optional[AmsAddr] - """Return the local AMS-address and the port number. - - :rtype: AmsAddr - - """ - if port is not None: - return adsGetLocalAddressEx(port) - - return None - - -def set_local_address(ams_netid): - # type: (Union[str, SAmsNetId]) -> None - """Set the local NetID (**Linux only**). - - :param str: new AmsNetID - :rtype: None - - **Usage:** - - >>> import pyads - >>> pyads.open_port() - >>> pyads.set_local_address('0.0.0.0.1.1') - - """ - if isinstance(ams_netid, str): - ams_netid_st = _parse_ams_netid(ams_netid) - else: - ams_netid_st = ams_netid - - assert isinstance(ams_netid_st, SAmsNetId) - - if linux: - return adsSetLocalAddress(ams_netid_st) - else: - raise ADSError( - text="SetLocalAddress is not supported for Windows clients." - ) # pragma: no cover - - -@deprecated() -def read_state(adr): - # type: (AmsAddr) -> Optional[Tuple[int, int]] - """Read the current ADS-state and the machine-state. - - Read the current ADS-state and the machine-state from the - ADS-server. - - :param AmsAddr adr: local or remote AmsAddr - :rtype: (int, int) - :return: adsState, deviceState - - """ - if port is not None: - return adsSyncReadStateReqEx(port, adr) - - return None - - -@deprecated() -def write_control(adr, ads_state, device_state, data, plc_datatype): - # type: (AmsAddr, int, int, Any, Type) -> None - """Change the ADS state and the machine-state of the ADS-server. - - :param AmsAddr adr: local or remote AmsAddr - :param int ads_state: new ADS-state, according to ADSTATE constants - :param int device_state: new machine-state - :param data: additional data - :param int plc_datatype: datatype, according to PLCTYPE constants - - :note: Despite changing the ADS-state and the machine-state it is possible - to send additional data to the ADS-server. For current ADS-devices - additional data is not progressed. - Every ADS-device is able to communicate its current state to other - devices. - There is a difference between the device-state and the state of the - ADS-interface (AdsState). The possible states of an ADS-interface - are defined in the ADS-specification. - - """ - if port is not None: - return adsSyncWriteControlReqEx( - port, adr, ads_state, device_state, data, plc_datatype - ) - - -@deprecated() -def read_device_info(adr): - # type: (AmsAddr) -> Optional[Tuple[str, AdsVersion]] - """Read the name and the version number of the ADS-server. - - :param AmsAddr adr: local or remote AmsAddr - :rtype: string, AdsVersion - :return: device name, version - - """ - if port is not None: - return adsSyncReadDeviceInfoReqEx(port, adr) - - return None - - -@deprecated() -def write(adr, index_group, index_offset, value, plc_datatype): - # type: (AmsAddr, int, int, Any, Type) -> None - """Send data synchronous to an ADS-device. - - :param AmsAddr adr: local or remote AmsAddr - :param int index_group: PLC storage area, according to the INDEXGROUP - constants - :param int index_offset: PLC storage address - :param value: value to write to the storage address of the PLC - :param Type plc_datatype: type of the data given to the PLC, - according to PLCTYPE constants - - """ - if port is not None: - return adsSyncWriteReqEx( - port, adr, index_group, index_offset, value, plc_datatype - ) - - -@deprecated() -def read_write( - adr, - index_group, - index_offset, - plc_read_datatype, - value, - plc_write_datatype, - return_ctypes=False, - check_length=True, -): - # type: (AmsAddr, int, int, Type, Any, Type, bool, bool) -> Any - """Read and write data synchronous from/to an ADS-device. - - :param AmsAddr adr: local or remote AmsAddr - :param int index_group: PLC storage area, according to the INDEXGROUP - constants - :param int index_offset: PLC storage address - :param Type plc_read_datatype: type of the data given to the PLC to respond - to, according to PLCTYPE constants - :param value: value to write to the storage address of the PLC - :param Type plc_write_datatype: type of the data given to the PLC, according to - PLCTYPE constants - :param bool return_ctypes: return ctypes instead of python types if True - (default: False) - :param bool check_length: check whether the amount of bytes read matches the size - of the read data type (default: True) - :rtype: PLCTYPE - :return: value: **value** - - """ - if port is not None: - return adsSyncReadWriteReqEx2( - port, - adr, - index_group, - index_offset, - plc_read_datatype, - value, - plc_write_datatype, - return_ctypes, - check_length, - ) - - return None - - -@deprecated() -def read( - adr, index_group, index_offset, plc_datatype, return_ctypes=False, check_length=True -): - # type: (AmsAddr, int, int, Type, bool, bool) -> Any - """Read data synchronous from an ADS-device. - - :param AmsAddr adr: local or remote AmsAddr - :param int index_group: PLC storage area, according to the INDEXGROUP - constants - :param int index_offset: PLC storage address - :param int plc_datatype: type of the data given to the PLC, according to - PLCTYPE constants - :param bool return_ctypes: return ctypes instead of python types if True - (default: False) - :param bool check_length: check whether the amount of bytes read matches the size - of the read data type (default: True) - :return: value: **value** - - """ - if port is not None: - return adsSyncReadReqEx2( - port, - adr, - index_group, - index_offset, - plc_datatype, - return_ctypes, - check_length, - ) - - return None - - -@deprecated() -def read_by_name(adr, data_name, plc_datatype, return_ctypes=False, check_length=True): - # type: (AmsAddr, str, Type, bool) -> Any - """Read data synchronous from an ADS-device from data name. - - :param AmsAddr adr: local or remote AmsAddr - :param string data_name: data name - :param int plc_datatype: type of the data given to the PLC, according to - PLCTYPE constants - :param bool return_ctypes: return ctypes instead of python types if True - (default: False) - :param bool check_length: check whether the amount of bytes read matches the size - of the read data type (default: True) - :return: value: **value** - - """ - if port is not None: - return adsSyncReadByNameEx( - port, adr, data_name, plc_datatype, return_ctypes, check_length=check_length - ) - - return None - - -@deprecated() -def write_by_name(adr, data_name, value, plc_datatype): - # type: (AmsAddr, str, Any, Type) -> None - """Send data synchronous to an ADS-device from data name. - - :param AmsAddr adr: local or remote AmsAddr - :param string data_name: PLC storage address - :param value: value to write to the storage address of the PLC - :param int plc_datatype: type of the data given to the PLC, - according to PLCTYPE constants - - """ - if port is not None: - return adsSyncWriteByNameEx(port, adr, data_name, value, plc_datatype) - - -def add_route(adr, ip_address): - # type: (Union[str, AmsAddr], str) -> None - """Establish a new route in the AMS Router (linux Only). - - :param adr: AMS Address of routing endpoint as str or AmsAddr object - :param str ip_address: ip address of the routing endpoint - """ - if isinstance(adr, str): - adr = AmsAddr(adr) - - return adsAddRoute(adr.netIdStruct(), ip_address) - - -def add_route_to_plc( - sending_net_id, - adding_host_name, - ip_address, - username, - password, - route_name=None, - added_net_id=None, -): - # type: (str, str, str, str, str, str, str) -> bool - """Embed a new route in the PLC. - - :param pyads.structs.SAmsNetId sending_net_id: sending net id - :param str adding_host_name: host name (or IP) of the PC being added - :param str ip_address: ip address of the PLC - :param str username: username for PLC - :param str password: password for PLC - :param str route_name: PLC side name for route, defaults to adding_host_name or the current hostname of this PC - :param pyads.structs.SAmsNetId added_net_id: net id that is being added to the PLC, defaults to sending_net_id - - """ - return adsAddRouteToPLC( - sending_net_id, - adding_host_name, - ip_address, - username, - password, - route_name=route_name, - added_net_id=added_net_id, - ) - - -def delete_route(adr): - # type: (AmsAddr) -> None - """Remove existing route from the AMS Router (Linux Only). - - :param pyads.structs.AmsAddr adr: AMS Address associated with the routing - entry which is to be removed from the router. - """ - return adsDelRoute(adr.netIdStruct()) - - -@deprecated() -def add_device_notification(adr, data, attr, callback, user_handle=None): - # type: (AmsAddr, Union[str, Tuple[int, int]], NotificationAttrib, Callable, int) -> Optional[Tuple[int, int]] # noqa: E501 - """Add a device notification. - - :param pyads.structs.AmsAddr adr: AMS Address associated with the routing - entry which is to be removed from the router. - :param Union[str, Tuple[int, int] data: PLC storage address as string or Tuple with index group and offset - :param pyads.structs.NotificationAttrib attr: object that contains - all the attributes for the definition of a notification - :param callback: callback function that gets executed in the event of a notification - - :rtype: (int, int) - :returns: notification handle, user handle - - Save the notification handle and the user handle on creating a - notification if you want to be able to remove the notification - later in your code. - - """ - if port is not None: - return adsSyncAddDeviceNotificationReqEx( - port, adr, data, attr, callback, user_handle - ) - - return None - - -@deprecated() -def del_device_notification(adr, notification_handle, user_handle): - # type: (AmsAddr, int, int) -> None - """Remove a device notification. - - :param pyads.structs.AmsAddr adr: AMS Address associated with the routing - entry which is to be removed from the router. - :param notification_handle: address of the variable that contains - the handle of the notification - :param user_handle: user handle - - """ - if port is not None: - return adsSyncDelDeviceNotificationReqEx( - port, adr, notification_handle, user_handle - ) - - -def set_timeout(ms): - # type: (int) -> None - """Set timeout.""" - if port is not None: - return adsSyncSetTimeoutEx(port, ms) - - -def size_of_structure(structure_def): - """Calculate the size of a structure in number of BYTEs. - - :param tuple structure_def: special tuple defining the structure and - types contained within it according o PLCTYPE constants - - Expected input example: - - structure_def = ( - ('rVar', pyads.PLCTYPE_LREAL, 1), - ('sVar', pyads.PLCTYPE_STRING, 2, 35), - ('rVar1', pyads.PLCTYPE_REAL, 1), - ('iVar', pyads.PLCTYPE_DINT, 1), - ('iVar1', pyads.PLCTYPE_INT, 3), - ('ivar2', pyads.PLCTYPE_UDINT, 1), - ('iVar3', pyads.PLCTYPE_UINT, 1), - ('iVar4', pyads.PLCTYPE_BYTE, 1), - ('iVar5', pyads.PLCTYPE_SINT, 1), - ('iVar6', pyads.PLCTYPE_USINT, 1), - ('bVar', pyads.PLCTYPE_BOOL, 4), - ('iVar7', pyads.PLCTYPE_WORD, 1), - ('iVar8', pyads.PLCTYPE_DWORD, 1), - ) - i.e ('Variable Name', variable type, arr size (1 if not array), - length of string (if defined in PLC)) - - If array of structure multiply structure_def input by array size - - :return: c_ubyte_Array: data size required to read/write a structure of multiple types - """ - num_of_bytes = 0 - for item in structure_def: - try: - var, plc_datatype, size = item - str_len = None - except ValueError: - var, plc_datatype, size, str_len = item - - if plc_datatype == PLCTYPE_STRING: - if str_len is not None: - num_of_bytes += (str_len + 1) * size - else: - num_of_bytes += (PLC_DEFAULT_STRING_SIZE + 1) * size - elif plc_datatype not in DATATYPE_MAP: - raise RuntimeError("Datatype not found") - else: - num_of_bytes += sizeof(plc_datatype) * size - - return c_ubyte * num_of_bytes - - -def dict_from_bytes(byte_list, structure_def, array_size=1): - """Return an ordered dict of PLC values from a list of BYTE values read from PLC. - - :param byte_list: list of byte values for an entire structure - :param tuple structure_def: special tuple defining the structure and - types contained within it according o PLCTYPE constants - - Expected input example: - - structure_def = ( - ('rVar', pyads.PLCTYPE_LREAL, 1), - ('sVar', pyads.PLCTYPE_STRING, 2, 35), - ('rVar1', pyads.PLCTYPE_REAL, 1), - ('iVar', pyads.PLCTYPE_DINT, 1), - ('iVar1', pyads.PLCTYPE_INT, 3), - ('ivar2', pyads.PLCTYPE_UDINT, 1), - ('iVar3', pyads.PLCTYPE_UINT, 1), - ('iVar4', pyads.PLCTYPE_BYTE, 1), - ('iVar5', pyads.PLCTYPE_SINT, 1), - ('iVar6', pyads.PLCTYPE_USINT, 1), - ('bVar', pyads.PLCTYPE_BOOL, 4), - ('iVar7', pyads.PLCTYPE_WORD, 1), - ('iVar8', pyads.PLCTYPE_DWORD, 1), - ) - i.e ('Variable Name', variable type, arr size (1 if not array), - length of string (if defined in PLC)) - - :param array_size: size of array if reading array of structure, defaults to 1 - :type array_size: int, optional - - :return: ordered dictionary of values for each variable type in order of structure - """ - values_list = [] - index = 0 - for structure in range(0, array_size): - values = OrderedDict() - for item in structure_def: - try: - var, plc_datatype, size = item - str_len = None - except ValueError: - var, plc_datatype, size, str_len = item - - var_array = [] - for i in range(size): - if plc_datatype == PLCTYPE_STRING: - if str_len is None: - str_len = PLC_DEFAULT_STRING_SIZE - var_array.append( - bytearray(byte_list[index : (index + (str_len + 1))]) - .partition(b"\0")[0] - .decode("utf-8") - ) - index += str_len + 1 - elif plc_datatype not in DATATYPE_MAP: - raise RuntimeError("Datatype not found. Check structure definition") - else: - n_bytes = sizeof(plc_datatype) - var_array.append( - struct.unpack( - DATATYPE_MAP[plc_datatype], - bytearray(byte_list[index : (index + n_bytes)]), - )[0] - ) - index += n_bytes - if size == 1: # if not an array, don't want a list in the dict return - values[var] = var_array[0] - else: - values[var] = var_array - values_list.append(values) - - if array_size != 1: - return values_list - else: - return values_list[0] - - -class Connection(object): - """Class for managing the connection to an ADS device. - - :ivar str ams_net_id: AMS net id of the remote device - :ivar int ams_net_port: port of the remote device - :ivar str ip_address: the ip address of the device - - :note: If no IP address is given the ip address is automatically set - to first 4 parts of the Ams net id. - - """ - - def __init__(self, ams_net_id, ams_net_port, ip_address=None): - # type: (str, int, str) -> None - self._port = None # type: Optional[int] - self._adr = AmsAddr(ams_net_id, ams_net_port) - if ip_address is None: - self.ip_address = ".".join(ams_net_id.split(".")[:4]) - else: - self.ip_address = ip_address - self._open = False - self._notifications = {} # type: Dict[int, str] - - @property - def ams_netid(self): - # type: () -> str - return self._adr.netid - - @ams_netid.setter - def ams_netid(self, netid): - # type: (str) -> None - if self._open: - raise AttributeError("Setting netid is not allowed while connection is open.") - self._adr.netid = netid - - @property - def ams_port(self): - # type: () -> int - return self._adr.port - - @ams_port.setter - def ams_port(self, port): - # type: (int) -> None - if self._open: - raise AttributeError("Setting port is not allowed while connection is open.") - self._adr.port = port - - def __enter__(self): - # type: () -> Connection - """Open on entering with-block.""" - self.open() - return self - - def __exit__(self, _type, _val, _traceback): - # type: (Type, Any, Any) -> None - """Close on leaving with-block.""" - self.close() - - def open(self): - # type: () -> None - """Connect to the TwinCAT message router.""" - if self._open: - return - - self._port = adsPortOpenEx() - - if linux: - adsAddRoute(self._adr.netIdStruct(), self.ip_address) - - self._open = True - - def close(self): - # type: () -> None - """:summary: Close the connection to the TwinCAT message router.""" - if not self._open: - return - - if linux: - adsDelRoute(self._adr.netIdStruct()) - - if self._port is not None: - adsPortCloseEx(self._port) - self._port = None - - self._open = False - - def get_local_address(self): - # type: () -> Optional[AmsAddr] - """Return the local AMS-address and the port number. - - :rtype: AmsAddr - - """ - if self._port is not None: - return adsGetLocalAddressEx(self._port) - - return None - - def read_state(self): - # type: () -> Optional[Tuple[int, int]] - """Read the current ADS-state and the machine-state. - - Read the current ADS-state and the machine-state from the ADS-server. - - :rtype: (int, int) - :return: adsState, deviceState - - """ - if self._port is not None: - return adsSyncReadStateReqEx(self._port, self._adr) - - return None - - def write_control(self, ads_state, device_state, data, plc_datatype): - # type: (int, int, Any, Type) -> None - """Change the ADS state and the machine-state of the ADS-server. - - :param int ads_state: new ADS-state, according to ADSTATE constants - :param int device_state: new machine-state - :param data: additional data - :param int plc_datatype: datatype, according to PLCTYPE constants - - :note: Despite changing the ADS-state and the machine-state it is - possible to send additional data to the ADS-server. For current - ADS-devices additional data is not progressed. - Every ADS-device is able to communicate its current state to other - devices. There is a difference between the device-state and the - state of the ADS-interface (AdsState). The possible states of an - ADS-interface are defined in the ADS-specification. - - """ - if self._port is not None: - return adsSyncWriteControlReqEx( - self._port, self._adr, ads_state, device_state, data, plc_datatype - ) - - def read_device_info(self): - # type: () -> Optional[Tuple[str, AdsVersion]] - """Read the name and the version number of the ADS-server. - - :rtype: string, AdsVersion - :return: device name, version - - """ - if self._port is not None: - return adsSyncReadDeviceInfoReqEx(self._port, self._adr) - - return None - - def write(self, index_group, index_offset, value, plc_datatype): - # type: (int, int, Any, Type) -> None - """Send data synchronous to an ADS-device. - - :param int index_group: PLC storage area, according to the INDEXGROUP - constants - :param int index_offset: PLC storage address - :param value: value to write to the storage address of the PLC - :param int plc_datatype: type of the data given to the PLC, - according to PLCTYPE constants - - """ - if self._port is not None: - return adsSyncWriteReqEx( - self._port, self._adr, index_group, index_offset, value, plc_datatype - ) - - def read_write( - self, - index_group, - index_offset, - plc_read_datatype, - value, - plc_write_datatype, - return_ctypes=False, - check_length=True, - ): - # type: (int, int, Optional[Type], Any, Optional[Type], bool, bool) -> Any - """Read and write data synchronous from/to an ADS-device. - - :param int index_group: PLC storage area, according to the INDEXGROUP - constants - :param int index_offset: PLC storage address - :param Type plc_read_datatype: type of the data given to the PLC to respond to, - according to PLCTYPE constants, or None to not read anything - :param value: value to write to the storage address of the PLC - :param Type plc_write_datatype: type of the data given to the PLC, according to - PLCTYPE constants, or None to not write anything - :param bool return_ctypes: return ctypes instead of python types if True - (default: False) - :param bool check_length: check whether the amount of bytes read matches the size - of the read data type (default: True) - :return: value: **value** - - """ - if self._port is not None: - return adsSyncReadWriteReqEx2( - self._port, - self._adr, - index_group, - index_offset, - plc_read_datatype, - value, - plc_write_datatype, - return_ctypes, - check_length, - ) - - return None - - def read( - self, - index_group, - index_offset, - plc_datatype, - return_ctypes=False, - check_length=True, - ): - # type: (int, int, Type, bool, bool) -> Any - """Read data synchronous from an ADS-device. - - :param int index_group: PLC storage area, according to the INDEXGROUP - constants - :param int index_offset: PLC storage address - :param int plc_datatype: type of the data given to the PLC, according - to PLCTYPE constants - :return: value: **value** - :param bool return_ctypes: return ctypes instead of python types if True - (default: False) - :param bool check_length: check whether the amount of bytes read matches the size - of the read data type (default: True) - - """ - if self._port is not None: - return adsSyncReadReqEx2( - self._port, - self._adr, - index_group, - index_offset, - plc_datatype, - return_ctypes, - check_length, - ) - - return None - - def get_handle(self, data_name): - # type: (str) -> int - """Get the handle of the PLC-variable, handles obtained using this - method should be released using method 'release_handle'. - - :param string data_name: data name - - :rtype: int - :return: int: PLC-variable handle - """ - if self._port is not None: - return adsGetHandle(self._port, self._adr, data_name) - - return None - - def release_handle(self, handle): - # type: (int) -> None - """ Release handle of a PLC-variable. - - :param int handle: handle of PLC-variable to be released - """ - if self._port is not None: - adsReleaseHandle(self._port, self._adr, handle) - - def read_by_name( - self, - data_name, - plc_datatype, - return_ctypes=False, - handle=None, - check_length=True, - ): - # type: (str, Type, bool, int) -> Any - """Read data synchronous from an ADS-device from data name. - - :param string data_name: data name, can be empty string if handle is used - :param int plc_datatype: type of the data given to the PLC, according - to PLCTYPE constants - :return: value: **value** - :param bool return_ctypes: return ctypes instead of python types if True - (default: False) - :param int handle: PLC-variable handle, pass in handle if previously - obtained to speed up reading (default: None) - :param bool check_length: check whether the amount of bytes read matches the size - of the read data type (default: True) - - """ - if self._port: - return adsSyncReadByNameEx( - self._port, - self._adr, - data_name, - plc_datatype, - return_ctypes=return_ctypes, - handle=handle, - check_length=check_length, - ) - - return None - - def read_structure_by_name( - self, data_name, structure_def, array_size=1, structure_size=None, handle=None - ): - """Read a structure of multiple types. - - :param string data_name: data name - :param tuple structure_def: special tuple defining the structure and - types contained within it according to PLCTYPE constants, must match - the structure defined in the PLC, PLC structure must be defined with - {attribute 'pack_mode' := '1'} - - Expected input example: - structure_def = ( - ('rVar', pyads.PLCTYPE_LREAL, 1), - ('sVar', pyads.PLCTYPE_STRING, 2, 35), - ('rVar1', pyads.PLCTYPE_REAL, 1), - ('iVar', pyads.PLCTYPE_DINT, 1), - ('iVar1', pyads.PLCTYPE_INT, 3), - ('ivar2', pyads.PLCTYPE_UDINT, 1), - ('iVar3', pyads.PLCTYPE_UINT, 1), - ('iVar4', pyads.PLCTYPE_BYTE, 1), - ('iVar5', pyads.PLCTYPE_SINT, 1), - ('iVar6', pyads.PLCTYPE_USINT, 1), - ('bVar', pyads.PLCTYPE_BOOL, 4), - ('iVar7', pyads.PLCTYPE_WORD, 1), - ('iVar8', pyads.PLCTYPE_DWORD, 1), - ) - i.e ('Variable Name', variable type, arr size (1 if not array), - length of string (if defined in PLC)) - - :param array_size: size of array if reading array of structure, defaults to 1 - :type array_size: int, optional - :param structure_size: size of structure if known by previous use of - size_of_structure, defaults to None - :type structure_size: , optional - :param handle: PLC-variable handle, pass in handle if previously - obtained to speed up reading, defaults to None - :type handle: int, optional - - :return: values_dict: ordered dictionary of all values corresponding to the structure - definition - """ - if structure_size is None: - structure_size = size_of_structure(structure_def * array_size) - values = self.read_by_name(data_name, structure_size, handle=handle) - if values is not None: - return dict_from_bytes(values, structure_def, array_size=array_size) - - return None - - def write_by_name(self, data_name, value, plc_datatype, handle=None): - # type: (str, Any, Type, int) -> None - """Send data synchronous to an ADS-device from data name. - - :param string data_name: data name, can be empty string if handle is used - :param value: value to write to the storage address of the PLC - :param int plc_datatype: type of the data given to the PLC, - according to PLCTYPE constants - :param int handle: PLC-variable handle, pass in handle if previously - obtained to speed up writing (default: None) - """ - if self._port: - return adsSyncWriteByNameEx( - self._port, self._adr, data_name, value, plc_datatype, handle=handle - ) - - def add_device_notification(self, data, attr, callback, user_handle=None): - # type: (Union[str, Tuple[int, int]], NotificationAttrib, Callable, int) -> Optional[Tuple[int, int]] - """Add a device notification. - - :param Union[str, Tuple[int, int] data: PLC storage address as string or Tuple with index group and offset - :param pyads.structs.NotificationAttrib attr: object that contains - all the attributes for the definition of a notification - :param callback: callback function that gets executed in the event of a notification - - :rtype: (int, int) - :returns: notification handle, user handle - - Save the notification handle and the user handle on creating a - notification if you want to be able to remove the notification - later in your code. - - **Usage**: - - >>> import pyads - >>> from ctypes import size_of - >>> - >>> # Connect to the local TwinCAT PLC - >>> plc = pyads.Connection('127.0.0.1.1.1', 851) - >>> - >>> # Create callback function that prints the value - >>> def mycallback(notification, data): - >>> contents = notification.contents - >>> value = next( - >>> map(int, - >>> bytearray(contents.data)[0:contents.cbSampleSize]) - >>> ) - >>> print(value) - >>> - >>> with plc: - >>> # Add notification with default settings - >>> attr = pyads.NotificationAttrib(size_of(pyads.PLCTYPE_INT)) - >>> - >>> handles = plc.add_device_notification("GVL.myvalue", attr, mycallback) - >>> - >>> # Remove notification - >>> plc.del_device_notification(handles) - - """ - if self._port is not None: - notification_handle, user_handle = adsSyncAddDeviceNotificationReqEx( - self._port, self._adr, data, attr, callback, user_handle - ) - return notification_handle, user_handle - - return None - - def del_device_notification(self, notification_handle, user_handle): - # type: (int, int) -> None - """Remove a device notification. - - :param notification_handle: address of the variable that contains - the handle of the notification - :param user_handle: user handle - - """ - if self._port is not None: - adsSyncDelDeviceNotificationReqEx( - self._port, self._adr, notification_handle, user_handle - ) - - @property - def is_open(self): - # type: () -> bool - """Show the current connection state. - - :return: True if connection is open - - """ - return self._open - - def set_timeout(self, ms): - # type: (int) -> None - """Set Timeout.""" - if self._port is not None: - adsSyncSetTimeoutEx(self._port, ms) - - def notification(self, plc_datatype=None, timestamp_as_filetime=False): - # type: (Optional[Type], bool) -> Callable - """Decorate a callback function. - - **Decorator**. - - A decorator that can be used for callback functions in order to - convert the data of the NotificationHeader into the fitting - Python type. - - :param plc_datatype: The PLC datatype that needs to be converted. This can - be any basic PLC datatype or a `ctypes.Structure`. - :param timestamp_as_filetime: Whether the notification timestamp should be returned - as `datetime.datetime` (False) or Windows `FILETIME` as originally transmitted - via ADS (True). Be aware that the precision of `datetime.datetime` is limited to - microseconds, while FILETIME allows for 100 ns. This may be relevant when using - task cycle times such as 62.5 µs. Default: False. - - The callback functions need to be of the following type: - - >>> def callback(handle, name, timestamp, value) - - * `handle`: the notification handle - * `name`: the variable name - * `timestamp`: the timestamp as datetime value - * `value`: the converted value of the variable - - **Usage**: - - >>> import pyads - >>> - >>> plc = pyads.Connection('172.18.3.25.1.1', 851) - >>> - >>> - >>> @plc.notification(pyads.PLCTYPE_STRING) - >>> def callback(handle, name, timestamp, value): - >>> print(handle, name, timestamp, value) - >>> - >>> - >>> with plc: - >>> attr = pyads.NotificationAttrib(20, - >>> pyads.ADSTRANS_SERVERCYCLE) - >>> handles = plc.add_device_notification('GVL.test', attr, - >>> callback) - >>> while True: - >>> pass - - """ - - def notification_decorator(func): - # type: (Union[Callable[[int, str, datetime, Any], None], Callable[[int, str, int, Any], None]]) -> Callable[[Any, str], None] # noqa: E501 - - def func_wrapper(notification, data_name): - # type: (Any, str) -> None - contents = notification.contents - data_size = contents.cbSampleSize - # Get dynamically sized data array - data = (c_ubyte * data_size).from_address( - addressof(contents) + SAdsNotificationHeader.data.offset - ) - - if plc_datatype == PLCTYPE_STRING: - # read only until null-termination character - value = bytearray(data).split(b"\0", 1)[0].decode("utf-8") - - elif plc_datatype is not None and issubclass(plc_datatype, Structure): - value = plc_datatype() - fit_size = min(data_size, sizeof(value)) - memmove(addressof(value), addressof(data), fit_size) - - elif plc_datatype is not None and issubclass(plc_datatype, Array): - if data_size == sizeof(plc_datatype): - value = list(plc_datatype.from_buffer_copy(bytes(data))) - else: - # invalid size - value = None - - elif plc_datatype not in DATATYPE_MAP: - value = bytearray(data) - - else: - value = struct.unpack(DATATYPE_MAP[plc_datatype], bytearray(data))[ - 0 - ] - - if timestamp_as_filetime: - timestamp = contents.nTimeStamp - else: - timestamp = filetime_to_dt(contents.nTimeStamp) - - return func(contents.hNotification, data_name, timestamp, value) - - return func_wrapper - - return notification_decorator +"""Pythonic ADS functions. + +:author: Stefan Lehmann +:license: MIT, see license file or https://opensource.org/licenses/MIT + +:created on: 2018-06-11 18:15:53 + +""" +from typing import Optional, Union, Tuple, Any, Type, Callable, Dict +from datetime import datetime +import struct +from ctypes import memmove, addressof, c_ubyte, Array, Structure, sizeof +from collections import OrderedDict + +from .utils import platform_is_linux, deprecated +from .filetimes import filetime_to_dt + +from .pyads_ex import ( + adsAddRoute, + adsAddRouteToPLC, + adsDelRoute, + adsPortOpenEx, + adsPortCloseEx, + adsGetLocalAddressEx, + adsSyncReadStateReqEx, + adsSyncReadDeviceInfoReqEx, + adsSyncWriteControlReqEx, + adsSyncWriteReqEx, + adsSyncReadWriteReqEx2, + adsSyncReadReqEx2, + adsGetHandle, + adsReleaseHandle, + adsSyncReadByNameEx, + adsSyncWriteByNameEx, + adsSyncAddDeviceNotificationReqEx, + adsSyncDelDeviceNotificationReqEx, + adsSyncSetTimeoutEx, + adsSetLocalAddress, + ADSError, +) + +from .constants import ( + PLCTYPE_BOOL, + PLCTYPE_BYTE, + PLCTYPE_DATE, + PLCTYPE_DINT, + PLCTYPE_DT, + PLCTYPE_DWORD, + PLCTYPE_INT, + PLCTYPE_LREAL, + PLCTYPE_REAL, + PLCTYPE_SINT, + PLCTYPE_STRING, + PLCTYPE_TIME, + PLCTYPE_TOD, + PLCTYPE_UDINT, + PLCTYPE_UINT, + PLCTYPE_USINT, + PLCTYPE_WORD, + PLC_DEFAULT_STRING_SIZE, + DATATYPE_MAP, +) + +from .structs import ( + AmsAddr, + SAmsNetId, + AdsVersion, + NotificationAttrib, + SAdsNotificationHeader, +) + +linux = platform_is_linux() +port = None # type: int + + +def _parse_ams_netid(ams_netid): + # type: (str) -> SAmsNetId + """Parse an AmsNetId from *str* to *SAmsNetId*. + + :param str ams_netid: NetId as a string + :rtype: SAmsNetId + :return: NetId as a struct + + """ + try: + id_numbers = list(map(int, ams_netid.split("."))) + except ValueError: + raise ValueError("no valid netid") + + if len(id_numbers) != 6: + raise ValueError("no valid netid") + + # Fill the netId struct with data + ams_netid_st = SAmsNetId() + ams_netid_st.b = (c_ubyte * 6)(*id_numbers) + return ams_netid_st + + +def open_port(): + # type: () -> int + """Connect to the TwinCAT message router. + + :rtype: int + :return: port number + + """ + global port + + port = port or adsPortOpenEx() + return port + + +def close_port(): + # type: () -> None + """Close the connection to the TwinCAT message router.""" + global port + + if port is not None: + adsPortCloseEx(port) + port = None + + +def get_local_address(): + # type: () -> Optional[AmsAddr] + """Return the local AMS-address and the port number. + + :rtype: AmsAddr + + """ + if port is not None: + return adsGetLocalAddressEx(port) + + return None + + +def set_local_address(ams_netid): + # type: (Union[str, SAmsNetId]) -> None + """Set the local NetID (**Linux only**). + + :param str: new AmsNetID + :rtype: None + + **Usage:** + + >>> import pyads + >>> pyads.open_port() + >>> pyads.set_local_address('0.0.0.0.1.1') + + """ + if isinstance(ams_netid, str): + ams_netid_st = _parse_ams_netid(ams_netid) + else: + ams_netid_st = ams_netid + + assert isinstance(ams_netid_st, SAmsNetId) + + if linux: + return adsSetLocalAddress(ams_netid_st) + else: + raise ADSError( + text="SetLocalAddress is not supported for Windows clients." + ) # pragma: no cover + + +@deprecated() +def read_state(adr): + # type: (AmsAddr) -> Optional[Tuple[int, int]] + """Read the current ADS-state and the machine-state. + + Read the current ADS-state and the machine-state from the + ADS-server. + + :param AmsAddr adr: local or remote AmsAddr + :rtype: (int, int) + :return: adsState, deviceState + + """ + if port is not None: + return adsSyncReadStateReqEx(port, adr) + + return None + + +@deprecated() +def write_control(adr, ads_state, device_state, data, plc_datatype): + # type: (AmsAddr, int, int, Any, Type) -> None + """Change the ADS state and the machine-state of the ADS-server. + + :param AmsAddr adr: local or remote AmsAddr + :param int ads_state: new ADS-state, according to ADSTATE constants + :param int device_state: new machine-state + :param data: additional data + :param int plc_datatype: datatype, according to PLCTYPE constants + + :note: Despite changing the ADS-state and the machine-state it is possible + to send additional data to the ADS-server. For current ADS-devices + additional data is not progressed. + Every ADS-device is able to communicate its current state to other + devices. + There is a difference between the device-state and the state of the + ADS-interface (AdsState). The possible states of an ADS-interface + are defined in the ADS-specification. + + """ + if port is not None: + return adsSyncWriteControlReqEx( + port, adr, ads_state, device_state, data, plc_datatype + ) + + +@deprecated() +def read_device_info(adr): + # type: (AmsAddr) -> Optional[Tuple[str, AdsVersion]] + """Read the name and the version number of the ADS-server. + + :param AmsAddr adr: local or remote AmsAddr + :rtype: string, AdsVersion + :return: device name, version + + """ + if port is not None: + return adsSyncReadDeviceInfoReqEx(port, adr) + + return None + + +@deprecated() +def write(adr, index_group, index_offset, value, plc_datatype): + # type: (AmsAddr, int, int, Any, Type) -> None + """Send data synchronous to an ADS-device. + + :param AmsAddr adr: local or remote AmsAddr + :param int index_group: PLC storage area, according to the INDEXGROUP + constants + :param int index_offset: PLC storage address + :param value: value to write to the storage address of the PLC + :param Type plc_datatype: type of the data given to the PLC, + according to PLCTYPE constants + + """ + if port is not None: + return adsSyncWriteReqEx( + port, adr, index_group, index_offset, value, plc_datatype + ) + + +@deprecated() +def read_write( + adr, + index_group, + index_offset, + plc_read_datatype, + value, + plc_write_datatype, + return_ctypes=False, + check_length=True, +): + # type: (AmsAddr, int, int, Type, Any, Type, bool, bool) -> Any + """Read and write data synchronous from/to an ADS-device. + + :param AmsAddr adr: local or remote AmsAddr + :param int index_group: PLC storage area, according to the INDEXGROUP + constants + :param int index_offset: PLC storage address + :param Type plc_read_datatype: type of the data given to the PLC to respond + to, according to PLCTYPE constants + :param value: value to write to the storage address of the PLC + :param Type plc_write_datatype: type of the data given to the PLC, according to + PLCTYPE constants + :param bool return_ctypes: return ctypes instead of python types if True + (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) + :rtype: PLCTYPE + :return: value: **value** + + """ + if port is not None: + return adsSyncReadWriteReqEx2( + port, + adr, + index_group, + index_offset, + plc_read_datatype, + value, + plc_write_datatype, + return_ctypes, + check_length, + ) + + return None + + +@deprecated() +def read( + adr, index_group, index_offset, plc_datatype, return_ctypes=False, check_length=True +): + # type: (AmsAddr, int, int, Type, bool, bool) -> Any + """Read data synchronous from an ADS-device. + + :param AmsAddr adr: local or remote AmsAddr + :param int index_group: PLC storage area, according to the INDEXGROUP + constants + :param int index_offset: PLC storage address + :param int plc_datatype: type of the data given to the PLC, according to + PLCTYPE constants + :param bool return_ctypes: return ctypes instead of python types if True + (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) + :return: value: **value** + + """ + if port is not None: + return adsSyncReadReqEx2( + port, + adr, + index_group, + index_offset, + plc_datatype, + return_ctypes, + check_length, + ) + + return None + + +@deprecated() +def read_by_name(adr, data_name, plc_datatype, return_ctypes=False, check_length=True): + # type: (AmsAddr, str, Type, bool) -> Any + """Read data synchronous from an ADS-device from data name. + + :param AmsAddr adr: local or remote AmsAddr + :param string data_name: data name + :param int plc_datatype: type of the data given to the PLC, according to + PLCTYPE constants + :param bool return_ctypes: return ctypes instead of python types if True + (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) + :return: value: **value** + + """ + if port is not None: + return adsSyncReadByNameEx( + port, adr, data_name, plc_datatype, return_ctypes, check_length=check_length + ) + + return None + + +@deprecated() +def write_by_name(adr, data_name, value, plc_datatype): + # type: (AmsAddr, str, Any, Type) -> None + """Send data synchronous to an ADS-device from data name. + + :param AmsAddr adr: local or remote AmsAddr + :param string data_name: PLC storage address + :param value: value to write to the storage address of the PLC + :param int plc_datatype: type of the data given to the PLC, + according to PLCTYPE constants + + """ + if port is not None: + return adsSyncWriteByNameEx(port, adr, data_name, value, plc_datatype) + + +def add_route(adr, ip_address): + # type: (Union[str, AmsAddr], str) -> None + """Establish a new route in the AMS Router (linux Only). + + :param adr: AMS Address of routing endpoint as str or AmsAddr object + :param str ip_address: ip address of the routing endpoint + """ + if isinstance(adr, str): + adr = AmsAddr(adr) + + return adsAddRoute(adr.netIdStruct(), ip_address) + + +def add_route_to_plc( + sending_net_id, + adding_host_name, + ip_address, + username, + password, + route_name=None, + added_net_id=None, +): + # type: (str, str, str, str, str, str, str) -> bool + """Embed a new route in the PLC. + + :param pyads.structs.SAmsNetId sending_net_id: sending net id + :param str adding_host_name: host name (or IP) of the PC being added + :param str ip_address: ip address of the PLC + :param str username: username for PLC + :param str password: password for PLC + :param str route_name: PLC side name for route, defaults to adding_host_name or the current hostname of this PC + :param pyads.structs.SAmsNetId added_net_id: net id that is being added to the PLC, defaults to sending_net_id + + """ + return adsAddRouteToPLC( + sending_net_id, + adding_host_name, + ip_address, + username, + password, + route_name=route_name, + added_net_id=added_net_id, + ) + + +def delete_route(adr): + # type: (AmsAddr) -> None + """Remove existing route from the AMS Router (Linux Only). + + :param pyads.structs.AmsAddr adr: AMS Address associated with the routing + entry which is to be removed from the router. + """ + return adsDelRoute(adr.netIdStruct()) + + +@deprecated() +def add_device_notification(adr, data, attr, callback, user_handle=None): + # type: (AmsAddr, Union[str, Tuple[int, int]], NotificationAttrib, Callable, int) -> Optional[Tuple[int, int]] # noqa: E501 + """Add a device notification. + + :param pyads.structs.AmsAddr adr: AMS Address associated with the routing + entry which is to be removed from the router. + :param Union[str, Tuple[int, int] data: PLC storage address as string or Tuple with index group and offset + :param pyads.structs.NotificationAttrib attr: object that contains + all the attributes for the definition of a notification + :param callback: callback function that gets executed in the event of a notification + + :rtype: (int, int) + :returns: notification handle, user handle + + Save the notification handle and the user handle on creating a + notification if you want to be able to remove the notification + later in your code. + + """ + if port is not None: + return adsSyncAddDeviceNotificationReqEx( + port, adr, data, attr, callback, user_handle + ) + + return None + + +@deprecated() +def del_device_notification(adr, notification_handle, user_handle): + # type: (AmsAddr, int, int) -> None + """Remove a device notification. + + :param pyads.structs.AmsAddr adr: AMS Address associated with the routing + entry which is to be removed from the router. + :param notification_handle: address of the variable that contains + the handle of the notification + :param user_handle: user handle + + """ + if port is not None: + return adsSyncDelDeviceNotificationReqEx( + port, adr, notification_handle, user_handle + ) + + +def set_timeout(ms): + # type: (int) -> None + """Set timeout.""" + if port is not None: + return adsSyncSetTimeoutEx(port, ms) + + +def size_of_structure(structure_def): + """Calculate the size of a structure in number of BYTEs. + + :param tuple structure_def: special tuple defining the structure and + types contained within it according o PLCTYPE constants + + Expected input example: + + structure_def = ( + ('rVar', pyads.PLCTYPE_LREAL, 1), + ('sVar', pyads.PLCTYPE_STRING, 2, 35), + ('rVar1', pyads.PLCTYPE_REAL, 1), + ('iVar', pyads.PLCTYPE_DINT, 1), + ('iVar1', pyads.PLCTYPE_INT, 3), + ('ivar2', pyads.PLCTYPE_UDINT, 1), + ('iVar3', pyads.PLCTYPE_UINT, 1), + ('iVar4', pyads.PLCTYPE_BYTE, 1), + ('iVar5', pyads.PLCTYPE_SINT, 1), + ('iVar6', pyads.PLCTYPE_USINT, 1), + ('bVar', pyads.PLCTYPE_BOOL, 4), + ('iVar7', pyads.PLCTYPE_WORD, 1), + ('iVar8', pyads.PLCTYPE_DWORD, 1), + ) + i.e ('Variable Name', variable type, arr size (1 if not array), + length of string (if defined in PLC)) + + If array of structure multiply structure_def input by array size + + :return: c_ubyte_Array: data size required to read/write a structure of multiple types + """ + num_of_bytes = 0 + for item in structure_def: + try: + var, plc_datatype, size = item + str_len = None + except ValueError: + var, plc_datatype, size, str_len = item + + if plc_datatype == PLCTYPE_STRING: + if str_len is not None: + num_of_bytes += (str_len + 1) * size + else: + num_of_bytes += (PLC_DEFAULT_STRING_SIZE + 1) * size + elif plc_datatype not in DATATYPE_MAP: + raise RuntimeError("Datatype not found") + else: + num_of_bytes += sizeof(plc_datatype) * size + + return c_ubyte * num_of_bytes + + +def dict_from_bytes(byte_list, structure_def, array_size=1): + """Return an ordered dict of PLC values from a list of BYTE values read from PLC. + + :param byte_list: list of byte values for an entire structure + :param tuple structure_def: special tuple defining the structure and + types contained within it according o PLCTYPE constants + + Expected input example: + + structure_def = ( + ('rVar', pyads.PLCTYPE_LREAL, 1), + ('sVar', pyads.PLCTYPE_STRING, 2, 35), + ('rVar1', pyads.PLCTYPE_REAL, 1), + ('iVar', pyads.PLCTYPE_DINT, 1), + ('iVar1', pyads.PLCTYPE_INT, 3), + ('ivar2', pyads.PLCTYPE_UDINT, 1), + ('iVar3', pyads.PLCTYPE_UINT, 1), + ('iVar4', pyads.PLCTYPE_BYTE, 1), + ('iVar5', pyads.PLCTYPE_SINT, 1), + ('iVar6', pyads.PLCTYPE_USINT, 1), + ('bVar', pyads.PLCTYPE_BOOL, 4), + ('iVar7', pyads.PLCTYPE_WORD, 1), + ('iVar8', pyads.PLCTYPE_DWORD, 1), + ) + i.e ('Variable Name', variable type, arr size (1 if not array), + length of string (if defined in PLC)) + + :param array_size: size of array if reading array of structure, defaults to 1 + :type array_size: int, optional + + :return: ordered dictionary of values for each variable type in order of structure + """ + values_list = [] + index = 0 + for structure in range(0, array_size): + values = OrderedDict() + for item in structure_def: + try: + var, plc_datatype, size = item + str_len = None + except ValueError: + var, plc_datatype, size, str_len = item + + var_array = [] + for i in range(size): + if plc_datatype == PLCTYPE_STRING: + if str_len is None: + str_len = PLC_DEFAULT_STRING_SIZE + var_array.append( + bytearray(byte_list[index : (index + (str_len + 1))]) + .partition(b"\0")[0] + .decode("utf-8") + ) + index += str_len + 1 + elif plc_datatype not in DATATYPE_MAP: + raise RuntimeError("Datatype not found. Check structure definition") + else: + n_bytes = sizeof(plc_datatype) + var_array.append( + struct.unpack( + DATATYPE_MAP[plc_datatype], + bytearray(byte_list[index : (index + n_bytes)]), + )[0] + ) + index += n_bytes + if size == 1: # if not an array, don't want a list in the dict return + values[var] = var_array[0] + else: + values[var] = var_array + values_list.append(values) + + if array_size != 1: + return values_list + else: + return values_list[0] + + +class Connection(object): + """Class for managing the connection to an ADS device. + + :ivar str ams_net_id: AMS net id of the remote device + :ivar int ams_net_port: port of the remote device + :ivar str ip_address: the ip address of the device + + :note: If no IP address is given the ip address is automatically set + to first 4 parts of the Ams net id. + + """ + + def __init__(self, ams_net_id, ams_net_port, ip_address=None): + # type: (str, int, str) -> None + self._port = None # type: Optional[int] + self._adr = AmsAddr(ams_net_id, ams_net_port) + if ip_address is None: + self.ip_address = ".".join(ams_net_id.split(".")[:4]) + else: + self.ip_address = ip_address + self._open = False + self._notifications = {} # type: Dict[int, str] + + @property + def ams_netid(self): + # type: () -> str + return self._adr.netid + + @ams_netid.setter + def ams_netid(self, netid): + # type: (str) -> None + if self._open: + raise AttributeError("Setting netid is not allowed while connection is open.") + self._adr.netid = netid + + @property + def ams_port(self): + # type: () -> int + return self._adr.port + + @ams_port.setter + def ams_port(self, port): + # type: (int) -> None + if self._open: + raise AttributeError("Setting port is not allowed while connection is open.") + self._adr.port = port + + def __enter__(self): + # type: () -> Connection + """Open on entering with-block.""" + self.open() + return self + + def __exit__(self, _type, _val, _traceback): + # type: (Type, Any, Any) -> None + """Close on leaving with-block.""" + self.close() + + def open(self): + # type: () -> None + """Connect to the TwinCAT message router.""" + if self._open: + return + + self._port = adsPortOpenEx() + + if linux: + adsAddRoute(self._adr.netIdStruct(), self.ip_address) + + self._open = True + + def close(self): + # type: () -> None + """:summary: Close the connection to the TwinCAT message router.""" + if not self._open: + return + + if linux: + adsDelRoute(self._adr.netIdStruct()) + + if self._port is not None: + adsPortCloseEx(self._port) + self._port = None + + self._open = False + + def get_local_address(self): + # type: () -> Optional[AmsAddr] + """Return the local AMS-address and the port number. + + :rtype: AmsAddr + + """ + if self._port is not None: + return adsGetLocalAddressEx(self._port) + + return None + + def read_state(self): + # type: () -> Optional[Tuple[int, int]] + """Read the current ADS-state and the machine-state. + + Read the current ADS-state and the machine-state from the ADS-server. + + :rtype: (int, int) + :return: adsState, deviceState + + """ + if self._port is not None: + return adsSyncReadStateReqEx(self._port, self._adr) + + return None + + def write_control(self, ads_state, device_state, data, plc_datatype): + # type: (int, int, Any, Type) -> None + """Change the ADS state and the machine-state of the ADS-server. + + :param int ads_state: new ADS-state, according to ADSTATE constants + :param int device_state: new machine-state + :param data: additional data + :param int plc_datatype: datatype, according to PLCTYPE constants + + :note: Despite changing the ADS-state and the machine-state it is + possible to send additional data to the ADS-server. For current + ADS-devices additional data is not progressed. + Every ADS-device is able to communicate its current state to other + devices. There is a difference between the device-state and the + state of the ADS-interface (AdsState). The possible states of an + ADS-interface are defined in the ADS-specification. + + """ + if self._port is not None: + return adsSyncWriteControlReqEx( + self._port, self._adr, ads_state, device_state, data, plc_datatype + ) + + def read_device_info(self): + # type: () -> Optional[Tuple[str, AdsVersion]] + """Read the name and the version number of the ADS-server. + + :rtype: string, AdsVersion + :return: device name, version + + """ + if self._port is not None: + return adsSyncReadDeviceInfoReqEx(self._port, self._adr) + + return None + + def write(self, index_group, index_offset, value, plc_datatype): + # type: (int, int, Any, Type) -> None + """Send data synchronous to an ADS-device. + + :param int index_group: PLC storage area, according to the INDEXGROUP + constants + :param int index_offset: PLC storage address + :param value: value to write to the storage address of the PLC + :param int plc_datatype: type of the data given to the PLC, + according to PLCTYPE constants + + """ + if self._port is not None: + return adsSyncWriteReqEx( + self._port, self._adr, index_group, index_offset, value, plc_datatype + ) + + def read_write( + self, + index_group, + index_offset, + plc_read_datatype, + value, + plc_write_datatype, + return_ctypes=False, + check_length=True, + ): + # type: (int, int, Optional[Type], Any, Optional[Type], bool, bool) -> Any + """Read and write data synchronous from/to an ADS-device. + + :param int index_group: PLC storage area, according to the INDEXGROUP + constants + :param int index_offset: PLC storage address + :param Type plc_read_datatype: type of the data given to the PLC to respond to, + according to PLCTYPE constants, or None to not read anything + :param value: value to write to the storage address of the PLC + :param Type plc_write_datatype: type of the data given to the PLC, according to + PLCTYPE constants, or None to not write anything + :param bool return_ctypes: return ctypes instead of python types if True + (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) + :return: value: **value** + + """ + if self._port is not None: + return adsSyncReadWriteReqEx2( + self._port, + self._adr, + index_group, + index_offset, + plc_read_datatype, + value, + plc_write_datatype, + return_ctypes, + check_length, + ) + + return None + + def read( + self, + index_group, + index_offset, + plc_datatype, + return_ctypes=False, + check_length=True, + ): + # type: (int, int, Type, bool, bool) -> Any + """Read data synchronous from an ADS-device. + + :param int index_group: PLC storage area, according to the INDEXGROUP + constants + :param int index_offset: PLC storage address + :param int plc_datatype: type of the data given to the PLC, according + to PLCTYPE constants + :return: value: **value** + :param bool return_ctypes: return ctypes instead of python types if True + (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) + + """ + if self._port is not None: + return adsSyncReadReqEx2( + self._port, + self._adr, + index_group, + index_offset, + plc_datatype, + return_ctypes, + check_length, + ) + + return None + + def get_handle(self, data_name): + # type: (str) -> int + """Get the handle of the PLC-variable, handles obtained using this + method should be released using method 'release_handle'. + + :param string data_name: data name + + :rtype: int + :return: int: PLC-variable handle + """ + if self._port is not None: + return adsGetHandle(self._port, self._adr, data_name) + + return None + + def release_handle(self, handle): + # type: (int) -> None + """ Release handle of a PLC-variable. + + :param int handle: handle of PLC-variable to be released + """ + if self._port is not None: + adsReleaseHandle(self._port, self._adr, handle) + + def read_by_name( + self, + data_name, + plc_datatype, + return_ctypes=False, + handle=None, + check_length=True, + ): + # type: (str, Type, bool, int) -> Any + """Read data synchronous from an ADS-device from data name. + + :param string data_name: data name, can be empty string if handle is used + :param int plc_datatype: type of the data given to the PLC, according + to PLCTYPE constants + :return: value: **value** + :param bool return_ctypes: return ctypes instead of python types if True + (default: False) + :param int handle: PLC-variable handle, pass in handle if previously + obtained to speed up reading (default: None) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) + + """ + if self._port: + return adsSyncReadByNameEx( + self._port, + self._adr, + data_name, + plc_datatype, + return_ctypes=return_ctypes, + handle=handle, + check_length=check_length, + ) + + return None + + def read_structure_by_name( + self, data_name, structure_def, array_size=1, structure_size=None, handle=None + ): + """Read a structure of multiple types. + + :param string data_name: data name + :param tuple structure_def: special tuple defining the structure and + types contained within it according to PLCTYPE constants, must match + the structure defined in the PLC, PLC structure must be defined with + {attribute 'pack_mode' := '1'} + + Expected input example: + structure_def = ( + ('rVar', pyads.PLCTYPE_LREAL, 1), + ('sVar', pyads.PLCTYPE_STRING, 2, 35), + ('rVar1', pyads.PLCTYPE_REAL, 1), + ('iVar', pyads.PLCTYPE_DINT, 1), + ('iVar1', pyads.PLCTYPE_INT, 3), + ('ivar2', pyads.PLCTYPE_UDINT, 1), + ('iVar3', pyads.PLCTYPE_UINT, 1), + ('iVar4', pyads.PLCTYPE_BYTE, 1), + ('iVar5', pyads.PLCTYPE_SINT, 1), + ('iVar6', pyads.PLCTYPE_USINT, 1), + ('bVar', pyads.PLCTYPE_BOOL, 4), + ('iVar7', pyads.PLCTYPE_WORD, 1), + ('iVar8', pyads.PLCTYPE_DWORD, 1), + ) + i.e ('Variable Name', variable type, arr size (1 if not array), + length of string (if defined in PLC)) + + :param array_size: size of array if reading array of structure, defaults to 1 + :type array_size: int, optional + :param structure_size: size of structure if known by previous use of + size_of_structure, defaults to None + :type structure_size: , optional + :param handle: PLC-variable handle, pass in handle if previously + obtained to speed up reading, defaults to None + :type handle: int, optional + + :return: values_dict: ordered dictionary of all values corresponding to the structure + definition + """ + if structure_size is None: + structure_size = size_of_structure(structure_def * array_size) + values = self.read_by_name(data_name, structure_size, handle=handle) + if values is not None: + return dict_from_bytes(values, structure_def, array_size=array_size) + + return None + + def write_by_name(self, data_name, value, plc_datatype, handle=None): + # type: (str, Any, Type, int) -> None + """Send data synchronous to an ADS-device from data name. + + :param string data_name: data name, can be empty string if handle is used + :param value: value to write to the storage address of the PLC + :param int plc_datatype: type of the data given to the PLC, + according to PLCTYPE constants + :param int handle: PLC-variable handle, pass in handle if previously + obtained to speed up writing (default: None) + """ + if self._port: + return adsSyncWriteByNameEx( + self._port, self._adr, data_name, value, plc_datatype, handle=handle + ) + + def add_device_notification(self, data, attr, callback, user_handle=None): + # type: (Union[str, Tuple[int, int]], NotificationAttrib, Callable, int) -> Optional[Tuple[int, int]] + """Add a device notification. + + :param Union[str, Tuple[int, int] data: PLC storage address as string or Tuple with index group and offset + :param pyads.structs.NotificationAttrib attr: object that contains + all the attributes for the definition of a notification + :param callback: callback function that gets executed in the event of a notification + + :rtype: (int, int) + :returns: notification handle, user handle + + Save the notification handle and the user handle on creating a + notification if you want to be able to remove the notification + later in your code. + + **Usage**: + + >>> import pyads + >>> from ctypes import size_of + >>> + >>> # Connect to the local TwinCAT PLC + >>> plc = pyads.Connection('127.0.0.1.1.1', 851) + >>> + >>> # Create callback function that prints the value + >>> def mycallback(notification, data): + >>> contents = notification.contents + >>> value = next( + >>> map(int, + >>> bytearray(contents.data)[0:contents.cbSampleSize]) + >>> ) + >>> print(value) + >>> + >>> with plc: + >>> # Add notification with default settings + >>> attr = pyads.NotificationAttrib(size_of(pyads.PLCTYPE_INT)) + >>> + >>> handles = plc.add_device_notification("GVL.myvalue", attr, mycallback) + >>> + >>> # Remove notification + >>> plc.del_device_notification(handles) + + """ + if self._port is not None: + notification_handle, user_handle = adsSyncAddDeviceNotificationReqEx( + self._port, self._adr, data, attr, callback, user_handle + ) + return notification_handle, user_handle + + return None + + def del_device_notification(self, notification_handle, user_handle): + # type: (int, int) -> None + """Remove a device notification. + + :param notification_handle: address of the variable that contains + the handle of the notification + :param user_handle: user handle + + """ + if self._port is not None: + adsSyncDelDeviceNotificationReqEx( + self._port, self._adr, notification_handle, user_handle + ) + + @property + def is_open(self): + # type: () -> bool + """Show the current connection state. + + :return: True if connection is open + + """ + return self._open + + def set_timeout(self, ms): + # type: (int) -> None + """Set Timeout.""" + if self._port is not None: + adsSyncSetTimeoutEx(self._port, ms) + + def notification(self, plc_datatype=None, timestamp_as_filetime=False): + # type: (Optional[Type], bool) -> Callable + """Decorate a callback function. + + **Decorator**. + + A decorator that can be used for callback functions in order to + convert the data of the NotificationHeader into the fitting + Python type. + + :param plc_datatype: The PLC datatype that needs to be converted. This can + be any basic PLC datatype or a `ctypes.Structure`. + :param timestamp_as_filetime: Whether the notification timestamp should be returned + as `datetime.datetime` (False) or Windows `FILETIME` as originally transmitted + via ADS (True). Be aware that the precision of `datetime.datetime` is limited to + microseconds, while FILETIME allows for 100 ns. This may be relevant when using + task cycle times such as 62.5 µs. Default: False. + + The callback functions need to be of the following type: + + >>> def callback(handle, name, timestamp, value) + + * `handle`: the notification handle + * `name`: the variable name + * `timestamp`: the timestamp as datetime value + * `value`: the converted value of the variable + + **Usage**: + + >>> import pyads + >>> + >>> plc = pyads.Connection('172.18.3.25.1.1', 851) + >>> + >>> + >>> @plc.notification(pyads.PLCTYPE_STRING) + >>> def callback(handle, name, timestamp, value): + >>> print(handle, name, timestamp, value) + >>> + >>> + >>> with plc: + >>> attr = pyads.NotificationAttrib(20, + >>> pyads.ADSTRANS_SERVERCYCLE) + >>> handles = plc.add_device_notification('GVL.test', attr, + >>> callback) + >>> while True: + >>> pass + + """ + + def notification_decorator(func): + # type: (Union[Callable[[int, str, datetime, Any], None], Callable[[int, str, int, Any], None]]) -> Callable[[Any, str], None] # noqa: E501 + + def func_wrapper(notification, data_name): + # type: (Any, str) -> None + contents = notification.contents + data_size = contents.cbSampleSize + # Get dynamically sized data array + data = (c_ubyte * data_size).from_address( + addressof(contents) + SAdsNotificationHeader.data.offset + ) + + if plc_datatype == PLCTYPE_STRING: + # read only until null-termination character + value = bytearray(data).split(b"\0", 1)[0].decode("utf-8") + + elif plc_datatype is not None and issubclass(plc_datatype, Structure): + value = plc_datatype() + fit_size = min(data_size, sizeof(value)) + memmove(addressof(value), addressof(data), fit_size) + + elif plc_datatype is not None and issubclass(plc_datatype, Array): + if data_size == sizeof(plc_datatype): + value = list(plc_datatype.from_buffer_copy(bytes(data))) + else: + # invalid size + value = None + + elif plc_datatype not in DATATYPE_MAP: + value = bytearray(data) + + else: + value = struct.unpack(DATATYPE_MAP[plc_datatype], bytearray(data))[ + 0 + ] + + if timestamp_as_filetime: + timestamp = contents.nTimeStamp + else: + timestamp = filetime_to_dt(contents.nTimeStamp) + + return func(contents.hNotification, data_name, timestamp, value) + + return func_wrapper + + return notification_decorator diff --git a/pyads/constants.py b/pyads/constants.py index c92bc5ac..d42ed82a 100644 --- a/pyads/constants.py +++ b/pyads/constants.py @@ -1,217 +1,217 @@ -# -*- coding: utf-8 -*- -"""Constants for the work with the ADS API. - -:author: Stefan Lehmann -:license: MIT, see license file or https://opensource.org/licenses/MIT - -:created on 2018-06-11 18:15:53 - -""" -from typing import Type -from ctypes import ( - Array, - c_bool, - c_ubyte, - c_int8, - c_uint8, - c_int16, - c_uint16, - c_int32, - c_uint32, - c_float, - c_double, - c_char, - c_int64, - c_uint64, -) - -STRING_BUFFER = 1024 -PLC_DEFAULT_STRING_SIZE = 80 - -# plc data types: -PLCTYPE_BOOL = c_bool -PLCTYPE_BYTE = c_ubyte -PLCTYPE_DATE = c_int32 -PLCTYPE_DINT = c_int32 -PLCTYPE_DT = c_int32 -PLCTYPE_DWORD = c_uint32 -PLCTYPE_INT = c_int16 -PLCTYPE_LREAL = c_double -PLCTYPE_REAL = c_float -PLCTYPE_SINT = c_int8 -PLCTYPE_STRING = c_char -PLCTYPE_TIME = c_int32 -PLCTYPE_TOD = c_int32 -PLCTYPE_UDINT = c_uint32 -PLCTYPE_UINT = c_uint16 -PLCTYPE_USINT = c_uint8 -PLCTYPE_WORD = c_uint16 -PLCTYPE_LINT = c_int64 -PLCTYPE_ULINT = c_uint64 - -# Datatype unpacking values -DATATYPE_MAP = { - PLCTYPE_BOOL: " Type[Array[c_float]] - """Return an array with n float values.""" - return c_float * n - - -def PLCTYPE_ARR_LREAL(n): - # type: (int) -> Type[Array[c_double]] - """Return an array with n double values.""" - return c_double * n - - -def PLCTYPE_ARR_INT(n): - # type: (int) -> Type[Array[c_int16]] - """Return an array with n int16 values.""" - return c_int16 * n - - -def PLCTYPE_ARR_DINT(n): - # type: (int) -> Type[Array[c_int32]] - """Return an array with n int32 values.""" - return c_int32 * n - - -def PLCTYPE_ARR_SHORT(n): - # type: (int) -> Type[Array[c_int16]] - """Return an array with n short values.""" - return c_int16 * n - - -# Index Group -# READ_M - WRITE_M -INDEXGROUP_MEMORYBYTE = 0x4020 #: plc memory area (%M), offset means byte-offset -# READ_MX - WRITE_MX -INDEXGROUP_MEMORYBIT = ( - 0x4021 -) #: plc memory area (%MX), offset means the bit address, calculatedb by bytenumber * 8 + bitnumber # noqa: E501 -# PLCADS_IGR_RMSIZE -INDEXGROUP_MEMORYSIZE = 0x4025 #: size of the memory area in bytes -# PLCADS_IGR_RWRB -INDEXGROUP_RETAIN = 0x4030 #: plc retain memory area, offset means byte-offset -# PLCADS_IGR_RRSIZE -INDEXGROUP_RETAINSIZE = 0x4035 #: size of the retain area in bytes -# PLCADS_IGR_RWDB -INDEXGROUP_DATA = 0x4040 #: data area, offset means byte-offset -# PLCADS_IGR_RDSIZE -INDEXGROUP_DATASIZE = 0x4045 #: size of the data area in bytes - - -ADSIGRP_SYMTAB = 0xF000 -ADSIGRP_SYMNAME = 0xF001 -ADSIGRP_SYMVAL = 0xF002 - -ADSIGRP_SYM_HNDBYNAME = 0xF003 -ADSIGRP_SYM_VALBYNAME = 0xF004 -ADSIGRP_SYM_VALBYHND = 0xF005 -ADSIGRP_SYM_RELEASEHND = 0xF006 -ADSIGRP_SYM_INFOBYNAME = 0xF007 -ADSIGRP_SYM_VERSION = 0xF008 -ADSIGRP_SYM_INFOBYNAMEEX = 0xF009 - -ADSIGRP_SYM_DOWNLOAD = 0xF00A -ADSIGRP_SYM_UPLOAD = 0xF00B -ADSIGRP_SYM_UPLOADINFO = 0xF00C - -ADSIGRP_SYMNOTE = 0xF010 #: notification of named handle -ADSIGRP_IOIMAGE_RWIB = 0xF020 #: read/write input byte(s) -ADSIGRP_IOIMAGE_RWIX = 0xF021 #: read/write input bit -ADSIGRP_IOIMAGE_RWOB = 0xF030 #: read/write output byte(s) -ADSIGRP_IOIMAGE_RWOX = 0xF031 #: read/write output bit -ADSIGRP_IOIMAGE_CLEARI = 0xF040 #: write inputs to null -ADSIGRP_IOIMAGE_CLEARO = 0xF050 #: write outputs to null - -ADSIGRP_DEVICE_DATA = 0xF100 #: state, name, etc... -ADSIOFFS_DEVDATA_ADSSTATE = 0x0000 #: ads state of device -ADSIOFFS_DEVDATA_DEVSTATE = 0x0002 #: device state - - -# PORTS -PORT_LOGGER = 100 -PORT_EVENTLOGGER = 110 -PORT_IO = 300 -PORT_SPECIALTASK1 = 301 -PORT_SPECIALTASK2 = 302 -PORT_NC = 500 -PORT_SPS1 = 801 -PORT_SPS2 = 811 -PORT_SPS3 = 821 -PORT_SPS4 = 831 -PORT_TC2PLC1 = PORT_SPS1 -PORT_TC2PLC2 = PORT_SPS2 -PORT_TC2PLC3 = PORT_SPS3 -PORT_TC2PLC4 = PORT_SPS4 -PORT_TC3PLC1 = 851 -PORT_NOCKE = 900 -PORT_CAM = PORT_NOCKE -PORT_SYSTEMSERVICE = 10000 -PORT_SCOPE = 14000 - -# ADSState-constants -ADSSTATE_INVALID = 0 -ADSSTATE_IDLE = 1 -ADSSTATE_RESET = 2 -ADSSTATE_INIT = 3 -ADSSTATE_START = 4 -ADSSTATE_RUN = 5 -ADSSTATE_STOP = 6 -ADSSTATE_SAVECFG = 7 -ADSSTATE_LOADCFG = 8 -ADSSTATE_POWERFAILURE = 9 -ADSSTATE_POWERGOOD = 10 -ADSSTATE_ERROR = 11 -ADSSTATE_SHUTDOWN = 12 -ADSSTATE_SUSPEND = 13 -ADSSTATE_RESUME = 14 -ADSSTATE_CONFIG = 15 -ADSSTATE_RECONFIG = 16 - -# ADSTransmode -ADSTRANS_NOTRANS = 0 -ADSTRANS_CLIENTCYCLE = 1 -ADSTRANS_CLIENT1REQ = 2 -ADSTRANS_SERVERCYCLE = 3 -ADSTRANS_SERVERONCHA = 4 - -# symbol flags -ADSSYMBOLFLAG_PERSISTENT = 0x00000001 -ADSSYMBOLFLAG_BITVALUE = 0x00000002 -ADSSYMBOLFLAG_REFERENCETO = 0x0004 -ADSSYMBOLFLAG_TYPEGUID = 0x0008 -ADSSYMBOLFLAG_TCCOMIFACEPTR = 0x0010 -ADSSYMBOLFLAG_READONLY = 0x0020 -ADSSYMBOLFLAG_CONTEXTMASK = 0x0F00 - -# ADS Command IDs -ADSCOMMAND_INVALID = 0x00 -ADSCOMMAND_READDEVICEINFO = 0x01 -ADSCOMMAND_READ = 0x02 -ADSCOMMAND_WRITE = 0x03 -ADSCOMMAND_READSTATE = 0x04 -ADSCOMMAND_WRITECTRL = 0x05 -ADSCOMMAND_ADDDEVICENOTE = 0x06 -ADSCOMMAND_DELDEVICENOTE = 0x07 -ADSCOMMAND_DEVICENOTE = 0x08 -ADSCOMMAND_READWRITE = 0x09 - -# STATE Flags -ADSSTATEFLAG_REQRESP = 0x0001 -ADSSTATEFLAG_COMMAND = 0x0004 +# -*- coding: utf-8 -*- +"""Constants for the work with the ADS API. + +:author: Stefan Lehmann +:license: MIT, see license file or https://opensource.org/licenses/MIT + +:created on 2018-06-11 18:15:53 + +""" +from typing import Type +from ctypes import ( + Array, + c_bool, + c_ubyte, + c_int8, + c_uint8, + c_int16, + c_uint16, + c_int32, + c_uint32, + c_float, + c_double, + c_char, + c_int64, + c_uint64, +) + +STRING_BUFFER = 1024 +PLC_DEFAULT_STRING_SIZE = 80 + +# plc data types: +PLCTYPE_BOOL = c_bool +PLCTYPE_BYTE = c_ubyte +PLCTYPE_DATE = c_int32 +PLCTYPE_DINT = c_int32 +PLCTYPE_DT = c_int32 +PLCTYPE_DWORD = c_uint32 +PLCTYPE_INT = c_int16 +PLCTYPE_LREAL = c_double +PLCTYPE_REAL = c_float +PLCTYPE_SINT = c_int8 +PLCTYPE_STRING = c_char +PLCTYPE_TIME = c_int32 +PLCTYPE_TOD = c_int32 +PLCTYPE_UDINT = c_uint32 +PLCTYPE_UINT = c_uint16 +PLCTYPE_USINT = c_uint8 +PLCTYPE_WORD = c_uint16 +PLCTYPE_LINT = c_int64 +PLCTYPE_ULINT = c_uint64 + +# Datatype unpacking values +DATATYPE_MAP = { + PLCTYPE_BOOL: " Type[Array[c_float]] + """Return an array with n float values.""" + return c_float * n + + +def PLCTYPE_ARR_LREAL(n): + # type: (int) -> Type[Array[c_double]] + """Return an array with n double values.""" + return c_double * n + + +def PLCTYPE_ARR_INT(n): + # type: (int) -> Type[Array[c_int16]] + """Return an array with n int16 values.""" + return c_int16 * n + + +def PLCTYPE_ARR_DINT(n): + # type: (int) -> Type[Array[c_int32]] + """Return an array with n int32 values.""" + return c_int32 * n + + +def PLCTYPE_ARR_SHORT(n): + # type: (int) -> Type[Array[c_int16]] + """Return an array with n short values.""" + return c_int16 * n + + +# Index Group +# READ_M - WRITE_M +INDEXGROUP_MEMORYBYTE = 0x4020 #: plc memory area (%M), offset means byte-offset +# READ_MX - WRITE_MX +INDEXGROUP_MEMORYBIT = ( + 0x4021 +) #: plc memory area (%MX), offset means the bit address, calculatedb by bytenumber * 8 + bitnumber # noqa: E501 +# PLCADS_IGR_RMSIZE +INDEXGROUP_MEMORYSIZE = 0x4025 #: size of the memory area in bytes +# PLCADS_IGR_RWRB +INDEXGROUP_RETAIN = 0x4030 #: plc retain memory area, offset means byte-offset +# PLCADS_IGR_RRSIZE +INDEXGROUP_RETAINSIZE = 0x4035 #: size of the retain area in bytes +# PLCADS_IGR_RWDB +INDEXGROUP_DATA = 0x4040 #: data area, offset means byte-offset +# PLCADS_IGR_RDSIZE +INDEXGROUP_DATASIZE = 0x4045 #: size of the data area in bytes + + +ADSIGRP_SYMTAB = 0xF000 +ADSIGRP_SYMNAME = 0xF001 +ADSIGRP_SYMVAL = 0xF002 + +ADSIGRP_SYM_HNDBYNAME = 0xF003 +ADSIGRP_SYM_VALBYNAME = 0xF004 +ADSIGRP_SYM_VALBYHND = 0xF005 +ADSIGRP_SYM_RELEASEHND = 0xF006 +ADSIGRP_SYM_INFOBYNAME = 0xF007 +ADSIGRP_SYM_VERSION = 0xF008 +ADSIGRP_SYM_INFOBYNAMEEX = 0xF009 + +ADSIGRP_SYM_DOWNLOAD = 0xF00A +ADSIGRP_SYM_UPLOAD = 0xF00B +ADSIGRP_SYM_UPLOADINFO = 0xF00C + +ADSIGRP_SYMNOTE = 0xF010 #: notification of named handle +ADSIGRP_IOIMAGE_RWIB = 0xF020 #: read/write input byte(s) +ADSIGRP_IOIMAGE_RWIX = 0xF021 #: read/write input bit +ADSIGRP_IOIMAGE_RWOB = 0xF030 #: read/write output byte(s) +ADSIGRP_IOIMAGE_RWOX = 0xF031 #: read/write output bit +ADSIGRP_IOIMAGE_CLEARI = 0xF040 #: write inputs to null +ADSIGRP_IOIMAGE_CLEARO = 0xF050 #: write outputs to null + +ADSIGRP_DEVICE_DATA = 0xF100 #: state, name, etc... +ADSIOFFS_DEVDATA_ADSSTATE = 0x0000 #: ads state of device +ADSIOFFS_DEVDATA_DEVSTATE = 0x0002 #: device state + + +# PORTS +PORT_LOGGER = 100 +PORT_EVENTLOGGER = 110 +PORT_IO = 300 +PORT_SPECIALTASK1 = 301 +PORT_SPECIALTASK2 = 302 +PORT_NC = 500 +PORT_SPS1 = 801 +PORT_SPS2 = 811 +PORT_SPS3 = 821 +PORT_SPS4 = 831 +PORT_TC2PLC1 = PORT_SPS1 +PORT_TC2PLC2 = PORT_SPS2 +PORT_TC2PLC3 = PORT_SPS3 +PORT_TC2PLC4 = PORT_SPS4 +PORT_TC3PLC1 = 851 +PORT_NOCKE = 900 +PORT_CAM = PORT_NOCKE +PORT_SYSTEMSERVICE = 10000 +PORT_SCOPE = 14000 + +# ADSState-constants +ADSSTATE_INVALID = 0 +ADSSTATE_IDLE = 1 +ADSSTATE_RESET = 2 +ADSSTATE_INIT = 3 +ADSSTATE_START = 4 +ADSSTATE_RUN = 5 +ADSSTATE_STOP = 6 +ADSSTATE_SAVECFG = 7 +ADSSTATE_LOADCFG = 8 +ADSSTATE_POWERFAILURE = 9 +ADSSTATE_POWERGOOD = 10 +ADSSTATE_ERROR = 11 +ADSSTATE_SHUTDOWN = 12 +ADSSTATE_SUSPEND = 13 +ADSSTATE_RESUME = 14 +ADSSTATE_CONFIG = 15 +ADSSTATE_RECONFIG = 16 + +# ADSTransmode +ADSTRANS_NOTRANS = 0 +ADSTRANS_CLIENTCYCLE = 1 +ADSTRANS_CLIENT1REQ = 2 +ADSTRANS_SERVERCYCLE = 3 +ADSTRANS_SERVERONCHA = 4 + +# symbol flags +ADSSYMBOLFLAG_PERSISTENT = 0x00000001 +ADSSYMBOLFLAG_BITVALUE = 0x00000002 +ADSSYMBOLFLAG_REFERENCETO = 0x0004 +ADSSYMBOLFLAG_TYPEGUID = 0x0008 +ADSSYMBOLFLAG_TCCOMIFACEPTR = 0x0010 +ADSSYMBOLFLAG_READONLY = 0x0020 +ADSSYMBOLFLAG_CONTEXTMASK = 0x0F00 + +# ADS Command IDs +ADSCOMMAND_INVALID = 0x00 +ADSCOMMAND_READDEVICEINFO = 0x01 +ADSCOMMAND_READ = 0x02 +ADSCOMMAND_WRITE = 0x03 +ADSCOMMAND_READSTATE = 0x04 +ADSCOMMAND_WRITECTRL = 0x05 +ADSCOMMAND_ADDDEVICENOTE = 0x06 +ADSCOMMAND_DELDEVICENOTE = 0x07 +ADSCOMMAND_DEVICENOTE = 0x08 +ADSCOMMAND_READWRITE = 0x09 + +# STATE Flags +ADSSTATEFLAG_REQRESP = 0x0001 +ADSSTATEFLAG_COMMAND = 0x0004 diff --git a/setup.py b/setup.py index c6e729ab..2693ec35 100644 --- a/setup.py +++ b/setup.py @@ -1,179 +1,179 @@ -#! /usr/bin/env python -# -*-coding: utf-8 -*- -import io -import glob -import os -import re -import sys -import shutil -import subprocess -import functools -import operator -from setuptools import setup -from setuptools.command.test import test as TestCommand -from setuptools.command.install import install as _install -from distutils.command.build import build as _build -from distutils.command.clean import clean as _clean -from distutils.command.sdist import sdist as _sdist - - -def read(*names, **kwargs): - try: - with io.open( - os.path.join(os.path.dirname(__file__), *names), - encoding=kwargs.get("encoding", "utf8") - ) as fp: - return fp.read() - except IOError: - return '' - - -def find_version(*file_paths): - version_file = read(*file_paths) - version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", - version_file, re.M) - if version_match: - return version_match.group(1) - raise RuntimeError("Unable to find version string.") - - -def platform_is_linux(): - return sys.platform.startswith('linux') or \ - sys.platform.startswith('darwin') - - -def get_files_rec(directory): - res = [] - for (path, directory, filenames) in os.walk(directory): - files = [os.path.join(path, fn) for fn in filenames] - res.append((path, files)) - return res - - -data_files = get_files_rec('adslib') - - -def create_binaries(): - subprocess.call(['make', '-C', 'adslib']) - - -def remove_binaries(): - """Remove all binary files in the adslib directory.""" - patterns = ( - "adslib/*.a", - "adslib/*.o", - "adslib/obj/*.o", - "adslib/*.bin", - "adslib/*.so", - ) - - for f in functools.reduce(operator.iconcat, [glob.glob(p) for p in patterns]): - os.remove(f) - - -def copy_sharedlib(): - try: - shutil.copy('adslib/adslib.so', 'pyads/adslib.so') - except OSError: - pass - - -def remove_sharedlib(): - try: - os.remove('pyads/adslib.so') - except OSError: - pass - - -class build(_build): - def run(self): - if platform_is_linux(): - remove_binaries() - create_binaries() - copy_sharedlib() - remove_binaries() - _build.run(self) - - -class clean(_clean): - def run(self): - if platform_is_linux(): - remove_binaries() - remove_sharedlib() - _clean.run(self) - - -class sdist(_sdist): - def run(self): - if platform_is_linux(): - remove_binaries() - _sdist.run(self) - - -class install(_install): - def run(self): - if platform_is_linux(): - create_binaries() - copy_sharedlib() - _install.run(self) - - -class PyTest(TestCommand): - user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")] - - def initialize_options(self): - TestCommand.initialize_options(self) - self.pytest_args = ['--cov-report', 'html', '--cov-report', 'term', - '--cov=pyads'] - - def finalize_options(self): - TestCommand.finalize_options(self) - self.test_args = [] - self.test_suite = True - - def run_tests(self): - import pytest - errno = pytest.main(self.pytest_args) - sys.exit(errno) - - -cmdclass = { - 'test': PyTest, - 'build': build, - 'clean': clean, - 'sdist': sdist, - 'install': install, -} - - -long_description = read('README.md') - - -setup( - name="pyads", - version=find_version('pyads', '__init__.py'), - description="Python wrapper for TwinCAT ADS library", - long_description=long_description, - long_description_content_type='text/markdown', - author="Stefan Lehmann", - author_email="Stefan.St.Lehmann@gmail.com", - packages=["pyads", "pyads.testserver_ex"], - package_data={'pyads': ['adslib.so']}, - requires=[], - install_requires=[], - provides=['pyads'], - url='https://github.com/MrLeeh/pyads', - classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3', - 'Topic :: Software Development :: Libraries', - 'Operating System :: Microsoft :: Windows', - 'Operating System :: Microsoft :: Windows :: Windows 7', - 'Operating System :: POSIX :: Linux', - ], - cmdclass=cmdclass, - data_files=data_files, - tests_require=['pytest', 'pytest-cov'], -) +#! /usr/bin/env python +# -*-coding: utf-8 -*- +import io +import glob +import os +import re +import sys +import shutil +import subprocess +import functools +import operator +from setuptools import setup +from setuptools.command.test import test as TestCommand +from setuptools.command.install import install as _install +from distutils.command.build import build as _build +from distutils.command.clean import clean as _clean +from distutils.command.sdist import sdist as _sdist + + +def read(*names, **kwargs): + try: + with io.open( + os.path.join(os.path.dirname(__file__), *names), + encoding=kwargs.get("encoding", "utf8") + ) as fp: + return fp.read() + except IOError: + return '' + + +def find_version(*file_paths): + version_file = read(*file_paths) + version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", + version_file, re.M) + if version_match: + return version_match.group(1) + raise RuntimeError("Unable to find version string.") + + +def platform_is_linux(): + return sys.platform.startswith('linux') or \ + sys.platform.startswith('darwin') + + +def get_files_rec(directory): + res = [] + for (path, directory, filenames) in os.walk(directory): + files = [os.path.join(path, fn) for fn in filenames] + res.append((path, files)) + return res + + +data_files = get_files_rec('adslib') + + +def create_binaries(): + subprocess.call(['make', '-C', 'adslib']) + + +def remove_binaries(): + """Remove all binary files in the adslib directory.""" + patterns = ( + "adslib/*.a", + "adslib/*.o", + "adslib/obj/*.o", + "adslib/*.bin", + "adslib/*.so", + ) + + for f in functools.reduce(operator.iconcat, [glob.glob(p) for p in patterns]): + os.remove(f) + + +def copy_sharedlib(): + try: + shutil.copy('adslib/adslib.so', 'pyads/adslib.so') + except OSError: + pass + + +def remove_sharedlib(): + try: + os.remove('pyads/adslib.so') + except OSError: + pass + + +class build(_build): + def run(self): + if platform_is_linux(): + remove_binaries() + create_binaries() + copy_sharedlib() + remove_binaries() + _build.run(self) + + +class clean(_clean): + def run(self): + if platform_is_linux(): + remove_binaries() + remove_sharedlib() + _clean.run(self) + + +class sdist(_sdist): + def run(self): + if platform_is_linux(): + remove_binaries() + _sdist.run(self) + + +class install(_install): + def run(self): + if platform_is_linux(): + create_binaries() + copy_sharedlib() + _install.run(self) + + +class PyTest(TestCommand): + user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")] + + def initialize_options(self): + TestCommand.initialize_options(self) + self.pytest_args = ['--cov-report', 'html', '--cov-report', 'term', + '--cov=pyads'] + + def finalize_options(self): + TestCommand.finalize_options(self) + self.test_args = [] + self.test_suite = True + + def run_tests(self): + import pytest + errno = pytest.main(self.pytest_args) + sys.exit(errno) + + +cmdclass = { + 'test': PyTest, + 'build': build, + 'clean': clean, + 'sdist': sdist, + 'install': install, +} + + +long_description = read('README.md') + + +setup( + name="pyads", + version=find_version('pyads', '__init__.py'), + description="Python wrapper for TwinCAT ADS library", + long_description=long_description, + long_description_content_type='text/markdown', + author="Stefan Lehmann", + author_email="Stefan.St.Lehmann@gmail.com", + packages=["pyads", "pyads.testserver_ex"], + package_data={'pyads': ['adslib.so']}, + requires=[], + install_requires=[], + provides=['pyads'], + url='https://github.com/MrLeeh/pyads', + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python :: 3', + 'Topic :: Software Development :: Libraries', + 'Operating System :: Microsoft :: Windows', + 'Operating System :: Microsoft :: Windows :: Windows 7', + 'Operating System :: POSIX :: Linux', + ], + cmdclass=cmdclass, + data_files=data_files, + tests_require=['pytest', 'pytest-cov'], +) diff --git a/tests/test_ads.py b/tests/test_ads.py index 2c70eb84..39ac6b51 100644 --- a/tests/test_ads.py +++ b/tests/test_ads.py @@ -1,388 +1,388 @@ -"""Test ADS module. - -:author: Stefan Lehmann -:license: MIT, see license file or https://opensource.org/licenses/MIT - -:created on: 2018-06-11 18:15:58 - -""" -import pyads -from pyads import AmsAddr -from pyads.utils import platform_is_linux -from ctypes import c_ubyte -from collections import OrderedDict -import unittest - - -class AdsTest(unittest.TestCase): - """Unittests for ADS module.""" - - def test_AmsAddr(self): - # type: () -> None - """Test ams address - related functions.""" - netid = "5.33.160.54.1.1" - port = 851 - - # test default init values - adr = AmsAddr() - self.assertEqual(adr.netid, "0.0.0.0.0.0") - self.assertEqual(adr.port, 0) - - # test given init values - adr = AmsAddr(netid, port) - self.assertEqual(adr.netid, netid) - self.assertEqual(adr.port, port) - - # check if ams addr struct has been changed - ams_addr_numbers = [x for x in adr._ams_addr.netId.b] - netid_numbers = [int(x) for x in netid.split(".")] - self.assertEqual(len(ams_addr_numbers), len(netid_numbers)) - for i in range(len(ams_addr_numbers)): - self.assertEqual(ams_addr_numbers[i], netid_numbers[i]) - self.assertEqual(adr.port, adr._ams_addr.port) - - def test_set_local_address(self): - # type: () -> None - """Test set_local_address function. - - Skip test on Windows as set_local_address is not supported for Windows. - - """ - if platform_is_linux(): - pyads.open_port() - org_adr = pyads.get_local_address() - self.assertIsNotNone(org_adr) - org_netid = org_adr.netid - - # Set netid to specific value - pyads.set_local_address("0.0.0.0.1.5") - netid = pyads.get_local_address().netid - self.assertEqual(netid, "0.0.0.0.1.5") - - # Change netid by String - pyads.set_local_address("0.0.0.0.1.6") - netid = pyads.get_local_address().netid - self.assertEqual(netid, "0.0.0.0.1.6") - - # Change netid by Struct - pyads.set_local_address(org_adr.netIdStruct()) - netid = pyads.get_local_address().netid - self.assertEqual(netid, org_netid) - - # Check raised error on short netid - with self.assertRaises(ValueError): - pyads.ads.set_local_address("1.2.3") - - # Check raised error on invalid netid - with self.assertRaises(ValueError): - pyads.set_local_address("1.2.3.a") - - # Check wrong netid datatype - with self.assertRaises(AssertionError): - pyads.set_local_address(123) - - def test_functions_with_closed_port(self): - # type: () -> None - """Test pyads functions with no open port.""" - pyads.open_port() - adr = pyads.get_local_address() - pyads.close_port() - - self.assertIsNotNone(adr) - self.assertIsNone(pyads.get_local_address()) - self.assertIsNone(pyads.read_state(adr)) - self.assertIsNone(pyads.read_device_info(adr)) - self.assertIsNone( - pyads.read_write(adr, 1, 2, pyads.PLCTYPE_INT, 1, pyads.PLCTYPE_INT) - ) - self.assertIsNone(pyads.read(adr, 1, 2, pyads.PLCTYPE_INT)) - self.assertIsNone(pyads.read_by_name(adr, "hello", pyads.PLCTYPE_INT)) - self.assertIsNone( - pyads.add_device_notification( - adr, "test", pyads.NotificationAttrib(4), lambda x: x - ) - ) - - def test_set_timeout(self): - # type: () -> None - """Test timeout function.""" - pyads.open_port() - self.assertIsNone(pyads.set_timeout(100)) - pyads.open_port() - - def test_size_of_structure(self): - # type: () -> None - """Test size_of_structure function""" - # known structure size with defined string - structure_def = ( - ("rVar", pyads.PLCTYPE_LREAL, 1), - ("sVar", pyads.PLCTYPE_STRING, 2, 35), - ("rVar1", pyads.PLCTYPE_REAL, 4), - ("iVar", pyads.PLCTYPE_DINT, 5), - ("iVar1", pyads.PLCTYPE_INT, 3), - ("ivar2", pyads.PLCTYPE_UDINT, 6), - ("iVar3", pyads.PLCTYPE_UINT, 7), - ("iVar4", pyads.PLCTYPE_BYTE, 1), - ("iVar5", pyads.PLCTYPE_SINT, 1), - ("iVar6", pyads.PLCTYPE_USINT, 1), - ("bVar", pyads.PLCTYPE_BOOL, 4), - ("iVar7", pyads.PLCTYPE_WORD, 1), - ("iVar8", pyads.PLCTYPE_DWORD, 1), - ) - self.assertEqual(pyads.size_of_structure(structure_def), c_ubyte * 173) - - # test for PLC_DEFAULT_STRING_SIZE - structure_def = (("sVar", pyads.PLCTYPE_STRING, 4),) - self.assertEqual( - pyads.size_of_structure(structure_def), - c_ubyte * ((pyads.PLC_DEFAULT_STRING_SIZE + 1) * 4), - ) - - # tests for incorrect definitions - structure_def = ( - ("sVar", pyads.PLCTYPE_STRING, 4), - ("rVar", 1, 1), - ("iVar", pyads.PLCTYPE_DINT, 1), - ) - with self.assertRaises(RuntimeError): - pyads.size_of_structure(structure_def) - - structure_def = ( - ("sVar", pyads.PLCTYPE_STRING, 4), - (pyads.PLCTYPE_REAL, 1), - ("iVar", pyads.PLCTYPE_DINT, 1), - ) - with self.assertRaises(ValueError): - pyads.size_of_structure(structure_def) - - structure_def = ( - ("sVar", pyads.PLCTYPE_STRING, 4), - ("rVar", pyads.PLCTYPE_REAL, ""), - ("iVar", pyads.PLCTYPE_DINT, 1), - ("iVar1", pyads.PLCTYPE_INT, 3), - ) - with self.assertRaises(TypeError): - pyads.size_of_structure(structure_def) - - # test another correct definition with array of structure - structure_def = ( - ("bVar", pyads.PLCTYPE_BOOL, 1), - ("rVar", pyads.PLCTYPE_LREAL, 3), - ("sVar", pyads.PLCTYPE_STRING, 2), - ("iVar", pyads.PLCTYPE_DINT, 10), - ("iVar1", pyads.PLCTYPE_INT, 3), - ("bVar1", pyads.PLCTYPE_BOOL, 4), - ) - self.assertEqual(pyads.size_of_structure(structure_def * 5), c_ubyte * 1185) - - def test_dict_from_bytes(self): - # type: () -> None - """Test dict_from_bytes function""" - # tests for known values - structure_def = ( - ("rVar", pyads.PLCTYPE_LREAL, 1), - ("sVar", pyads.PLCTYPE_STRING, 2, 35), - ("rVar1", pyads.PLCTYPE_REAL, 4), - ("iVar", pyads.PLCTYPE_DINT, 5), - ("iVar1", pyads.PLCTYPE_INT, 3), - ("ivar2", pyads.PLCTYPE_UDINT, 6), - ("iVar3", pyads.PLCTYPE_UINT, 7), - ("iVar4", pyads.PLCTYPE_BYTE, 1), - ("iVar5", pyads.PLCTYPE_SINT, 1), - ("iVar6", pyads.PLCTYPE_USINT, 1), - ("bVar", pyads.PLCTYPE_BOOL, 4), - ("iVar7", pyads.PLCTYPE_WORD, 1), - ("iVar8", pyads.PLCTYPE_DWORD, 1), - ) - values = OrderedDict( - [ - ("rVar", 1.11), - ("sVar", ["Hello", "World"]), - ("rVar1", [2.25, 2.25, 2.5, 2.75]), - ("iVar", [3, 4, 5, 6, 7]), - ("iVar1", [8, 9, 10]), - ("ivar2", [11, 12, 13, 14, 15, 16]), - ("iVar3", [17, 18, 19, 20, 21, 22, 23]), - ("iVar4", 24), - ("iVar5", 25), - ("iVar6", 26), - ("bVar", [True, False, True, False]), - ("iVar7", 27), - ("iVar8", 28), - ] - ) - # fmt: off - bytes_list = [195, 245, 40, 92, 143, 194, 241, 63, 72, 101, 108, 108, 111, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 87, 111, 114, 108, 100, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 64, 0, 0, 16, 64, 0, - 0, 32, 64, 0, 0, 48, 64, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, - 6, 0, 0, 0, 7, 0, 0, 0, 8, 0, 9, 0, 10, 0, 11, 0, 0, 0, 12, - 0, 0, 0, 13, 0, 0, 0, 14, 0, 0, 0, 15, 0, 0, 0, 16, 0, 0, 0, - 17, 0, 18, 0, 19, 0, 20, 0, 21, 0, 22, 0, 23, 0, 24, 25, 26, - 1, 0, 1, 0, 27, 0, 28, 0, 0, 0] - # fmt: on - self.assertEqual(values, pyads.dict_from_bytes(bytes_list, structure_def)) - - values = OrderedDict( - [ - ("rVar", 780245.5678), - ("sVar", ["TwinCat works", "with Python using pyads"]), - ("rVar1", [65.5, 89.75, 999.5, 55555.0]), - ("iVar", [24567, -5678988, 12, -393, 0]), - ("iVar1", [-20563, 32765, -1]), - ("ivar2", [100001, 1234567890, 76, 582, 94034536, 2167]), - ("iVar3", [2167, 987, 63000, 5648, 678, 2734, 43768]), - ("iVar4", 200), - ("iVar5", 127), - ("iVar6", 255), - ("bVar", [True, False, True, False]), - ("iVar7", 45367), - ("iVar8", 256000000), - ] - ) - # fmt: off - bytes_list = [125, 174, 182, 34, 171, 207, 39, 65, 84, 119, 105, 110, 67, - 97, 116, 32, 119, 111, 114, 107, 115, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 119, 105, - 116, 104, 32, 80, 121, 116, 104, 111, 110, 32, 117, 115, 105, - 110, 103, 32, 112, 121, 97, 100, 115, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 131, 66, 0, 128, 179, 66, 0, 224, 121, - 68, 0, 3, 89, 71, 247, 95, 0, 0, 116, 88, 169, 255, 12, 0, 0, - 0, 119, 254, 255, 255, 0, 0, 0, 0, 173, 175, 253, 127, 255, - 255, 161, 134, 1, 0, 210, 2, 150, 73, 76, 0, 0, 0, 70, 2, 0, - 0, 104, 218, 154, 5, 119, 8, 0, 0, 119, 8, 219, 3, 24, 246, - 16, 22, 166, 2, 174, 10, 248, 170, 200, 127, 255, 1, 0, 1, 0, - 55, 177, 0, 64, 66, 15] - # fmt: on - self.assertEqual(values, pyads.dict_from_bytes(bytes_list, structure_def)) - - # test for PLC_DEFAULT_STRING_SIZE - structure_def = ( - ("iVar", pyads.PLCTYPE_INT, 1), - ("bVar", pyads.PLCTYPE_BOOL, 1), - ("sVar", pyads.PLCTYPE_STRING, 1), - ("iVar2", pyads.PLCTYPE_DINT, 1), - ) - values = OrderedDict( - [ - ("iVar", 32767), - ("bVar", True), - ("sVar", "Testing the default string size of 80"), - ("iVar2", -25600000), - ] - ) - # fmt: off - bytes_list = [255, 127, 1, 84, 101, 115, 116, 105, 110, 103, 32, 116, 104, - 101, 32, 100, 101, 102, 97, 117, 108, 116, 32, 115, 116, 114, - 105, 110, 103, 32, 115, 105, 122, 101, 32, 111, 102, 32, 56, - 48, 0, 72, 137, 131, 240, 3, 6, 0, 72, 141, 131, 104, 115, 6, - 0, 72, 137, 131, 248, 3, 6, 0, 72, 141, 131, 104, 12, 6, 0, - 72, 137, 131, 0, 4, 6, 0, 72, 141, 131, 224, 114, 6, 0, 72, - 0, 96, 121, 254] - # fmt: on - self.assertEqual(values, pyads.dict_from_bytes(bytes_list, structure_def)) - - # test another correct definition with array of structure - values_list = [ - OrderedDict( - [ - ("iVar", 32767), - ("bVar", True), - ("sVar", "Testing the default string size of 80"), - ("iVar2", -25600000), - ] - ), - OrderedDict( - [ - ("iVar", -32768), - ("bVar", True), - ("sVar", "Another Test using the default string size of 80"), - ("iVar2", -25600000), - ] - ), - OrderedDict( - [ - ("iVar", 0), - ("bVar", False), - ("sVar", "Last Test String of Array"), - ("iVar2", 1234567890), - ] - ), - ] - # fmt: off - bytes_list = [255, 127, 1, 84, 101, 115, 116, 105, 110, 103, 32, 116, 104, - 101, 32, 100, 101, 102, 97, 117, 108, 116, 32, 115, 116, 114, - 105, 110, 103, 32, 115, 105, 122, 101, 32, 111, 102, 32, 56, - 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 96, 121, 254, 0, 128, 1, 65, 110, 111, 116, - 104, 101, 114, 32, 84, 101, 115, 116, 32, 117, 115, 105, 110, - 103, 32, 116, 104, 101, 32, 100, 101, 102, 97, 117, 108, 116, - 32, 115, 116, 114, 105, 110, 103, 32, 115, 105, 122, 101, 32, - 111, 102, 32, 56, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 96, 121, 254, 0, 0, 0, 76, 97, 115, 116, 32, 84, 101, 115, - 116, 32, 83, 116, 114, 105, 110, 103, 32, 111, 102, 32, 65, - 114, 114, 97, 121, 0, 72, 137, 131, 240, 4, 6, 0, 72, 141, - 131, 80, 19, 6, 0, 72, 137, 131, 248, 4, 6, 0, 72, 141, 131, - 168, 19, 6, 0, 72, 137, 131, 0, 5, 6, 0, 72, 141, 131, 0, 20, - 6, 0, 72, 137, 131, 8, 5, 6, 0, 72, 141, 131, 96, 20, 6, 210, - 2, 150, 73] - # fmt: on - self.assertEqual( - values_list, pyads.dict_from_bytes(bytes_list, structure_def, array_size=3) - ) - - # test for not default string and array of LREALs - structure_def = ( - ("sVar", pyads.PLCTYPE_STRING, 1, 20), - ("rVar", pyads.PLCTYPE_LREAL, 4), - ) - values = OrderedDict([("sVar", "pyads"), ("rVar", [1.11, 2.22, 3.33, 4.44])]) - # fmt: off - bytes_list = [112, 121, 97, 100, 115, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 195, 245, 40, 92, 143, 194, 241, 63, 195, 245, - 40, 92, 143, 194, 1, 64, 164, 112, 61, 10, 215, 163, 10, 64, - 195, 245, 40, 92, 143, 194, 17, 64] - # fmt: on - self.assertEqual(values, pyads.dict_from_bytes(bytes_list, structure_def)) - - # tests for incorrect definitions - structure_def = ( - ("sVar", pyads.PLCTYPE_STRING, 4), - ("rVar", 1, 1), - ("iVar", pyads.PLCTYPE_DINT, 1), - ) - with self.assertRaises(RuntimeError): - pyads.dict_from_bytes([], structure_def) - - structure_def = ( - ("sVar", pyads.PLCTYPE_STRING, 4), - ("rVar", 1, 2), - ("iVar", pyads.PLCTYPE_DINT, 1), - ) - with self.assertRaises(RuntimeError): - pyads.dict_from_bytes([], structure_def) - - structure_def = ( - ("sVar", pyads.PLCTYPE_STRING, 4), - (pyads.PLCTYPE_REAL, 1), - ("iVar", pyads.PLCTYPE_DINT, 1), - ) - with self.assertRaises(ValueError): - pyads.dict_from_bytes([], structure_def) - - structure_def = ( - ("sVar", pyads.PLCTYPE_STRING, 4), - ("rVar", pyads.PLCTYPE_REAL, ""), - ("iVar", pyads.PLCTYPE_DINT, 1), - ("iVar1", pyads.PLCTYPE_INT, 3), - ) - with self.assertRaises(TypeError): - pyads.dict_from_bytes([], structure_def) - - -if __name__ == "__main__": - unittest.main() +"""Test ADS module. + +:author: Stefan Lehmann +:license: MIT, see license file or https://opensource.org/licenses/MIT + +:created on: 2018-06-11 18:15:58 + +""" +import pyads +from pyads import AmsAddr +from pyads.utils import platform_is_linux +from ctypes import c_ubyte +from collections import OrderedDict +import unittest + + +class AdsTest(unittest.TestCase): + """Unittests for ADS module.""" + + def test_AmsAddr(self): + # type: () -> None + """Test ams address - related functions.""" + netid = "5.33.160.54.1.1" + port = 851 + + # test default init values + adr = AmsAddr() + self.assertEqual(adr.netid, "0.0.0.0.0.0") + self.assertEqual(adr.port, 0) + + # test given init values + adr = AmsAddr(netid, port) + self.assertEqual(adr.netid, netid) + self.assertEqual(adr.port, port) + + # check if ams addr struct has been changed + ams_addr_numbers = [x for x in adr._ams_addr.netId.b] + netid_numbers = [int(x) for x in netid.split(".")] + self.assertEqual(len(ams_addr_numbers), len(netid_numbers)) + for i in range(len(ams_addr_numbers)): + self.assertEqual(ams_addr_numbers[i], netid_numbers[i]) + self.assertEqual(adr.port, adr._ams_addr.port) + + def test_set_local_address(self): + # type: () -> None + """Test set_local_address function. + + Skip test on Windows as set_local_address is not supported for Windows. + + """ + if platform_is_linux(): + pyads.open_port() + org_adr = pyads.get_local_address() + self.assertIsNotNone(org_adr) + org_netid = org_adr.netid + + # Set netid to specific value + pyads.set_local_address("0.0.0.0.1.5") + netid = pyads.get_local_address().netid + self.assertEqual(netid, "0.0.0.0.1.5") + + # Change netid by String + pyads.set_local_address("0.0.0.0.1.6") + netid = pyads.get_local_address().netid + self.assertEqual(netid, "0.0.0.0.1.6") + + # Change netid by Struct + pyads.set_local_address(org_adr.netIdStruct()) + netid = pyads.get_local_address().netid + self.assertEqual(netid, org_netid) + + # Check raised error on short netid + with self.assertRaises(ValueError): + pyads.ads.set_local_address("1.2.3") + + # Check raised error on invalid netid + with self.assertRaises(ValueError): + pyads.set_local_address("1.2.3.a") + + # Check wrong netid datatype + with self.assertRaises(AssertionError): + pyads.set_local_address(123) + + def test_functions_with_closed_port(self): + # type: () -> None + """Test pyads functions with no open port.""" + pyads.open_port() + adr = pyads.get_local_address() + pyads.close_port() + + self.assertIsNotNone(adr) + self.assertIsNone(pyads.get_local_address()) + self.assertIsNone(pyads.read_state(adr)) + self.assertIsNone(pyads.read_device_info(adr)) + self.assertIsNone( + pyads.read_write(adr, 1, 2, pyads.PLCTYPE_INT, 1, pyads.PLCTYPE_INT) + ) + self.assertIsNone(pyads.read(adr, 1, 2, pyads.PLCTYPE_INT)) + self.assertIsNone(pyads.read_by_name(adr, "hello", pyads.PLCTYPE_INT)) + self.assertIsNone( + pyads.add_device_notification( + adr, "test", pyads.NotificationAttrib(4), lambda x: x + ) + ) + + def test_set_timeout(self): + # type: () -> None + """Test timeout function.""" + pyads.open_port() + self.assertIsNone(pyads.set_timeout(100)) + pyads.open_port() + + def test_size_of_structure(self): + # type: () -> None + """Test size_of_structure function""" + # known structure size with defined string + structure_def = ( + ("rVar", pyads.PLCTYPE_LREAL, 1), + ("sVar", pyads.PLCTYPE_STRING, 2, 35), + ("rVar1", pyads.PLCTYPE_REAL, 4), + ("iVar", pyads.PLCTYPE_DINT, 5), + ("iVar1", pyads.PLCTYPE_INT, 3), + ("ivar2", pyads.PLCTYPE_UDINT, 6), + ("iVar3", pyads.PLCTYPE_UINT, 7), + ("iVar4", pyads.PLCTYPE_BYTE, 1), + ("iVar5", pyads.PLCTYPE_SINT, 1), + ("iVar6", pyads.PLCTYPE_USINT, 1), + ("bVar", pyads.PLCTYPE_BOOL, 4), + ("iVar7", pyads.PLCTYPE_WORD, 1), + ("iVar8", pyads.PLCTYPE_DWORD, 1), + ) + self.assertEqual(pyads.size_of_structure(structure_def), c_ubyte * 173) + + # test for PLC_DEFAULT_STRING_SIZE + structure_def = (("sVar", pyads.PLCTYPE_STRING, 4),) + self.assertEqual( + pyads.size_of_structure(structure_def), + c_ubyte * ((pyads.PLC_DEFAULT_STRING_SIZE + 1) * 4), + ) + + # tests for incorrect definitions + structure_def = ( + ("sVar", pyads.PLCTYPE_STRING, 4), + ("rVar", 1, 1), + ("iVar", pyads.PLCTYPE_DINT, 1), + ) + with self.assertRaises(RuntimeError): + pyads.size_of_structure(structure_def) + + structure_def = ( + ("sVar", pyads.PLCTYPE_STRING, 4), + (pyads.PLCTYPE_REAL, 1), + ("iVar", pyads.PLCTYPE_DINT, 1), + ) + with self.assertRaises(ValueError): + pyads.size_of_structure(structure_def) + + structure_def = ( + ("sVar", pyads.PLCTYPE_STRING, 4), + ("rVar", pyads.PLCTYPE_REAL, ""), + ("iVar", pyads.PLCTYPE_DINT, 1), + ("iVar1", pyads.PLCTYPE_INT, 3), + ) + with self.assertRaises(TypeError): + pyads.size_of_structure(structure_def) + + # test another correct definition with array of structure + structure_def = ( + ("bVar", pyads.PLCTYPE_BOOL, 1), + ("rVar", pyads.PLCTYPE_LREAL, 3), + ("sVar", pyads.PLCTYPE_STRING, 2), + ("iVar", pyads.PLCTYPE_DINT, 10), + ("iVar1", pyads.PLCTYPE_INT, 3), + ("bVar1", pyads.PLCTYPE_BOOL, 4), + ) + self.assertEqual(pyads.size_of_structure(structure_def * 5), c_ubyte * 1185) + + def test_dict_from_bytes(self): + # type: () -> None + """Test dict_from_bytes function""" + # tests for known values + structure_def = ( + ("rVar", pyads.PLCTYPE_LREAL, 1), + ("sVar", pyads.PLCTYPE_STRING, 2, 35), + ("rVar1", pyads.PLCTYPE_REAL, 4), + ("iVar", pyads.PLCTYPE_DINT, 5), + ("iVar1", pyads.PLCTYPE_INT, 3), + ("ivar2", pyads.PLCTYPE_UDINT, 6), + ("iVar3", pyads.PLCTYPE_UINT, 7), + ("iVar4", pyads.PLCTYPE_BYTE, 1), + ("iVar5", pyads.PLCTYPE_SINT, 1), + ("iVar6", pyads.PLCTYPE_USINT, 1), + ("bVar", pyads.PLCTYPE_BOOL, 4), + ("iVar7", pyads.PLCTYPE_WORD, 1), + ("iVar8", pyads.PLCTYPE_DWORD, 1), + ) + values = OrderedDict( + [ + ("rVar", 1.11), + ("sVar", ["Hello", "World"]), + ("rVar1", [2.25, 2.25, 2.5, 2.75]), + ("iVar", [3, 4, 5, 6, 7]), + ("iVar1", [8, 9, 10]), + ("ivar2", [11, 12, 13, 14, 15, 16]), + ("iVar3", [17, 18, 19, 20, 21, 22, 23]), + ("iVar4", 24), + ("iVar5", 25), + ("iVar6", 26), + ("bVar", [True, False, True, False]), + ("iVar7", 27), + ("iVar8", 28), + ] + ) + # fmt: off + bytes_list = [195, 245, 40, 92, 143, 194, 241, 63, 72, 101, 108, 108, 111, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 87, 111, 114, 108, 100, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 64, 0, 0, 16, 64, 0, + 0, 32, 64, 0, 0, 48, 64, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, + 6, 0, 0, 0, 7, 0, 0, 0, 8, 0, 9, 0, 10, 0, 11, 0, 0, 0, 12, + 0, 0, 0, 13, 0, 0, 0, 14, 0, 0, 0, 15, 0, 0, 0, 16, 0, 0, 0, + 17, 0, 18, 0, 19, 0, 20, 0, 21, 0, 22, 0, 23, 0, 24, 25, 26, + 1, 0, 1, 0, 27, 0, 28, 0, 0, 0] + # fmt: on + self.assertEqual(values, pyads.dict_from_bytes(bytes_list, structure_def)) + + values = OrderedDict( + [ + ("rVar", 780245.5678), + ("sVar", ["TwinCat works", "with Python using pyads"]), + ("rVar1", [65.5, 89.75, 999.5, 55555.0]), + ("iVar", [24567, -5678988, 12, -393, 0]), + ("iVar1", [-20563, 32765, -1]), + ("ivar2", [100001, 1234567890, 76, 582, 94034536, 2167]), + ("iVar3", [2167, 987, 63000, 5648, 678, 2734, 43768]), + ("iVar4", 200), + ("iVar5", 127), + ("iVar6", 255), + ("bVar", [True, False, True, False]), + ("iVar7", 45367), + ("iVar8", 256000000), + ] + ) + # fmt: off + bytes_list = [125, 174, 182, 34, 171, 207, 39, 65, 84, 119, 105, 110, 67, + 97, 116, 32, 119, 111, 114, 107, 115, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 119, 105, + 116, 104, 32, 80, 121, 116, 104, 111, 110, 32, 117, 115, 105, + 110, 103, 32, 112, 121, 97, 100, 115, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 131, 66, 0, 128, 179, 66, 0, 224, 121, + 68, 0, 3, 89, 71, 247, 95, 0, 0, 116, 88, 169, 255, 12, 0, 0, + 0, 119, 254, 255, 255, 0, 0, 0, 0, 173, 175, 253, 127, 255, + 255, 161, 134, 1, 0, 210, 2, 150, 73, 76, 0, 0, 0, 70, 2, 0, + 0, 104, 218, 154, 5, 119, 8, 0, 0, 119, 8, 219, 3, 24, 246, + 16, 22, 166, 2, 174, 10, 248, 170, 200, 127, 255, 1, 0, 1, 0, + 55, 177, 0, 64, 66, 15] + # fmt: on + self.assertEqual(values, pyads.dict_from_bytes(bytes_list, structure_def)) + + # test for PLC_DEFAULT_STRING_SIZE + structure_def = ( + ("iVar", pyads.PLCTYPE_INT, 1), + ("bVar", pyads.PLCTYPE_BOOL, 1), + ("sVar", pyads.PLCTYPE_STRING, 1), + ("iVar2", pyads.PLCTYPE_DINT, 1), + ) + values = OrderedDict( + [ + ("iVar", 32767), + ("bVar", True), + ("sVar", "Testing the default string size of 80"), + ("iVar2", -25600000), + ] + ) + # fmt: off + bytes_list = [255, 127, 1, 84, 101, 115, 116, 105, 110, 103, 32, 116, 104, + 101, 32, 100, 101, 102, 97, 117, 108, 116, 32, 115, 116, 114, + 105, 110, 103, 32, 115, 105, 122, 101, 32, 111, 102, 32, 56, + 48, 0, 72, 137, 131, 240, 3, 6, 0, 72, 141, 131, 104, 115, 6, + 0, 72, 137, 131, 248, 3, 6, 0, 72, 141, 131, 104, 12, 6, 0, + 72, 137, 131, 0, 4, 6, 0, 72, 141, 131, 224, 114, 6, 0, 72, + 0, 96, 121, 254] + # fmt: on + self.assertEqual(values, pyads.dict_from_bytes(bytes_list, structure_def)) + + # test another correct definition with array of structure + values_list = [ + OrderedDict( + [ + ("iVar", 32767), + ("bVar", True), + ("sVar", "Testing the default string size of 80"), + ("iVar2", -25600000), + ] + ), + OrderedDict( + [ + ("iVar", -32768), + ("bVar", True), + ("sVar", "Another Test using the default string size of 80"), + ("iVar2", -25600000), + ] + ), + OrderedDict( + [ + ("iVar", 0), + ("bVar", False), + ("sVar", "Last Test String of Array"), + ("iVar2", 1234567890), + ] + ), + ] + # fmt: off + bytes_list = [255, 127, 1, 84, 101, 115, 116, 105, 110, 103, 32, 116, 104, + 101, 32, 100, 101, 102, 97, 117, 108, 116, 32, 115, 116, 114, + 105, 110, 103, 32, 115, 105, 122, 101, 32, 111, 102, 32, 56, + 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 96, 121, 254, 0, 128, 1, 65, 110, 111, 116, + 104, 101, 114, 32, 84, 101, 115, 116, 32, 117, 115, 105, 110, + 103, 32, 116, 104, 101, 32, 100, 101, 102, 97, 117, 108, 116, + 32, 115, 116, 114, 105, 110, 103, 32, 115, 105, 122, 101, 32, + 111, 102, 32, 56, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 96, 121, 254, 0, 0, 0, 76, 97, 115, 116, 32, 84, 101, 115, + 116, 32, 83, 116, 114, 105, 110, 103, 32, 111, 102, 32, 65, + 114, 114, 97, 121, 0, 72, 137, 131, 240, 4, 6, 0, 72, 141, + 131, 80, 19, 6, 0, 72, 137, 131, 248, 4, 6, 0, 72, 141, 131, + 168, 19, 6, 0, 72, 137, 131, 0, 5, 6, 0, 72, 141, 131, 0, 20, + 6, 0, 72, 137, 131, 8, 5, 6, 0, 72, 141, 131, 96, 20, 6, 210, + 2, 150, 73] + # fmt: on + self.assertEqual( + values_list, pyads.dict_from_bytes(bytes_list, structure_def, array_size=3) + ) + + # test for not default string and array of LREALs + structure_def = ( + ("sVar", pyads.PLCTYPE_STRING, 1, 20), + ("rVar", pyads.PLCTYPE_LREAL, 4), + ) + values = OrderedDict([("sVar", "pyads"), ("rVar", [1.11, 2.22, 3.33, 4.44])]) + # fmt: off + bytes_list = [112, 121, 97, 100, 115, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 195, 245, 40, 92, 143, 194, 241, 63, 195, 245, + 40, 92, 143, 194, 1, 64, 164, 112, 61, 10, 215, 163, 10, 64, + 195, 245, 40, 92, 143, 194, 17, 64] + # fmt: on + self.assertEqual(values, pyads.dict_from_bytes(bytes_list, structure_def)) + + # tests for incorrect definitions + structure_def = ( + ("sVar", pyads.PLCTYPE_STRING, 4), + ("rVar", 1, 1), + ("iVar", pyads.PLCTYPE_DINT, 1), + ) + with self.assertRaises(RuntimeError): + pyads.dict_from_bytes([], structure_def) + + structure_def = ( + ("sVar", pyads.PLCTYPE_STRING, 4), + ("rVar", 1, 2), + ("iVar", pyads.PLCTYPE_DINT, 1), + ) + with self.assertRaises(RuntimeError): + pyads.dict_from_bytes([], structure_def) + + structure_def = ( + ("sVar", pyads.PLCTYPE_STRING, 4), + (pyads.PLCTYPE_REAL, 1), + ("iVar", pyads.PLCTYPE_DINT, 1), + ) + with self.assertRaises(ValueError): + pyads.dict_from_bytes([], structure_def) + + structure_def = ( + ("sVar", pyads.PLCTYPE_STRING, 4), + ("rVar", pyads.PLCTYPE_REAL, ""), + ("iVar", pyads.PLCTYPE_DINT, 1), + ("iVar1", pyads.PLCTYPE_INT, 3), + ) + with self.assertRaises(TypeError): + pyads.dict_from_bytes([], structure_def) + + +if __name__ == "__main__": + unittest.main()