From f8776f4aa699b1eefe13513c8838fd71b40803cc Mon Sep 17 00:00:00 2001 From: Mateusz Maciejewski Date: Tue, 18 Feb 2025 10:33:46 +0100 Subject: [PATCH 1/3] osfv_cli/src/osfv/libs/snipeit_api.py: IP exclusivity protection CLI client-side mechanism, enforcing IP exclusivity in asset data from Snipe-IT, covering system IP, RTE IP, Sonoff IP & PiKVM IP. Throws DuplicatedIpException to prevent user from remote hardware manipulation if duplicated IP entries exist. Signed-off-by: Mateusz Maciejewski --- osfv_cli/src/osfv/libs/snipeit_api.py | 118 ++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/osfv_cli/src/osfv/libs/snipeit_api.py b/osfv_cli/src/osfv/libs/snipeit_api.py index 16552fb..4e94a09 100644 --- a/osfv_cli/src/osfv/libs/snipeit_api.py +++ b/osfv_cli/src/osfv/libs/snipeit_api.py @@ -1,6 +1,7 @@ import os import secrets import string +import sys import requests import unidecode @@ -8,6 +9,27 @@ class SnipeIT: + class DuplicatedIpException(Exception): + def __init__(self, counts, field_name, expected_ip): + # filling some data and composing an error message + self.count = int(counts[field_name]) + self.field_name = field_name + self.expected_ip = expected_ip + self.message = ( + "FATAL: You are trying to access an asset with " + + self.field_name + + ": " + + self.expected_ip + + " which is not exclusive. Please check Snipe-IT data." + ) + + # disabling traceback, this is intentional failure. + sys.tracebacklimit = 0 + super().__init__(self.message) + + def __str__(self): + return self.message + def __init__(self): snipeit_cfg = self.load_snipeit_config() self.cfg_api_url = snipeit_cfg["url"] @@ -76,9 +98,94 @@ def get_all_assets(self): return all_assets + def __retieve_custom_field_value(self, custom_fields, expected_field_name): + my_field = next( + ( + field_data["value"] + for field_name, field_data in custom_fields.items() + if field_name == expected_field_name + ), + None, + ) + if my_field: + return my_field + else: + return None + + def __count_customField( + self, custom_fields, counts, field_name, expected_ip + ): + my_field = self.__retieve_custom_field_value(custom_fields, field_name) + + if my_field: + if my_field == expected_ip: + counts[field_name] += 1 + if counts[field_name] > 1: + raise self.DuplicatedIpException( + counts, field_name, expected_ip + ) + return None + + # check by selected IP-fields (continue until second occurrence of any) + def check_asset_for_ip_exclusivity( + self, all_assets, ip=None, rte_ip=None, sonoff_ip=None, pikvm_ip=None + ): + # all IP-containing data fields + occurence_counts = { + "IP": 0, + "RTE IP": 0, + "Sonoff IP": 0, + "PiKVM IP": 0, + } + for asset in all_assets: + custom_fields = asset.get("custom_fields", {}) + if custom_fields: + if ip: + self.__count_customField( + custom_fields, occurence_counts, "IP", ip + ) + if rte_ip: + self.__count_customField( + custom_fields, occurence_counts, "RTE IP", rte_ip + ) + if sonoff_ip: + self.__count_customField( + custom_fields, occurence_counts, "Sonoff IP", sonoff_ip + ) + if pikvm_ip: + self.__count_customField( + custom_fields, occurence_counts, "PiKVM IP", pikvm_ip + ) + return None + + # check by asset ID, on any non-empty IP field + def check_asset_for_ip_exclusivity_by_id(self, asset_id): + status, asset_data = self.get_asset(asset_id) + if not status: + return None + + custom_fields = asset_data.get("custom_fields", {}) + if custom_fields: + ip = self.__retieve_custom_field_value(custom_fields, "IP") + rte_ip = self.__retieve_custom_field_value(custom_fields, "RTE IP") + sonoff_ip = self.__retieve_custom_field_value( + custom_fields, "Sonoff IP" + ) + pikvm_ip = self.__retieve_custom_field_value( + custom_fields, "PiKVM IP" + ) + + self.check_asset_for_ip_exclusivity( + self.get_all_assets(), ip, rte_ip, sonoff_ip, pikvm_ip + ) + return None + def get_asset_id_by_rte_ip(self, rte_ip): # Retrieve all assets all_assets = self.get_all_assets() + self.check_asset_for_ip_exclusivity( + all_assets, None, rte_ip, None, None + ) # Search for asset with matching RTE IP for asset in all_assets: @@ -101,6 +208,9 @@ def get_asset_id_by_rte_ip(self, rte_ip): def get_asset_id_by_sonoff_ip(self, rte_ip): # Retrieve all assets all_assets = self.get_all_assets() + self.check_asset_for_ip_exclusivity( + all_assets, None, None, rte_ip, None + ) # Search for asset with matching RTE IP for asset in all_assets: @@ -123,6 +233,9 @@ def get_asset_id_by_sonoff_ip(self, rte_ip): def get_sonoff_ip_by_rte_ip(self, rte_ip): # Retrieve all assets all_assets = self.get_all_assets() + self.check_asset_for_ip_exclusivity( + all_assets, None, rte_ip, None, None + ) # Search for asset with matching RTE IP for asset in all_assets: @@ -146,6 +259,9 @@ def get_sonoff_ip_by_rte_ip(self, rte_ip): def get_pikvm_ip_by_rte_ip(self, rte_ip): # Retrieve all assets all_assets = self.get_all_assets() + self.check_asset_for_ip_exclusivity( + all_assets, None, rte_ip, None, None + ) # Search for asset with matching RTE IP for asset in all_assets: @@ -187,6 +303,8 @@ def check_out_asset(self, asset_id): Raises: requests.exceptions.RequestException: If the HTTP request to the API fails. """ + self.check_asset_for_ip_exclusivity_by_id(asset_id) + status, asset_data = self.get_asset(asset_id) if not status: From 1d3ee21d1fddb341ed77b65c32b89c1e04d4d5f7 Mon Sep 17 00:00:00 2001 From: Mateusz Maciejewski Date: Mon, 24 Feb 2025 11:55:30 +0100 Subject: [PATCH 2/3] src/osfv/libs/snipeit_api.py: IP exclusivity re-check 'by-ID'. Bugfix for cases, when asset ID is retrieved from Snipe-IT data by RTE IP or Sonoff IP. Signed-off-by: Mateusz Maciejewski --- osfv_cli/src/osfv/libs/snipeit_api.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/osfv_cli/src/osfv/libs/snipeit_api.py b/osfv_cli/src/osfv/libs/snipeit_api.py index 4e94a09..1ba1ce3 100644 --- a/osfv_cli/src/osfv/libs/snipeit_api.py +++ b/osfv_cli/src/osfv/libs/snipeit_api.py @@ -200,6 +200,8 @@ def get_asset_id_by_rte_ip(self, rte_ip): None, ) if rte_ip_field == rte_ip: + # re-run exclusivty check by asset ID + self.check_asset_for_ip_exclusivity_by_id(asset["id"]) return asset["id"] # No asset found with matching RTE IP @@ -225,6 +227,8 @@ def get_asset_id_by_sonoff_ip(self, rte_ip): None, ) if rte_ip_field == rte_ip: + # re-run exclusivty check by asset ID + self.check_asset_for_ip_exclusivity_by_id(asset["id"]) return asset["id"] # No asset found with matching RTE IP From 75e6c3f78a3af0861ee34e08dd701194c6e8d804 Mon Sep 17 00:00:00 2001 From: Mateusz Maciejewski Date: Mon, 24 Feb 2025 12:00:47 +0100 Subject: [PATCH 3/3] test/exclusive_ip.robot: IP exclusivity test suite Test suite covering some possible use cases od IP exclusivity detection. IP/ID fake DUT data in robot file must be kept in sync with Snipe-IT. Added Check Out By Asset ID to common keywords. Signed-off-by: Mateusz Maciejewski --- osfv_cli/test/common/keywords.robot | 17 +++++ osfv_cli/test/exclusive_ip.robot | 112 ++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 osfv_cli/test/exclusive_ip.robot diff --git a/osfv_cli/test/common/keywords.robot b/osfv_cli/test/common/keywords.robot index 1d25030..6ba60eb 100644 --- a/osfv_cli/test/common/keywords.robot +++ b/osfv_cli/test/common/keywords.robot @@ -45,6 +45,23 @@ Check Out ... --rte_ip ... ${rte_ip} ... env:SNIPEIT_CONFIG_FILE_PATH=${snipeit_config} + Log ${snipeit_config} + Log ${result.stdout} + Log ${result.stderr} + RETURN ${result} + +Check Out By Asset ID + [Documentation] Check out asset using osfv_cli as user defined + ... in ~/.osfv.snipeit.yml + [Arguments] ${asset_id} ${snipeit_config}=%{HOME}/.osfv/snipeit.yml + ${result}= Run Process + ... osfv_cli + ... snipeit + ... check_out + ... --asset_id + ... ${asset_id} + ... env:SNIPEIT_CONFIG_FILE_PATH=${snipeit_config} + Log ${snipeit_config} Log ${result.stdout} Log ${result.stderr} RETURN ${result} diff --git a/osfv_cli/test/exclusive_ip.robot b/osfv_cli/test/exclusive_ip.robot new file mode 100644 index 0000000..a8c2330 --- /dev/null +++ b/osfv_cli/test/exclusive_ip.robot @@ -0,0 +1,112 @@ +*** Settings *** +Documentation IP duplication protection (IP, RTE IP, Sonoff IP, PiKVM IP) test suite. + +Library Process +Resource ../test/common/keywords.robot + + +*** Variables *** +${UNIT_00_ID}= 226 +${UNIT_01_ID}= 227 +${UNIT_02_ID}= 228 +${UNIT_03_ID}= 229 + +${UNIT_00_IP}= 127.1.0.1 +${UNIT_01_IP}= 127.1.1.1 +${UNIT_02_IP}= 127.1.2.1 +${UNIT_03_IP}= 127.1.0.1 + +${UNIT_00_RTE_IP}= 127.100.1.1 +${UNIT_01_RTE_IP}= 127.100.1.1 +${UNIT_02_RTE_IP}= 127.100.2.1 +${UNIT_03_RTE_IP}= 127.100.1.1 + +${UNIT_00_SONOFF_IP}= 127.7.0.1 +${UNIT_01_SONOFF_IP}= 127.7.1.1 +${UNIT_02_SONOFF_IP}= 127.7.0.1 +${UNIT_03_SONOFF_IP}= 127.7.0.1 + +${UNIT_00_PIKVM_IP}= 127.200.1.1 +${UNIT_01_PIKVM_IP}= 127.200.1.1 +${UNIT_02_PIKVM_IP}= 127.200.1.1 +${UNIT_03_PIKVM_IP}= 127.200.1.1 + +${BY_RTE_FAILURE_PRE}= SEPARATOR= osfv.libs.snipeit_api.SnipeIT. +... DuplicatedIpException: FATAL: You are trying to access an asset with +${BY_RTE_FAILURE_POST}= SEPARATOR= which is not exclusive. Please check Snipe-IT data. + + +*** Test Cases *** *** +Check Out By RTE IP Negative failing on RTE IP + [Documentation] Should fail due to RTE IP non-exclusivity + ${checkout_result}= Check Out rte_ip=${UNIT_00_RTE_IP} + + ${expected}= Catenate SEPARATOR=${SPACE} ${BY_RTE_FAILURE_PRE} + ... RTE IP: ${UNIT_00_RTE_IP} ${BY_RTE_FAILURE_POST} + Should Match ${checkout_result.stderr} ${expected} + +Check Out By RTE Negative failing on Sonoff IP + [Documentation] Should fail due to Sonoff IP non-exclusivity + ${checkout_result}= Check Out rte_ip=${UNIT_02_RTE_IP} + + ${expected}= Catenate SEPARATOR=${SPACE} ${BY_RTE_FAILURE_PRE} + ... Sonoff IP: ${UNIT_02_SONOFF_IP} ${BY_RTE_FAILURE_POST} + + Should Match ${checkout_result.stderr} ${expected} + +Check Out By ID Negative failing on PiKVM IP + [Documentation] Should fail due to PiKVM IP non-exclusivity + ${checkout_result}= Check Out By Asset ID asset_id=${UNIT_01_ID} + + ${expected}= Catenate SEPARATOR=${SPACE} ${BY_RTE_FAILURE_PRE} + ... PiKVM IP: ${UNIT_02_PIKVM_IP} ${BY_RTE_FAILURE_POST} + + Should Match ${checkout_result.stderr} ${expected} + +Check Out By ID Negative failing on Sonoff IP + [Documentation] Should fail due to Sonoff IP non-exclusivity + ${checkout_result}= Check Out By Asset ID asset_id=${UNIT_03_ID} + + ${expected}= Catenate SEPARATOR=${SPACE} ${BY_RTE_FAILURE_PRE} + ... Sonoff IP: ${UNIT_03_SONOFF_IP} ${BY_RTE_FAILURE_POST} + + Should Match ${checkout_result.stderr} ${expected} + +Flash Probe Negative by RTE IP failing on RTE IP + [Documentation] Should fail due to RTE IP non-exclusivity + ${probe_result}= Run Process osfv_cli rte --rte_ip + ... ${UNIT_00_RTE_IP} flash probe + Should Be Equal As Integers ${probe_result.rc} 1 + ${expected}= Catenate SEPARATOR=${SPACE} ${BY_RTE_FAILURE_PRE} + ... RTE IP: ${UNIT_00_RTE_IP} ${BY_RTE_FAILURE_POST} + + Should Match ${probe_result.stderr} ${expected} + +Flash Probe Negative by RTE IP failing on Sonoff IP + ${probe_result}= Run Process osfv_cli rte --rte_ip + ... ${UNIT_02_RTE_IP} flash probe + Should Be Equal As Integers ${probe_result.rc} 1 + ${expected}= Catenate SEPARATOR=${SPACE} ${BY_RTE_FAILURE_PRE} + ... Sonoff IP: ${UNIT_02_SONOFF_IP} ${BY_RTE_FAILURE_POST} + + Should Match ${probe_result.stderr} ${expected} + +Sonoff Get Negative by Sonoff IP failing on PiKVM IP + [Documentation] Should fail due to PiKVM IP non-exclusivity + ${probe_result}= Run Process osfv_cli sonoff --sonoff_ip + ... ${UNIT_01_SONOFF_IP} get + Should Be Equal As Integers ${probe_result.rc} 1 + ${expected}= Catenate SEPARATOR=${SPACE} ${BY_RTE_FAILURE_PRE} + ... PiKVM IP: ${UNIT_01_PIKVM_IP} ${BY_RTE_FAILURE_POST} + + Should Match ${probe_result.stderr} ${expected} + +Snoff Get Negative by RTE IP failing on Sonoff IP + [Documentation] Should fail due to PiKVM IP non-exclusivity + ${probe_result}= Run Process osfv_cli sonoff --rte_ip + ... ${UNIT_02_RTE_IP} get + Should Be Equal As Integers ${probe_result.rc} 1 + ${expected}= Catenate SEPARATOR=${SPACE} ${BY_RTE_FAILURE_PRE} + ... Sonoff IP: ${UNIT_02_SONOFF_IP} ${BY_RTE_FAILURE_POST} + + Should Match ${probe_result.stderr} ${expected}