-
-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for HLPdata BMS4S #505
Merged
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
60ed836
Support för BMS4S from hlpdata.se, a BMS developed for use in boats.
peterohman 699d116
log entry removed
peterohman 6d8a1ff
Update hlpdatabms4s.py
peterohman edafdcf
Terminal to manage HLPdataBMS4S
peterohman 7f5c0ec
Initial comment added
peterohman bb53832
Temperture reporting added
peterohman c32656e
Temp reading corrected
peterohman 2f17876
Charge/discharge_fet added
peterohman 3b90aa5
Starts with local echo on
peterohman cc9c7b5
Merge branch 'master' into master
mr-manuel b4592b5
wildcard imports removed
peterohman 2005e4f
fixed import and black lint
mr-manuel 49c43cb
fix black lint errors
mr-manuel 25cd931
fix error
mr-manuel 781c257
Merge branch 'master' into master
Louisvdw File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
# -*- coding: utf-8 -*- | ||
from battery import Battery, Cell | ||
from utils import logger | ||
import utils | ||
import serial | ||
from time import sleep | ||
|
||
|
||
class HLPdataBMS4S(Battery): | ||
def __init__(self, port, baud, address): | ||
super(HLPdataBMS4S, self).__init__(port, baud, address) | ||
self.type = self.BATTERYTYPE | ||
|
||
BATTERYTYPE = "HLPdataBMS4S" | ||
|
||
def test_connection(self): | ||
# call a function that will connect to the battery, send a command and retrieve the result. | ||
# The result or call should be unique to this BMS. Battery name or version, etc. | ||
# Return True if success, False for failure | ||
result = False | ||
try: | ||
result = self.read_test_data() | ||
except Exception as err: | ||
logger.error(f"Unexpected {err=}, {type(err)=}") | ||
result = False | ||
|
||
return result | ||
|
||
def get_settings(self): | ||
# After successful connection get_settings will be call to set up the battery. | ||
# Set the current limits, populate cell count, etc | ||
# Return True if success, False for failure | ||
result = False | ||
try: | ||
result = self.read_settings_data() | ||
except Exception as e: | ||
logger.error(e, exc_info=True) | ||
pass | ||
return result | ||
|
||
def refresh_data(self): | ||
# call all functions that will refresh the battery data. | ||
# This will be called for every iteration (1 second) | ||
# Return True if success, False for failure | ||
result = False | ||
try: | ||
result = self.read_status_data() | ||
except Exception as e: | ||
logger.error(e, exc_info=True) | ||
pass | ||
return result | ||
|
||
# def log_settings(self): | ||
# logger.info(f'Battery {self.type} connected to dbus from {self.port}') | ||
# logger.info(f'=== Settings ===') | ||
# cell_counter = len(self.cells) | ||
# logger.info(f'> Connection voltage {self.voltage}V | current {self.current}A | SOC {self.soc}%') | ||
# logger.info(f'> Cell count {self.cell_count} | cells populated {cell_counter}') | ||
# logger.info(f'> CCCM SOC {CCCM_SOC_ENABLE} | DCCM SOC {DCCM_SOC_ENABLE}') | ||
# logger.info(f'> CCCM CV {CCCM_CV_ENABLE} | DCCM CV {DCCM_CV_ENABLE}') | ||
# logger.info(f'> CCCM T {CCCM_T_ENABLE} | DCCM T {DCCM_T_ENABLE}') | ||
# logger.info(f'> MIN_CELL_VOLTAGE {MIN_CELL_VOLTAGE}V | MAX_CELL_VOLTAGE {MAX_CELL_VOLTAGE}V') | ||
|
||
return | ||
|
||
def read_test_data(self): | ||
test_data = self.read_serial_data_HLPdataBMS4S(b"pv\n", 1, 15) | ||
if test_data is False: | ||
return False | ||
s1 = str(test_data) | ||
ix = s1.find("BMS4S") | ||
if ix > 0: | ||
self.hardware_version = s1[ix : len(s1) - 1] | ||
self.version = self.hardware_version | ||
self.max_battery_charge_current = utils.MAX_BATTERY_CHARGE_CURRENT | ||
self.max_battery_discharge_current = utils.MAX_BATTERY_DISCHARGE_CURRENT | ||
self.poll_interval = 10000 | ||
self.control_discharge_current = 1000 | ||
self.control_charge_current = 1000 | ||
self.soc = 50 | ||
self.voltage = 13.2 | ||
self.current = 0 | ||
self.min_battery_voltage = 12.0 | ||
self.max_battery_voltage = 14.4 | ||
|
||
if self.cell_count is None: | ||
self.cell_count = 4 | ||
for c in range(self.cell_count): | ||
self.cells.append(Cell(False)) | ||
return True | ||
return False | ||
|
||
def read_settings_data(self): | ||
test_data = self.read_serial_data_HLPdataBMS4S(b"ps\n", 3, 700) | ||
if test_data is False: | ||
return False | ||
s = str(test_data) | ||
s = s.replace(",", ".") | ||
par = get_par("BatterySize= ", s) | ||
if par is False: | ||
return False | ||
self.capacity = int(par) | ||
v = get_par("VoltHigh= ", s) | ||
if v is False: | ||
return False | ||
self.max_battery_voltage = float(v) * float(4) | ||
v = get_par("VoltLow= ", s) | ||
if v is False: | ||
return False | ||
self.min_battery_voltage = float(v) * float(4) | ||
|
||
return True | ||
|
||
def read_status_data(self): | ||
status_data = self.read_serial_data_HLPdataBMS4S(b"m1\n", 0.2, 40) | ||
if status_data is False: | ||
return False | ||
par1 = str(status_data) | ||
par = par1.split(",") | ||
if len(par) < 8: | ||
return False | ||
if len(par[0]) < 7: | ||
return False | ||
p0 = str(par[0]) | ||
ix = p0.find(".") | ||
par0 = p0[ix - 1 : len(p0)] | ||
|
||
# v1,v2,v3,v4,current,soc,chargeoff,loadoff,vbat2,socnow,adj,beep,led,temp1,temp2... | ||
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14... | ||
|
||
self.voltage = float(par0) + float(par[1]) + float(par[2]) + float(par[3]) | ||
self.cells[0].voltage = float(par0) | ||
self.cells[1].voltage = float(par[1]) | ||
self.cells[2].voltage = float(par[2]) | ||
self.cells[3].voltage = float(par[3]) | ||
self.current = float(par[4]) | ||
self.soc = int(par[5]) | ||
self.control_allow_charge = par[6] | ||
self.charge_fet = par[6] | ||
self.control_allow_discharge = par[7] | ||
self.discharge_fet = par[7] | ||
|
||
beep = int(par[11]) | ||
if beep == 2: | ||
self.protection.temp_low_charge = 1 | ||
else: | ||
self.protection.temp_low_charge = 0 | ||
if beep == 3: | ||
self.protection.temp_high_charge = 1 | ||
else: | ||
self.protection.temp_high_charge = 0 | ||
if beep == 4: | ||
self.protection.voltage_low = 2 | ||
else: | ||
self.protection.voltage_low = 0 | ||
if beep == 5: | ||
self.protection.voltage_high = 2 | ||
else: | ||
self.protection.voltage_high = 0 | ||
|
||
if len(par) > 13: | ||
nb = 0 | ||
min = int(1000) | ||
max = int(-1000) | ||
ix = 13 | ||
while ix < len(par): | ||
tmp = par[ix].split(" ") | ||
ix += 1 | ||
if len(tmp) == 2: | ||
name = tmp[0] | ||
temp = int("".join(filter(str.isdigit, tmp[1]))) | ||
if name[0] == "b": | ||
nb += 1 | ||
if temp > max: | ||
max = temp | ||
if temp < min: | ||
min = temp | ||
if nb == 1: | ||
self.temp1 = max | ||
if nb > 1: | ||
self.temp1 = max | ||
self.temp2 = min | ||
|
||
return True | ||
|
||
def manage_charge_voltage(self): | ||
self.allow_max_voltage = True | ||
self.control_voltage = self.max_battery_voltage | ||
|
||
def manage_charge_current(self): | ||
self.control_charge_current = 1000 | ||
self.control_discharge_current = 1000 | ||
|
||
def read_serial_data_HLPdataBMS4S(self, command, time, min_len): | ||
data = read_serial_data2(command, self.port, self.baud_rate, time, min_len) | ||
if data is False: | ||
return False | ||
return data | ||
|
||
|
||
def read_serial_data2(command, port, baud, time, min_len): | ||
try: | ||
with serial.Serial(port, baudrate=baud, timeout=0.5) as ser: | ||
ret = read_serialport_data2(ser, command, time, min_len) | ||
if not ret is False: | ||
return ret | ||
return False | ||
|
||
except serial.SerialException as e: | ||
logger.error(e) | ||
return False | ||
|
||
|
||
def read_serialport_data2(ser, command, time, min_len): | ||
try: | ||
cnt = 0 | ||
while cnt < 3: | ||
cnt += 1 | ||
ser.flushOutput() | ||
ser.flushInput() | ||
ser.write(command) | ||
sleep(time) | ||
res = ser.read(1000) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it always 1000? Why are you not using |
||
if len(res) >= min_len: | ||
return res | ||
return False | ||
|
||
except serial.SerialException as e: | ||
logger.error(e) | ||
return False | ||
|
||
|
||
def get_par(p, s): | ||
ix = s.find(p) | ||
if ix > 0: | ||
ix += len(p) | ||
for i in range(ix, len(s)): | ||
if s[i] == " " or s[i] == 10 or s[i] == 13: | ||
ret = s[ix:i] | ||
return ret | ||
return False |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the wait time of 1 second be reduced?