Skip to content

Commit 6bc1ff5

Browse files
authored
feat: support multi-byte reads for device registers (#31)
It is unclear if it's possible to have a multi-byte state since 'register' writes look exactly like 'multi-byte state' writes.
1 parent 206971b commit 6bc1ff5

File tree

3 files changed

+81
-18
lines changed

3 files changed

+81
-18
lines changed

feeph/i2c/burst_handler.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,16 @@ def read_register(self, register: int, byte_count: int = 1, max_tries: int = 5)
6262
- may raise a RuntimeError if there were too many errors
6363
"""
6464
_validate_inputs(register=register, value=0, byte_count=byte_count, max_tries=max_tries)
65-
if byte_count > 1:
66-
LH.warning("Multi byte reads are not implemented yet! Returning a single byte instead.")
67-
byte_count = 1
6865
for cur_try in range(1, 1 + max_tries):
6966
try:
70-
buf_r = bytearray(1)
67+
buf_r = bytearray(byte_count)
7168
buf_r[0] = register
7269
buf_w = bytearray(byte_count)
7370
self._i2c_bus.writeto_then_readfrom(address=self._i2c_adr, buffer_out=buf_r, buffer_in=buf_w)
74-
# TODO properly handle multi byte reads
75-
return buf_w[0]
71+
value = 0
72+
for i in range(byte_count):
73+
value += buf_w[i] << i*8
74+
return value
7675
except OSError as e:
7776
# [Errno 121] Remote I/O error
7877
LH.warning("[%s] Failed to read register 0x%02X (%i/%i): %s", __name__, register, cur_try, max_tries, e)
@@ -91,15 +90,16 @@ def write_register(self, register: int, value: int, byte_count: int = 1, max_tri
9190
- may raise a RuntimeError if there were too many errors
9291
"""
9392
_validate_inputs(register=register, value=value, byte_count=byte_count, max_tries=max_tries)
94-
if byte_count > 1:
95-
LH.warning("Multi byte writes are not implemented yet! Returning a single byte instead.")
96-
byte_count = 1
9793
for cur_try in range(1, 1 + max_tries):
9894
try:
99-
buf = bytearray(1 + byte_count)
95+
buf = bytearray(1 + byte_count) # buf[0], buf[1], buf[2]
10096
buf[0] = register
101-
buf[1] = value & 0xFF
102-
# TODO properly handle multi byte reads
97+
# need to populate the bytes in reverse order:
98+
# 0x##.. => buf[2]
99+
# 0x..## => buf[1]
100+
for i in range(byte_count, 0, -1):
101+
buf[i] = value & 0xFF
102+
value = value >> 8
103103
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
104104
return
105105
except OSError as e:
@@ -112,6 +112,9 @@ def write_register(self, register: int, value: int, byte_count: int = 1, max_tri
112112
else:
113113
raise RuntimeError(f"Unable to read register 0x{register:02X} after {cur_try} attempts. Giving up.")
114114

115+
# it is unclear if it's possible to have a multi-byte state registers
116+
# (a register write looks exactly like a multi-byte state write)
117+
115118
def get_state(self, byte_count: int = 1, max_tries: int = 5) -> int:
116119
"""
117120
get current state of I²C device identified by `i2c_adr` and
@@ -127,7 +130,6 @@ def get_state(self, byte_count: int = 1, max_tries: int = 5) -> int:
127130
for cur_try in range(1, 1 + max_tries):
128131
try:
129132
buf = bytearray(byte_count)
130-
# TODO properly handle multi byte reads
131133
self._i2c_bus.readfrom_into(address=self._i2c_adr, buffer=buf)
132134
return buf[0]
133135
except OSError as e:
@@ -155,7 +157,6 @@ def set_state(self, value: int, byte_count: int = 1, max_tries: int = 3):
155157
try:
156158
buf = bytearray(byte_count)
157159
buf[0] = value & 0xFF
158-
# TODO properly handle multi byte reads
159160
self._i2c_bus.writeto(address=self._i2c_adr, buffer=buf)
160161
return
161162
except OSError as e:
@@ -228,7 +229,7 @@ def __exit__(self, exc_type, exc_value, exc_tb):
228229

229230

230231
def _validate_inputs(register: int, value: int, byte_count: int = 1, max_tries: int = 3):
231-
if register < 0 or register > 255:
232+
if register < 0 or register > pow(255, byte_count):
232233
raise ValueError(f"Provided I²C device register {register} is out of range! (allowed range: 0 ≤ x ≤ 255)")
233234
max_value = pow(256, byte_count) - 1
234235
if value < 0 or value > max_value:

feeph/i2c/emulation.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ def readfrom_into(self, address, buffer, *, start=0, end=None, stop=True):
5353
"""
5454
i2c_device_address = address
5555
i2c_device_register = -1
56-
buffer[0] = self._state[i2c_device_address][i2c_device_register]
56+
value = self._state[i2c_device_address][i2c_device_register]
57+
for i in range(len(buffer)):
58+
buffer[i] = value & 0xff
59+
value = value >> 8
5760

5861
def writeto(self, address: int, buffer: bytearray, *, start=0, end=None):
5962
"""
@@ -70,7 +73,15 @@ def writeto(self, address: int, buffer: bytearray, *, start=0, end=None):
7073
i2c_device_register = buffer[0]
7174
if i2c_device_register < 0:
7275
raise ValueError("device register can't be negative")
73-
value = buffer[1]
76+
value = 0
77+
# i must count up (0->len-1) to calculate the correct bitshift
78+
# offset must count down (len->1) when populating the buffer
79+
# 0x..## -> buf[2] -> i = 0, offset = 2
80+
# 0x##.. -> buf[1] -> i = 1, offset = 1
81+
# (buf[0] contains the register address)
82+
for i in range(len(buffer) - 1):
83+
offset = len(buffer) - 1 - i
84+
value += buffer[offset] << i*8
7485
self._state[i2c_device_address][i2c_device_register] = value
7586

7687
def writeto_then_readfrom(self, address: int, buffer_out: bytearray, buffer_in: bytearray, *, out_start=0, out_end=None, in_start=0, in_end=None, stop=False):
@@ -81,4 +92,7 @@ def writeto_then_readfrom(self, address: int, buffer_out: bytearray, buffer_in:
8192
i2c_device_register = buffer_out[0]
8293
if i2c_device_register < 0:
8394
raise ValueError("device register can't be negative")
84-
buffer_in[0] = self._state[i2c_device_address][i2c_device_register]
95+
value = self._state[i2c_device_address][i2c_device_register]
96+
for i in range(len(buffer_in)):
97+
buffer_in[i] = value & 0xff
98+
value = value >> 8

tests/test_burst_handler.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,20 @@ def test_read_device_register(self):
2424
# -----------------------------------------------------------------
2525
self.assertEqual(computed, expected)
2626

27+
def test_read_device_register_multibyte(self):
28+
state = {
29+
0x4C: {
30+
0x00: 0x1234,
31+
},
32+
}
33+
i2c_bus = sut.EmulatedI2C(state=state)
34+
# -----------------------------------------------------------------
35+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
36+
computed = bh.read_register(0x00, byte_count=2)
37+
expected = 0x1234
38+
# -----------------------------------------------------------------
39+
self.assertEqual(computed, expected)
40+
2741
def test_read_device_registers(self):
2842
state = {
2943
0x4C: {
@@ -75,6 +89,21 @@ def test_write_device_register(self):
7589
# -----------------------------------------------------------------
7690
self.assertEqual(computed, expected)
7791

92+
def test_write_device_register_multibyte(self):
93+
state = {
94+
0x4C: {
95+
0x00: 0x0000,
96+
},
97+
}
98+
i2c_bus = sut.EmulatedI2C(state=state)
99+
# -----------------------------------------------------------------
100+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
101+
bh.write_register(register=0x00, value=0x1234, byte_count=2)
102+
computed = i2c_bus._state[0x4C]
103+
expected = {0x00: 0x1234}
104+
# -----------------------------------------------------------------
105+
self.assertEqual(computed, expected)
106+
78107
def test_write_device_registers(self):
79108
state = {
80109
0x4C: {
@@ -146,6 +175,18 @@ def test_get_state(self):
146175
# -----------------------------------------------------------------
147176
self.assertEqual(computed, expected)
148177

178+
def test_get_state_multibyte(self):
179+
state = {
180+
0x70: {-1: 0x01},
181+
}
182+
i2c_bus = sut.EmulatedI2C(state=state)
183+
# -----------------------------------------------------------------
184+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x70) as bh:
185+
computed = bh.get_state(byte_count=2)
186+
expected = 0x01 # byte count was ignored
187+
# -----------------------------------------------------------------
188+
self.assertEqual(computed, expected)
189+
149190
def test_set_state(self):
150191
state = {
151192
0x70: {-1: 0x00},
@@ -159,6 +200,13 @@ def test_set_state(self):
159200
# -----------------------------------------------------------------
160201
self.assertEqual(computed, expected)
161202

203+
def test_set_state_multibyte(self):
204+
i2c_bus = sut.EmulatedI2C(state={0x70: {-1: 0x00}})
205+
# -----------------------------------------------------------------
206+
# -----------------------------------------------------------------
207+
with sut.BurstHandler(i2c_bus=i2c_bus, i2c_adr=0x70) as bh:
208+
self.assertRaises(ValueError, bh.set_state, value=0x0102)
209+
162210
# ---------------------------------------------------------------------
163211

164212
def test_no_timeout(self):

0 commit comments

Comments
 (0)