Skip to content

Commit 8bffbec

Browse files
committed
test: improve code coverage
1 parent bec7613 commit 8bffbec

File tree

5 files changed

+152
-57
lines changed

5 files changed

+152
-57
lines changed

feeph/i2c/burst_handler.py

100644100755
Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ def read_register(self, register: int, byte_count: int = 1, max_tries: int = 5)
6666
buf_r = bytearray(byte_count)
6767
buf_r[0] = register
6868
buf_w = bytearray(byte_count)
69+
cur_try = 0
6970
for cur_try in range(1, 1 + max_tries):
7071
try:
7172
self._i2c_bus.writeto_then_readfrom(address=self._i2c_adr, buffer_out=buf_r, buffer_in=buf_w)
7273
return convert_bytearry_to_uint(buf_w)
73-
except OSError as e:
74+
# protect against sporadic errors on actual devices
75+
# (maybe we can do something to prevent these errors?)
76+
except (OSError, RuntimeError) as e:
7477
# [Errno 121] Remote I/O error
75-
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
76-
time.sleep(0.001)
77-
except RuntimeError as e:
7878
LH.warning("[%s] Unable to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
7979
time.sleep(0.001)
8080
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
@@ -93,16 +93,16 @@ def write_register(self, register: int, value: int, byte_count: int = 1, max_tri
9393
buf = bytearray([register])
9494
for byte in ba:
9595
buf.append(byte)
96+
cur_try = 0
9697
for cur_try in range(1, 1 + max_tries):
9798
try:
9899
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
99100
return
100-
except OSError as e:
101+
# protect against sporadic errors on actual devices
102+
# (maybe we can do something to prevent these errors?)
103+
except (OSError, RuntimeError) as e:
101104
# [Errno 121] Remote I/O error
102-
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
103-
time.sleep(0.1)
104-
except RuntimeError as e:
105-
LH.warning("[%s] Unable to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
105+
LH.warning("[%s] Unable to write register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
106106
time.sleep(0.1)
107107
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
108108

@@ -121,15 +121,15 @@ def get_state(self, byte_count: int = 1, max_tries: int = 5) -> int:
121121
LH.warning("Multi byte reads are not implemented yet! Returning a single byte instead.")
122122
byte_count = 1
123123
buf = bytearray(byte_count)
124+
cur_try = 0
124125
for cur_try in range(1, 1 + max_tries):
125126
try:
126127
self._i2c_bus.readfrom_into(address=self._i2c_adr, buffer=buf)
127128
return buf[0]
128-
except OSError as e:
129+
# protect against sporadic errors on actual devices
130+
# (maybe we can do something to prevent these errors?)
131+
except (OSError, RuntimeError) as e:
129132
# [Errno 121] Remote I/O error
130-
LH.warning("[%s] Failed to read state (%i/%i): %s", __name__, cur_try, max_tries, e)
131-
time.sleep(0.001)
132-
except RuntimeError as e:
133133
LH.warning("[%s] Unable to read state (%i/%i): %s", __name__, cur_try, max_tries, e)
134134
time.sleep(0.001)
135135
raise RuntimeError(f"Unable to read state after {cur_try} attempts. Giving up.")
@@ -145,15 +145,15 @@ def set_state(self, value: int, byte_count: int = 1, max_tries: int = 3):
145145
LH.warning("Multi byte writes are not implemented yet! Returning a single byte instead.")
146146
byte_count = 1
147147
buf = convert_uint_to_bytearry(value, byte_count)
148+
cur_try = 0
148149
for cur_try in range(1, 1 + max_tries):
149150
try:
150151
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
151152
return
152-
except OSError as e:
153+
# protect against sporadic errors on actual devices
154+
# (maybe we can do something to prevent these errors?)
155+
except (OSError, RuntimeError) as e:
153156
# [Errno 121] Remote I/O error
154-
LH.warning("[%s] Failed to write state (%i/%i): %s", __name__, cur_try, max_tries, e)
155-
time.sleep(0.1)
156-
except RuntimeError as e:
157157
LH.warning("[%s] Unable to write state (%i/%i): %s", __name__, cur_try, max_tries, e)
158158
time.sleep(0.1)
159159
raise RuntimeError(f"Unable to write state after {cur_try} attempts. Giving up.")

feeph/i2c/emulation.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,18 @@ def unlock(self):
5555

5656
# replicate the signature of busio.I2C
5757
# pylint: disable=too-many-arguments
58-
def readfrom_into(self, address, buffer, *, start=0, end=None, stop=True):
58+
def readfrom_into(self, address: int, buffer: bytearray, *, start=0, end=None, stop=True):
5959
"""
6060
read device state
6161
6262
(buffer is used as an output parameter)
6363
"""
64+
# make sure that buffer truly is a bytearray
65+
# (we really want this to be a bytearray because bytearray performs
66+
# its own input validation and automatically guarantees we're not
67+
# getting negative byte values or other unexpected data as input)
68+
if not isinstance(buffer, bytearray):
69+
raise ValueError("buffer must be of type 'bytearray'")
6470
i2c_device_address = address
6571
i2c_device_register = -1
6672
value = self._state[i2c_device_address][i2c_device_register]
@@ -76,6 +82,12 @@ def writeto(self, address: int, buffer: bytearray, *, start=0, end=None):
7682
"""
7783
write device state or register
7884
"""
85+
# make sure that buffer truly is a bytearray
86+
# (we really want this to be a bytearray because bytearray performs
87+
# its own input validation and automatically guarantees we're not
88+
# getting negative byte values or other unexpected data as input)
89+
if not isinstance(buffer, bytearray):
90+
raise ValueError("buffer must be of type 'bytearray'")
7991
if len(buffer) == 1:
8092
# device status
8193
i2c_device_address = address
@@ -85,8 +97,6 @@ def writeto(self, address: int, buffer: bytearray, *, start=0, end=None):
8597
# device register
8698
i2c_device_address = address
8799
i2c_device_register = buffer[0]
88-
if i2c_device_register < 0:
89-
raise ValueError("device register can't be negative")
90100
value = convert_bytearry_to_uint(buffer[1:])
91101
self._state[i2c_device_address][i2c_device_register] = value
92102

@@ -96,10 +106,16 @@ def writeto_then_readfrom(self, address: int, buffer_out: bytearray, buffer_in:
96106
97107
(buffer_in is used as an output parameter)
98108
"""
109+
# make sure that buffer_in and buffer_out truly are a bytearray
110+
# (we really want this to be a bytearray because bytearray performs
111+
# its own input validation and automatically guarantees we're not
112+
# getting negative byte values or other unexpected data as input)
113+
if not isinstance(buffer_in, bytearray):
114+
raise ValueError("buffer_in must be of type 'bytearray'")
115+
if not isinstance(buffer_out, bytearray):
116+
raise ValueError("buffer_out must be of type 'bytearray'")
99117
i2c_device_address = address
100118
i2c_device_register = buffer_out[0]
101-
if i2c_device_register < 0:
102-
raise ValueError("device register can't be negative")
103119
value = self._state[i2c_device_address][i2c_device_register]
104120
ba = convert_uint_to_bytearry(value, len(buffer_in))
105121
# copy computed result to output parameter

feeph/i2c/utility.py

Lines changed: 0 additions & 32 deletions
This file was deleted.

tests/test_burst_handler.py

100644100755
Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import feeph.i2c as sut # sytem under test
99

1010

11-
# pylint: disable=protected-access
11+
# pylint: disable=protected-access,too-many-public-methods
1212
class TestBurstHandler(unittest.TestCase):
1313

1414
def test_read_device_register(self):
@@ -39,6 +39,15 @@ def test_read_device_register_multibyte(self):
3939
# -----------------------------------------------------------------
4040
self.assertEqual(computed, expected)
4141

42+
def test_read_device_register_insufficient_tries(self):
43+
i2c_bus = sut.EmulatedI2C(state={})
44+
# -----------------------------------------------------------------
45+
# -----------------------------------------------------------------
46+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
47+
# under realistic circumstances the max_tries would be a positive
48+
# value but we're intentionally setting it to 0 to force an error
49+
self.assertRaises(RuntimeError, bh.read_register, 0x00, max_tries=0)
50+
4251
def test_read_device_registers(self):
4352
state = {
4453
0x4C: {
@@ -105,6 +114,15 @@ def test_write_device_register_multibyte(self):
105114
# -----------------------------------------------------------------
106115
self.assertEqual(computed, expected)
107116

117+
def test_write_device_register_insufficient_tries(self):
118+
i2c_bus = sut.EmulatedI2C(state={})
119+
# -----------------------------------------------------------------
120+
# -----------------------------------------------------------------
121+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
122+
# under realistic circumstances the max_tries would be a positive
123+
# value but we're intentionally setting it to 0 to force an error
124+
self.assertRaises(RuntimeError, bh.write_register, 0x00, value=0x12, max_tries=0)
125+
108126
def test_write_device_registers(self):
109127
state = {
110128
0x4C: {
@@ -188,6 +206,15 @@ def test_get_state_multibyte(self):
188206
# -----------------------------------------------------------------
189207
self.assertEqual(computed, expected)
190208

209+
def test_get_state_insufficient_tries(self):
210+
i2c_bus = sut.EmulatedI2C(state={})
211+
# -----------------------------------------------------------------
212+
# -----------------------------------------------------------------
213+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
214+
# under realistic circumstances the max_tries would be a positive
215+
# value but we're intentionally setting it to 0 to force an error
216+
self.assertRaises(RuntimeError, bh.get_state, max_tries=0)
217+
191218
def test_set_state(self):
192219
state = {
193220
0x70: {-1: 0x00},
@@ -206,11 +233,49 @@ def test_set_state_multibyte(self):
206233
# -----------------------------------------------------------------
207234
# -----------------------------------------------------------------
208235
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x70) as bh:
209-
self.assertRaises(ValueError, bh.set_state, value=0x0102)
236+
self.assertRaises(ValueError, bh.set_state, value=0x0102, byte_count=2)
237+
238+
def test_set_state_insufficient_tries(self):
239+
i2c_bus = sut.EmulatedI2C(state={})
240+
# -----------------------------------------------------------------
241+
# -----------------------------------------------------------------
242+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
243+
# under realistic circumstances the max_tries would be a positive
244+
# value but we're intentionally setting it to 0 to force an error
245+
self.assertRaises(RuntimeError, bh.set_state, value=0x12, max_tries=0)
210246

211247
# ---------------------------------------------------------------------
212248

213-
def test_no_timeout(self):
249+
def test_invalid_device_address(self):
250+
# this code tests the equivalent of:
251+
# with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0xFFFF) as bh:
252+
# ...
253+
i2c_bus = sut.EmulatedI2C(state={})
254+
bh = sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0xFFFF)
255+
# -----------------------------------------------------------------
256+
# -----------------------------------------------------------------
257+
self.assertRaises(ValueError, bh.__enter__)
258+
259+
def test_invalid_device_register(self):
260+
# this code tests the equivalent of:
261+
# with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0xFFFF) as bh:
262+
# ...
263+
i2c_bus = sut.EmulatedI2C(state={})
264+
# -----------------------------------------------------------------
265+
# -----------------------------------------------------------------
266+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x70) as bh:
267+
self.assertRaises(ValueError, bh.read_register, register=0xFFFF)
268+
269+
def test_invalid_timeout(self):
270+
# this code tests the equivalent of:
271+
# with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C, timeout_ms=0) as bh:
272+
# ...
273+
i2c_bus = sut.EmulatedI2C(state={}, lock_chance=1)
274+
# -----------------------------------------------------------------
275+
# -----------------------------------------------------------------
276+
self.assertRaises(ValueError, sut.BurstHandler, i2c_bus=i2c_bus, i2c_adr=0x4C, timeout_ms=0)
277+
278+
def test_hard_to_lock(self):
214279
state = {
215280
0x4C: {
216281
0x00: 0x12,
@@ -225,3 +290,13 @@ def test_no_timeout(self):
225290
expected = 0x12
226291
# -----------------------------------------------------------------
227292
self.assertEqual(computed, expected)
293+
294+
def test_unable_to_lock(self):
295+
# this code tests the equivalent of:
296+
# with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
297+
# ...
298+
i2c_bus = sut.EmulatedI2C(state={}, lock_chance=0) # impossible to acquire a lock
299+
bh = sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C)
300+
# -----------------------------------------------------------------
301+
# -----------------------------------------------------------------
302+
self.assertRaises(RuntimeError, bh.__enter__)

tests/test_emulation.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/usr/bin/env python3
2+
"""
3+
perform I²C bus related tests
4+
"""
5+
6+
import unittest
7+
8+
import feeph.i2c as sut # sytem under test
9+
10+
11+
# pylint: disable=protected-access
12+
class TestEmulatedI2C(unittest.TestCase):
13+
14+
def test_input_validation1(self):
15+
i2c_bus = sut.EmulatedI2C(state={})
16+
# -----------------------------------------------------------------
17+
# -----------------------------------------------------------------
18+
self.assertRaises(ValueError, i2c_bus.readfrom_into, 0x12, [-1, 0x00])
19+
20+
def test_input_validation2(self):
21+
i2c_bus = sut.EmulatedI2C(state={})
22+
# -----------------------------------------------------------------
23+
# -----------------------------------------------------------------
24+
self.assertRaises(ValueError, i2c_bus.writeto, 0x12, [-1, 0x00])
25+
26+
def test_input_validation3(self):
27+
i2c_bus = sut.EmulatedI2C(state={})
28+
# -----------------------------------------------------------------
29+
# -----------------------------------------------------------------
30+
self.assertRaises(ValueError, i2c_bus.writeto_then_readfrom, 0x12, bytearray(0), [-1, 0x00])
31+
32+
def test_input_validation4(self):
33+
i2c_bus = sut.EmulatedI2C(state={})
34+
# -----------------------------------------------------------------
35+
# -----------------------------------------------------------------
36+
self.assertRaises(ValueError, i2c_bus.writeto_then_readfrom, 0x12, [-1, 0x00], bytearray(0))

0 commit comments

Comments
 (0)