Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing my JK BMS RS485 issue. #508

Merged
merged 1 commit into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions mppsolar/inout/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class PortType(Enum):
USB = auto()
ESP32 = auto()
SERIAL = auto()
JKSERIAL = auto()
JKBLE = auto()
MQTT = auto()
VSERIAL = auto()
Expand Down Expand Up @@ -62,6 +63,9 @@ def get_port_type(port):
elif "vserial" in port:
log.debug("port matches vserial")
return PortType.VSERIAL
elif "jkserial" in port:
log.debug("port matches jkserial")
return PortType.JKSERIAL
elif "serial" in port:
log.debug("port matches serial")
return PortType.SERIAL
Expand Down Expand Up @@ -113,6 +117,12 @@ def get_port(*args, **kwargs):

_port = SerialIO(device_path=port, serial_baud=baud)

elif port_type == PortType.JKSERIAL:
log.info("Using jkserialio for communications")
from mppsolar.inout.jkserialio import JKSerialIO

_port = JKSerialIO(device_path=port, serial_baud=baud)

elif port_type == PortType.DALYSERIAL:
log.info("Using dalyserialio for communications")
from mppsolar.inout.dalyserialio import DalySerialIO
Expand Down
61 changes: 61 additions & 0 deletions mppsolar/inout/jkserialio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging
import serial
import time

from .baseio import BaseIO
from ..helpers import get_kwargs

log = logging.getLogger("JKSerialIO")


class JKSerialIO(BaseIO):
def __init__(self, *args, **kwargs) -> None:
self._serial_port = get_kwargs(kwargs, "device_path")
self._serial_baud = get_kwargs(kwargs, "serial_baud")
self.no_data_counter = 0

def pattern_matched(self, data):
if len(data) >= 5:
return (
data[-5] == 0x68 and
data[-4] == 0x00 and
data[-3] == 0x00
# data[-2] and data[-1] can be anything, so no specific check needed
)
return False

def send_and_receive(self, *args, **kwargs) -> dict:
full_command = get_kwargs(kwargs, "full_command")
response_line = None
log.debug(f"port {self._serial_port}, baudrate {self._serial_baud}")
try:
with serial.serial_for_url(self._serial_port, self._serial_baud) as s:
log.debug("Executing command via jkserialio...")
s.timeout = 1
s.write_timeout = 1
s.flushInput()
s.flushOutput()
s.write(full_command)
time.sleep(0.1)

while self.no_data_counter < 5: # Try up to 5 times with no new data before exiting
if s.in_waiting > 0:
if response_line is None:
response_line = bytearray()
response_line.extend(s.read(s.in_waiting))
self.no_data_counter = 0 # Reset counter if data was received

# Check if the last 5 bytes match the pattern
if self.pattern_matched(response_line):
log.debug("JK serial end frame pattern matched.")
break # Exit the loop if the pattern is matched
else:
self.no_data_counter += 1
time.sleep(0.01)

log.debug("serial response was: %s", response_line)
return response_line
except Exception as e:
log.warning(f"Serial read error: {e}")
log.info("Command execution failed")
return {"ERROR": ["Serial command execution failed", ""]}