From 9cf3b9af8ec62f70586d03c821c3a9887bd3f04b Mon Sep 17 00:00:00 2001 From: wollew Date: Tue, 2 May 2023 21:45:54 +0200 Subject: [PATCH] Add support for Seplos BMS (#530) * Initial version of Seplos support TODOS (at least): * revisit the stupid timing stuff, still does not read data reliably * validate return code and checksums * use address to support multiple packs * read and populate alarm data * cleanup logging * calculate real checksums instead of hard wired for one command * Fix most open issues: * fixes serial reading from Seplos by simply using readline() * validates checksums, return codes * clean up logging * proper encoding of arbitrary cid2 commands including address, length and checksums Still TODO: * read alarm data * adds reading of alarm data, populates the Protection class. * adds state of charge/discharge switches * remove wildcard imports from seplos * remove wildcard imports from battery template * fix wildcard import and black lint errors * Revert battery_template.py changes The template is already reworked in another PR. * fix typo * remove unused code * add documentation about hardcoded/config max (dis-)charge values (see #4) --------- Co-authored-by: Manuel Co-authored-by: Louis Van Der Walt <26077665+Louisvdw@users.noreply.github.com> --- etc/dbus-serialbattery/battery.py | 3 + etc/dbus-serialbattery/dbus-serialbattery.py | 2 + etc/dbus-serialbattery/seplos.py | 298 +++++++++++++++++++ 3 files changed, 303 insertions(+) create mode 100644 etc/dbus-serialbattery/seplos.py diff --git a/etc/dbus-serialbattery/battery.py b/etc/dbus-serialbattery/battery.py index 24522fe4..15021526 100644 --- a/etc/dbus-serialbattery/battery.py +++ b/etc/dbus-serialbattery/battery.py @@ -15,6 +15,9 @@ class Protection(object): This class holds Warning and alarm states for different types of Checks They are of type integer, 2 represents an Alarm, 1 a Warning, 0 if everything is fine """ + ALARM = 2 + WARNING = 1 + NOALARM = 0 def __init__(self): self.voltage_high: int = None diff --git a/etc/dbus-serialbattery/dbus-serialbattery.py b/etc/dbus-serialbattery/dbus-serialbattery.py index 230ed2ca..5b95a531 100644 --- a/etc/dbus-serialbattery/dbus-serialbattery.py +++ b/etc/dbus-serialbattery/dbus-serialbattery.py @@ -23,6 +23,7 @@ from daly import Daly from ant import Ant from jkbms import Jkbms +from seplos import Seplos # from sinowealth import Sinowealth from renogy import Renogy @@ -42,6 +43,7 @@ {"bms": Renogy, "baud": 9600, "address": b"\x30"}, {"bms": Renogy, "baud": 9600, "address": b"\xF7"}, {"bms": Ecs, "baud": 19200}, + {"bms": Seplos, "baud": 19200}, ] expected_bms_types = [ battery_type diff --git a/etc/dbus-serialbattery/seplos.py b/etc/dbus-serialbattery/seplos.py new file mode 100644 index 00000000..c9519fd0 --- /dev/null +++ b/etc/dbus-serialbattery/seplos.py @@ -0,0 +1,298 @@ +# -*- coding: utf-8 -*- +from battery import Protection, Battery, Cell +from utils import logger +import utils +import serial + + + +class Seplos(Battery): + def __init__(self, port, baud, address=0x00): + super(Seplos, self).__init__(port, baud, address) + self.type = self.BATTERYTYPE + self.poll_interval = 5000 + + BATTERYTYPE = "Seplos" + + COMMAND_STATUS = 0x42 + COMMAND_ALARM = 0x44 + COMMAND_PROTOCOL_VERSION = 0x4F + COMMAND_VENDOR_INFO = 0x51 + + @staticmethod + def int_from_1byte_hex_ascii(data: bytes, offset: int, signed=False): + return int.from_bytes( + bytes.fromhex(data[offset : offset + 2].decode("ascii")), + byteorder="big", + signed=signed, + ) + + @staticmethod + def int_from_2byte_hex_ascii(data: bytes, offset: int, signed=False): + return int.from_bytes( + bytes.fromhex(data[offset : offset + 4].decode("ascii")), + byteorder="big", + signed=signed, + ) + + @staticmethod + def get_checksum(frame: bytes) -> int: + """implements the Seplos checksum algorithm, returns 4 bytes""" + checksum = 0 + for b in frame: + checksum += b + checksum %= 0xFFFF + checksum ^= 0xFFFF + checksum += 1 + return checksum + + @staticmethod + def get_info_length(info: bytes) -> int: + """implements the Seplos checksum for the info length""" + lenid = len(info) + if lenid == 0: + return 0 + + lchksum = (lenid & 0xF) + ((lenid >> 4) & 0xF) + ((lenid >> 8) & 0xF) + lchksum %= 16 + lchksum ^= 0xF + lchksum += 1 + + return (lchksum << 12) + lenid + + @staticmethod + def encode_cmd(address: int, cid2: int, info: bytes = b"") -> bytes: + """encodes a command sent to a battery (cid1=0x46)""" + cid1 = 0x46 + + info_length = Seplos.get_info_length(info) + + frame = "{:02X}{:02X}{:02X}{:02X}{:04X}".format( + 0x20, address, cid1, cid2, info_length + ).encode() + frame += info + + checksum = Seplos.get_checksum(frame) + encoded = b"~" + frame + "{:04X}".format(checksum).encode() + b"\r" + return encoded + + def test_connection(self): + # call a function that will connect to the battery, send a command and retrieve the result. + # The result or call should be unique to this BMS. Battery name or version, etc. + # Return True if success, False for failure + + try: + return self.read_status_data() + except Exception as err: + logger.error(f"Unexpected {err=}, {type(err)=}") + return False + + def get_settings(self): + # After successful connection get_settings will be called to set up the battery. + # Set the current limits, populate cell count, etc. + # Return True if success, False for failure + + # BMS does not provide max charge-/discharge, so we have to use hardcoded/config values + self.max_battery_charge_current = utils.MAX_BATTERY_CHARGE_CURRENT + self.max_battery_discharge_current = utils.MAX_BATTERY_DISCHARGE_CURRENT + + self.max_battery_voltage = utils.MAX_CELL_VOLTAGE * self.cell_count + self.min_battery_voltage = utils.MIN_CELL_VOLTAGE * self.cell_count + + # init the cell array + for _ in range(self.cell_count): + self.cells.append(Cell(False)) + + return True + + def refresh_data(self): + # call all functions that will refresh the battery data. + # This will be called for every iteration (self.poll_interval) + # Return True if success, False for failure + result_status = self.read_status_data() + # sleep(0.5) + result_alarm = self.read_alarm_data() + + return result_status and result_alarm + + @staticmethod + def decode_alarm_byte(data_byte: int, alarm_bit: int, warn_bit: int): + if data_byte & (1 << alarm_bit) != 0: + return Protection.ALARM + if data_byte & (1 << warn_bit) != 0: + return Protection.WARNING + return Protection.OK + + def read_alarm_data(self): + data = self.read_serial_data_seplos( + self.encode_cmd(address=0x00, cid2=self.COMMAND_ALARM, info=b"01") + ) + # check if connection success + if data is False: + return False + + logger.debug("alarm info raw {}".format(data)) + return self.decode_alarm_data(bytes.fromhex(data.decode("ascii"))) + + def decode_alarm_data(self, data: bytes): + logger.debug("alarm info decoded {}".format(data)) + voltage_alarm_byte = data[30] + self.protection.voltage_cell_low = Seplos.decode_alarm_byte( + data_byte=voltage_alarm_byte, alarm_bit=3, warn_bit=2 + ) + # cell high voltage is actually unused because DBUS does not seem to support it, decoding anyway + # c.f. https://github.com/victronenergy/venus/wiki/dbus#battery + self.protection.voltage_cell_high = Seplos.decode_alarm_byte( + data_byte=voltage_alarm_byte, alarm_bit=1, warn_bit=0 + ) + self.protection.voltage_low = Seplos.decode_alarm_byte( + data_byte=voltage_alarm_byte, alarm_bit=7, warn_bit=6 + ) + self.protection.voltage_high = Seplos.decode_alarm_byte( + data_byte=voltage_alarm_byte, alarm_bit=5, warn_bit=4 + ) + + temperature_alarm_byte = data[31] + self.protection.temp_low_charge = Seplos.decode_alarm_byte( + data_byte=temperature_alarm_byte, alarm_bit=3, warn_bit=2 + ) + self.protection.temp_high_charge = Seplos.decode_alarm_byte( + data_byte=temperature_alarm_byte, alarm_bit=1, warn_bit=0 + ) + self.protection.temp_low_discharge = Seplos.decode_alarm_byte( + data_byte=temperature_alarm_byte, alarm_bit=7, warn_bit=6 + ) + self.protection.temp_high_discharge = Seplos.decode_alarm_byte( + data_byte=temperature_alarm_byte, alarm_bit=5, warn_bit=4 + ) + + current_alarm_byte = data[33] + self.protection.current_over = Seplos.decode_alarm_byte( + data_byte=current_alarm_byte, alarm_bit=1, warn_bit=0 + ) + self.protection.current_under = Seplos.decode_alarm_byte( + data_byte=current_alarm_byte, alarm_bit=3, warn_bit=2 + ) + + soc_alarm_byte = data[34] + self.protection.soc_low = Seplos.decode_alarm_byte( + data_byte=soc_alarm_byte, alarm_bit=3, warn_bit=2 + ) + + switch_byte = data[35] + self.discharge_fet = True if switch_byte & 0b01 != 0 else False + self.charge_fet = True if switch_byte & 0b10 != 0 else False + return True + + def read_status_data(self): + logger.debug("read status data") + data = self.read_serial_data_seplos( + self.encode_cmd(address=0x00, cid2=0x42, info=b"01") + ) + + # check if connection success + if data is False: + return False + + cell_count_offset = 4 + voltage_offset = 6 + temps_offset = 72 + self.cell_count = Seplos.int_from_1byte_hex_ascii( + data=data, offset=cell_count_offset + ) + if self.cell_count == len(self.cells): + for i in range(self.cell_count): + voltage = ( + Seplos.int_from_2byte_hex_ascii(data, voltage_offset + i * 4) / 1000 + ) + self.cells[i].voltage = voltage + logger.debug("Voltage cell[{}]={}V".format(i, voltage)) + for i in range(min(4, self.cell_count)): + temp = ( + Seplos.int_from_2byte_hex_ascii(data, temps_offset + i * 4) - 2731 + ) / 10 + self.cells[i].temp = temp + logger.debug("Temp cell[{}]={}°C".format(i, temp)) + + self.temp1 = ( + Seplos.int_from_2byte_hex_ascii(data, temps_offset + 4 * 4) - 2731 + ) / 10 + self.temp2 = ( + Seplos.int_from_2byte_hex_ascii(data, temps_offset + 5 * 4) - 2731 + ) / 10 + self.current = ( + Seplos.int_from_2byte_hex_ascii(data, offset=96, signed=True) / 100 + ) + self.voltage = Seplos.int_from_2byte_hex_ascii(data, offset=100) / 100 + self.capacity_remain = Seplos.int_from_2byte_hex_ascii(data, offset=104) / 100 + self.capacity = Seplos.int_from_2byte_hex_ascii(data, offset=110) / 100 + self.soc = Seplos.int_from_2byte_hex_ascii(data, offset=114) / 10 + self.cycles = Seplos.int_from_2byte_hex_ascii(data, offset=122) + self.hardware_version = "Seplos BMS {} cells".format(self.cell_count) + + logger.debug("Current = {}A , Voltage = {}V".format(self.current, self.voltage)) + logger.debug( + "Capacity = {}/{}Ah , SOC = {}%".format( + self.capacity_remain, self.capacity, self.soc + ) + ) + logger.debug("Cycles = {}".format(self.cycles)) + logger.debug( + "Environment temp = {}°C , Power temp = {}°C".format( + self.temp1, self.temp2 + ) + ) + logger.debug("HW:" + self.hardware_version) + + return True + + @staticmethod + def is_valid_frame(data: bytes) -> bool: + """checks if data contains a valid frame + * minimum length is 18 Byte + * checksum needs to be valid + * also checks for error code as return code in cid2 + * not checked: lchksum + """ + if len(data) < 18: + logger.warning("short read, data={}".format(data)) + return False + + chksum = Seplos.get_checksum(data[1:-5]) + if chksum != Seplos.int_from_2byte_hex_ascii(data, -5): + logger.warning("checksum error") + return False + + cid2 = data[7:9] + if cid2 != b"00": + logger.warning("command returned with error code {}".format(cid2)) + return False + + return True + + def read_serial_data_seplos(self, command): + logger.debug("read serial data seplos") + + with serial.Serial(self.port, baudrate=self.baud_rate, timeout=1) as ser: + ser.flushOutput() + ser.flushInput() + written = ser.write(command) + logger.debug( + "wrote {} bytes to serial port {}, command={}".format( + written, self.port, command + ) + ) + + data = ser.readline() + + if not Seplos.is_valid_frame(data): + return False + + length_pos = 10 + return_data = data[length_pos + 3 : -5] + info_length = Seplos.int_from_2byte_hex_ascii(b"0" + data[length_pos:], 0) + logger.debug( + "return info data of length {} : {}".format(info_length, return_data) + ) + + return return_data