Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 190 additions & 20 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import unittest
from threading import Event

Expand All @@ -9,32 +10,94 @@
class TestNetwork(unittest.TestCase):

def setUp(self):
network = canopen.Network()
with self.assertLogs():
network.add_node(2, SAMPLE_EDS)
network.add_node(3, network[2].object_dictionary)
self.network = network
self.network = canopen.Network()

def test_add_node(self):
node = self.network[2]
self.assertIsInstance(node, canopen.Node)
self.assertEqual(node.id, 2)
def test_network_add_node(self):
# Add using str.
with self.assertLogs():
node = self.network.add_node(2, SAMPLE_EDS)
self.assertEqual(self.network[2], node)
self.assertEqual(len(self.network), 2)
self.assertEqual(node.id, 2)
self.assertIsInstance(node, canopen.RemoteNode)

# Add using OD.
node = self.network.add_node(3, self.network[2].object_dictionary)
self.assertEqual(self.network[3], node)
self.assertEqual(node.id, 3)
self.assertIsInstance(node, canopen.RemoteNode)

# Add using RemoteNode.
with self.assertLogs():
node = canopen.RemoteNode(4, SAMPLE_EDS)
self.network.add_node(node)
self.assertEqual(self.network[4], node)
self.assertEqual(node.id, 4)
self.assertIsInstance(node, canopen.RemoteNode)

# Add using LocalNode.
with self.assertLogs():
node = canopen.LocalNode(5, SAMPLE_EDS)
self.network.add_node(node)
self.assertEqual(self.network[5], node)
self.assertEqual(node.id, 5)
self.assertIsInstance(node, canopen.LocalNode)

# Verify that we've got the correct number of nodes.
self.assertEqual(len(self.network), 4)

def test_notify(self):
def test_network_add_node_upload_eds(self):
# Will err because we're not connected to a real network.
with self.assertLogs(level=logging.ERROR):
self.network.add_node(2, SAMPLE_EDS, upload_eds=True)
Comment on lines +49 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test can be enhanced for example by mocking canopen.sdo.SdoClient.open to return a pre-filled StringIO buffer.


def test_network_create_node(self):
with self.assertLogs():
self.network.create_node(2, SAMPLE_EDS)
self.network.create_node(3, SAMPLE_EDS)
node = canopen.RemoteNode(4, SAMPLE_EDS)
self.network.create_node(node)
self.assertIsInstance(self.network[2], canopen.LocalNode)
self.assertIsInstance(self.network[3], canopen.LocalNode)
self.assertIsInstance(self.network[4], canopen.RemoteNode)

def test_network_check(self):
self.network.connect(interface="virtual")

def cleanup():
# We must clear the fake exception installed below, since
# .disconnect() implicitly calls .check() during test tear down.
self.network.notifier.exception = None
self.network.disconnect()

self.addCleanup(cleanup)
self.assertIsNone(self.network.check())

class Custom(Exception):
pass

self.network.notifier.exception = Custom("fake")
with self.assertRaisesRegex(Custom, "fake"):
with self.assertLogs(level=logging.ERROR):
self.network.check()
with self.assertRaisesRegex(Custom, "fake"):
with self.assertLogs(level=logging.ERROR):
self.network.disconnect()

def test_network_notify(self):
with self.assertLogs():
self.network.add_node(2, SAMPLE_EDS)
node = self.network[2]
self.network.notify(0x82, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1473418396.0)
self.assertEqual(len(node.emcy.active), 1)
self.network.notify(0x702, b'\x05', 1473418396.0)
self.assertEqual(node.nmt.state, 'OPERATIONAL')
self.assertListEqual(self.network.scanner.nodes, [2])

def test_send(self):
bus = can.interface.Bus(interface="virtual", channel=1)
def test_network_send_message(self):
bus = can.interface.Bus(interface="virtual")
self.addCleanup(bus.shutdown)

self.network.connect(interface="virtual", channel=1)
self.network.connect(interface="virtual")
self.addCleanup(self.network.disconnect)

# Send standard ID
Expand All @@ -52,16 +115,123 @@ def test_send(self):
self.assertEqual(msg.arbitration_id, 0x12345)
self.assertTrue(msg.is_extended_id)

