Skip to content

Commit

Permalink
pylint: address several issues that were not previously reported.
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Blot <[email protected]>
  • Loading branch information
eblot committed Nov 19, 2024
1 parent 981cd16 commit 3321e21
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ disable=
too-many-lines,
too-many-locals,
too-many-nested-blocks,
too-many-positional-arguments,
too-many-public-methods,
too-many-return-statements,
too-many-statements,
Expand Down
5 changes: 3 additions & 2 deletions pyftdi/ftdi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1781,8 +1781,9 @@ def write_data(self, data: Union[bytes, bytearray]) -> int:
raise FtdiError(f'UsbError: {exc}') from exc

def read_data_bytes(self, size: int, attempt: int = 1,
request_gen: Optional[Callable[[int], bytes]] = None) \
-> bytes:
request_gen: Optional[Callable[[int],
Union[bytes, bytearray]]] = None) \
-> bytearray:
"""Read data from the FTDI interface
In UART mode, data contains the serial stream read from the UART
Expand Down
28 changes: 14 additions & 14 deletions pyftdi/i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def width(self) -> int:
"""
return 16 if self._wide_port else 8

def read(self, address: int, readlen: int = 1,
def read(self, address: Optional[int], readlen: int = 1,
relax: bool = True) -> bytes:
"""Read one or more bytes from a remote slave
Expand Down Expand Up @@ -695,7 +695,8 @@ def read(self, address: int, readlen: int = 1,
if do_epilog:
self._do_epilog()

def write(self, address: int, out: Union[bytes, bytearray, Iterable[int]],
def write(self, address: Optional[int],
out: Union[bytes, bytearray, Iterable[int]],
relax: bool = True) -> None:
"""Write one or more bytes to a remote slave
Expand All @@ -713,10 +714,7 @@ def write(self, address: int, out: Union[bytes, bytearray, Iterable[int]],
if not self.configured:
raise I2cIOError("FTDI controller not initialized")
self.validate_address(address)
if address is None:
i2caddress = None
else:
i2caddress = (address << 1) & self.HIGH
i2caddress = (address << 1) & self.HIGH if address is not None else None
retries = self._retry_count
do_epilog = True
with self._lock:
Expand All @@ -735,9 +733,9 @@ def write(self, address: int, out: Union[bytes, bytearray, Iterable[int]],
if do_epilog:
self._do_epilog()

def exchange(self, address: int,
def exchange(self, address: Optional[int],
out: Union[bytes, bytearray, Iterable[int]],
readlen: int = 0, relax: bool = True) -> bytes:
readlen: int = 0, relax: bool = True) -> bytearray:
"""Send a byte sequence to a remote slave followed with
a read request of one or more bytes.
Expand Down Expand Up @@ -767,12 +765,14 @@ def exchange(self, address: int,
i2caddress = (address << 1) & self.HIGH
retries = self._retry_count
do_epilog = True
data = bytearray()
with self._lock:
while True:
try:
self._do_prolog(i2caddress)
self._do_write(out)
self._do_prolog(i2caddress | self.BIT0)
if i2caddress is not None:
self._do_prolog(i2caddress | self.BIT0)
if readlen:
data = self._do_read(readlen)
do_epilog = relax
Expand Down Expand Up @@ -1017,7 +1017,7 @@ def _write_raw(self, data: int, write_high: bool):
cmd = bytes([Ftdi.SET_BITS_LOW, low_data, low_dir])
self._ftdi.write_data(cmd)

def _do_prolog(self, i2caddress: int) -> None:
def _do_prolog(self, i2caddress: Optional[int]) -> None:
if i2caddress is None:
return
self.log.debug(' prolog 0x%x', i2caddress >> 1)
Expand Down Expand Up @@ -1061,7 +1061,7 @@ def _send_check_ack(self, cmd: bytearray):
if ack[0] & self.BIT0:
raise I2cNackError('NACK from slave')

def _do_read(self, readlen: int) -> bytes:
def _do_read(self, readlen: int) -> bytearray:
self.log.debug('- read %d byte(s)', readlen)
if not readlen:
# force a real read request on device, but discard any result
Expand Down Expand Up @@ -1101,10 +1101,10 @@ def _do_read(self, readlen: int) -> bytes:
cmd_chunk.extend(read_not_last * chunk_size)
cmd_chunk.extend(self._immediate)

def write_command_gen(length: int):
def write_command_gen(length: int) -> bytearray:
if length <= 0:
# no more data
return b''
return bytearray()
if length <= chunk_size:
cmd = bytearray()
cmd.extend(read_not_last * (length-1))
Expand All @@ -1120,6 +1120,7 @@ def write_command_gen(length: int):
rem -= len(buf)
else:
while rem:
size = rem
if rem > chunk_size:
if not cmd:
# build the command sequence only once, as it may be
Expand All @@ -1132,7 +1133,6 @@ def write_command_gen(length: int):
cmd.extend(read_not_last * (rem-1))
cmd.extend(read_last)
cmd.extend(self._immediate)
size = rem
self._ftdi.write_data(cmd)
buf = self._ftdi.read_data_bytes(size, 4)
self.log.debug('- read %d byte(s): %s',
Expand Down
1 change: 1 addition & 0 deletions pyftdi/usbtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def parse_url(cls, urlstr: str, scheme: str,
path = urlparts.path.strip('/')
if path == '?' or (not path and urlstr.endswith('?')):
report_devices = True
interface = -1
else:
interface = to_int(path)
report_devices = False
Expand Down

0 comments on commit 3321e21

Please sign in to comment.