|
| 1 | +import logging |
| 2 | +import time |
| 3 | +from typing import Any, Iterable, Optional, List, Tuple |
| 4 | +from smartcard.CardType import AnyCardType, CardType |
| 5 | +from serial import SerialException |
| 6 | +from .at_command_client import ATCommandClient |
| 7 | + |
| 8 | +logger = logging.getLogger("modem") |
| 9 | + |
| 10 | +class ModemCardRequest: |
| 11 | + def __init__(self, modem_device_path, timeout: int = 1, cardType: CardType = AnyCardType, readers: Optional[Iterable[str]] = None) -> None: |
| 12 | + self._readers = readers or [''] |
| 13 | + self._timeout = timeout |
| 14 | + self._client = ATCommandClient(modem_device_path, timeout=float(timeout*15)) |
| 15 | + |
| 16 | + @property |
| 17 | + def connection(self) -> Any: |
| 18 | + return self |
| 19 | + |
| 20 | + def waitforcard(self) -> None: |
| 21 | + self.connect() |
| 22 | + return self |
| 23 | + |
| 24 | + def connect(self) -> None: |
| 25 | + self._client.connect() |
| 26 | + |
| 27 | + def getReader(self) -> Any: |
| 28 | + return self._readers |
| 29 | + |
| 30 | + def getATR(self) -> Any: |
| 31 | + return None |
| 32 | + |
| 33 | + def transmit(self, apdu: List[int]) -> Any: |
| 34 | + """ |
| 35 | + Transmits SIM APDU to the modem. |
| 36 | + """ |
| 37 | + |
| 38 | + at_command = self._to_csim_command(apdu) |
| 39 | + data, sw1, sw2 = [], 0xff, 0xff |
| 40 | + |
| 41 | + attempt_until = time.time() + self._timeout |
| 42 | + try: |
| 43 | + while sw1 == 0xff and sw2 == 0xff: |
| 44 | + data, sw1, sw2 = self._client.transmit(at_command, self._at_response_to_card_response) |
| 45 | + except SerialException as e: |
| 46 | + logger.debug("Serial communication error << {e} ... retrying") |
| 47 | + if time.time() > attempt_until: |
| 48 | + raise |
| 49 | + |
| 50 | + logger.debug(f""" |
| 51 | + APDU << {apdu} |
| 52 | + AT Command << {at_command} |
| 53 | + Ret << data:{data}, sw1:{sw1}, sw2:{sw2} |
| 54 | + """) |
| 55 | + return (data, sw1, sw2) |
| 56 | + |
| 57 | + def _to_csim_command(self, apdu: List[int]) -> str: |
| 58 | + """ |
| 59 | + Transforms a SIM APDU represented as a list of integers (bytes data) |
| 60 | + into its corresponding AT+CSIM command format. |
| 61 | + """ |
| 62 | + |
| 63 | + at_command = ("").join(map(lambda x: "%0.2X" % x, apdu)) |
| 64 | + at_command = f'AT+CSIM={len(at_command)},"{at_command}"' |
| 65 | + return at_command |
| 66 | + |
| 67 | + def _at_response_to_card_response(self, at_command: str, at_response: str) -> Tuple[List[int], int, int]: |
| 68 | + """ |
| 69 | + Transforms AT response to the expected CardService format. |
| 70 | + """ |
| 71 | + |
| 72 | + parts = list(filter(lambda x: x != '', at_response.split("\r\n"))) |
| 73 | + if len(parts) == 0: |
| 74 | + return [], 0xff, 0xff # communication error |
| 75 | + |
| 76 | + if not parts[-1] or 'ERROR' in parts[-1]: |
| 77 | + return [], 0x6f, 0x0 # checking error: no precise diagnosis |
| 78 | + |
| 79 | + res = parts[0] |
| 80 | + res = res[res.find('"')+1:-1:] |
| 81 | + |
| 82 | + return ( |
| 83 | + self._hexstream_to_bytes(res[:-4:]), |
| 84 | + int(res[-4:-2:], 16), |
| 85 | + int(res[-2::], 16) |
| 86 | + ) |
| 87 | + |
| 88 | + def _hexstream_to_bytes(self, hexstream: str) -> List[int]: |
| 89 | + """ |
| 90 | + Returns a list of integers representing byte data from a hexadecimal stream. |
| 91 | + """ |
| 92 | + |
| 93 | + return list( |
| 94 | + map( |
| 95 | + lambda x: int(x, 16), |
| 96 | + [hexstream[i:i+2] for i in range(0, len(hexstream), 2)] |
| 97 | + ) |
| 98 | + ) |
| 99 | + |
0 commit comments