Skip to content

Commit 37a28d6

Browse files
Tests: improve coverage of canopen.Network
1 parent a196e1e commit 37a28d6

File tree

1 file changed

+189
-13
lines changed

1 file changed

+189
-13
lines changed

test/test_network.py

Lines changed: 189 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import unittest
23
from threading import Event
34

@@ -9,28 +10,84 @@
910
class TestNetwork(unittest.TestCase):
1011

1112
def setUp(self):
12-
network = canopen.Network()
13-
with self.assertLogs():
14-
network.add_node(2, SAMPLE_EDS)
15-
network.add_node(3, network[2].object_dictionary)
16-
self.network = network
13+
self.network = canopen.Network()
1714

18-
def test_add_node(self):
19-
node = self.network[2]
20-
self.assertIsInstance(node, canopen.Node)
21-
self.assertEqual(node.id, 2)
15+
def test_network_add_node(self):
16+
# Add using str.
17+
with self.assertLogs():
18+
node = self.network.add_node(2, SAMPLE_EDS)
2219
self.assertEqual(self.network[2], node)
23-
self.assertEqual(len(self.network), 2)
20+
self.assertEqual(node.id, 2)
21+
self.assertIsInstance(node, canopen.RemoteNode)
22+
23+
# Add using OD.
24+
node = self.network.add_node(3, self.network[2].object_dictionary)
25+
self.assertEqual(self.network[3], node)
26+
self.assertEqual(node.id, 3)
27+
self.assertIsInstance(node, canopen.RemoteNode)
28+
29+
# Add using RemoteNode.
30+
with self.assertLogs():
31+
node = canopen.RemoteNode(4, SAMPLE_EDS)
32+
self.network.add_node(node)
33+
self.assertEqual(self.network[4], node)
34+
self.assertEqual(node.id, 4)
35+
self.assertIsInstance(node, canopen.RemoteNode)
36+
37+
# Add using LocalNode.
38+
with self.assertLogs():
39+
node = canopen.LocalNode(5, SAMPLE_EDS)
40+
self.network.add_node(node)
41+
self.assertEqual(self.network[5], node)
42+
self.assertEqual(node.id, 5)
43+
self.assertIsInstance(node, canopen.LocalNode)
44+
45+
# Verify that we've got the correct number of nodes.
46+
self.assertEqual(len(self.network), 4)
47+
48+
def test_network_add_node_upload_eds(self):
49+
# Will err because we're not connected to a real network.
50+
with self.assertLogs(level=logging.ERROR):
51+
self.network.add_node(2, SAMPLE_EDS, upload_eds=True)
52+
53+
def test_network_create_node(self):
54+
with self.assertLogs():
55+
self.network.create_node(2, SAMPLE_EDS)
56+
self.network.create_node(3, SAMPLE_EDS)
57+
node = canopen.RemoteNode(4, SAMPLE_EDS)
58+
self.network.create_node(node)
59+
self.assertIsInstance(self.network[2], canopen.LocalNode)
60+
self.assertIsInstance(self.network[3], canopen.LocalNode)
61+
self.assertIsInstance(self.network[4], canopen.RemoteNode)
62+
63+
def test_network_check(self):
64+
self.network.connect(interface="virtual", channel="test")
65+
self.addCleanup(self.network.disconnect)
66+
self.assertIsNone(self.network.check())
67+
68+
class Custom(Exception):
69+
pass
2470

25-
def test_notify(self):
71+
self.network.notifier.exception = Custom("fake")
72+
with self.assertRaisesRegex(Custom, "fake"):
73+
with self.assertLogs(level=logging.ERROR):
74+
self.network.check()
75+
with self.assertRaisesRegex(Custom, "fake"):
76+
with self.assertLogs(level=logging.ERROR):
77+
self.network.disconnect()
78+
self.network.notifier.exception = None
79+
80+
def test_network_notify(self):
81+
with self.assertLogs():
82+
self.network.add_node(2, SAMPLE_EDS)
2683
node = self.network[2]
2784
self.network.notify(0x82, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1473418396.0)
2885
self.assertEqual(len(node.emcy.active), 1)
2986
self.network.notify(0x702, b'\x05', 1473418396.0)
3087
self.assertEqual(node.nmt.state, 'OPERATIONAL')
3188
self.assertListEqual(self.network.scanner.nodes, [2])
3289

33-
def test_send(self):
90+
def test_network_send_message(self):
3491
bus = can.interface.Bus(interface="virtual", channel=1)
3592
self.addCleanup(bus.shutdown)
3693

@@ -52,7 +109,126 @@ def test_send(self):
52109
self.assertEqual(msg.arbitration_id, 0x12345)
53110
self.assertTrue(msg.is_extended_id)
54111

