diff --git a/.pylintrc b/.pylintrc index 05d31e49..f37dae20 100644 --- a/.pylintrc +++ b/.pylintrc @@ -12,6 +12,7 @@ disable= too-many-lines, too-many-locals, too-many-nested-blocks, + too-many-positional-arguments, too-many-public-methods, too-many-return-statements, too-many-statements, diff --git a/pyftdi/ftdi.py b/pyftdi/ftdi.py index 26e1da6a..891546e2 100644 --- a/pyftdi/ftdi.py +++ b/pyftdi/ftdi.py @@ -1781,8 +1781,9 @@ def write_data(self, data: Union[bytes, bytearray]) -> int: raise FtdiError(f'UsbError: {exc}') from exc def read_data_bytes(self, size: int, attempt: int = 1, - request_gen: Optional[Callable[[int], bytes]] = None) \ - -> bytes: + request_gen: Optional[Callable[[int], + Union[bytes, bytearray]]] = None) \ + -> bytearray: """Read data from the FTDI interface In UART mode, data contains the serial stream read from the UART diff --git a/pyftdi/i2c.py b/pyftdi/i2c.py index 7231dea5..b758b53b 100644 --- a/pyftdi/i2c.py +++ b/pyftdi/i2c.py @@ -653,7 +653,7 @@ def width(self) -> int: """ return 16 if self._wide_port else 8 - def read(self, address: int, readlen: int = 1, + def read(self, address: Optional[int], readlen: int = 1, relax: bool = True) -> bytes: """Read one or more bytes from a remote slave @@ -695,7 +695,8 @@ def read(self, address: int, readlen: int = 1, if do_epilog: self._do_epilog() - def write(self, address: int, out: Union[bytes, bytearray, Iterable[int]], + def write(self, address: Optional[int], + out: Union[bytes, bytearray, Iterable[int]], relax: bool = True) -> None: """Write one or more bytes to a remote slave @@ -735,9 +736,9 @@ def write(self, address: int, out: Union[bytes, bytearray, Iterable[int]], if do_epilog: self._do_epilog() - def exchange(self, address: int, + def exchange(self, address: Optional[int], out: Union[bytes, bytearray, Iterable[int]], - readlen: int = 0, relax: bool = True) -> bytes: + readlen: int = 0, relax: bool = True) -> bytearray: """Send a byte sequence to a remote slave followed with a read request of one or more bytes. @@ -767,12 +768,14 @@ def exchange(self, address: int, i2caddress = (address << 1) & self.HIGH retries = self._retry_count do_epilog = True + data = bytearray() with self._lock: while True: try: self._do_prolog(i2caddress) self._do_write(out) - self._do_prolog(i2caddress | self.BIT0) + if i2caddress is not None: + self._do_prolog(i2caddress | self.BIT0) if readlen: data = self._do_read(readlen) do_epilog = relax @@ -1017,7 +1020,7 @@ def _write_raw(self, data: int, write_high: bool): cmd = bytes([Ftdi.SET_BITS_LOW, low_data, low_dir]) self._ftdi.write_data(cmd) - def _do_prolog(self, i2caddress: int) -> None: + def _do_prolog(self, i2caddress: Optional[int]) -> None: if i2caddress is None: return self.log.debug(' prolog 0x%x', i2caddress >> 1) @@ -1061,7 +1064,7 @@ def _send_check_ack(self, cmd: bytearray): if ack[0] & self.BIT0: raise I2cNackError('NACK from slave') - def _do_read(self, readlen: int) -> bytes: + def _do_read(self, readlen: int) -> bytearray: self.log.debug('- read %d byte(s)', readlen) if not readlen: # force a real read request on device, but discard any result @@ -1101,10 +1104,10 @@ def _do_read(self, readlen: int) -> bytes: cmd_chunk.extend(read_not_last * chunk_size) cmd_chunk.extend(self._immediate) - def write_command_gen(length: int): + def write_command_gen(length: int) -> bytearray: if length <= 0: # no more data - return b'' + return bytearray() if length <= chunk_size: cmd = bytearray() cmd.extend(read_not_last * (length-1)) @@ -1120,6 +1123,7 @@ def write_command_gen(length: int): rem -= len(buf) else: while rem: + size = rem if rem > chunk_size: if not cmd: # build the command sequence only once, as it may be @@ -1132,7 +1136,6 @@ def write_command_gen(length: int): cmd.extend(read_not_last * (rem-1)) cmd.extend(read_last) cmd.extend(self._immediate) - size = rem self._ftdi.write_data(cmd) buf = self._ftdi.read_data_bytes(size, 4) self.log.debug('- read %d byte(s): %s', diff --git a/pyftdi/usbtools.py b/pyftdi/usbtools.py index b8a593fd..15b8bd61 100644 --- a/pyftdi/usbtools.py +++ b/pyftdi/usbtools.py @@ -291,6 +291,7 @@ def parse_url(cls, urlstr: str, scheme: str, path = urlparts.path.strip('/') if path == '?' or (not path and urlstr.endswith('?')): report_devices = True + interface = -1 else: interface = to_int(path) report_devices = False