From 60ed83629e030ccdb0e2126e4a6d2cb48f6dc0c4 Mon Sep 17 00:00:00 2001 From: "EV34F39AA350A7\\peter" <psub@fieber.se> Date: Sat, 11 Mar 2023 11:08:37 +0100 Subject: [PATCH 01/13] =?UTF-8?q?Support=20f=C3=B6r=20BMS4S=20from=20hlpda?= =?UTF-8?q?ta.se,=20a=20BMS=20developed=20for=20use=20in=20boats.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etc/dbus-serialbattery/dbus-serialbattery.py | 2 + etc/dbus-serialbattery/hlpdatabms4s.py | 197 +++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 etc/dbus-serialbattery/hlpdatabms4s.py diff --git a/etc/dbus-serialbattery/dbus-serialbattery.py b/etc/dbus-serialbattery/dbus-serialbattery.py index 29a85bab..230ed2ca 100644 --- a/etc/dbus-serialbattery/dbus-serialbattery.py +++ b/etc/dbus-serialbattery/dbus-serialbattery.py @@ -28,8 +28,10 @@ from renogy import Renogy from ecs import Ecs from lifepower import Lifepower +from hlpdatabms4s import HLPdataBMS4S supported_bms_types = [ + {'bms': HLPdataBMS4S, "baud": 9600}, {"bms": LltJbd, "baud": 9600}, {"bms": Ant, "baud": 19200}, {"bms": Daly, "baud": 9600, "address": b"\x40"}, diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py new file mode 100644 index 00000000..b543d651 --- /dev/null +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +from battery import Protection, Battery, Cell +from utils import * +from struct import * +import locale +import serial +from time import sleep + + +class HLPdataBMS4S(Battery): + def __init__(self, port, baud): + super(HLPdataBMS4S, self).__init__(port, baud) + self.type = self.BATTERYTYPE + + BATTERYTYPE = "HLPdataBMS4S" + + def test_connection(self): + # call a function that will connect to the battery, send a command and retrieve the result. + # The result or call should be unique to this BMS. Battery name or version, etc. + # Return True if success, False for failure + locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') + result = False + try: + result = self.read_test_data() + except Exception as e: + logger.info(e, exc_info=True) + pass + + return result + + def get_settings(self): + # After successful connection get_settings will be call to set up the battery. + # Set the current limits, populate cell count, etc + # Return True if success, False for failure + result = False + try: + result = self.read_settings_data() + except Exception as e: +# logger.info(e, exc_info=True) + pass + return result + + def refresh_data(self): + # call all functions that will refresh the battery data. + # This will be called for every iteration (1 second) + # Return True if success, False for failure + result = False + try: + result = self.read_status_data() + except Exception as e: +# logger.info(e, exc_info=True) + pass + + return result + + def read_test_data(self): + test_data = self.read_serial_data_HLPdataBMS4S(b"pv\n", 0.08) + if test_data is False: + return False + if len(test_data) >= 15: + s1 = str(test_data) + ix = s1.find("BMS4S") + if ix > 0: + self.hardware_version = s1[ix:len(s1)-1] + self.version = self.hardware_version + return True + return False + + def read_settings_data(self): + self.max_battery_charge_current = MAX_BATTERY_CHARGE_CURRENT + self.max_battery_discharge_current = MAX_BATTERY_DISCHARGE_CURRENT + self.poll_interval = 5000 + self.control_discharge_current = 1000 + self.control_charge_current = 1000 + self.soc = 50 + self.voltage = 13.2 + self.current = 0 + self.min_battery_voltage = 12.0 + self.max_battery_voltage = 14.4 + + if self.cell_count is None: + self.cell_count = 4 + for c in range(self.cell_count): + self.cells.append(Cell(False)) + test_data = self.read_serial_data_HLPdataBMS4S(b"ps\n", 1.0) + if test_data is False: + return False + if len(test_data) < 500: + return False + s = str(test_data) + s = s.replace(",", ".") + par = get_par("BatterySize= ", s) + if par is False: + return False + self.capacity = int(par) + v = get_par("VoltHigh= ", s) + if v is False: + return False + self.max_battery_voltage = float(v)*float(4) + v = get_par("VoltLow= ", s) + if v is False: + return False + self.min_battery_voltage = float(v)*float(4) + + return True + + + def read_status_data(self): + status_data = self.read_serial_data_HLPdataBMS4S(b"m1\n", 0.1) + if status_data is False: + return False + par1 = str(status_data) + par = par1.split(",") + if len(par) < 8: + return False + if len(par[0]) < 7: + return False + p0 = str(par[0]) + ix = p0.find('.') + par0 = p0[ix-1:len(p0)] + +# v1,v2,v3,v4,current,soc,chargeoff,loadoff,vbat2,socnow,adj,beep,led,temp1,temp2... +# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14... + + self.voltage = float(par0) + float(par[1]) + float(par[2]) + float(par[3]) + self.cells[0].voltage = float(par0) +# self.cells[0].temp = 20 + self.cells[1].voltage = float(par[1]) +# self.cells[0].temp = 20 + self.cells[2].voltage = float(par[2]) +# self.cells[0].temp = 20 + self.cells[3].voltage = float(par[3]) +# self.cells[0].temp = 20 + + self.current = float(par[4]) + self.soc = int(par[5]) + + if par[6] == "0": + self.control_allow_charge = False + else: + self.control_allow_charge = True + if par[7] == "0": + self.control_allow_discharge = False + else: + self.control_allow_discharge = True + return True + + def manage_charge_voltage(self): + self.allow_max_voltage = True + self.control_voltage = self.max_battery_voltage + + def manage_charge_current(self): + self.control_charge_current = 1000 + self.control_discharge_current = 1000 + + def read_serial_data_HLPdataBMS4S(self, command, time): + data = read_serial_data2(command, self.port, self.baud_rate, time) + if data is False: + return False + return data + + + +def read_serial_data2(command, port, baud, time): + try: + with serial.Serial(port, baudrate=baud, timeout=0.1) as ser: + return read_serialport_data2(ser, command, time) + + except serial.SerialException as e: + logger.error(e) + return False + +def read_serialport_data2(ser, command, time): + try: + ser.flushOutput() + ser.flushInput() + ser.write(command) + + sleep(time) + res = ser.read(1000) + if len(res) > 0: + return res + return False + + except serial.SerialException as e: + logger.error(e) + return False + +def get_par(p, s): + ix = s.find(p) + if ix > 0: + ix += len(p) + for i in range(ix, len(s)): + if s[i] == ' ' or s[i] == 10: + ret = s[ix:i] + return ret + return False From 699d116044d347504f3058156bc45acdbb0520c0 Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Sat, 11 Mar 2023 13:37:21 +0100 Subject: [PATCH 02/13] log entry removed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Support för BMS4S from hlpdata.se, a BMS developed for use in boats. --- etc/dbus-serialbattery/hlpdatabms4s.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py index b543d651..42f5d986 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s.py +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -23,7 +23,7 @@ def test_connection(self): try: result = self.read_test_data() except Exception as e: - logger.info(e, exc_info=True) +# logger.info(e, exc_info=True) pass return result From 6d8a1ff5d8a59b54c7638d51c4e678c4bbec9721 Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Wed, 22 Mar 2023 08:44:11 +0100 Subject: [PATCH 03/13] Update hlpdatabms4s.py Alarms added and 3 read attempts --- etc/dbus-serialbattery/hlpdatabms4s.py | 115 ++++++++++++++----------- 1 file changed, 66 insertions(+), 49 deletions(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py index 42f5d986..5df4c202 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s.py +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -2,7 +2,6 @@ from battery import Protection, Battery, Cell from utils import * from struct import * -import locale import serial from time import sleep @@ -18,14 +17,12 @@ def test_connection(self): # call a function that will connect to the battery, send a command and retrieve the result. # The result or call should be unique to this BMS. Battery name or version, etc. # Return True if success, False for failure - locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') result = False try: result = self.read_test_data() except Exception as e: -# logger.info(e, exc_info=True) +# logger.error(e, exc_info=True) pass - return result def get_settings(self): @@ -36,7 +33,7 @@ def get_settings(self): try: result = self.read_settings_data() except Exception as e: -# logger.info(e, exc_info=True) +# logger.error(e, exc_info=True) pass return result @@ -48,45 +45,41 @@ def refresh_data(self): try: result = self.read_status_data() except Exception as e: -# logger.info(e, exc_info=True) +# logger.error(e, exc_info=True) pass - return result def read_test_data(self): - test_data = self.read_serial_data_HLPdataBMS4S(b"pv\n", 0.08) + test_data = self.read_serial_data_HLPdataBMS4S(b"pv\n", 1, 15) if test_data is False: return False - if len(test_data) >= 15: - s1 = str(test_data) - ix = s1.find("BMS4S") - if ix > 0: - self.hardware_version = s1[ix:len(s1)-1] - self.version = self.hardware_version - return True + s1 = str(test_data) + ix = s1.find("BMS4S") + if ix > 0: + self.hardware_version = s1[ix:len(s1)-1] + self.version = self.hardware_version + self.max_battery_charge_current = MAX_BATTERY_CHARGE_CURRENT + self.max_battery_discharge_current = MAX_BATTERY_DISCHARGE_CURRENT + self.poll_interval = 10000 + self.control_discharge_current = 1000 + self.control_charge_current = 1000 + self.soc = 50 + self.voltage = 13.2 + self.current = 0 + self.min_battery_voltage = 12.0 + self.max_battery_voltage = 14.4 + + if self.cell_count is None: + self.cell_count = 4 + for c in range(self.cell_count): + self.cells.append(Cell(False)) + return True return False def read_settings_data(self): - self.max_battery_charge_current = MAX_BATTERY_CHARGE_CURRENT - self.max_battery_discharge_current = MAX_BATTERY_DISCHARGE_CURRENT - self.poll_interval = 5000 - self.control_discharge_current = 1000 - self.control_charge_current = 1000 - self.soc = 50 - self.voltage = 13.2 - self.current = 0 - self.min_battery_voltage = 12.0 - self.max_battery_voltage = 14.4 - - if self.cell_count is None: - self.cell_count = 4 - for c in range(self.cell_count): - self.cells.append(Cell(False)) - test_data = self.read_serial_data_HLPdataBMS4S(b"ps\n", 1.0) + test_data = self.read_serial_data_HLPdataBMS4S(b"ps\n", 3, 700) if test_data is False: return False - if len(test_data) < 500: - return False s = str(test_data) s = s.replace(",", ".") par = get_par("BatterySize= ", s) @@ -106,7 +99,7 @@ def read_settings_data(self): def read_status_data(self): - status_data = self.read_serial_data_HLPdataBMS4S(b"m1\n", 0.1) + status_data = self.read_serial_data_HLPdataBMS4S(b"m1\n", 0.2, 40) if status_data is False: return False par1 = str(status_data) @@ -143,6 +136,25 @@ def read_status_data(self): self.control_allow_discharge = False else: self.control_allow_discharge = True + + beep = int(par[11]) + if beep == 2: + self.protection.temp_low_charge = 1 + else: + self.protection.temp_low_charge = 0 + if beep == 3: + self.protection.temp_high_charge = 1 + else: + self.protection.temp_high_charge = 0 + if beep == 4: + self.protection.voltage_low = 2 + else: + self.protection.voltage_low = 0 + if beep == 5: + self.protection.voltage_high = 2 + else: + self.protection.voltage_high = 0 + return True def manage_charge_voltage(self): @@ -153,33 +165,38 @@ def manage_charge_current(self): self.control_charge_current = 1000 self.control_discharge_current = 1000 - def read_serial_data_HLPdataBMS4S(self, command, time): - data = read_serial_data2(command, self.port, self.baud_rate, time) + def read_serial_data_HLPdataBMS4S(self, command, time, min_len): + data = read_serial_data2(command, self.port, self.baud_rate, time, min_len) if data is False: return False return data -def read_serial_data2(command, port, baud, time): +def read_serial_data2(command, port, baud, time, min_len): try: - with serial.Serial(port, baudrate=baud, timeout=0.1) as ser: - return read_serialport_data2(ser, command, time) + with serial.Serial(port, baudrate=baud, timeout=0.5) as ser: + ret = read_serialport_data2(ser, command, time, min_len) + if not ret is False: + return ret + return False except serial.SerialException as e: logger.error(e) return False -def read_serialport_data2(ser, command, time): +def read_serialport_data2(ser, command, time, min_len): try: - ser.flushOutput() - ser.flushInput() - ser.write(command) - - sleep(time) - res = ser.read(1000) - if len(res) > 0: - return res + cnt = 0 + while cnt < 3: + cnt += 1 + ser.flushOutput() + ser.flushInput() + ser.write(command) + sleep(time) + res = ser.read(1000) + if len(res) >= min_len: + return res return False except serial.SerialException as e: @@ -191,7 +208,7 @@ def get_par(p, s): if ix > 0: ix += len(p) for i in range(ix, len(s)): - if s[i] == ' ' or s[i] == 10: + if s[i] == ' ' or s[i] == 10 or s[i] == 13: ret = s[ix:i] return ret return False From edafdcf85e3d0bf9079cc57ccfd2f030bd2e74fa Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Wed, 22 Mar 2023 16:06:14 +0100 Subject: [PATCH 04/13] Terminal to manage HLPdataBMS4S Crude hack to make management possible while being monitored --- .../hlpdatabms4s_miniterm.py | 1071 +++++++++++++++++ 1 file changed, 1071 insertions(+) create mode 100644 etc/dbus-serialbattery/hlpdatabms4s_miniterm.py diff --git a/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py b/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py new file mode 100644 index 00000000..b0387c94 --- /dev/null +++ b/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py @@ -0,0 +1,1071 @@ +#!/usr/bin/env python +# +# Very simple serial terminal +# +# This file is part of pySerial. https://github.com/pyserial/pyserial +# (C)2002-2020 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause + +from __future__ import absolute_import + +import codecs +import os +import sys +import threading + +import serial +from serial.tools.list_ports import comports +from serial.tools import hexlify_codec +from time import sleep + +# pylint: disable=wrong-import-order,wrong-import-position + +codecs.register(lambda c: hexlify_codec.getregentry() if c == 'hexlify' else None) + +try: + raw_input +except NameError: + # pylint: disable=redefined-builtin,invalid-name + raw_input = input # in python3 it's "raw" + unichr = chr + + +def key_description(character): + """generate a readable description for a key""" + ascii_code = ord(character) + if ascii_code < 32: + return 'Ctrl+{:c}'.format(ord('@') + ascii_code) + else: + return repr(character) + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +class ConsoleBase(object): + """OS abstraction for console (input/output codec, no echo)""" + + def __init__(self, miniterm): + self.miniterm = miniterm + if sys.version_info >= (3, 0): + self.byte_output = sys.stdout.buffer + else: + self.byte_output = sys.stdout + self.output = sys.stdout + + def setup(self): + """Set console to read single characters, no echo""" + + def cleanup(self): + """Restore default console settings""" + + def getkey(self): + """Read a single key from the console""" + return None + + def write_bytes(self, byte_string): + """Write bytes (already encoded)""" + self.byte_output.write(byte_string) + self.byte_output.flush() + + def write(self, text): + """Write string""" + self.output.write(text) + self.output.flush() + + def cancel(self): + """Cancel getkey operation""" + + # - - - - - - - - - - - - - - - - - - - - - - - - + # context manager: + # switch terminal temporary to normal mode (e.g. to get user input) + + def __enter__(self): + self.cleanup() + return self + + def __exit__(self, *args, **kwargs): + self.setup() + + +if os.name == 'nt': # noqa + import msvcrt + import ctypes + import platform + + class Out(object): + """file-like wrapper that uses os.write""" + + def __init__(self, fd): + self.fd = fd + + def flush(self): + pass + + def write(self, s): + os.write(self.fd, s) + + class Console(ConsoleBase): + fncodes = { + ';': '\x1bOP', # F1 + '<': '\x1bOQ', # F2 + '=': '\x1bOR', # F3 + '>': '\x1bOS', # F4 + '?': '\x1b[15~', # F5 + '@': '\x1b[17~', # F6 + 'A': '\x1b[18~', # F7 + 'B': '\x1b[19~', # F8 + 'C': '\x1b[20~', # F9 + 'D': '\x1b[21~', # F10 + } + navcodes = { + 'H': '\x1b[A', # UP + 'P': '\x1b[B', # DOWN + 'K': '\x1b[D', # LEFT + 'M': '\x1b[C', # RIGHT + 'G': '\x1b[H', # HOME + 'O': '\x1b[F', # END + 'R': '\x1b[2~', # INSERT + 'S': '\x1b[3~', # DELETE + 'I': '\x1b[5~', # PAGE UP + 'Q': '\x1b[6~', # PAGE DOWN + } + + def __init__(self, miniterm): + super(Console, self).__init__(miniterm) + self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP() + self._saved_icp = ctypes.windll.kernel32.GetConsoleCP() + ctypes.windll.kernel32.SetConsoleOutputCP(65001) + ctypes.windll.kernel32.SetConsoleCP(65001) + # ANSI handling available through SetConsoleMode since Windows 10 v1511 + # https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1 + if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586: + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + import ctypes.wintypes as wintypes + if not hasattr(wintypes, 'LPDWORD'): # PY2 + wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) + SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode + GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode + GetStdHandle = ctypes.windll.kernel32.GetStdHandle + mode = wintypes.DWORD() + GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode)) + if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0: + SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) + self._saved_cm = mode + self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace') + # the change of the code page is not propagated to Python, manually fix it + sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace') + sys.stdout = self.output + self.output.encoding = 'UTF-8' # needed for input + + def __del__(self): + ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp) + ctypes.windll.kernel32.SetConsoleCP(self._saved_icp) + try: + ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm) + except AttributeError: # in case no _saved_cm + pass + + def getkey(self): + while True: + z = msvcrt.getwch() + if z == unichr(13): + return unichr(10) + elif z is unichr(0) or z is unichr(0xe0): + try: + code = msvcrt.getwch() + if z is unichr(0): + return self.fncodes[code] + else: + return self.navcodes[code] + except KeyError: + pass + else: + return z + + def cancel(self): + # CancelIo, CancelSynchronousIo do not seem to work when using + # getwch, so instead, send a key to the window with the console + hwnd = ctypes.windll.kernel32.GetConsoleWindow() + ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0d, 0) + +elif os.name == 'posix': + import atexit + import termios + import fcntl + import signal + + class Console(ConsoleBase): + def __init__(self, miniterm): + super(Console, self).__init__(miniterm) + self.fd = sys.stdin.fileno() + self.old = termios.tcgetattr(self.fd) + atexit.register(self.cleanup) + signal.signal(signal.SIGINT, self.sigint) + if sys.version_info < (3, 0): + self.enc_stdin = codecs.getreader(sys.stdin.encoding)(sys.stdin) + else: + self.enc_stdin = sys.stdin + + def setup(self): + new = termios.tcgetattr(self.fd) + new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG + new[6][termios.VMIN] = 1 + new[6][termios.VTIME] = 0 + termios.tcsetattr(self.fd, termios.TCSANOW, new) + + def getkey(self): + c = self.enc_stdin.read(1) + if c == unichr(0x7f): + c = unichr(8) # map the BS key (which yields DEL) to backspace + return c + + def cancel(self): + fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0') + + def cleanup(self): + termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) + + def sigint(self, sig, frame): + """signal handler for a clean exit on SIGINT""" + self.miniterm.stop() + self.cancel() + +else: + raise NotImplementedError( + 'Sorry no implementation for your platform ({}) available.'.format(sys.platform)) + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +class Transform(object): + """do-nothing: forward all data unchanged""" + def rx(self, text): + """text received from serial port""" + return text + + def tx(self, text): + """text to be sent to serial port""" + return text + + def echo(self, text): + """text to be sent but displayed on console""" + return text + + +class CRLF(Transform): + """ENTER sends CR+LF""" + + def tx(self, text): + return text.replace('\n', '\r\n') + + +class CR(Transform): + """ENTER sends CR""" + + def rx(self, text): + return text.replace('\r', '\n') + + def tx(self, text): + return text.replace('\n', '\r') + + +class LF(Transform): + """ENTER sends LF""" + + +class NoTerminal(Transform): + """remove typical terminal control codes from input""" + + REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t') + REPLACEMENT_MAP.update( + { + 0x7F: 0x2421, # DEL + 0x9B: 0x2425, # CSI + }) + + def rx(self, text): + return text.translate(self.REPLACEMENT_MAP) + + echo = rx + + +class NoControls(NoTerminal): + """Remove all control codes, incl. CR+LF""" + + REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32)) + REPLACEMENT_MAP.update( + { + 0x20: 0x2423, # visual space + 0x7F: 0x2421, # DEL + 0x9B: 0x2425, # CSI + }) + + +class Printable(Transform): + """Show decimal code for all non-ASCII characters and replace most control codes""" + + def rx(self, text): + r = [] + for c in text: + if ' ' <= c < '\x7f' or c in '\r\n\b\t': + r.append(c) + elif c < ' ': + r.append(unichr(0x2400 + ord(c))) + else: + r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(c))) + r.append(' ') + return ''.join(r) + + echo = rx + + +class Colorize(Transform): + """Apply different colors for received and echo""" + + def __init__(self): + # XXX make it configurable, use colorama? + self.input_color = '\x1b[37m' + self.echo_color = '\x1b[31m' + + def rx(self, text): + return self.input_color + text + + def echo(self, text): + return self.echo_color + text + + +class DebugIO(Transform): + """Print what is sent and received""" + + def rx(self, text): + sys.stderr.write(' [RX:{!r}] '.format(text)) + sys.stderr.flush() + return text + + def tx(self, text): + sys.stderr.write(' [TX:{!r}] '.format(text)) + sys.stderr.flush() + return text + + +# other ideas: +# - add date/time for each newline +# - insert newline after: a) timeout b) packet end character + +EOL_TRANSFORMATIONS = { + 'crlf': CRLF, + 'cr': CR, + 'lf': LF, +} + +TRANSFORMATIONS = { + 'direct': Transform, # no transformation + 'default': NoTerminal, + 'nocontrol': NoControls, + 'printable': Printable, + 'colorize': Colorize, + 'debug': DebugIO, +} + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +def ask_for_port(): + """\ + Show a list of ports and ask the user for a choice. To make selection + easier on systems with long device names, also allow the input of an + index. + """ + sys.stderr.write('\n--- Available ports:\n') + ports = [] + for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): + sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc)) + ports.append(port) + while True: + sys.stderr.write('--- Enter port index or full name: ') + port = raw_input('') + try: + index = int(port) - 1 + if not 0 <= index < len(ports): + sys.stderr.write('--- Invalid index!\n') + continue + except ValueError: + pass + else: + port = ports[index] + return port + + +class Miniterm(object): + """\ + Terminal application. Copy data from serial port to console and vice versa. + Handle special keys from the console to show menu etc. + """ + + def __init__(self, serial_instance, echo=False, eol='crlf', filters=()): + self.console = Console(self) + self.serial = serial_instance + self.echo = echo + self.raw = False + self.input_encoding = 'UTF-8' + self.output_encoding = 'UTF-8' + self.eol = eol + self.filters = filters + self.update_transformations() + self.exit_character = unichr(0x1d) # GS/CTRL+] + self.menu_character = unichr(0x14) # Menu: CTRL+T + self.alive = None + self._reader_alive = None + self.receiver_thread = None + self.rx_decoder = None + self.tx_decoder = None + self.tx_encoder = None + self.no_write = False + self.remove_no_write = False + + def _start_reader(self): + """Start reader thread""" + self._reader_alive = True + # start serial->console thread + self.receiver_thread = threading.Thread(target=self.reader, name='rx') + self.receiver_thread.daemon = True + self.receiver_thread.start() + + def _stop_reader(self): + """Stop reader thread only, wait for clean exit of thread""" + self._reader_alive = False + if hasattr(self.serial, 'cancel_read'): + self.serial.cancel_read() + self.receiver_thread.join() + + def start(self): + """start worker threads""" + self.alive = True + self._start_reader() + # enter console->serial loop + self.transmitter_thread = threading.Thread(target=self.writer, name='tx') + self.transmitter_thread.daemon = True + self.transmitter_thread.start() + self.console.setup() + + def stop(self): + """set flag to stop worker threads""" + self.alive = False + + def join(self, transmit_only=False): + """wait for worker threads to terminate""" + self.transmitter_thread.join() + if not transmit_only: + if hasattr(self.serial, 'cancel_read'): + self.serial.cancel_read() + self.receiver_thread.join() + + def close(self): + self.serial.close() + + def update_transformations(self): + """take list of transformation classes and instantiate them for rx and tx""" + transformations = [EOL_TRANSFORMATIONS[self.eol]] + [TRANSFORMATIONS[f] + for f in self.filters] + self.tx_transformations = [t() for t in transformations] + self.rx_transformations = list(reversed(self.tx_transformations)) + + def set_rx_encoding(self, encoding, errors='replace'): + """set encoding for received data""" + self.input_encoding = encoding + self.rx_decoder = codecs.getincrementaldecoder(encoding)(errors) + + def set_tx_encoding(self, encoding, errors='replace'): + """set encoding for transmitted data""" + self.output_encoding = encoding + self.tx_encoder = codecs.getincrementalencoder(encoding)(errors) + + def dump_port_settings(self): + """Write current settings to sys.stderr""" + sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format( + p=self.serial)) + sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format( + ('active' if self.serial.rts else 'inactive'), + ('active' if self.serial.dtr else 'inactive'), + ('active' if self.serial.break_condition else 'inactive'))) + try: + sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format( + ('active' if self.serial.cts else 'inactive'), + ('active' if self.serial.dsr else 'inactive'), + ('active' if self.serial.ri else 'inactive'), + ('active' if self.serial.cd else 'inactive'))) + except serial.SerialException: + # on RFC 2217 ports, it can happen if no modem state notification was + # yet received. ignore this error. + pass + sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive')) + sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive')) + sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) + sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper())) + sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + + def reader(self): + """loop and copy serial->console""" + data = b"" + while self.alive and self._reader_alive: + # read all that is there or wait for one byte + try: + data2 = self.serial.read(self.serial.in_waiting or 1) + except: + data2 = b"" + if data2: + if self.remove_no_write == True: + self.remove_no_write = False + self.no_write = False + data = data2 + else: + data += data2 + if b"\n" in data: + if self.raw: + self.console.write_bytes(data) + else: + text = self.rx_decoder.decode(data) + for transformation in self.rx_transformations: + text = transformation.rx(text) + if b"m1\n" in data: + self.no_write = True + if self.no_write == False: + self.console.write(text) + data = b"" + + def writer(self): + """\ + Loop and copy console->serial until self.exit_character character is + found. When self.menu_character is found, interpret the next key + locally. + """ + menu_active = False + try: + while self.alive: + try: + c = self.console.getkey() + self.remove_no_write = True + except KeyboardInterrupt: + c = '\x03' + if not self.alive: + break + if menu_active: + self.handle_menu_key(c) + menu_active = False + elif c == self.menu_character: + menu_active = True # next char will be for menu + elif c == self.exit_character: + self.stop() # exit app + break + else: + #~ if self.raw: + text = c + for transformation in self.tx_transformations: + text = transformation.tx(text) + self.serial.write(self.tx_encoder.encode(text)) + if self.echo: + echo_text = c + for transformation in self.tx_transformations: + echo_text = transformation.echo(echo_text) + self.console.write(echo_text) + except: + self.alive = False + raise + + def handle_menu_key(self, c): + """Implement a simple menu / settings""" + if c == self.menu_character or c == self.exit_character: + # Menu/exit character again -> send itself + self.serial.write(self.tx_encoder.encode(c)) + if self.echo: + self.console.write(c) + elif c == '\x15': # CTRL+U -> upload file + self.upload_file() + elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help + sys.stderr.write(self.get_help_text()) + elif c == '\x12': # CTRL+R -> Toggle RTS + self.serial.rts = not self.serial.rts + sys.stderr.write('--- RTS {} ---\n'.format('active' if self.serial.rts else 'inactive')) + elif c == '\x04': # CTRL+D -> Toggle DTR + self.serial.dtr = not self.serial.dtr + sys.stderr.write('--- DTR {} ---\n'.format('active' if self.serial.dtr else 'inactive')) + elif c == '\x02': # CTRL+B -> toggle BREAK condition + self.serial.break_condition = not self.serial.break_condition + sys.stderr.write('--- BREAK {} ---\n'.format('active' if self.serial.break_condition else 'inactive')) + elif c == '\x05': # CTRL+E -> toggle local echo + self.echo = not self.echo + sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive')) + elif c == '\x06': # CTRL+F -> edit filters + self.change_filter() + elif c == '\x0c': # CTRL+L -> EOL mode + modes = list(EOL_TRANSFORMATIONS) # keys + eol = modes.index(self.eol) + 1 + if eol >= len(modes): + eol = 0 + self.eol = modes[eol] + sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper())) + self.update_transformations() + elif c == '\x01': # CTRL+A -> set encoding + self.change_encoding() + elif c == '\x09': # CTRL+I -> info + self.dump_port_settings() + #~ elif c == '\x01': # CTRL+A -> cycle escape mode + #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode + elif c in 'pP': # P -> change port + self.change_port() + elif c in 'zZ': # S -> suspend / open port temporarily + self.suspend_port() + elif c in 'bB': # B -> change baudrate + self.change_baudrate() + elif c == '8': # 8 -> change to 8 bits + self.serial.bytesize = serial.EIGHTBITS + self.dump_port_settings() + elif c == '7': # 7 -> change to 8 bits + self.serial.bytesize = serial.SEVENBITS + self.dump_port_settings() + elif c in 'eE': # E -> change to even parity + self.serial.parity = serial.PARITY_EVEN + self.dump_port_settings() + elif c in 'oO': # O -> change to odd parity + self.serial.parity = serial.PARITY_ODD + self.dump_port_settings() + elif c in 'mM': # M -> change to mark parity + self.serial.parity = serial.PARITY_MARK + self.dump_port_settings() + elif c in 'sS': # S -> change to space parity + self.serial.parity = serial.PARITY_SPACE + self.dump_port_settings() + elif c in 'nN': # N -> change to no parity + self.serial.parity = serial.PARITY_NONE + self.dump_port_settings() + elif c == '1': # 1 -> change to 1 stop bits + self.serial.stopbits = serial.STOPBITS_ONE + self.dump_port_settings() + elif c == '2': # 2 -> change to 2 stop bits + self.serial.stopbits = serial.STOPBITS_TWO + self.dump_port_settings() + elif c == '3': # 3 -> change to 1.5 stop bits + self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE + self.dump_port_settings() + elif c in 'xX': # X -> change software flow control + self.serial.xonxoff = (c == 'X') + self.dump_port_settings() + elif c in 'rR': # R -> change hardware flow control + self.serial.rtscts = (c == 'R') + self.dump_port_settings() + elif c in 'qQ': + self.stop() # Q -> exit app + else: + sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c))) + + def upload_file(self): + """Ask user for filename and send its contents""" + sys.stderr.write('\n--- File to upload: ') + sys.stderr.flush() + with self.console: + filename = sys.stdin.readline().rstrip('\r\n') + if filename: + try: + with open(filename, 'rb') as f: + sys.stderr.write('--- Sending file {} ---\n'.format(filename)) + while True: + block = f.read(1024) + if not block: + break + self.serial.write(block) + # Wait for output buffer to drain. + self.serial.flush() + sys.stderr.write('.') # Progress indicator. + sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) + except IOError as e: + sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) + + def change_filter(self): + """change the i/o transformations""" + sys.stderr.write('\n--- Available Filters:\n') + sys.stderr.write('\n'.join( + '--- {:<10} = {.__doc__}'.format(k, v) + for k, v in sorted(TRANSFORMATIONS.items()))) + sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) + with self.console: + new_filters = sys.stdin.readline().lower().split() + if new_filters: + for f in new_filters: + if f not in TRANSFORMATIONS: + sys.stderr.write('--- unknown filter: {!r}\n'.format(f)) + break + else: + self.filters = new_filters + self.update_transformations() + sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + + def change_encoding(self): + """change encoding on the serial port""" + sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) + with self.console: + new_encoding = sys.stdin.readline().strip() + if new_encoding: + try: + codecs.lookup(new_encoding) + except LookupError: + sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) + else: + self.set_rx_encoding(new_encoding) + self.set_tx_encoding(new_encoding) + sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) + sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + + def change_baudrate(self): + """change the baudrate""" + sys.stderr.write('\n--- Baudrate: ') + sys.stderr.flush() + with self.console: + backup = self.serial.baudrate + try: + self.serial.baudrate = int(sys.stdin.readline().strip()) + except ValueError as e: + sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) + self.serial.baudrate = backup + else: + self.dump_port_settings() + + def change_port(self): + """Have a conversation with the user to change the serial port""" + with self.console: + try: + port = ask_for_port() + except KeyboardInterrupt: + port = None + if port and port != self.serial.port: + # reader thread needs to be shut down + self._stop_reader() + # save settings + settings = self.serial.getSettingsDict() + try: + new_serial = serial.serial_for_url(port, do_not_open=True) + # restore settings and open + new_serial.applySettingsDict(settings) + new_serial.rts = self.serial.rts + new_serial.dtr = self.serial.dtr + new_serial.open() + new_serial.break_condition = self.serial.break_condition + except Exception as e: + sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) + new_serial.close() + else: + self.serial.close() + self.serial = new_serial + sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) + # and restart the reader thread + self._start_reader() + + def suspend_port(self): + """\ + open port temporarily, allow reconnect, exit and port change to get + out of the loop + """ + # reader thread needs to be shut down + self._stop_reader() + self.serial.close() + sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port)) + do_change_port = False + while not self.serial.is_open: + sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format( + exit=key_description(self.exit_character))) + k = self.console.getkey() + if k == self.exit_character: + self.stop() # exit app + break + elif k in 'pP': + do_change_port = True + break + try: + self.serial.open() + except Exception as e: + sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e)) + if do_change_port: + self.change_port() + else: + # and restart the reader thread + self._start_reader() + sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port)) + + def get_help_text(self): + """return the help text""" + # help text, starts with blank line! + return """ +--- pySerial ({version}) - miniterm - help +--- +--- {exit:8} Exit program (alias {menu} Q) +--- {menu:8} Menu escape key, followed by: +--- Menu keys: +--- {menu:7} Send the menu character itself to remote +--- {exit:7} Send the exit character itself to remote +--- {info:7} Show info +--- {upload:7} Upload file (prompt will be shown) +--- {repr:7} encoding +--- {filter:7} edit filters +--- Toggles: +--- {rts:7} RTS {dtr:7} DTR {brk:7} BREAK +--- {echo:7} echo {eol:7} EOL +--- +--- Port settings ({menu} followed by the following): +--- p change port +--- 7 8 set data bits +--- N E O S M change parity (None, Even, Odd, Space, Mark) +--- 1 2 3 set stop bits (1, 2, 1.5) +--- b change baud rate +--- x X disable/enable software flow control +--- r R disable/enable hardware flow control +""".format(version=getattr(serial, 'VERSION', 'unknown version'), + exit=key_description(self.exit_character), + menu=key_description(self.menu_character), + rts=key_description('\x12'), + dtr=key_description('\x04'), + brk=key_description('\x02'), + echo=key_description('\x05'), + info=key_description('\x09'), + upload=key_description('\x15'), + repr=key_description('\x01'), + filter=key_description('\x06'), + eol=key_description('\x0c')) + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +# default args can be used to override when calling main() from an other script +# e.g to create a miniterm-my-device.py +def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr=None, serial_instance=None): + """Command line tool, entry point""" + + import argparse + + parser = argparse.ArgumentParser( + description='Miniterm - A simple terminal program for the serial port.') + + parser.add_argument( + 'port', + nargs='?', + help='serial port name ("-" to show port list)', + default=default_port) + + parser.add_argument( + 'baudrate', + nargs='?', + type=int, + help='set baud rate, default: %(default)s', + default=default_baudrate) + + group = parser.add_argument_group('port settings') + + group.add_argument( + '--parity', + choices=['N', 'E', 'O', 'S', 'M'], + type=lambda c: c.upper(), + help='set parity, one of {N E O S M}, default: N', + default='N') + + group.add_argument( + '--rtscts', + action='store_true', + help='enable RTS/CTS flow control (default off)', + default=False) + + group.add_argument( + '--xonxoff', + action='store_true', + help='enable software flow control (default off)', + default=False) + + group.add_argument( + '--rts', + type=int, + help='set initial RTS line state (possible values: 0, 1)', + default=default_rts) + + group.add_argument( + '--dtr', + type=int, + help='set initial DTR line state (possible values: 0, 1)', + default=default_dtr) + + group.add_argument( + '--non-exclusive', + dest='exclusive', + action='store_false', + help='disable locking for native ports', + default=True) + + group.add_argument( + '--ask', + action='store_true', + help='ask again for port when open fails', + default=False) + + group = parser.add_argument_group('data handling') + + group.add_argument( + '-e', '--echo', + action='store_true', + help='enable local echo (default off)', + default=False) + + group.add_argument( + '--encoding', + dest='serial_port_encoding', + metavar='CODEC', + help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s', + default='UTF-8') + + group.add_argument( + '-f', '--filter', + action='append', + metavar='NAME', + help='add text transformation', + default=[]) + + group.add_argument( + '--eol', + choices=['CR', 'LF', 'CRLF'], + type=lambda c: c.upper(), + help='end of line mode', + default='CRLF') + + group.add_argument( + '--raw', + action='store_true', + help='Do no apply any encodings/transformations', + default=False) + + group = parser.add_argument_group('hotkeys') + + group.add_argument( + '--exit-char', + type=int, + metavar='NUM', + help='Unicode of special character that is used to exit the application, default: %(default)s', + default=0x1d) # GS/CTRL+] + + group.add_argument( + '--menu-char', + type=int, + metavar='NUM', + help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s', + default=0x14) # Menu: CTRL+T + + group = parser.add_argument_group('diagnostics') + + group.add_argument( + '-q', '--quiet', + action='store_true', + help='suppress non-error messages', + default=False) + + group.add_argument( + '--develop', + action='store_true', + help='show Python traceback on error', + default=False) + + args = parser.parse_args() + + if args.menu_char == args.exit_char: + parser.error('--exit-char can not be the same as --menu-char') + + if args.filter: + if 'help' in args.filter: + sys.stderr.write('Available filters:\n') + sys.stderr.write('\n'.join( + '{:<10} = {.__doc__}'.format(k, v) + for k, v in sorted(TRANSFORMATIONS.items()))) + sys.stderr.write('\n') + sys.exit(1) + filters = args.filter + else: + filters = ['default'] + + while serial_instance is None: + # no port given on command line -> ask user now + if args.port is None or args.port == '-': + try: + args.port = ask_for_port() + except KeyboardInterrupt: + sys.stderr.write('\n') + parser.error('user aborted and port is not given') + else: + if not args.port: + parser.error('port is not given') + try: + serial_instance = serial.serial_for_url( + args.port, + args.baudrate, + parity=args.parity, + rtscts=args.rtscts, + xonxoff=args.xonxoff, + do_not_open=True) + + if not hasattr(serial_instance, 'cancel_read'): + # enable timeout for alive flag polling if cancel_read is not available + serial_instance.timeout = 1 + + if args.dtr is not None: + if not args.quiet: + sys.stderr.write('--- forcing DTR {}\n'.format('active' if args.dtr else 'inactive')) + serial_instance.dtr = args.dtr + if args.rts is not None: + if not args.quiet: + sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive')) + serial_instance.rts = args.rts + + if isinstance(serial_instance, serial.Serial): + serial_instance.exclusive = args.exclusive + + serial_instance.open() + except serial.SerialException as e: + sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e)) + if args.develop: + raise + if not args.ask: + sys.exit(1) + else: + args.port = '-' + else: + break + + miniterm = Miniterm( + serial_instance, + echo=args.echo, + eol=args.eol.lower(), + filters=filters) + miniterm.exit_character = unichr(args.exit_char) + miniterm.menu_character = unichr(args.menu_char) + miniterm.raw = args.raw + miniterm.set_rx_encoding(args.serial_port_encoding) + miniterm.set_tx_encoding(args.serial_port_encoding) + + if not args.quiet: + sys.stderr.write('--- Miniterm on {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n'.format( + p=miniterm.serial)) + sys.stderr.write('--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n'.format( + key_description(miniterm.exit_character), + key_description(miniterm.menu_character), + key_description(miniterm.menu_character), + key_description('\x08'))) + sys.stderr.write('--- Specifically modified for managing HLPdataBMS4S in Venus OS ---\n'.format( + p=miniterm.serial)) + sys.stderr.write('--- Quit: Ctrl-t q | Local echo: Ctrl-t Ctrl-e ---\n'.format( + p=miniterm.serial)) + + miniterm.start() + try: + miniterm.join(True) + except KeyboardInterrupt: + pass + if not args.quiet: + sys.stderr.write('\n--- exit ---\n') + miniterm.join() + miniterm.close() + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +if __name__ == '__main__': + main() \ No newline at end of file From 7f5c0ec73e81f51618b26c52ae63a6875bc30974 Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Wed, 29 Mar 2023 14:56:16 +0200 Subject: [PATCH 05/13] Initial comment added --- etc/dbus-serialbattery/hlpdatabms4s_miniterm.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py b/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py index b0387c94..b4749d83 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py +++ b/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py @@ -6,6 +6,8 @@ # (C)2002-2020 Chris Liechti <cliechti@gmx.net> # # SPDX-License-Identifier: BSD-3-Clause +# +# Modified to enable managing HLPdataBMS4S in Venus OS from __future__ import absolute_import @@ -1068,4 +1070,4 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if __name__ == '__main__': - main() \ No newline at end of file + main() From bb538322c4a56c7dfad80a23125d2e930bc60d21 Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Wed, 29 Mar 2023 14:58:36 +0200 Subject: [PATCH 06/13] Temperture reporting added --- etc/dbus-serialbattery/hlpdatabms4s.py | 41 +++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py index 5df4c202..7be1d53e 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s.py +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -49,6 +49,18 @@ def refresh_data(self): pass return result + #def log_settings(self): + # logger.info(f'Battery {self.type} connected to dbus from {self.port}') + # logger.info(f'=== Settings ===') + # cell_counter = len(self.cells) + # logger.info(f'> Connection voltage {self.voltage}V | current {self.current}A | SOC {self.soc}%') + # logger.info(f'> Cell count {self.cell_count} | cells populated {cell_counter}') + # logger.info(f'> CCCM SOC {CCCM_SOC_ENABLE} | DCCM SOC {DCCM_SOC_ENABLE}') + # logger.info(f'> CCCM CV {CCCM_CV_ENABLE} | DCCM CV {DCCM_CV_ENABLE}') + # logger.info(f'> CCCM T {CCCM_T_ENABLE} | DCCM T {DCCM_T_ENABLE}') + # logger.info(f'> MIN_CELL_VOLTAGE {MIN_CELL_VOLTAGE}V | MAX_CELL_VOLTAGE {MAX_CELL_VOLTAGE}V') + + return def read_test_data(self): test_data = self.read_serial_data_HLPdataBMS4S(b"pv\n", 1, 15) if test_data is False: @@ -154,7 +166,32 @@ def read_status_data(self): self.protection.voltage_high = 2 else: self.protection.voltage_high = 0 - + + if len(par) > 13: + nb = 0 + min = int(1000) + max = int(-1000) + ix = 13 + while ix < len(par): + tmp = par[ix].split(" ") + ix += 1 + if len(tmp) == 2: + name = tmp[0] + temp = tmp[1] + temp = temp[:-1] + temp = int(temp) + if name[0] == "b": + nb += 1 + if temp > max: + max = temp + if temp < min: + min = temp + if nb == 1: + self.temp1 = max + if nb > 1: + self.temp1 = max + self.temp2 = min + return True def manage_charge_voltage(self): @@ -190,6 +227,8 @@ def read_serialport_data2(ser, command, time, min_len): cnt = 0 while cnt < 3: cnt += 1 + if cnt > 1: + pass ser.flushOutput() ser.flushInput() ser.write(command) From c32656e7ce2d70a6fd3549a787e19f7daae136a2 Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Wed, 29 Mar 2023 20:48:56 +0200 Subject: [PATCH 07/13] Temp reading corrected --- etc/dbus-serialbattery/hlpdatabms4s.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py index 7be1d53e..203ec9d4 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s.py +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -129,13 +129,9 @@ def read_status_data(self): self.voltage = float(par0) + float(par[1]) + float(par[2]) + float(par[3]) self.cells[0].voltage = float(par0) -# self.cells[0].temp = 20 self.cells[1].voltage = float(par[1]) -# self.cells[0].temp = 20 self.cells[2].voltage = float(par[2]) -# self.cells[0].temp = 20 self.cells[3].voltage = float(par[3]) -# self.cells[0].temp = 20 self.current = float(par[4]) self.soc = int(par[5]) @@ -177,9 +173,7 @@ def read_status_data(self): ix += 1 if len(tmp) == 2: name = tmp[0] - temp = tmp[1] - temp = temp[:-1] - temp = int(temp) + temp = int("".join(filter(str.isdigit, tmp[1]))) if name[0] == "b": nb += 1 if temp > max: From 2f17876d91c5946f41150cafab7d3f5cce8f9da4 Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Sat, 8 Apr 2023 15:15:51 +0200 Subject: [PATCH 08/13] Charge/discharge_fet added --- etc/dbus-serialbattery/hlpdatabms4s.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py index 203ec9d4..e7e6ab14 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s.py +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -61,6 +61,7 @@ def refresh_data(self): # logger.info(f'> MIN_CELL_VOLTAGE {MIN_CELL_VOLTAGE}V | MAX_CELL_VOLTAGE {MAX_CELL_VOLTAGE}V') return + def read_test_data(self): test_data = self.read_serial_data_HLPdataBMS4S(b"pv\n", 1, 15) if test_data is False: @@ -132,19 +133,13 @@ def read_status_data(self): self.cells[1].voltage = float(par[1]) self.cells[2].voltage = float(par[2]) self.cells[3].voltage = float(par[3]) - self.current = float(par[4]) self.soc = int(par[5]) - - if par[6] == "0": - self.control_allow_charge = False - else: - self.control_allow_charge = True - if par[7] == "0": - self.control_allow_discharge = False - else: - self.control_allow_discharge = True - + self.control_allow_charge = par[6] + self.charge_fet = par[6] + self.control_allow_discharge = par[7] + self.discharge_fet = par[7] + beep = int(par[11]) if beep == 2: self.protection.temp_low_charge = 1 @@ -221,8 +216,6 @@ def read_serialport_data2(ser, command, time, min_len): cnt = 0 while cnt < 3: cnt += 1 - if cnt > 1: - pass ser.flushOutput() ser.flushInput() ser.write(command) From 3b90aa53f41439bcb4e29e88f35939631142d78d Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Thu, 20 Apr 2023 18:20:13 +0200 Subject: [PATCH 09/13] Starts with local echo on --- etc/dbus-serialbattery/hlpdatabms4s_miniterm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py b/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py index b4749d83..8ac84165 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py +++ b/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py @@ -406,7 +406,7 @@ class Miniterm(object): def __init__(self, serial_instance, echo=False, eol='crlf', filters=()): self.console = Console(self) self.serial = serial_instance - self.echo = echo + self.echo = True self.raw = False self.input_encoding = 'UTF-8' self.output_encoding = 'UTF-8' From b4592b5705614f65591345efd30bb13e47c603d7 Mon Sep 17 00:00:00 2001 From: peterohman <psub@fieber.se> Date: Thu, 27 Apr 2023 15:47:05 +0200 Subject: [PATCH 10/13] wildcard imports removed --- etc/dbus-serialbattery/hlpdatabms4s.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py index e7e6ab14..9d9c4ad3 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s.py +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- from battery import Protection, Battery, Cell -from utils import * -from struct import * +from utils import logger import serial from time import sleep @@ -71,8 +70,8 @@ def read_test_data(self): if ix > 0: self.hardware_version = s1[ix:len(s1)-1] self.version = self.hardware_version - self.max_battery_charge_current = MAX_BATTERY_CHARGE_CURRENT - self.max_battery_discharge_current = MAX_BATTERY_DISCHARGE_CURRENT + self.max_battery_charge_current = 1000 + self.max_battery_discharge_current = 1000 self.poll_interval = 10000 self.control_discharge_current = 1000 self.control_charge_current = 1000 From 2005e4f62803f0faf64b24cf1fd27d5b3b772845 Mon Sep 17 00:00:00 2001 From: Manuel <mr-manuel@outlook.it> Date: Thu, 27 Apr 2023 17:23:04 +0200 Subject: [PATCH 11/13] fixed import and black lint --- etc/dbus-serialbattery/hlpdatabms4s.py | 68 +++++++++++++------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py index 9d9c4ad3..4b3ef278 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s.py +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- -from battery import Protection, Battery, Cell +from battery import Battery, Cell from utils import logger +import utils import serial from time import sleep @@ -19,9 +20,10 @@ def test_connection(self): result = False try: result = self.read_test_data() - except Exception as e: -# logger.error(e, exc_info=True) - pass + except Exception as err: + logger.error(f"Unexpected {err=}, {type(err)=}") + result = False + return result def get_settings(self): @@ -32,7 +34,7 @@ def get_settings(self): try: result = self.read_settings_data() except Exception as e: -# logger.error(e, exc_info=True) + logger.error(e, exc_info=True) pass return result @@ -44,23 +46,23 @@ def refresh_data(self): try: result = self.read_status_data() except Exception as e: -# logger.error(e, exc_info=True) + logger.error(e, exc_info=True) pass return result - #def log_settings(self): - # logger.info(f'Battery {self.type} connected to dbus from {self.port}') - # logger.info(f'=== Settings ===') - # cell_counter = len(self.cells) - # logger.info(f'> Connection voltage {self.voltage}V | current {self.current}A | SOC {self.soc}%') - # logger.info(f'> Cell count {self.cell_count} | cells populated {cell_counter}') - # logger.info(f'> CCCM SOC {CCCM_SOC_ENABLE} | DCCM SOC {DCCM_SOC_ENABLE}') - # logger.info(f'> CCCM CV {CCCM_CV_ENABLE} | DCCM CV {DCCM_CV_ENABLE}') - # logger.info(f'> CCCM T {CCCM_T_ENABLE} | DCCM T {DCCM_T_ENABLE}') - # logger.info(f'> MIN_CELL_VOLTAGE {MIN_CELL_VOLTAGE}V | MAX_CELL_VOLTAGE {MAX_CELL_VOLTAGE}V') + # def log_settings(self): + # logger.info(f'Battery {self.type} connected to dbus from {self.port}') + # logger.info(f'=== Settings ===') + # cell_counter = len(self.cells) + # logger.info(f'> Connection voltage {self.voltage}V | current {self.current}A | SOC {self.soc}%') + # logger.info(f'> Cell count {self.cell_count} | cells populated {cell_counter}') + # logger.info(f'> CCCM SOC {CCCM_SOC_ENABLE} | DCCM SOC {DCCM_SOC_ENABLE}') + # logger.info(f'> CCCM CV {CCCM_CV_ENABLE} | DCCM CV {DCCM_CV_ENABLE}') + # logger.info(f'> CCCM T {CCCM_T_ENABLE} | DCCM T {DCCM_T_ENABLE}') + # logger.info(f'> MIN_CELL_VOLTAGE {MIN_CELL_VOLTAGE}V | MAX_CELL_VOLTAGE {MAX_CELL_VOLTAGE}V') return - + def read_test_data(self): test_data = self.read_serial_data_HLPdataBMS4S(b"pv\n", 1, 15) if test_data is False: @@ -68,10 +70,10 @@ def read_test_data(self): s1 = str(test_data) ix = s1.find("BMS4S") if ix > 0: - self.hardware_version = s1[ix:len(s1)-1] + self.hardware_version = s1[ix : len(s1) - 1] self.version = self.hardware_version - self.max_battery_charge_current = 1000 - self.max_battery_discharge_current = 1000 + self.max_battery_charge_current = utils.MAX_BATTERY_CHARGE_CURRENT + self.max_battery_discharge_current = utils.MAX_BATTERY_DISCHARGE_CURRENT self.poll_interval = 10000 self.control_discharge_current = 1000 self.control_charge_current = 1000 @@ -101,15 +103,14 @@ def read_settings_data(self): v = get_par("VoltHigh= ", s) if v is False: return False - self.max_battery_voltage = float(v)*float(4) + self.max_battery_voltage = float(v) * float(4) v = get_par("VoltLow= ", s) if v is False: return False - self.min_battery_voltage = float(v)*float(4) + self.min_battery_voltage = float(v) * float(4) return True - def read_status_data(self): status_data = self.read_serial_data_HLPdataBMS4S(b"m1\n", 0.2, 40) if status_data is False: @@ -121,11 +122,11 @@ def read_status_data(self): if len(par[0]) < 7: return False p0 = str(par[0]) - ix = p0.find('.') - par0 = p0[ix-1:len(p0)] + ix = p0.find(".") + par0 = p0[ix - 1 : len(p0)] -# v1,v2,v3,v4,current,soc,chargeoff,loadoff,vbat2,socnow,adj,beep,led,temp1,temp2... -# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14... + # v1,v2,v3,v4,current,soc,chargeoff,loadoff,vbat2,socnow,adj,beep,led,temp1,temp2... + # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14... self.voltage = float(par0) + float(par[1]) + float(par[2]) + float(par[3]) self.cells[0].voltage = float(par0) @@ -138,7 +139,7 @@ def read_status_data(self): self.charge_fet = par[6] self.control_allow_discharge = par[7] self.discharge_fet = par[7] - + beep = int(par[11]) if beep == 2: self.protection.temp_low_charge = 1 @@ -147,7 +148,7 @@ def read_status_data(self): if beep == 3: self.protection.temp_high_charge = 1 else: - self.protection.temp_high_charge = 0 + self.protection.temp_high_charge = 0 if beep == 4: self.protection.voltage_low = 2 else: @@ -156,7 +157,7 @@ def read_status_data(self): self.protection.voltage_high = 2 else: self.protection.voltage_high = 0 - + if len(par) > 13: nb = 0 min = int(1000) @@ -179,7 +180,7 @@ def read_status_data(self): if nb > 1: self.temp1 = max self.temp2 = min - + return True def manage_charge_voltage(self): @@ -197,7 +198,6 @@ def read_serial_data_HLPdataBMS4S(self, command, time, min_len): return data - def read_serial_data2(command, port, baud, time, min_len): try: with serial.Serial(port, baudrate=baud, timeout=0.5) as ser: @@ -210,6 +210,7 @@ def read_serial_data2(command, port, baud, time, min_len): logger.error(e) return False + def read_serialport_data2(ser, command, time, min_len): try: cnt = 0 @@ -228,12 +229,13 @@ def read_serialport_data2(ser, command, time, min_len): logger.error(e) return False + def get_par(p, s): ix = s.find(p) if ix > 0: ix += len(p) for i in range(ix, len(s)): - if s[i] == ' ' or s[i] == 10 or s[i] == 13: + if s[i] == " " or s[i] == 10 or s[i] == 13: ret = s[ix:i] return ret return False From 49c43cb9cc295fc9c2bd6bce913b793c078dce5d Mon Sep 17 00:00:00 2001 From: Manuel <mr-manuel@outlook.it> Date: Thu, 27 Apr 2023 17:27:43 +0200 Subject: [PATCH 12/13] fix black lint errors --- .../hlpdatabms4s_miniterm.py | 715 ++++++++++-------- 1 file changed, 416 insertions(+), 299 deletions(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py b/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py index 8ac84165..80cb5414 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py +++ b/etc/dbus-serialbattery/hlpdatabms4s_miniterm.py @@ -19,17 +19,18 @@ import serial from serial.tools.list_ports import comports from serial.tools import hexlify_codec -from time import sleep + +# from time import sleep # pylint: disable=wrong-import-order,wrong-import-position -codecs.register(lambda c: hexlify_codec.getregentry() if c == 'hexlify' else None) +codecs.register(lambda c: hexlify_codec.getregentry() if c == "hexlify" else None) try: raw_input except NameError: # pylint: disable=redefined-builtin,invalid-name - raw_input = input # in python3 it's "raw" + raw_input = input # in python3 it's "raw" unichr = chr @@ -37,7 +38,7 @@ def key_description(character): """generate a readable description for a key""" ascii_code = ord(character) if ascii_code < 32: - return 'Ctrl+{:c}'.format(ord('@') + ascii_code) + return "Ctrl+{:c}".format(ord("@") + ascii_code) else: return repr(character) @@ -89,7 +90,7 @@ def __exit__(self, *args, **kwargs): self.setup() -if os.name == 'nt': # noqa +if os.name == "nt": # noqa import msvcrt import ctypes import platform @@ -108,28 +109,28 @@ def write(self, s): class Console(ConsoleBase): fncodes = { - ';': '\x1bOP', # F1 - '<': '\x1bOQ', # F2 - '=': '\x1bOR', # F3 - '>': '\x1bOS', # F4 - '?': '\x1b[15~', # F5 - '@': '\x1b[17~', # F6 - 'A': '\x1b[18~', # F7 - 'B': '\x1b[19~', # F8 - 'C': '\x1b[20~', # F9 - 'D': '\x1b[21~', # F10 + ";": "\x1bOP", # F1 + "<": "\x1bOQ", # F2 + "=": "\x1bOR", # F3 + ">": "\x1bOS", # F4 + "?": "\x1b[15~", # F5 + "@": "\x1b[17~", # F6 + "A": "\x1b[18~", # F7 + "B": "\x1b[19~", # F8 + "C": "\x1b[20~", # F9 + "D": "\x1b[21~", # F10 } navcodes = { - 'H': '\x1b[A', # UP - 'P': '\x1b[B', # DOWN - 'K': '\x1b[D', # LEFT - 'M': '\x1b[C', # RIGHT - 'G': '\x1b[H', # HOME - 'O': '\x1b[F', # END - 'R': '\x1b[2~', # INSERT - 'S': '\x1b[3~', # DELETE - 'I': '\x1b[5~', # PAGE UP - 'Q': '\x1b[6~', # PAGE DOWN + "H": "\x1b[A", # UP + "P": "\x1b[B", # DOWN + "K": "\x1b[D", # LEFT + "M": "\x1b[C", # RIGHT + "G": "\x1b[H", # HOME + "O": "\x1b[F", # END + "R": "\x1b[2~", # INSERT + "S": "\x1b[3~", # DELETE + "I": "\x1b[5~", # PAGE UP + "Q": "\x1b[6~", # PAGE DOWN } def __init__(self, miniterm): @@ -138,12 +139,16 @@ def __init__(self, miniterm): self._saved_icp = ctypes.windll.kernel32.GetConsoleCP() ctypes.windll.kernel32.SetConsoleOutputCP(65001) ctypes.windll.kernel32.SetConsoleCP(65001) - # ANSI handling available through SetConsoleMode since Windows 10 v1511 + # ANSI handling available through SetConsoleMode since Windows 10 v1511 # https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1 - if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586: + if ( + platform.release() == "10" + and int(platform.version().split(".")[2]) > 10586 + ): ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 import ctypes.wintypes as wintypes - if not hasattr(wintypes, 'LPDWORD'): # PY2 + + if not hasattr(wintypes, "LPDWORD"): # PY2 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode @@ -151,19 +156,24 @@ def __init__(self, miniterm): mode = wintypes.DWORD() GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode)) if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0: - SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) + SetConsoleMode( + GetStdHandle(-11), + mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING, + ) self._saved_cm = mode - self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace') + self.output = codecs.getwriter("UTF-8")(Out(sys.stdout.fileno()), "replace") # the change of the code page is not propagated to Python, manually fix it - sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace') + sys.stderr = codecs.getwriter("UTF-8")(Out(sys.stderr.fileno()), "replace") sys.stdout = self.output - self.output.encoding = 'UTF-8' # needed for input + self.output.encoding = "UTF-8" # needed for input def __del__(self): ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp) ctypes.windll.kernel32.SetConsoleCP(self._saved_icp) try: - ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm) + ctypes.windll.kernel32.SetConsoleMode( + ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm + ) except AttributeError: # in case no _saved_cm pass @@ -172,7 +182,7 @@ def getkey(self): z = msvcrt.getwch() if z == unichr(13): return unichr(10) - elif z is unichr(0) or z is unichr(0xe0): + elif z is unichr(0) or z is unichr(0xE0): try: code = msvcrt.getwch() if z is unichr(0): @@ -188,9 +198,9 @@ def cancel(self): # CancelIo, CancelSynchronousIo do not seem to work when using # getwch, so instead, send a key to the window with the console hwnd = ctypes.windll.kernel32.GetConsoleWindow() - ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0d, 0) + ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0D, 0) -elif os.name == 'posix': +elif os.name == "posix": import atexit import termios import fcntl @@ -217,12 +227,12 @@ def setup(self): def getkey(self): c = self.enc_stdin.read(1) - if c == unichr(0x7f): - c = unichr(8) # map the BS key (which yields DEL) to backspace + if c == unichr(0x7F): + c = unichr(8) # map the BS key (which yields DEL) to backspace return c def cancel(self): - fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0') + fcntl.ioctl(self.fd, termios.TIOCSTI, b"\0") def cleanup(self): termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) @@ -234,13 +244,16 @@ def sigint(self, sig, frame): else: raise NotImplementedError( - 'Sorry no implementation for your platform ({}) available.'.format(sys.platform)) + "Sorry no implementation for your platform ({}) available.".format(sys.platform) + ) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + class Transform(object): """do-nothing: forward all data unchanged""" + def rx(self, text): """text received from serial port""" return text @@ -258,17 +271,17 @@ class CRLF(Transform): """ENTER sends CR+LF""" def tx(self, text): - return text.replace('\n', '\r\n') + return text.replace("\n", "\r\n") class CR(Transform): """ENTER sends CR""" def rx(self, text): - return text.replace('\r', '\n') + return text.replace("\r", "\n") def tx(self, text): - return text.replace('\n', '\r') + return text.replace("\n", "\r") class LF(Transform): @@ -278,12 +291,15 @@ class LF(Transform): class NoTerminal(Transform): """remove typical terminal control codes from input""" - REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t') + REPLACEMENT_MAP = dict( + (x, 0x2400 + x) for x in range(32) if unichr(x) not in "\r\n\b\t" + ) REPLACEMENT_MAP.update( { 0x7F: 0x2421, # DEL 0x9B: 0x2425, # CSI - }) + } + ) def rx(self, text): return text.translate(self.REPLACEMENT_MAP) @@ -300,7 +316,8 @@ class NoControls(NoTerminal): 0x20: 0x2423, # visual space 0x7F: 0x2421, # DEL 0x9B: 0x2425, # CSI - }) + } + ) class Printable(Transform): @@ -309,14 +326,14 @@ class Printable(Transform): def rx(self, text): r = [] for c in text: - if ' ' <= c < '\x7f' or c in '\r\n\b\t': + if " " <= c < "\x7f" or c in "\r\n\b\t": r.append(c) - elif c < ' ': + elif c < " ": r.append(unichr(0x2400 + ord(c))) else: - r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(c))) - r.append(' ') - return ''.join(r) + r.extend(unichr(0x2080 + ord(d) - 48) for d in "{:d}".format(ord(c))) + r.append(" ") + return "".join(r) echo = rx @@ -326,8 +343,8 @@ class Colorize(Transform): def __init__(self): # XXX make it configurable, use colorama? - self.input_color = '\x1b[37m' - self.echo_color = '\x1b[31m' + self.input_color = "\x1b[37m" + self.echo_color = "\x1b[31m" def rx(self, text): return self.input_color + text @@ -340,12 +357,12 @@ class DebugIO(Transform): """Print what is sent and received""" def rx(self, text): - sys.stderr.write(' [RX:{!r}] '.format(text)) + sys.stderr.write(" [RX:{!r}] ".format(text)) sys.stderr.flush() return text def tx(self, text): - sys.stderr.write(' [TX:{!r}] '.format(text)) + sys.stderr.write(" [TX:{!r}] ".format(text)) sys.stderr.flush() return text @@ -355,18 +372,18 @@ def tx(self, text): # - insert newline after: a) timeout b) packet end character EOL_TRANSFORMATIONS = { - 'crlf': CRLF, - 'cr': CR, - 'lf': LF, + "crlf": CRLF, + "cr": CR, + "lf": LF, } TRANSFORMATIONS = { - 'direct': Transform, # no transformation - 'default': NoTerminal, - 'nocontrol': NoControls, - 'printable': Printable, - 'colorize': Colorize, - 'debug': DebugIO, + "direct": Transform, # no transformation + "default": NoTerminal, + "nocontrol": NoControls, + "printable": Printable, + "colorize": Colorize, + "debug": DebugIO, } @@ -377,18 +394,18 @@ def ask_for_port(): easier on systems with long device names, also allow the input of an index. """ - sys.stderr.write('\n--- Available ports:\n') + sys.stderr.write("\n--- Available ports:\n") ports = [] for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): - sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc)) + sys.stderr.write("--- {:2}: {:20} {!r}\n".format(n, port, desc)) ports.append(port) while True: - sys.stderr.write('--- Enter port index or full name: ') - port = raw_input('') + sys.stderr.write("--- Enter port index or full name: ") + port = raw_input("") try: index = int(port) - 1 if not 0 <= index < len(ports): - sys.stderr.write('--- Invalid index!\n') + sys.stderr.write("--- Invalid index!\n") continue except ValueError: pass @@ -403,17 +420,17 @@ class Miniterm(object): Handle special keys from the console to show menu etc. """ - def __init__(self, serial_instance, echo=False, eol='crlf', filters=()): + def __init__(self, serial_instance, echo=False, eol="crlf", filters=()): self.console = Console(self) self.serial = serial_instance self.echo = True self.raw = False - self.input_encoding = 'UTF-8' - self.output_encoding = 'UTF-8' + self.input_encoding = "UTF-8" + self.output_encoding = "UTF-8" self.eol = eol self.filters = filters self.update_transformations() - self.exit_character = unichr(0x1d) # GS/CTRL+] + self.exit_character = unichr(0x1D) # GS/CTRL+] self.menu_character = unichr(0x14) # Menu: CTRL+T self.alive = None self._reader_alive = None @@ -428,14 +445,14 @@ def _start_reader(self): """Start reader thread""" self._reader_alive = True # start serial->console thread - self.receiver_thread = threading.Thread(target=self.reader, name='rx') + self.receiver_thread = threading.Thread(target=self.reader, name="rx") self.receiver_thread.daemon = True self.receiver_thread.start() def _stop_reader(self): """Stop reader thread only, wait for clean exit of thread""" self._reader_alive = False - if hasattr(self.serial, 'cancel_read'): + if hasattr(self.serial, "cancel_read"): self.serial.cancel_read() self.receiver_thread.join() @@ -444,7 +461,7 @@ def start(self): self.alive = True self._start_reader() # enter console->serial loop - self.transmitter_thread = threading.Thread(target=self.writer, name='tx') + self.transmitter_thread = threading.Thread(target=self.writer, name="tx") self.transmitter_thread.daemon = True self.transmitter_thread.start() self.console.setup() @@ -457,7 +474,7 @@ def join(self, transmit_only=False): """wait for worker threads to terminate""" self.transmitter_thread.join() if not transmit_only: - if hasattr(self.serial, 'cancel_read'): + if hasattr(self.serial, "cancel_read"): self.serial.cancel_read() self.receiver_thread.join() @@ -466,45 +483,65 @@ def close(self): def update_transformations(self): """take list of transformation classes and instantiate them for rx and tx""" - transformations = [EOL_TRANSFORMATIONS[self.eol]] + [TRANSFORMATIONS[f] - for f in self.filters] + transformations = [EOL_TRANSFORMATIONS[self.eol]] + [ + TRANSFORMATIONS[f] for f in self.filters + ] self.tx_transformations = [t() for t in transformations] self.rx_transformations = list(reversed(self.tx_transformations)) - def set_rx_encoding(self, encoding, errors='replace'): + def set_rx_encoding(self, encoding, errors="replace"): """set encoding for received data""" self.input_encoding = encoding self.rx_decoder = codecs.getincrementaldecoder(encoding)(errors) - def set_tx_encoding(self, encoding, errors='replace'): + def set_tx_encoding(self, encoding, errors="replace"): """set encoding for transmitted data""" self.output_encoding = encoding self.tx_encoder = codecs.getincrementalencoder(encoding)(errors) def dump_port_settings(self): """Write current settings to sys.stderr""" - sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format( - p=self.serial)) - sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format( - ('active' if self.serial.rts else 'inactive'), - ('active' if self.serial.dtr else 'inactive'), - ('active' if self.serial.break_condition else 'inactive'))) + sys.stderr.write( + "\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format( + p=self.serial + ) + ) + sys.stderr.write( + "--- RTS: {:8} DTR: {:8} BREAK: {:8}\n".format( + ("active" if self.serial.rts else "inactive"), + ("active" if self.serial.dtr else "inactive"), + ("active" if self.serial.break_condition else "inactive"), + ) + ) try: - sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format( - ('active' if self.serial.cts else 'inactive'), - ('active' if self.serial.dsr else 'inactive'), - ('active' if self.serial.ri else 'inactive'), - ('active' if self.serial.cd else 'inactive'))) + sys.stderr.write( + "--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n".format( + ("active" if self.serial.cts else "inactive"), + ("active" if self.serial.dsr else "inactive"), + ("active" if self.serial.ri else "inactive"), + ("active" if self.serial.cd else "inactive"), + ) + ) except serial.SerialException: # on RFC 2217 ports, it can happen if no modem state notification was # yet received. ignore this error. pass - sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive')) - sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive')) - sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) - sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) - sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper())) - sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + sys.stderr.write( + "--- software flow control: {}\n".format( + "active" if self.serial.xonxoff else "inactive" + ) + ) + sys.stderr.write( + "--- hardware flow control: {}\n".format( + "active" if self.serial.rtscts else "inactive" + ) + ) + sys.stderr.write("--- serial input encoding: {}\n".format(self.input_encoding)) + sys.stderr.write( + "--- serial output encoding: {}\n".format(self.output_encoding) + ) + sys.stderr.write("--- EOL: {}\n".format(self.eol.upper())) + sys.stderr.write("--- filters: {}\n".format(" ".join(self.filters))) def reader(self): """loop and copy serial->console""" @@ -513,16 +550,16 @@ def reader(self): # read all that is there or wait for one byte try: data2 = self.serial.read(self.serial.in_waiting or 1) - except: + except Exception: data2 = b"" if data2: - if self.remove_no_write == True: + if self.remove_no_write is True: self.remove_no_write = False self.no_write = False data = data2 else: data += data2 - if b"\n" in data: + if b"\n" in data: if self.raw: self.console.write_bytes(data) else: @@ -531,7 +568,7 @@ def reader(self): text = transformation.rx(text) if b"m1\n" in data: self.no_write = True - if self.no_write == False: + if self.no_write is False: self.console.write(text) data = b"" @@ -548,19 +585,19 @@ def writer(self): c = self.console.getkey() self.remove_no_write = True except KeyboardInterrupt: - c = '\x03' + c = "\x03" if not self.alive: break if menu_active: self.handle_menu_key(c) menu_active = False elif c == self.menu_character: - menu_active = True # next char will be for menu + menu_active = True # next char will be for menu elif c == self.exit_character: - self.stop() # exit app + self.stop() # exit app break else: - #~ if self.raw: + # ~ if self.raw: text = c for transformation in self.tx_transformations: text = transformation.tx(text) @@ -570,7 +607,7 @@ def writer(self): for transformation in self.tx_transformations: echo_text = transformation.echo(echo_text) self.console.write(echo_text) - except: + except Exception: self.alive = False raise @@ -581,95 +618,107 @@ def handle_menu_key(self, c): self.serial.write(self.tx_encoder.encode(c)) if self.echo: self.console.write(c) - elif c == '\x15': # CTRL+U -> upload file + elif c == "\x15": # CTRL+U -> upload file self.upload_file() - elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help + elif c in "\x08hH?": # CTRL+H, h, H, ? -> Show help sys.stderr.write(self.get_help_text()) - elif c == '\x12': # CTRL+R -> Toggle RTS + elif c == "\x12": # CTRL+R -> Toggle RTS self.serial.rts = not self.serial.rts - sys.stderr.write('--- RTS {} ---\n'.format('active' if self.serial.rts else 'inactive')) - elif c == '\x04': # CTRL+D -> Toggle DTR + sys.stderr.write( + "--- RTS {} ---\n".format("active" if self.serial.rts else "inactive") + ) + elif c == "\x04": # CTRL+D -> Toggle DTR self.serial.dtr = not self.serial.dtr - sys.stderr.write('--- DTR {} ---\n'.format('active' if self.serial.dtr else 'inactive')) - elif c == '\x02': # CTRL+B -> toggle BREAK condition + sys.stderr.write( + "--- DTR {} ---\n".format("active" if self.serial.dtr else "inactive") + ) + elif c == "\x02": # CTRL+B -> toggle BREAK condition self.serial.break_condition = not self.serial.break_condition - sys.stderr.write('--- BREAK {} ---\n'.format('active' if self.serial.break_condition else 'inactive')) - elif c == '\x05': # CTRL+E -> toggle local echo + sys.stderr.write( + "--- BREAK {} ---\n".format( + "active" if self.serial.break_condition else "inactive" + ) + ) + elif c == "\x05": # CTRL+E -> toggle local echo self.echo = not self.echo - sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive')) - elif c == '\x06': # CTRL+F -> edit filters + sys.stderr.write( + "--- local echo {} ---\n".format("active" if self.echo else "inactive") + ) + elif c == "\x06": # CTRL+F -> edit filters self.change_filter() - elif c == '\x0c': # CTRL+L -> EOL mode - modes = list(EOL_TRANSFORMATIONS) # keys + elif c == "\x0c": # CTRL+L -> EOL mode + modes = list(EOL_TRANSFORMATIONS) # keys eol = modes.index(self.eol) + 1 if eol >= len(modes): eol = 0 self.eol = modes[eol] - sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper())) + sys.stderr.write("--- EOL: {} ---\n".format(self.eol.upper())) self.update_transformations() - elif c == '\x01': # CTRL+A -> set encoding + elif c == "\x01": # CTRL+A -> set encoding self.change_encoding() - elif c == '\x09': # CTRL+I -> info + elif c == "\x09": # CTRL+I -> info self.dump_port_settings() - #~ elif c == '\x01': # CTRL+A -> cycle escape mode - #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode - elif c in 'pP': # P -> change port + # ~ elif c == '\x01': # CTRL+A -> cycle escape mode + # ~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode + elif c in "pP": # P -> change port self.change_port() - elif c in 'zZ': # S -> suspend / open port temporarily + elif c in "zZ": # S -> suspend / open port temporarily self.suspend_port() - elif c in 'bB': # B -> change baudrate + elif c in "bB": # B -> change baudrate self.change_baudrate() - elif c == '8': # 8 -> change to 8 bits + elif c == "8": # 8 -> change to 8 bits self.serial.bytesize = serial.EIGHTBITS self.dump_port_settings() - elif c == '7': # 7 -> change to 8 bits + elif c == "7": # 7 -> change to 8 bits self.serial.bytesize = serial.SEVENBITS self.dump_port_settings() - elif c in 'eE': # E -> change to even parity + elif c in "eE": # E -> change to even parity self.serial.parity = serial.PARITY_EVEN self.dump_port_settings() - elif c in 'oO': # O -> change to odd parity + elif c in "oO": # O -> change to odd parity self.serial.parity = serial.PARITY_ODD self.dump_port_settings() - elif c in 'mM': # M -> change to mark parity + elif c in "mM": # M -> change to mark parity self.serial.parity = serial.PARITY_MARK self.dump_port_settings() - elif c in 'sS': # S -> change to space parity + elif c in "sS": # S -> change to space parity self.serial.parity = serial.PARITY_SPACE self.dump_port_settings() - elif c in 'nN': # N -> change to no parity + elif c in "nN": # N -> change to no parity self.serial.parity = serial.PARITY_NONE self.dump_port_settings() - elif c == '1': # 1 -> change to 1 stop bits + elif c == "1": # 1 -> change to 1 stop bits self.serial.stopbits = serial.STOPBITS_ONE self.dump_port_settings() - elif c == '2': # 2 -> change to 2 stop bits + elif c == "2": # 2 -> change to 2 stop bits self.serial.stopbits = serial.STOPBITS_TWO self.dump_port_settings() - elif c == '3': # 3 -> change to 1.5 stop bits + elif c == "3": # 3 -> change to 1.5 stop bits self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE self.dump_port_settings() - elif c in 'xX': # X -> change software flow control - self.serial.xonxoff = (c == 'X') + elif c in "xX": # X -> change software flow control + self.serial.xonxoff = c == "X" self.dump_port_settings() - elif c in 'rR': # R -> change hardware flow control - self.serial.rtscts = (c == 'R') + elif c in "rR": # R -> change hardware flow control + self.serial.rtscts = c == "R" self.dump_port_settings() - elif c in 'qQ': - self.stop() # Q -> exit app + elif c in "qQ": + self.stop() # Q -> exit app else: - sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c))) + sys.stderr.write( + "--- unknown menu character {} --\n".format(key_description(c)) + ) def upload_file(self): """Ask user for filename and send its contents""" - sys.stderr.write('\n--- File to upload: ') + sys.stderr.write("\n--- File to upload: ") sys.stderr.flush() with self.console: - filename = sys.stdin.readline().rstrip('\r\n') + filename = sys.stdin.readline().rstrip("\r\n") if filename: try: - with open(filename, 'rb') as f: - sys.stderr.write('--- Sending file {} ---\n'.format(filename)) + with open(filename, "rb") as f: + sys.stderr.write("--- Sending file {} ---\n".format(filename)) while True: block = f.read(1024) if not block: @@ -677,56 +726,67 @@ def upload_file(self): self.serial.write(block) # Wait for output buffer to drain. self.serial.flush() - sys.stderr.write('.') # Progress indicator. - sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) + sys.stderr.write(".") # Progress indicator. + sys.stderr.write("\n--- File {} sent ---\n".format(filename)) except IOError as e: - sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) + sys.stderr.write( + "--- ERROR opening file {}: {} ---\n".format(filename, e) + ) def change_filter(self): """change the i/o transformations""" - sys.stderr.write('\n--- Available Filters:\n') - sys.stderr.write('\n'.join( - '--- {:<10} = {.__doc__}'.format(k, v) - for k, v in sorted(TRANSFORMATIONS.items()))) - sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) + sys.stderr.write("\n--- Available Filters:\n") + sys.stderr.write( + "\n".join( + "--- {:<10} = {.__doc__}".format(k, v) + for k, v in sorted(TRANSFORMATIONS.items()) + ) + ) + sys.stderr.write( + "\n--- Enter new filter name(s) [{}]: ".format(" ".join(self.filters)) + ) with self.console: new_filters = sys.stdin.readline().lower().split() if new_filters: for f in new_filters: if f not in TRANSFORMATIONS: - sys.stderr.write('--- unknown filter: {!r}\n'.format(f)) + sys.stderr.write("--- unknown filter: {!r}\n".format(f)) break else: self.filters = new_filters self.update_transformations() - sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + sys.stderr.write("--- filters: {}\n".format(" ".join(self.filters))) def change_encoding(self): """change encoding on the serial port""" - sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) + sys.stderr.write( + "\n--- Enter new encoding name [{}]: ".format(self.input_encoding) + ) with self.console: new_encoding = sys.stdin.readline().strip() if new_encoding: try: codecs.lookup(new_encoding) except LookupError: - sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) + sys.stderr.write("--- invalid encoding name: {}\n".format(new_encoding)) else: self.set_rx_encoding(new_encoding) self.set_tx_encoding(new_encoding) - sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) - sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + sys.stderr.write("--- serial input encoding: {}\n".format(self.input_encoding)) + sys.stderr.write( + "--- serial output encoding: {}\n".format(self.output_encoding) + ) def change_baudrate(self): """change the baudrate""" - sys.stderr.write('\n--- Baudrate: ') + sys.stderr.write("\n--- Baudrate: ") sys.stderr.flush() with self.console: backup = self.serial.baudrate try: self.serial.baudrate = int(sys.stdin.readline().strip()) except ValueError as e: - sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) + sys.stderr.write("--- ERROR setting baudrate: {} ---\n".format(e)) self.serial.baudrate = backup else: self.dump_port_settings() @@ -752,12 +812,14 @@ def change_port(self): new_serial.open() new_serial.break_condition = self.serial.break_condition except Exception as e: - sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) + sys.stderr.write("--- ERROR opening new port: {} ---\n".format(e)) new_serial.close() else: self.serial.close() self.serial = new_serial - sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) + sys.stderr.write( + "--- Port changed to: {} ---\n".format(self.serial.port) + ) # and restart the reader thread self._start_reader() @@ -769,28 +831,31 @@ def suspend_port(self): # reader thread needs to be shut down self._stop_reader() self.serial.close() - sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port)) + sys.stderr.write("\n--- Port closed: {} ---\n".format(self.serial.port)) do_change_port = False while not self.serial.is_open: - sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format( - exit=key_description(self.exit_character))) + sys.stderr.write( + "--- Quit: {exit} | p: port change | any other key to reconnect ---\n".format( + exit=key_description(self.exit_character) + ) + ) k = self.console.getkey() if k == self.exit_character: - self.stop() # exit app + self.stop() # exit app break - elif k in 'pP': + elif k in "pP": do_change_port = True break try: self.serial.open() except Exception as e: - sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e)) + sys.stderr.write("--- ERROR opening port: {} ---\n".format(e)) if do_change_port: self.change_port() else: # and restart the reader thread self._start_reader() - sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port)) + sys.stderr.write("--- Port opened: {} ---\n".format(self.serial.port)) def get_help_text(self): """return the help text""" @@ -819,183 +884,216 @@ def get_help_text(self): --- b change baud rate --- x X disable/enable software flow control --- r R disable/enable hardware flow control -""".format(version=getattr(serial, 'VERSION', 'unknown version'), - exit=key_description(self.exit_character), - menu=key_description(self.menu_character), - rts=key_description('\x12'), - dtr=key_description('\x04'), - brk=key_description('\x02'), - echo=key_description('\x05'), - info=key_description('\x09'), - upload=key_description('\x15'), - repr=key_description('\x01'), - filter=key_description('\x06'), - eol=key_description('\x0c')) +""".format( + version=getattr(serial, "VERSION", "unknown version"), + exit=key_description(self.exit_character), + menu=key_description(self.menu_character), + rts=key_description("\x12"), + dtr=key_description("\x04"), + brk=key_description("\x02"), + echo=key_description("\x05"), + info=key_description("\x09"), + upload=key_description("\x15"), + repr=key_description("\x01"), + filter=key_description("\x06"), + eol=key_description("\x0c"), + ) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # default args can be used to override when calling main() from an other script # e.g to create a miniterm-my-device.py -def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr=None, serial_instance=None): +def main( + default_port=None, + default_baudrate=9600, + default_rts=None, + default_dtr=None, + serial_instance=None, +): """Command line tool, entry point""" import argparse parser = argparse.ArgumentParser( - description='Miniterm - A simple terminal program for the serial port.') + description="Miniterm - A simple terminal program for the serial port." + ) parser.add_argument( - 'port', - nargs='?', + "port", + nargs="?", help='serial port name ("-" to show port list)', - default=default_port) + default=default_port, + ) parser.add_argument( - 'baudrate', - nargs='?', + "baudrate", + nargs="?", type=int, - help='set baud rate, default: %(default)s', - default=default_baudrate) + help="set baud rate, default: %(default)s", + default=default_baudrate, + ) - group = parser.add_argument_group('port settings') + group = parser.add_argument_group("port settings") group.add_argument( - '--parity', - choices=['N', 'E', 'O', 'S', 'M'], + "--parity", + choices=["N", "E", "O", "S", "M"], type=lambda c: c.upper(), - help='set parity, one of {N E O S M}, default: N', - default='N') + help="set parity, one of {N E O S M}, default: N", + default="N", + ) group.add_argument( - '--rtscts', - action='store_true', - help='enable RTS/CTS flow control (default off)', - default=False) + "--rtscts", + action="store_true", + help="enable RTS/CTS flow control (default off)", + default=False, + ) group.add_argument( - '--xonxoff', - action='store_true', - help='enable software flow control (default off)', - default=False) + "--xonxoff", + action="store_true", + help="enable software flow control (default off)", + default=False, + ) group.add_argument( - '--rts', + "--rts", type=int, - help='set initial RTS line state (possible values: 0, 1)', - default=default_rts) + help="set initial RTS line state (possible values: 0, 1)", + default=default_rts, + ) group.add_argument( - '--dtr', + "--dtr", type=int, - help='set initial DTR line state (possible values: 0, 1)', - default=default_dtr) + help="set initial DTR line state (possible values: 0, 1)", + default=default_dtr, + ) group.add_argument( - '--non-exclusive', - dest='exclusive', - action='store_false', - help='disable locking for native ports', - default=True) + "--non-exclusive", + dest="exclusive", + action="store_false", + help="disable locking for native ports", + default=True, + ) group.add_argument( - '--ask', - action='store_true', - help='ask again for port when open fails', - default=False) + "--ask", + action="store_true", + help="ask again for port when open fails", + default=False, + ) - group = parser.add_argument_group('data handling') + group = parser.add_argument_group("data handling") group.add_argument( - '-e', '--echo', - action='store_true', - help='enable local echo (default off)', - default=False) + "-e", + "--echo", + action="store_true", + help="enable local echo (default off)", + default=False, + ) group.add_argument( - '--encoding', - dest='serial_port_encoding', - metavar='CODEC', - help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s', - default='UTF-8') + "--encoding", + dest="serial_port_encoding", + metavar="CODEC", + help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s", + default="UTF-8", + ) group.add_argument( - '-f', '--filter', - action='append', - metavar='NAME', - help='add text transformation', - default=[]) + "-f", + "--filter", + action="append", + metavar="NAME", + help="add text transformation", + default=[], + ) group.add_argument( - '--eol', - choices=['CR', 'LF', 'CRLF'], + "--eol", + choices=["CR", "LF", "CRLF"], type=lambda c: c.upper(), - help='end of line mode', - default='CRLF') + help="end of line mode", + default="CRLF", + ) group.add_argument( - '--raw', - action='store_true', - help='Do no apply any encodings/transformations', - default=False) + "--raw", + action="store_true", + help="Do no apply any encodings/transformations", + default=False, + ) - group = parser.add_argument_group('hotkeys') + group = parser.add_argument_group("hotkeys") group.add_argument( - '--exit-char', + "--exit-char", type=int, - metavar='NUM', - help='Unicode of special character that is used to exit the application, default: %(default)s', - default=0x1d) # GS/CTRL+] + metavar="NUM", + help="Unicode of special character that is used to exit the application, default: %(default)s", + default=0x1D, + ) # GS/CTRL+] group.add_argument( - '--menu-char', + "--menu-char", type=int, - metavar='NUM', - help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s', - default=0x14) # Menu: CTRL+T + metavar="NUM", + help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s", + default=0x14, + ) # Menu: CTRL+T - group = parser.add_argument_group('diagnostics') + group = parser.add_argument_group("diagnostics") group.add_argument( - '-q', '--quiet', - action='store_true', - help='suppress non-error messages', - default=False) + "-q", + "--quiet", + action="store_true", + help="suppress non-error messages", + default=False, + ) group.add_argument( - '--develop', - action='store_true', - help='show Python traceback on error', - default=False) + "--develop", + action="store_true", + help="show Python traceback on error", + default=False, + ) args = parser.parse_args() if args.menu_char == args.exit_char: - parser.error('--exit-char can not be the same as --menu-char') + parser.error("--exit-char can not be the same as --menu-char") if args.filter: - if 'help' in args.filter: - sys.stderr.write('Available filters:\n') - sys.stderr.write('\n'.join( - '{:<10} = {.__doc__}'.format(k, v) - for k, v in sorted(TRANSFORMATIONS.items()))) - sys.stderr.write('\n') + if "help" in args.filter: + sys.stderr.write("Available filters:\n") + sys.stderr.write( + "\n".join( + "{:<10} = {.__doc__}".format(k, v) + for k, v in sorted(TRANSFORMATIONS.items()) + ) + ) + sys.stderr.write("\n") sys.exit(1) filters = args.filter else: - filters = ['default'] + filters = ["default"] while serial_instance is None: # no port given on command line -> ask user now - if args.port is None or args.port == '-': + if args.port is None or args.port == "-": try: args.port = ask_for_port() except KeyboardInterrupt: - sys.stderr.write('\n') - parser.error('user aborted and port is not given') + sys.stderr.write("\n") + parser.error("user aborted and port is not given") else: if not args.port: - parser.error('port is not given') + parser.error("port is not given") try: serial_instance = serial.serial_for_url( args.port, @@ -1003,19 +1101,28 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr parity=args.parity, rtscts=args.rtscts, xonxoff=args.xonxoff, - do_not_open=True) + do_not_open=True, + ) - if not hasattr(serial_instance, 'cancel_read'): + if not hasattr(serial_instance, "cancel_read"): # enable timeout for alive flag polling if cancel_read is not available serial_instance.timeout = 1 if args.dtr is not None: if not args.quiet: - sys.stderr.write('--- forcing DTR {}\n'.format('active' if args.dtr else 'inactive')) + sys.stderr.write( + "--- forcing DTR {}\n".format( + "active" if args.dtr else "inactive" + ) + ) serial_instance.dtr = args.dtr if args.rts is not None: if not args.quiet: - sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive')) + sys.stderr.write( + "--- forcing RTS {}\n".format( + "active" if args.rts else "inactive" + ) + ) serial_instance.rts = args.rts if isinstance(serial_instance, serial.Serial): @@ -1023,21 +1130,19 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr serial_instance.open() except serial.SerialException as e: - sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e)) + sys.stderr.write("could not open port {!r}: {}\n".format(args.port, e)) if args.develop: raise if not args.ask: sys.exit(1) else: - args.port = '-' + args.port = "-" else: break miniterm = Miniterm( - serial_instance, - echo=args.echo, - eol=args.eol.lower(), - filters=filters) + serial_instance, echo=args.echo, eol=args.eol.lower(), filters=filters + ) miniterm.exit_character = unichr(args.exit_char) miniterm.menu_character = unichr(args.menu_char) miniterm.raw = args.raw @@ -1045,17 +1150,29 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr miniterm.set_tx_encoding(args.serial_port_encoding) if not args.quiet: - sys.stderr.write('--- Miniterm on {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n'.format( - p=miniterm.serial)) - sys.stderr.write('--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n'.format( - key_description(miniterm.exit_character), - key_description(miniterm.menu_character), - key_description(miniterm.menu_character), - key_description('\x08'))) - sys.stderr.write('--- Specifically modified for managing HLPdataBMS4S in Venus OS ---\n'.format( - p=miniterm.serial)) - sys.stderr.write('--- Quit: Ctrl-t q | Local echo: Ctrl-t Ctrl-e ---\n'.format( - p=miniterm.serial)) + sys.stderr.write( + "--- Miniterm on {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n".format( + p=miniterm.serial + ) + ) + sys.stderr.write( + "--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n".format( + key_description(miniterm.exit_character), + key_description(miniterm.menu_character), + key_description(miniterm.menu_character), + key_description("\x08"), + ) + ) + sys.stderr.write( + "--- Specifically modified for managing HLPdataBMS4S in Venus OS ---\n".format( # noqa: F522 + p=miniterm.serial + ) + ) + sys.stderr.write( + "--- Quit: Ctrl-t q | Local echo: Ctrl-t Ctrl-e ---\n".format( # noqa: F522 + p=miniterm.serial + ) + ) miniterm.start() try: @@ -1063,11 +1180,11 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr except KeyboardInterrupt: pass if not args.quiet: - sys.stderr.write('\n--- exit ---\n') + sys.stderr.write("\n--- exit ---\n") miniterm.join() miniterm.close() # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -if __name__ == '__main__': +if __name__ == "__main__": main() From 25cd931ef9b1a6d9e1202bb3133c1d40762b6f6c Mon Sep 17 00:00:00 2001 From: Manuel <mr-manuel@outlook.it> Date: Mon, 1 May 2023 18:28:20 +0200 Subject: [PATCH 13/13] fix error 2023-05-01 16:26:56.144343500 INFO:SerialBattery:Testing HLPdataBMS4S 2023-05-01 16:26:56.146290500 Traceback (most recent call last): 2023-05-01 16:26:56.146294500 File "/opt/victronenergy/dbus-serialbattery/dbus-serialbattery.py", line 151, in <module> 2023-05-01 16:26:56.146298500 main() 2023-05-01 16:26:56.146300500 File "/opt/victronenergy/dbus-serialbattery/dbus-serialbattery.py", line 120, in main 2023-05-01 16:26:56.146304500 battery = get_battery(port) 2023-05-01 16:26:56.146307500 File "/opt/victronenergy/dbus-serialbattery/dbus-serialbattery.py", line 77, in get_battery 2023-05-01 16:26:56.146310500 battery: Battery = batteryClass( 2023-05-01 16:26:56.146347500 TypeError: __init__() got an unexpected keyword argument 'address' --- etc/dbus-serialbattery/hlpdatabms4s.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/dbus-serialbattery/hlpdatabms4s.py b/etc/dbus-serialbattery/hlpdatabms4s.py index 4b3ef278..1bacf395 100644 --- a/etc/dbus-serialbattery/hlpdatabms4s.py +++ b/etc/dbus-serialbattery/hlpdatabms4s.py @@ -7,8 +7,8 @@ class HLPdataBMS4S(Battery): - def __init__(self, port, baud): - super(HLPdataBMS4S, self).__init__(port, baud) + def __init__(self, port, baud, address): + super(HLPdataBMS4S, self).__init__(port, baud, address) self.type = self.BATTERYTYPE BATTERYTYPE = "HLPdataBMS4S"