From 37a28d68f2c98f051d19f3c496f4e9ed788a6d5d Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Thu, 11 Jul 2024 01:24:46 +0200 Subject: [PATCH 1/4] Tests: improve coverage of canopen.Network --- test/test_network.py | 202 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 189 insertions(+), 13 deletions(-) diff --git a/test/test_network.py b/test/test_network.py index d488a2bc..1ffc8a08 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -1,3 +1,4 @@ +import logging import unittest from threading import Event @@ -9,20 +10,76 @@ class TestNetwork(unittest.TestCase): def setUp(self): - network = canopen.Network() - with self.assertLogs(): - network.add_node(2, SAMPLE_EDS) - network.add_node(3, network[2].object_dictionary) - self.network = network + self.network = canopen.Network() - def test_add_node(self): - node = self.network[2] - self.assertIsInstance(node, canopen.Node) - self.assertEqual(node.id, 2) + def test_network_add_node(self): + # Add using str. + with self.assertLogs(): + node = self.network.add_node(2, SAMPLE_EDS) self.assertEqual(self.network[2], node) - self.assertEqual(len(self.network), 2) + self.assertEqual(node.id, 2) + self.assertIsInstance(node, canopen.RemoteNode) + + # Add using OD. + node = self.network.add_node(3, self.network[2].object_dictionary) + self.assertEqual(self.network[3], node) + self.assertEqual(node.id, 3) + self.assertIsInstance(node, canopen.RemoteNode) + + # Add using RemoteNode. + with self.assertLogs(): + node = canopen.RemoteNode(4, SAMPLE_EDS) + self.network.add_node(node) + self.assertEqual(self.network[4], node) + self.assertEqual(node.id, 4) + self.assertIsInstance(node, canopen.RemoteNode) + + # Add using LocalNode. + with self.assertLogs(): + node = canopen.LocalNode(5, SAMPLE_EDS) + self.network.add_node(node) + self.assertEqual(self.network[5], node) + self.assertEqual(node.id, 5) + self.assertIsInstance(node, canopen.LocalNode) + + # Verify that we've got the correct number of nodes. + self.assertEqual(len(self.network), 4) + + def test_network_add_node_upload_eds(self): + # Will err because we're not connected to a real network. + with self.assertLogs(level=logging.ERROR): + self.network.add_node(2, SAMPLE_EDS, upload_eds=True) + + def test_network_create_node(self): + with self.assertLogs(): + self.network.create_node(2, SAMPLE_EDS) + self.network.create_node(3, SAMPLE_EDS) + node = canopen.RemoteNode(4, SAMPLE_EDS) + self.network.create_node(node) + self.assertIsInstance(self.network[2], canopen.LocalNode) + self.assertIsInstance(self.network[3], canopen.LocalNode) + self.assertIsInstance(self.network[4], canopen.RemoteNode) + + def test_network_check(self): + self.network.connect(interface="virtual", channel="test") + self.addCleanup(self.network.disconnect) + self.assertIsNone(self.network.check()) + + class Custom(Exception): + pass - def test_notify(self): + self.network.notifier.exception = Custom("fake") + with self.assertRaisesRegex(Custom, "fake"): + with self.assertLogs(level=logging.ERROR): + self.network.check() + with self.assertRaisesRegex(Custom, "fake"): + with self.assertLogs(level=logging.ERROR): + self.network.disconnect() + self.network.notifier.exception = None + + def test_network_notify(self): + with self.assertLogs(): + self.network.add_node(2, SAMPLE_EDS) node = self.network[2] self.network.notify(0x82, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1473418396.0) self.assertEqual(len(node.emcy.active), 1) @@ -30,7 +87,7 @@ def test_notify(self): self.assertEqual(node.nmt.state, 'OPERATIONAL') self.assertListEqual(self.network.scanner.nodes, [2]) - def test_send(self): + def test_network_send_message(self): bus = can.interface.Bus(interface="virtual", channel=1) self.addCleanup(bus.shutdown) @@ -52,7 +109,126 @@ def test_send(self): self.assertEqual(msg.arbitration_id, 0x12345) self.assertTrue(msg.is_extended_id) - def test_send_periodic(self): + def test_network_subscribe_unsubscribe(self): + N_HOOKS = 3 + accumulators = [] * N_HOOKS + + self.network.connect( + interface="virtual", + channel="test", + receive_own_messages=True + ) + self.addCleanup(self.network.disconnect) + + for i in range(N_HOOKS): + accumulators.append([]) + def hook(*args, i=i): + accumulators[i].append(args) + self.network.subscribe(i, hook) + + self.network.notify(0, bytes([1, 2, 3]), 1000) + self.network.notify(1, bytes([2, 3, 4]), 1001) + self.network.notify(1, bytes([3, 4, 5]), 1002) + self.network.notify(2, bytes([4, 5, 6]), 1003) + + self.assertEqual(accumulators[0], [(0, bytes([1, 2, 3]), 1000)]) + self.assertEqual(accumulators[1], [ + (1, bytes([2, 3, 4]), 1001), + (1, bytes([3, 4, 5]), 1002), + ]) + self.assertEqual(accumulators[2], [(2, bytes([4, 5, 6]), 1003)]) + + self.network.unsubscribe(0) + self.network.notify(0, bytes([7, 7, 7]), 1004) + # Verify that no new data was added to the accumulator. + self.assertEqual(accumulators[0], [(0, bytes([1, 2, 3]), 1000)]) + + def test_network_subscribe_multiple(self): + N_HOOKS = 3 + self.network.connect( + interface="virtual", + channel="test", + receive_own_messages=True + ) + self.addCleanup(self.network.disconnect) + + accumulators = [] + hooks = [] + for i in range(N_HOOKS): + accumulators.append([]) + def hook(*args, i=i): + accumulators[i].append(args) + hooks.append(hook) + self.network.subscribe(0x20, hook) + + self.network.notify(0xaa, bytes([1, 1, 1]), 2000) + self.network.notify(0x20, bytes([2, 3, 4]), 2001) + self.network.notify(0xbb, bytes([2, 2, 2]), 2002) + self.network.notify(0x20, bytes([3, 4, 5]), 2003) + self.network.notify(0xcc, bytes([3, 3, 3]), 2004) + + BATCH1 = [ + (0x20, bytes([2, 3, 4]), 2001), + (0x20, bytes([3, 4, 5]), 2003), + ] + for n, acc in enumerate(accumulators): + with self.subTest(hook=n): + self.assertEqual(acc, BATCH1) + + # Unsubscribe the second hook; dispatch a new message. + self.network.unsubscribe(0x20, hooks[1]) + + BATCH2 = 0x20, bytes([4, 5, 6]), 2005 + self.network.notify(*BATCH2) + self.assertEqual(accumulators[0], BATCH1 + [BATCH2]) + self.assertEqual(accumulators[1], BATCH1) + self.assertEqual(accumulators[2], BATCH1 + [BATCH2]) + + # Unsubscribe the first hook; dispatch yet another message. + self.network.unsubscribe(0x20, hooks[0]) + + BATCH3 = 0x20, bytes([5, 6, 7]), 2006 + self.network.notify(*BATCH3) + self.assertEqual(accumulators[0], BATCH1 + [BATCH2]) + self.assertEqual(accumulators[1], BATCH1) + self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3]) + + # Unsubscribe the rest (only one remaining); dispatch a new message. + self.network.unsubscribe(0x20) + self.network.notify(0x20, bytes([7, 7, 7]), 2007) + self.assertEqual(accumulators[0], BATCH1 + [BATCH2]) + self.assertEqual(accumulators[1], BATCH1) + self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3]) + + def test_network_context_manager(self): + with self.network.connect(interface="virtual", channel=1): + pass + with self.assertRaisesRegex(RuntimeError, "Not connected"): + self.network.send_message(0, []) + + def test_network_item_access(self): + with self.assertLogs(): + self.network.add_node(2, SAMPLE_EDS) + self.network.add_node(3, SAMPLE_EDS) + self.assertEqual([2, 3], [node for node in self.network]) + + # Check __delitem__. + del self.network[2] + self.assertEqual([3], [node for node in self.network]) + with self.assertRaises(KeyError): + del self.network[2] + + # Check __setitem__. + old = self.network[3] + with self.assertLogs(): + new = canopen.Node(3, SAMPLE_EDS) + self.network[3] = new + + # Check __getitem__. + self.assertNotEqual(self.network[3], old) + self.assertEqual([3], [node for node in self.network]) + + def test_network_send_periodic(self): DATA1 = bytes([1, 2, 3]) DATA2 = bytes([4, 5, 6]) COB_ID = 0x123 From 08ebe40fea110929b5f528a3f161dc3716a569c4 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Thu, 11 Jul 2024 09:38:25 +0200 Subject: [PATCH 2/4] Better cleanup for test_network_check --- test/test_network.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/test/test_network.py b/test/test_network.py index 1ffc8a08..fe354d9a 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -62,20 +62,28 @@ def test_network_create_node(self): def test_network_check(self): self.network.connect(interface="virtual", channel="test") - self.addCleanup(self.network.disconnect) + + def cleanup(): + # We must clear the fake exception installed below, since + # .disconnect() implicitly calls .check() during test tear down. + self.network.notifier.exception = None + self.network.disconnect() + + self.addCleanup(cleanup) self.assertIsNone(self.network.check()) class Custom(Exception): pass + self.network.notifier.exception = Custom("fake") + with self.assertRaisesRegex(Custom, "fake"): with self.assertLogs(level=logging.ERROR): self.network.check() with self.assertRaisesRegex(Custom, "fake"): with self.assertLogs(level=logging.ERROR): self.network.disconnect() - self.network.notifier.exception = None def test_network_notify(self): with self.assertLogs(): From d940d0105d1335cd21567ac627ce98c6aa0c7e0f Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Thu, 11 Jul 2024 09:38:57 +0200 Subject: [PATCH 3/4] Whitespace --- test/test_network.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/test_network.py b/test/test_network.py index fe354d9a..998f1d2e 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -75,9 +75,7 @@ def cleanup(): class Custom(Exception): pass - self.network.notifier.exception = Custom("fake") - with self.assertRaisesRegex(Custom, "fake"): with self.assertLogs(level=logging.ERROR): self.network.check() From d8d8d3cbefc871c5d48b937626ed5afcc72367bb Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Thu, 11 Jul 2024 12:31:57 +0200 Subject: [PATCH 4/4] Simplify --- test/test_network.py | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/test/test_network.py b/test/test_network.py index 998f1d2e..16c7616c 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -61,7 +61,7 @@ def test_network_create_node(self): self.assertIsInstance(self.network[4], canopen.RemoteNode) def test_network_check(self): - self.network.connect(interface="virtual", channel="test") + self.network.connect(interface="virtual") def cleanup(): # We must clear the fake exception installed below, since @@ -94,10 +94,10 @@ def test_network_notify(self): self.assertListEqual(self.network.scanner.nodes, [2]) def test_network_send_message(self): - bus = can.interface.Bus(interface="virtual", channel=1) + bus = can.interface.Bus(interface="virtual") self.addCleanup(bus.shutdown) - self.network.connect(interface="virtual", channel=1) + self.network.connect(interface="virtual") self.addCleanup(self.network.disconnect) # Send standard ID @@ -119,11 +119,7 @@ def test_network_subscribe_unsubscribe(self): N_HOOKS = 3 accumulators = [] * N_HOOKS - self.network.connect( - interface="virtual", - channel="test", - receive_own_messages=True - ) + self.network.connect(interface="virtual", receive_own_messages=True) self.addCleanup(self.network.disconnect) for i in range(N_HOOKS): @@ -151,11 +147,7 @@ def hook(*args, i=i): def test_network_subscribe_multiple(self): N_HOOKS = 3 - self.network.connect( - interface="virtual", - channel="test", - receive_own_messages=True - ) + self.network.connect(interface="virtual", receive_own_messages=True) self.addCleanup(self.network.disconnect) accumulators = [] @@ -207,7 +199,7 @@ def hook(*args, i=i): self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3]) def test_network_context_manager(self): - with self.network.connect(interface="virtual", channel=1): + with self.network.connect(interface="virtual"): pass with self.assertRaisesRegex(RuntimeError, "Not connected"): self.network.send_message(0, []) @@ -239,11 +231,7 @@ def test_network_send_periodic(self): DATA2 = bytes([4, 5, 6]) COB_ID = 0x123 PERIOD = 0.1 - self.network.connect( - interface="virtual", - channel=1, - receive_own_messages=True - ) + self.network.connect(interface="virtual", receive_own_messages=True) self.addCleanup(self.network.disconnect) acc = []