From 7eeb18893883451b465079d1cf0e01e4fe451fbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frieder=20Sch=C3=BCler?= Date: Tue, 28 Apr 2026 14:11:15 +0200 Subject: [PATCH 1/7] Add tests for LSS master (lss.py coverage 42% -> 93%) Add 28 unit tests covering: - Switch state global/selective commands - Node ID and bit timing configuration - Activate bit timing and store configuration - LSS address inquiry (vendor/product/revision/serial) - Timeout and error handling - Fast scan protocol - Obsolete method aliases - LssError exception --- test/test_lss.py | 337 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 337 insertions(+) create mode 100644 test/test_lss.py diff --git a/test/test_lss.py b/test/test_lss.py new file mode 100644 index 00000000..e3336210 --- /dev/null +++ b/test/test_lss.py @@ -0,0 +1,337 @@ +import struct +import unittest +from unittest.mock import MagicMock + +from canopen.lss import ( + LssMaster, + LssError, + CS_SWITCH_STATE_GLOBAL, + CS_CONFIGURE_NODE_ID, + CS_CONFIGURE_BIT_TIMING, + CS_STORE_CONFIGURATION, + CS_SWITCH_STATE_SELECTIVE_VENDOR_ID, + CS_SWITCH_STATE_SELECTIVE_PRODUCT_CODE, + CS_SWITCH_STATE_SELECTIVE_REVISION_NUMBER, + CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER, + CS_SWITCH_STATE_SELECTIVE_RESPONSE, + CS_INQUIRE_NODE_ID, + CS_INQUIRE_VENDOR_ID, + CS_INQUIRE_PRODUCT_CODE, + CS_INQUIRE_REVISION_NUMBER, + CS_INQUIRE_SERIAL_NUMBER, + CS_IDENTIFY_SLAVE, + CS_FAST_SCAN, + CS_ACTIVATE_BIT_TIMING, + ERROR_NONE, + ERROR_INADMISSIBLE, + ListMessageNeedResponse, +) + + +class TestLssMaster(unittest.TestCase): + """Tests for LssMaster message encoding, decoding, and error handling. + + Follows the same pattern as test_sdo.py: replace network.send_message + with a custom method that records sent data and injects responses + synchronously. + """ + + def setUp(self): + self.lss = LssMaster() + self.lss.RESPONSE_TIMEOUT = 0.1 + self.network = MagicMock() + self.lss.network = self.network + self.sent_messages = [] + + def _send_and_respond(self, response): + """Return a send_message side_effect that always injects the given response.""" + def side_effect(cob_id, data): + self.sent_messages.append((cob_id, bytes(data))) + if data[0] in ListMessageNeedResponse: + self.lss.on_message_received(LssMaster.LSS_RX_COBID, response, 0.0) + return side_effect + + def _send_no_response(self, cob_id, data): + """A send_message side_effect that records but sends no response.""" + self.sent_messages.append((cob_id, bytes(data))) + + # ---- switch state global ---- + + def test_send_switch_state_global_configuration(self): + self.network.send_message.side_effect = self._send_no_response + self.lss.send_switch_state_global(LssMaster.CONFIGURATION_STATE) + self.assertEqual(len(self.sent_messages), 1) + cob_id, data = self.sent_messages[0] + self.assertEqual(cob_id, LssMaster.LSS_TX_COBID) + self.assertEqual(len(data), 8) + self.assertEqual(data[0], CS_SWITCH_STATE_GLOBAL) + self.assertEqual(data[1], LssMaster.CONFIGURATION_STATE) + + def test_send_switch_state_global_waiting(self): + self.network.send_message.side_effect = self._send_no_response + self.lss.send_switch_state_global(LssMaster.WAITING_STATE) + _, data = self.sent_messages[0] + self.assertEqual(data[0], CS_SWITCH_STATE_GLOBAL) + self.assertEqual(data[1], LssMaster.WAITING_STATE) + + def test_send_switch_state_global_no_response_expected(self): + self.network.send_message.side_effect = self._send_no_response + self.lss.send_switch_state_global(LssMaster.CONFIGURATION_STATE) + + # ---- configure node ID ---- + + def test_configure_node_id_success(self): + response = bytearray(8) + response[0] = CS_CONFIGURE_NODE_ID + response[1] = ERROR_NONE + self.network.send_message.side_effect = self._send_and_respond(response) + + self.lss.configure_node_id(5) + _, data = self.sent_messages[0] + self.assertEqual(data[0], CS_CONFIGURE_NODE_ID) + self.assertEqual(data[1], 5) + + def test_configure_node_id_error(self): + response = bytearray(8) + response[0] = CS_CONFIGURE_NODE_ID + response[1] = ERROR_INADMISSIBLE + self.network.send_message.side_effect = self._send_and_respond(response) + + with self.assertRaises(LssError): + self.lss.configure_node_id(200) + + def test_configure_node_id_wrong_cs(self): + response = bytearray(8) + response[0] = 0xFF + response[1] = ERROR_NONE + self.network.send_message.side_effect = self._send_and_respond(response) + + with self.assertRaises(LssError): + self.lss.configure_node_id(5) + + # ---- configure bit timing ---- + + def test_configure_bit_timing_success(self): + response = bytearray(8) + response[0] = CS_CONFIGURE_BIT_TIMING + response[1] = ERROR_NONE + self.network.send_message.side_effect = self._send_and_respond(response) + + self.lss.configure_bit_timing(4) + _, data = self.sent_messages[0] + self.assertEqual(data[0], CS_CONFIGURE_BIT_TIMING) + self.assertEqual(data[1], 0) + self.assertEqual(data[2], 4) + + # ---- activate bit timing ---- + + def test_activate_bit_timing(self): + self.network.send_message.side_effect = self._send_no_response + self.lss.activate_bit_timing(500) + _, data = self.sent_messages[0] + self.assertEqual(data[0], CS_ACTIVATE_BIT_TIMING) + delay = struct.unpack_from(' Date: Wed, 29 Apr 2026 08:11:54 +0200 Subject: [PATCH 2/7] black --- test/test_lss.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/test/test_lss.py b/test/test_lss.py index e3336210..1407109e 100644 --- a/test/test_lss.py +++ b/test/test_lss.py @@ -45,10 +45,12 @@ def setUp(self): def _send_and_respond(self, response): """Return a send_message side_effect that always injects the given response.""" + def side_effect(cob_id, data): self.sent_messages.append((cob_id, bytes(data))) if data[0] in ListMessageNeedResponse: self.lss.on_message_received(LssMaster.LSS_RX_COBID, response, 0.0) + return side_effect def _send_no_response(self, cob_id, data): @@ -224,8 +226,7 @@ def test_send_switch_state_selective_success(self): response[0] = CS_SWITCH_STATE_SELECTIVE_RESPONSE self.network.send_message.side_effect = self._send_and_respond(response) - result = self.lss.send_switch_state_selective( - 0x1111, 0x2222, 0x3333, 0x4444) + result = self.lss.send_switch_state_selective(0x1111, 0x2222, 0x3333, 0x4444) self.assertTrue(result) self.assertEqual(len(self.sent_messages), 4) @@ -239,8 +240,7 @@ def test_send_switch_state_selective_no_match(self): response[0] = 0x00 self.network.send_message.side_effect = self._send_and_respond(response) - result = self.lss.send_switch_state_selective( - 0x1111, 0x2222, 0x3333, 0x4444) + result = self.lss.send_switch_state_selective(0x1111, 0x2222, 0x3333, 0x4444) self.assertFalse(result) # ---- timeout / error handling ---- @@ -301,8 +301,7 @@ def test_lss_address_encoding(self): response[0] = CS_SWITCH_STATE_SELECTIVE_RESPONSE self.network.send_message.side_effect = self._send_and_respond(response) - self.lss.send_switch_state_selective( - 0xDEADBEEF, 0xCAFEBABE, 0x12345678, 0x9ABCDEF0) + self.lss.send_switch_state_selective(0xDEADBEEF, 0xCAFEBABE, 0x12345678, 0x9ABCDEF0) data = self.sent_messages[0][1] packed = struct.unpack_from(' Date: Wed, 29 Apr 2026 08:17:23 +0200 Subject: [PATCH 3/7] docstring mood --- test/test_lss.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_lss.py b/test/test_lss.py index 1407109e..f8919b54 100644 --- a/test/test_lss.py +++ b/test/test_lss.py @@ -54,7 +54,7 @@ def side_effect(cob_id, data): return side_effect def _send_no_response(self, cob_id, data): - """A send_message side_effect that records but sends no response.""" + """Record but do not send a response.""" self.sent_messages.append((cob_id, bytes(data))) # ---- switch state global ---- From 73814ac9c2692b1967c509699d89b626faaf23f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Colomb?= Date: Wed, 29 Apr 2026 08:17:38 +0200 Subject: [PATCH 4/7] unused import --- test/test_lss.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_lss.py b/test/test_lss.py index f8919b54..3ad80b6f 100644 --- a/test/test_lss.py +++ b/test/test_lss.py @@ -20,7 +20,6 @@ CS_INQUIRE_REVISION_NUMBER, CS_INQUIRE_SERIAL_NUMBER, CS_IDENTIFY_SLAVE, - CS_FAST_SCAN, CS_ACTIVATE_BIT_TIMING, ERROR_NONE, ERROR_INADMISSIBLE, From 9273fbcc3a1f7e4b1f30ef35f40fa22474f91b51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Colomb?= Date: Wed, 29 Apr 2026 09:52:17 +0200 Subject: [PATCH 5/7] Compare against raw bytes, avoid module constants. --- test/test_lss.py | 126 ++++++++++++----------------------------------- 1 file changed, 31 insertions(+), 95 deletions(-) diff --git a/test/test_lss.py b/test/test_lss.py index 3ad80b6f..04de14ba 100644 --- a/test/test_lss.py +++ b/test/test_lss.py @@ -5,24 +5,10 @@ from canopen.lss import ( LssMaster, LssError, - CS_SWITCH_STATE_GLOBAL, - CS_CONFIGURE_NODE_ID, - CS_CONFIGURE_BIT_TIMING, - CS_STORE_CONFIGURATION, - CS_SWITCH_STATE_SELECTIVE_VENDOR_ID, - CS_SWITCH_STATE_SELECTIVE_PRODUCT_CODE, - CS_SWITCH_STATE_SELECTIVE_REVISION_NUMBER, - CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER, - CS_SWITCH_STATE_SELECTIVE_RESPONSE, - CS_INQUIRE_NODE_ID, CS_INQUIRE_VENDOR_ID, CS_INQUIRE_PRODUCT_CODE, CS_INQUIRE_REVISION_NUMBER, CS_INQUIRE_SERIAL_NUMBER, - CS_IDENTIFY_SLAVE, - CS_ACTIVATE_BIT_TIMING, - ERROR_NONE, - ERROR_INADMISSIBLE, ListMessageNeedResponse, ) @@ -65,15 +51,13 @@ def test_send_switch_state_global_configuration(self): cob_id, data = self.sent_messages[0] self.assertEqual(cob_id, LssMaster.LSS_TX_COBID) self.assertEqual(len(data), 8) - self.assertEqual(data[0], CS_SWITCH_STATE_GLOBAL) - self.assertEqual(data[1], LssMaster.CONFIGURATION_STATE) + self.assertEqual(data[:2], b'\x04\x01') def test_send_switch_state_global_waiting(self): self.network.send_message.side_effect = self._send_no_response self.lss.send_switch_state_global(LssMaster.WAITING_STATE) _, data = self.sent_messages[0] - self.assertEqual(data[0], CS_SWITCH_STATE_GLOBAL) - self.assertEqual(data[1], LssMaster.WAITING_STATE) + self.assertEqual(data[:2], b'\x04\x00') def test_send_switch_state_global_no_response_expected(self): self.network.send_message.side_effect = self._send_no_response @@ -82,29 +66,21 @@ def test_send_switch_state_global_no_response_expected(self): # ---- configure node ID ---- def test_configure_node_id_success(self): - response = bytearray(8) - response[0] = CS_CONFIGURE_NODE_ID - response[1] = ERROR_NONE + response = b'\x11\x00\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - self.lss.configure_node_id(5) _, data = self.sent_messages[0] - self.assertEqual(data[0], CS_CONFIGURE_NODE_ID) - self.assertEqual(data[1], 5) + self.assertEqual(data[:2], b'\x11\x05') def test_configure_node_id_error(self): - response = bytearray(8) - response[0] = CS_CONFIGURE_NODE_ID - response[1] = ERROR_INADMISSIBLE + response = b'\x11\x01\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) with self.assertRaises(LssError): self.lss.configure_node_id(200) def test_configure_node_id_wrong_cs(self): - response = bytearray(8) - response[0] = 0xFF - response[1] = ERROR_NONE + response = b'\xFF\x00\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) with self.assertRaises(LssError): @@ -113,16 +89,12 @@ def test_configure_node_id_wrong_cs(self): # ---- configure bit timing ---- def test_configure_bit_timing_success(self): - response = bytearray(8) - response[0] = CS_CONFIGURE_BIT_TIMING - response[1] = ERROR_NONE + response = b'\x13\x00\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) self.lss.configure_bit_timing(4) _, data = self.sent_messages[0] - self.assertEqual(data[0], CS_CONFIGURE_BIT_TIMING) - self.assertEqual(data[1], 0) - self.assertEqual(data[2], 4) + self.assertEqual(data[:3], b'\x13\x00\x04') # ---- activate bit timing ---- @@ -130,23 +102,17 @@ def test_activate_bit_timing(self): self.network.send_message.side_effect = self._send_no_response self.lss.activate_bit_timing(500) _, data = self.sent_messages[0] - self.assertEqual(data[0], CS_ACTIVATE_BIT_TIMING) - delay = struct.unpack_from(' Date: Wed, 29 Apr 2026 09:53:45 +0200 Subject: [PATCH 6/7] More robust checking of exception messages (using regex). --- test/test_lss.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/test/test_lss.py b/test/test_lss.py index 04de14ba..87b1b06c 100644 --- a/test/test_lss.py +++ b/test/test_lss.py @@ -1,3 +1,4 @@ +import re import struct import unittest from unittest.mock import MagicMock @@ -75,15 +76,13 @@ def test_configure_node_id_success(self): def test_configure_node_id_error(self): response = b'\x11\x01\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - - with self.assertRaises(LssError): + with self.assertRaisesRegex(LssError, re.compile('error.*1', re.I)): self.lss.configure_node_id(200) def test_configure_node_id_wrong_cs(self): response = b'\xFF\x00\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - - with self.assertRaises(LssError): + with self.assertRaisesRegex(LssError, re.compile('not for.*request', re.I)): self.lss.configure_node_id(5) # ---- configure bit timing ---- @@ -114,8 +113,7 @@ def test_store_configuration_success(self): def test_store_configuration_error(self): response = b'\x17\x01\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - - with self.assertRaises(LssError): + with self.assertRaisesRegex(LssError, re.compile('error.*1', re.I)): self.lss.store_configuration() # ---- inquire node ID ---- @@ -129,8 +127,7 @@ def test_inquire_node_id(self): def test_inquire_node_id_wrong_cs(self): response = b'\xFF\x2A\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - - with self.assertRaises(LssError): + with self.assertRaisesRegex(LssError, re.compile('not for.*request', re.I)): self.lss.inquire_node_id() # ---- inquire LSS address ---- @@ -162,8 +159,7 @@ def test_inquire_serial_number(self): def test_inquire_lss_address_wrong_cs(self): response = b'\xFF\x00\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - - with self.assertRaises(LssError): + with self.assertRaisesRegex(LssError, re.compile('not for.*request', re.I)): self.lss.inquire_lss_address(CS_INQUIRE_VENDOR_ID) # ---- switch state selective ---- @@ -190,9 +186,8 @@ def test_send_switch_state_selective_no_match(self): def test_no_response_timeout(self): self.network.send_message.side_effect = self._send_no_response - with self.assertRaises(LssError) as ctx: + with self.assertRaisesRegex(LssError, re.compile('no LSS response', re.I)): self.lss.inquire_node_id() - self.assertIn("No LSS response", str(ctx.exception)) def test_unexpected_messages_cleared(self): """Stale messages in queue should be cleared before sending.""" From 367788c4fc09265e2e330e1ba6446e037dd92099 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frieder=20Sch=C3=BCler?= Date: Wed, 29 Apr 2026 10:07:23 +0200 Subject: [PATCH 7/7] test: address review comments on LSS master tests - Remove duplicate test_send_switch_state_global_no_response_expected - Remove duplicate test_lss_address_encoding and unused struct import - Restructure imports: use 'from canopen import lss' for constants, keep class imports on one line, sort alphabetically Co-authored-by: GitHub Copilot Tool-assisted: tests were written with AI tooling assistance --- test/test_lss.py | 45 ++++++++------------------------------------- 1 file changed, 8 insertions(+), 37 deletions(-) diff --git a/test/test_lss.py b/test/test_lss.py index 87b1b06c..53c63852 100644 --- a/test/test_lss.py +++ b/test/test_lss.py @@ -1,17 +1,9 @@ import re -import struct import unittest from unittest.mock import MagicMock -from canopen.lss import ( - LssMaster, - LssError, - CS_INQUIRE_VENDOR_ID, - CS_INQUIRE_PRODUCT_CODE, - CS_INQUIRE_REVISION_NUMBER, - CS_INQUIRE_SERIAL_NUMBER, - ListMessageNeedResponse, -) +from canopen import lss +from canopen.lss import LssError, LssMaster class TestLssMaster(unittest.TestCase): @@ -34,7 +26,7 @@ def _send_and_respond(self, response): def side_effect(cob_id, data): self.sent_messages.append((cob_id, bytes(data))) - if data[0] in ListMessageNeedResponse: + if data[0] in lss.ListMessageNeedResponse: self.lss.on_message_received(LssMaster.LSS_RX_COBID, response, 0.0) return side_effect @@ -60,10 +52,6 @@ def test_send_switch_state_global_waiting(self): _, data = self.sent_messages[0] self.assertEqual(data[:2], b'\x04\x00') - def test_send_switch_state_global_no_response_expected(self): - self.network.send_message.side_effect = self._send_no_response - self.lss.send_switch_state_global(LssMaster.CONFIGURATION_STATE) - # ---- configure node ID ---- def test_configure_node_id_success(self): @@ -135,32 +123,32 @@ def test_inquire_node_id_wrong_cs(self): def test_inquire_vendor_id(self): response = b'\x5A\x78\x56\x34\x12\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - result = self.lss.inquire_lss_address(CS_INQUIRE_VENDOR_ID) + result = self.lss.inquire_lss_address(lss.CS_INQUIRE_VENDOR_ID) self.assertEqual(result, 0x12345678) def test_inquire_product_code(self): response = b'\x5B\xCD\xAB\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - result = self.lss.inquire_lss_address(CS_INQUIRE_PRODUCT_CODE) + result = self.lss.inquire_lss_address(lss.CS_INQUIRE_PRODUCT_CODE) self.assertEqual(result, 0xABCD) def test_inquire_revision_number(self): response = b'\x5C\x63\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - result = self.lss.inquire_lss_address(CS_INQUIRE_REVISION_NUMBER) + result = self.lss.inquire_lss_address(lss.CS_INQUIRE_REVISION_NUMBER) self.assertEqual(result, 99) def test_inquire_serial_number(self): response = b'\x5D\xE9\x03\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) - result = self.lss.inquire_lss_address(CS_INQUIRE_SERIAL_NUMBER) + result = self.lss.inquire_lss_address(lss.CS_INQUIRE_SERIAL_NUMBER) self.assertEqual(result, 1001) def test_inquire_lss_address_wrong_cs(self): response = b'\xFF\x00\x00\x00\x00\x00\x00\x00' self.network.send_message.side_effect = self._send_and_respond(response) with self.assertRaisesRegex(LssError, re.compile('not for.*request', re.I)): - self.lss.inquire_lss_address(CS_INQUIRE_VENDOR_ID) + self.lss.inquire_lss_address(lss.CS_INQUIRE_VENDOR_ID) # ---- switch state selective ---- @@ -225,23 +213,6 @@ def test_fast_scan_finds_slave(self): self.assertTrue(result) self.assertEqual(lss_id, [0, 0, 0, 0]) - # ---- LSS address encoding ---- - - def test_lss_address_encoding(self): - """Verify the 4-byte address is packed correctly in messages.""" - response = b'\x44\x00\x00\x00\x00\x00\x00\x00' - self.network.send_message.side_effect = self._send_and_respond(response) - - self.lss.send_switch_state_selective(0xDEADBEEF, 0xCAFEBABE, 0x12345678, 0x9ABCDEF0) - - data = self.sent_messages[0][1] - packed = struct.unpack_from('