|
| 1 | +# Python interface for AXI-Lite reads and writes |
| 2 | + |
| 3 | +# Copyright (c) 2024 Zero ASIC Corporation |
| 4 | +# This code is licensed under Apache License 2.0 (see LICENSE for details) |
| 5 | + |
| 6 | +import numpy as np |
| 7 | + |
| 8 | +from math import floor, ceil, log2 |
| 9 | +from numbers import Integral |
| 10 | + |
| 11 | +from _switchboard import PySbPacket, PySbTx, PySbRx |
| 12 | + |
| 13 | + |
| 14 | +class ApbTxRx: |
| 15 | + def __init__( |
| 16 | + self, |
| 17 | + uri: str, |
| 18 | + fresh: bool = True, |
| 19 | + data_width: int = 32, |
| 20 | + addr_width: int = 16, |
| 21 | + prot: int = 0, |
| 22 | + slv_err_expected: bool = False, |
| 23 | + queue_suffix: str = '.q', |
| 24 | + max_rate: float = -1 |
| 25 | + ): |
| 26 | + """ |
| 27 | + Parameters |
| 28 | + ---------- |
| 29 | + uri: str |
| 30 | + Base name of for switchboard queues used to convey APB transactions. |
| 31 | + fresh: bool, optional |
| 32 | + If True (default), the queue specified by the uri parameter will get cleared |
| 33 | + before executing the simulation. |
| 34 | + data_width: int, optional |
| 35 | + Width the write and read data buses, in bits. |
| 36 | + addr_width: int, optional |
| 37 | + Width the write and read address buses, in bits. |
| 38 | + prot: int, optional |
| 39 | + Default value of PROT to use for read and write transactions. Can be |
| 40 | + overridden on a transaction-by-transaction basis. |
| 41 | + slv_err_expected: bool, optional |
| 42 | + Default response to expect from reads and writes. |
| 43 | + None means "don't check the response". |
| 44 | + This default can be overridden on a transaction-by-transaction basis. |
| 45 | + queue_suffix: str, optional |
| 46 | + File extension/suffix to use when naming switchboard queues that carry |
| 47 | + APB transactions. For example, if set to ".queue", the write address |
| 48 | + queue name will be "{uri}-aw.queue" |
| 49 | + """ |
| 50 | + |
| 51 | + # check data types |
| 52 | + assert isinstance(data_width, Integral), 'data_width must be an integer' |
| 53 | + assert isinstance(addr_width, Integral), 'addr_width must be an integer' |
| 54 | + |
| 55 | + # check that data width is a multiple of a byte |
| 56 | + data_width_choices = [8, 16, 32, 64, 128, 256, 512, 1024] |
| 57 | + assert data_width in data_width_choices, \ |
| 58 | + f'data_width must be in {data_width_choices}' |
| 59 | + |
| 60 | + # check that data and address widths are supported |
| 61 | + SBDW = 416 |
| 62 | + assert 0 < data_width <= floor(SBDW / (1 + (1 / 8))), 'data_width out of range' |
| 63 | + assert 0 < addr_width <= SBDW - 3, 'addr_width out of range' |
| 64 | + |
| 65 | + # save settings |
| 66 | + self.data_width = data_width |
| 67 | + self.addr_width = addr_width |
| 68 | + self.default_prot = prot |
| 69 | + self.default_slv_err_expected = slv_err_expected |
| 70 | + |
| 71 | + # create the queues |
| 72 | + self.apb_req = PySbTx(f'{uri}-apb-req{queue_suffix}', fresh=fresh, max_rate=max_rate) |
| 73 | + self.apb_resp = PySbRx(f'{uri}-apb-resp{queue_suffix}', fresh=fresh, max_rate=max_rate) |
| 74 | + |
| 75 | + @property |
| 76 | + def strb_width(self): |
| 77 | + return self.data_width // 8 |
| 78 | + |
| 79 | + def write( |
| 80 | + self, |
| 81 | + addr: Integral, |
| 82 | + data, |
| 83 | + prot: Integral = None, |
| 84 | + slv_err_expected: bool = None |
| 85 | + ): |
| 86 | + """ |
| 87 | + Parameters |
| 88 | + ---------- |
| 89 | + addr: int |
| 90 | + Address to write to |
| 91 | +
|
| 92 | + data: np.uint8, np.uint16, np.uint32, np.uint64, or np.array |
| 93 | + Data to write |
| 94 | +
|
| 95 | + prot: Integral |
| 96 | + Value of PROT for this transaction. Defaults to the value provided in the |
| 97 | + ApbTxRx constructor if not provided, which in turn defaults to 0. |
| 98 | +
|
| 99 | + slv_err_expected: str, optional |
| 100 | + Response to expect for this transaction. |
| 101 | + None means, "don't check the response". Defaults to the |
| 102 | + value provided in the ApbTxRx constructor if not provided, which in turn |
| 103 | + defaults to False. |
| 104 | +
|
| 105 | + Returns |
| 106 | + ------- |
| 107 | + bool |
| 108 | + slv_err: True if SLVERR was received, False otherwise. |
| 109 | + """ |
| 110 | + |
| 111 | + # set defaults |
| 112 | + |
| 113 | + if prot is None: |
| 114 | + prot = self.default_prot |
| 115 | + |
| 116 | + if slv_err_expected is None: |
| 117 | + slv_err_expected = self.default_resp_expected |
| 118 | + |
| 119 | + # check/standardize data types |
| 120 | + |
| 121 | + assert isinstance(addr, Integral), 'addr must be an integer' |
| 122 | + addr = int(addr) |
| 123 | + |
| 124 | + assert isinstance(prot, Integral), 'prot must be an integer' |
| 125 | + prot = int(prot) |
| 126 | + |
| 127 | + if isinstance(data, np.ndarray): |
| 128 | + if data.ndim == 0: |
| 129 | + write_data = np.atleast_1d(data) |
| 130 | + elif data.ndim == 1: |
| 131 | + write_data = data |
| 132 | + else: |
| 133 | + raise ValueError(f'Can only write 1D arrays (got ndim={data.ndim})') |
| 134 | + |
| 135 | + if not np.issubdtype(write_data.dtype, np.integer): |
| 136 | + raise ValueError('Can only write integer dtypes such as uint8, uint16, etc.' |
| 137 | + f' (got dtype "{data.dtype}")') |
| 138 | + elif isinstance(data, np.integer): |
| 139 | + write_data = np.array(data, ndmin=1) |
| 140 | + else: |
| 141 | + raise TypeError(f"Unknown data type: {type(data)}") |
| 142 | + |
| 143 | + write_data = write_data.view(np.uint8) |
| 144 | + bytes_to_send = write_data.size |
| 145 | + |
| 146 | + # range validation |
| 147 | + |
| 148 | + assert 0 <= addr < (1 << self.addr_width), 'addr out of range' |
| 149 | + assert addr + bytes_to_send <= (1 << self.addr_width), \ |
| 150 | + "transaction exceeds the address space." |
| 151 | + |
| 152 | + assert 0 <= prot < (1 << 3), 'prot out of range' |
| 153 | + |
| 154 | + # loop until all data is sent |
| 155 | + # TODO: move to C++? |
| 156 | + |
| 157 | + bytes_sent = 0 |
| 158 | + |
| 159 | + strb_bytes = (self.strb_width + 7) // 8 |
| 160 | + header_bytes = ceil((1 + 3 + self.strb_width) / 8.0) |
| 161 | + data_bytes = self.data_width // 8 |
| 162 | + addr_bytes = self.addr_width // 8 |
| 163 | + |
| 164 | + addr_mask = (1 << self.addr_width) - 1 |
| 165 | + addr_mask >>= ceil(log2(data_bytes)) |
| 166 | + addr_mask <<= ceil(log2(data_bytes)) |
| 167 | + |
| 168 | + while bytes_sent < bytes_to_send: |
| 169 | + # find the offset into the data bus for this cycle. bytes below |
| 170 | + # the offset will have write strobe de-asserted. |
| 171 | + offset = addr % data_bytes |
| 172 | + |
| 173 | + # determine how many bytes we're sending in this cycle |
| 174 | + bytes_this_cycle = min(bytes_to_send - bytes_sent, data_bytes - offset) |
| 175 | + |
| 176 | + # extract those bytes from the whole input data array, picking |
| 177 | + # up where we left off from the last iteration |
| 178 | + data_this_cycle = write_data[bytes_sent:bytes_sent + bytes_this_cycle] |
| 179 | + |
| 180 | + # calculate strobe value based on the offset and number |
| 181 | + # of bytes that we're writing. |
| 182 | + strb = ((1 << bytes_this_cycle) - 1) << offset |
| 183 | + header = 1 |
| 184 | + header = (header << 3) | prot |
| 185 | + header = (header << self.strb_width) | strb |
| 186 | + header = np.frombuffer(header, dtype=np.uint8) |
| 187 | + |
| 188 | + addr_bytes = np.frombuffer( |
| 189 | + (addr & addr_mask).to_bytes((self.addr_width + 7) // 8, 'little'), |
| 190 | + dtype=np.uint8 |
| 191 | + ) |
| 192 | + |
| 193 | + pack = np.empty((header_bytes + addr_bytes + data_bytes,), dtype=np.uint8) |
| 194 | + pack[0:data_bytes] = data_this_cycle |
| 195 | + pack[data_bytes:data_bytes + addr_bytes] = addr_bytes |
| 196 | + pack[data_bytes + addr_bytes:] = header |
| 197 | + pack = PySbPacket(data=pack, flags=1, destination=0) |
| 198 | + # Transmit request |
| 199 | + self.apb_req.send(pack, True) |
| 200 | + |
| 201 | + # wait for response |
| 202 | + pack = self.apb_resp.recv(True) |
| 203 | + pack = pack.data.tobytes() |
| 204 | + pack = int.from_bytes(pack, 'little') |
| 205 | + |
| 206 | + # decode the response |
| 207 | + slv_err = bool(pack & 0b1) |
| 208 | + |
| 209 | + # check the response if desired |
| 210 | + if slv_err_expected is not None: |
| 211 | + assert slv_err == slv_err_expected, f'Unexpected response: slv_err = {slv_err}' |
| 212 | + |
| 213 | + # increment pointers |
| 214 | + bytes_sent += bytes_this_cycle |
| 215 | + addr += bytes_this_cycle |
| 216 | + |
| 217 | + # return the last reponse |
| 218 | + return slv_err |
0 commit comments