def test_send_periodic(self):
def test_network_subscribe_unsubscribe(self):
N_HOOKS = 3
accumulators = [] * N_HOOKS

self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

for i in range(N_HOOKS):
accumulators.append([])
def hook(*args, i=i):
accumulators[i].append(args)
self.network.subscribe(i, hook)

self.network.notify(0, bytes([1, 2, 3]), 1000)
self.network.notify(1, bytes([2, 3, 4]), 1001)
self.network.notify(1, bytes([3, 4, 5]), 1002)
self.network.notify(2, bytes([4, 5, 6]), 1003)

self.assertEqual(accumulators[0], [(0, bytes([1, 2, 3]), 1000)])
self.assertEqual(accumulators[1], [
(1, bytes([2, 3, 4]), 1001),
(1, bytes([3, 4, 5]), 1002),
])
self.assertEqual(accumulators[2], [(2, bytes([4, 5, 6]), 1003)])

self.network.unsubscribe(0)
self.network.notify(0, bytes([7, 7, 7]), 1004)
# Verify that no new data was added to the accumulator.
self.assertEqual(accumulators[0], [(0, bytes([1, 2, 3]), 1000)])

def test_network_subscribe_multiple(self):
N_HOOKS = 3
self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

accumulators = []
hooks = []
for i in range(N_HOOKS):
accumulators.append([])
def hook(*args, i=i):
accumulators[i].append(args)
hooks.append(hook)
self.network.subscribe(0x20, hook)

self.network.notify(0xaa, bytes([1, 1, 1]), 2000)
self.network.notify(0x20, bytes([2, 3, 4]), 2001)
self.network.notify(0xbb, bytes([2, 2, 2]), 2002)
self.network.notify(0x20, bytes([3, 4, 5]), 2003)
self.network.notify(0xcc, bytes([3, 3, 3]), 2004)

BATCH1 = [
(0x20, bytes([2, 3, 4]), 2001),
(0x20, bytes([3, 4, 5]), 2003),
]
for n, acc in enumerate(accumulators):
with self.subTest(hook=n):
self.assertEqual(acc, BATCH1)

# Unsubscribe the second hook; dispatch a new message.
self.network.unsubscribe(0x20, hooks[1])

BATCH2 = 0x20, bytes([4, 5, 6]), 2005
self.network.notify(*BATCH2)
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
self.assertEqual(accumulators[1], BATCH1)
self.assertEqual(accumulators[2], BATCH1 + [BATCH2])

# Unsubscribe the first hook; dispatch yet another message.
self.network.unsubscribe(0x20, hooks[0])

BATCH3 = 0x20, bytes([5, 6, 7]), 2006
self.network.notify(*BATCH3)
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
self.assertEqual(accumulators[1], BATCH1)
self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3])

# Unsubscribe the rest (only one remaining); dispatch a new message.
self.network.unsubscribe(0x20)
self.network.notify(0x20, bytes([7, 7, 7]), 2007)
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
self.assertEqual(accumulators[1], BATCH1)
self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3])

def test_network_context_manager(self):
with self.network.connect(interface="virtual"):
pass
with self.assertRaisesRegex(RuntimeError, "Not connected"):
self.network.send_message(0, [])

def test_network_item_access(self):
with self.assertLogs():
self.network.add_node(2, SAMPLE_EDS)
self.network.add_node(3, SAMPLE_EDS)
self.assertEqual([2, 3], [node for node in self.network])

# Check __delitem__.
del self.network[2]
self.assertEqual([3], [node for node in self.network])
with self.assertRaises(KeyError):
del self.network[2]

# Check __setitem__.
old = self.network[3]
with self.assertLogs():
new = canopen.Node(3, SAMPLE_EDS)
self.network[3] = new

# Check __getitem__.
self.assertNotEqual(self.network[3], old)
self.assertEqual([3], [node for node in self.network])

def test_network_send_periodic(self):
DATA1 = bytes([1, 2, 3])
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
self.network.connect(
interface="virtual",
channel=1,
receive_own_messages=True
)
self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

acc = []
Expand Down