1212with feeph.i2c.Burst(i2c_bus=i2c_bus, i2c_adr=0x4C) as bh:
1313 value = bh.read_register(register)
1414 bh.write_register(register, value + 1)
15+
16+ with feeph.i2c.Burst(i2c_bus=i2c_bus, i2c_adr=0x70) as bh:
17+ value = bh.get_state()
18+ bh.set_state(value + 1)
1519```
1620"""
1721
@@ -38,6 +42,17 @@ def __init__(self, i2c_bus: busio.I2C, i2c_adr: int):
3842 else :
3943 raise ValueError (f"Provided I²C address { i2c_adr } is out of range! (allowed range: 0 ≤ x ≤ 255)" )
4044
45+ # fundamentally there isn't actually much difference between accesses
46+ # to a device's register or internal state
47+ #
48+ # read from register write to register
49+ # 1. write one byte write one byte (device address)
50+ # 2. write one byte write one byte (register address)
51+ # 2. read one or more bytes write one or more bytes (register value)
52+ #
53+ # step 2. is skipped when accessing the internal state,
54+ # step 1. and 3. are kept
55+
4156 def read_register (self , register : int , byte_count : int = 1 , max_tries : int = 5 ) -> int :
4257 """
4358 read a single register from I²C device identified by `i2c_adr` and
@@ -97,6 +112,62 @@ def write_register(self, register: int, value: int, byte_count: int = 1, max_tri
97112 else :
98113 raise RuntimeError (f"Unable to read register 0x{ register :02X} after { cur_try } attempts. Giving up." )
99114
115+ def get_state (self , byte_count : int = 1 , max_tries : int = 5 ) -> int :
116+ """
117+ get current state of I²C device identified by `i2c_adr` and
118+ return its contents as an integer value
119+ - may raise a RuntimeError if it was not possible to acquire
120+ the bus within allowed time
121+ - may raise a RuntimeError if there were too many errors
122+ """
123+ _validate_inputs (register = 0 , value = 0 , byte_count = byte_count , max_tries = max_tries )
124+ if byte_count > 1 :
125+ LH .warning ("Multi byte reads are not implemented yet! Returning a single byte instead." )
126+ byte_count = 1
127+ for cur_try in range (1 , 1 + max_tries ):
128+ try :
129+ buf = bytearray (byte_count )
130+ # TODO properly handle multi byte reads
131+ self ._i2c_bus .readfrom_into (address = self ._i2c_adr , buffer = buf )
132+ return buf [0 ]
133+ except OSError as e :
134+ # [Errno 121] Remote I/O error
135+ LH .warning ("[%s] Failed to read state (%i/%i): %s" , __name__ , cur_try , max_tries , e )
136+ time .sleep (0.001 )
137+ except RuntimeError as e :
138+ LH .warning ("[%s] Unable to read state (%i/%i): %s" , __name__ , cur_try , max_tries , e )
139+ time .sleep (0.001 )
140+ else :
141+ raise RuntimeError (f"Unable to read state after { cur_try } attempts. Giving up." )
142+
143+ def set_state (self , value : int , byte_count : int = 1 , max_tries : int = 3 ):
144+ """
145+ set current state of I²C device identified by `i2c_adr`
146+ - may raise a RuntimeError if it was not possible to acquire
147+ the bus within allowed time
148+ - may raise a RuntimeError if there were too many errors
149+ """
150+ _validate_inputs (register = 0 , value = value , byte_count = byte_count , max_tries = max_tries )
151+ if byte_count > 1 :
152+ LH .warning ("Multi byte writes are not implemented yet! Returning a single byte instead." )
153+ byte_count = 1
154+ for cur_try in range (1 , 1 + max_tries ):
155+ try :
156+ buf = bytearray (byte_count )
157+ buf [0 ] = value & 0xFF
158+ # TODO properly handle multi byte reads
159+ self ._i2c_bus .writeto (address = self ._i2c_adr , buffer = buf )
160+ return
161+ except OSError as e :
162+ # [Errno 121] Remote I/O error
163+ LH .warning ("[%s] Failed to write state (%i/%i): %s" , __name__ , cur_try , max_tries , e )
164+ time .sleep (0.1 )
165+ except RuntimeError as e :
166+ LH .warning ("[%s] Unable to write state 0x%02X (%i/%i): %s" , __name__ , cur_try , max_tries , e )
167+ time .sleep (0.1 )
168+ else :
169+ raise RuntimeError (f"Unable to write state after { cur_try } attempts. Giving up." )
170+
100171
101172class BurstHandler :
102173 """
0 commit comments