|
4 | 4 | import struct
|
5 | 5 | from abc import abstractmethod
|
6 | 6 | from enum import Enum
|
7 |
| -from typing import Generic, TypeVar |
| 7 | +from typing import Generic, Literal, TypeVar, cast |
8 | 8 |
|
9 | 9 | import pymodbus.pdu.bit_message as pdu_bit
|
10 | 10 | import pymodbus.pdu.diag_message as pdu_diag
|
@@ -691,61 +691,83 @@ class DATATYPE(Enum):
|
691 | 691 | FLOAT32 = ("f", 2)
|
692 | 692 | FLOAT64 = ("d", 4)
|
693 | 693 | STRING = ("s", 0)
|
694 |
| - BITS = "bits" |
| 694 | + BITS = ("bits", 0) |
695 | 695 |
|
696 | 696 | @classmethod
|
697 | 697 | def convert_from_registers(
|
698 |
| - cls, registers: list[int], data_type: DATATYPE |
699 |
| - ) -> int | float | str | list[bool]: |
| 698 | + cls, registers: list[int], data_type: DATATYPE, word_order: Literal["big", "little"] = "big" |
| 699 | + ) -> int | float | str | list[bool] | list[int] | list[float]: |
700 | 700 | """Convert registers to int/float/str.
|
701 | 701 |
|
702 | 702 | :param registers: list of registers received from e.g. read_holding_registers()
|
703 | 703 | :param data_type: data type to convert to
|
704 |
| - :returns: int, float, str or list[bool] depending on "data_type" |
705 |
| - :raises ModbusException: when size of registers is not 1, 2 or 4 |
706 |
| - """ |
707 |
| - byte_list = bytearray() |
708 |
| - for x in registers: |
709 |
| - byte_list.extend(int.to_bytes(x, 2, "big")) |
710 |
| - if data_type == cls.DATATYPE.STRING: |
711 |
| - if byte_list[-1:] == b"\00": |
712 |
| - byte_list = byte_list[:-1] |
713 |
| - return byte_list.decode("utf-8") |
714 |
| - if data_type == cls.DATATYPE.BITS: |
| 704 | + :param word_order: "big"/"little" order of words/registers |
| 705 | + :returns: scalar or array of "data_type" |
| 706 | + :raises ModbusException: when size of registers is not a multiple of data_type |
| 707 | + """ |
| 708 | + if not (data_len := data_type.value[1]): |
| 709 | + byte_list = bytearray() |
| 710 | + if word_order == "little": |
| 711 | + registers.reverse() |
| 712 | + for x in registers: |
| 713 | + byte_list.extend(int.to_bytes(x, 2, "big")) |
| 714 | + if data_type == cls.DATATYPE.STRING: |
| 715 | + trailing_nulls_begin = len(byte_list) |
| 716 | + while trailing_nulls_begin > 0 and not byte_list[trailing_nulls_begin - 1]: |
| 717 | + trailing_nulls_begin -= 1 |
| 718 | + byte_list = byte_list[:trailing_nulls_begin] |
| 719 | + return byte_list.decode("utf-8") |
715 | 720 | return unpack_bitstring(byte_list)
|
716 |
| - if len(registers) != data_type.value[1]: |
| 721 | + if (reg_len := len(registers)) % data_len: |
717 | 722 | raise ModbusException(
|
718 |
| - f"Illegal size ({len(registers)}) of register array, cannot convert!" |
| 723 | + f"Registers illegal size ({len(registers)}) expected multiple of {data_len}!" |
719 | 724 | )
|
720 |
| - return struct.unpack(f">{data_type.value[0]}", byte_list)[0] |
| 725 | + |
| 726 | + result = [] |
| 727 | + for i in range(0, reg_len, data_len): |
| 728 | + regs = registers[i:i+data_len] |
| 729 | + if word_order == "little": |
| 730 | + regs.reverse() |
| 731 | + byte_list = bytearray() |
| 732 | + for x in regs: |
| 733 | + byte_list.extend(int.to_bytes(x, 2, "big")) |
| 734 | + result.append(struct.unpack(f">{data_type.value[0]}", byte_list)[0]) |
| 735 | + return result if len(result) != 1 else result[0] |
721 | 736 |
|
722 | 737 | @classmethod
|
723 | 738 | def convert_to_registers(
|
724 |
| - cls, value: int | float | str | list[bool], data_type: DATATYPE |
| 739 | + cls, value: int | float | str | list[bool] | list[int] | list[float] , data_type: DATATYPE, word_order: Literal["big", "little"] = "big" |
725 | 740 | ) -> list[int]:
|
726 | 741 | """Convert int/float/str to registers (16/32/64 bit).
|
727 | 742 |
|
728 | 743 | :param value: value to be converted
|
729 |
| - :param data_type: data type to be encoded as registers |
| 744 | + :param data_type: data type to convert from |
| 745 | + :param word_order: "big"/"little" order of words/registers |
730 | 746 | :returns: List of registers, can be used directly in e.g. write_registers()
|
731 | 747 | :raises TypeError: when there is a mismatch between data_type and value
|
732 | 748 | """
|
733 | 749 | if data_type == cls.DATATYPE.BITS:
|
734 | 750 | if not isinstance(value, list):
|
735 |
| - raise TypeError(f"Value should be string but is {type(value)}.") |
| 751 | + raise TypeError(f"Value should be list of bool but is {type(value)}.") |
736 | 752 | if (missing := len(value) % 16):
|
737 | 753 | value = value + [False] * (16 - missing)
|
738 |
| - byte_list = pack_bitstring(value) |
| 754 | + byte_list = pack_bitstring(cast(list[bool], value)) |
739 | 755 | elif data_type == cls.DATATYPE.STRING:
|
740 | 756 | if not isinstance(value, str):
|
741 | 757 | raise TypeError(f"Value should be string but is {type(value)}.")
|
742 | 758 | byte_list = value.encode()
|
743 | 759 | if len(byte_list) % 2:
|
744 | 760 | byte_list += b"\x00"
|
745 | 761 | else:
|
746 |
| - byte_list = struct.pack(f">{data_type.value[0]}", value) |
| 762 | + if not isinstance(value, list): |
| 763 | + value = cast(list[int], [value]) |
| 764 | + byte_list = bytearray() |
| 765 | + for v in value: |
| 766 | + byte_list.extend(struct.pack(f">{data_type.value[0]}", v)) |
747 | 767 | regs = [
|
748 | 768 | int.from_bytes(byte_list[x : x + 2], "big")
|
749 | 769 | for x in range(0, len(byte_list), 2)
|
750 | 770 | ]
|
| 771 | + if word_order == "little": |
| 772 | + regs.reverse() |
751 | 773 | return regs
|
0 commit comments