55-
def test_send_periodic(self):
112+
def test_network_subscribe_unsubscribe(self):
113+
N_HOOKS = 3
114+
accumulators = [] * N_HOOKS
115+
116+
self.network.connect(
117+
interface="virtual",
118+
channel="test",
119+
receive_own_messages=True
120+
)
121+
self.addCleanup(self.network.disconnect)
122+
123+
for i in range(N_HOOKS):
124+
accumulators.append([])
125+
def hook(*args, i=i):
126+
accumulators[i].append(args)
127+
self.network.subscribe(i, hook)
128+
129+
self.network.notify(0, bytes([1, 2, 3]), 1000)
130+
self.network.notify(1, bytes([2, 3, 4]), 1001)
131+
self.network.notify(1, bytes([3, 4, 5]), 1002)
132+
self.network.notify(2, bytes([4, 5, 6]), 1003)
133+
134+
self.assertEqual(accumulators[0], [(0, bytes([1, 2, 3]), 1000)])
135+
self.assertEqual(accumulators[1], [
136+
(1, bytes([2, 3, 4]), 1001),
137+
(1, bytes([3, 4, 5]), 1002),
138+
])
139+
self.assertEqual(accumulators[2], [(2, bytes([4, 5, 6]), 1003)])
140+
141+
self.network.unsubscribe(0)
142+
self.network.notify(0, bytes([7, 7, 7]), 1004)
143+
# Verify that no new data was added to the accumulator.
144+
self.assertEqual(accumulators[0], [(0, bytes([1, 2, 3]), 1000)])
145+
146+
def test_network_subscribe_multiple(self):
147+
N_HOOKS = 3
148+
self.network.connect(
149+
interface="virtual",
150+
channel="test",
151+
receive_own_messages=True
152+
)
153+
self.addCleanup(self.network.disconnect)
154+
155+
accumulators = []
156+
hooks = []
157+
for i in range(N_HOOKS):
158+
accumulators.append([])
159+
def hook(*args, i=i):
160+
accumulators[i].append(args)
161+
hooks.append(hook)
162+
self.network.subscribe(0x20, hook)
163+
164+
self.network.notify(0xaa, bytes([1, 1, 1]), 2000)
165+
self.network.notify(0x20, bytes([2, 3, 4]), 2001)
166+
self.network.notify(0xbb, bytes([2, 2, 2]), 2002)
167+
self.network.notify(0x20, bytes([3, 4, 5]), 2003)
168+
self.network.notify(0xcc, bytes([3, 3, 3]), 2004)
169+
170+
BATCH1 = [
171+
(0x20, bytes([2, 3, 4]), 2001),
172+
(0x20, bytes([3, 4, 5]), 2003),
173+
]
174+
for n, acc in enumerate(accumulators):
175+
with self.subTest(hook=n):
176+
self.assertEqual(acc, BATCH1)
177+
178+
# Unsubscribe the second hook; dispatch a new message.
179+
self.network.unsubscribe(0x20, hooks[1])
180+
181+
BATCH2 = 0x20, bytes([4, 5, 6]), 2005
182+
self.network.notify(*BATCH2)
183+
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
184+
self.assertEqual(accumulators[1], BATCH1)
185+
self.assertEqual(accumulators[2], BATCH1 + [BATCH2])
186+
187+
# Unsubscribe the first hook; dispatch yet another message.
188+
self.network.unsubscribe(0x20, hooks[0])
189+
190+
BATCH3 = 0x20, bytes([5, 6, 7]), 2006
191+
self.network.notify(*BATCH3)
192+
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
193+
self.assertEqual(accumulators[1], BATCH1)
194+
self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3])
195+
196+
# Unsubscribe the rest (only one remaining); dispatch a new message.
197+
self.network.unsubscribe(0x20)
198+
self.network.notify(0x20, bytes([7, 7, 7]), 2007)
199+
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
200+
self.assertEqual(accumulators[1], BATCH1)
201+
self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3])
202+
203+
def test_network_context_manager(self):
204+
with self.network.connect(interface="virtual", channel=1):
205+
pass
206+
with self.assertRaisesRegex(RuntimeError, "Not connected"):
207+
self.network.send_message(0, [])
208+
209+
def test_network_item_access(self):
210+
with self.assertLogs():
211+
self.network.add_node(2, SAMPLE_EDS)
212+
self.network.add_node(3, SAMPLE_EDS)
213+
self.assertEqual([2, 3], [node for node in self.network])
214+
215+
# Check __delitem__.
216+
del self.network[2]
217+
self.assertEqual([3], [node for node in self.network])
218+
with self.assertRaises(KeyError):
219+
del self.network[2]
220+
221+
# Check __setitem__.
222+
old = self.network[3]
223+
with self.assertLogs():
224+
new = canopen.Node(3, SAMPLE_EDS)
225+
self.network[3] = new
226+
227+
# Check __getitem__.
228+
self.assertNotEqual(self.network[3], old)
229+
self.assertEqual([3], [node for node in self.network])
230+
231+
def test_network_send_periodic(self):
56232
DATA1 = bytes([1, 2, 3])
57233
DATA2 = bytes([4, 5, 6])
58234
COB_ID = 0x123

0 commit comments

Comments
 (0)