-
-
Notifications
You must be signed in to change notification settings - Fork 20
/
__init__.py
executable file
·214 lines (191 loc) · 7.98 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""Luxtronik heatpump interface."""
# -*- coding: utf-8 -*-
# region Imports
from __future__ import annotations
import logging
import socket
import struct
import threading
import time
from luxtronik.calculations import Calculations
from luxtronik.parameters import Parameters
from luxtronik.visibilities import Visibilities
from luxtronik.discover import discover # noqa: F401
from luxtronik.constants import (
LUXTRONIK_DEFAULT_PORT,
LUXTRONIK_PARAMETERS_WRITE,
LUXTRONIK_PARAMETERS_READ,
LUXTRONIK_CALCULATIONS_READ,
LUXTRONIK_VISIBILITIES_READ,
LUXTRONIK_SOCKET_READ_SIZE_PEEK,
LUXTRONIK_SOCKET_READ_SIZE_INTEGER,
LUXTRONIK_SOCKET_READ_SIZE_CHAR,
)
# endregion Imports
LOGGER = logging.getLogger("Luxtronik")
# Wait time (in seconds) after writing parameters to give controller
# some time to re-calculate values, etc.
WAIT_TIME_AFTER_PARAMETER_WRITE = 1
def is_socket_closed(sock: socket.socket) -> bool:
"""Check is socket closed."""
try:
# this will try to read bytes without blocking and also without removing them from buffer
data = sock.recv(
LUXTRONIK_SOCKET_READ_SIZE_PEEK, socket.MSG_DONTWAIT | socket.MSG_PEEK
)
if len(data) == 0:
return True
except BlockingIOError:
return False # socket is open and reading from it would block
except ConnectionResetError: # pylint: disable=broad-except
return True # socket was closed for some other reason
except Exception as err: # pylint: disable=broad-except
LOGGER.exception(
"Unexpected exception when checking if socket is closed", exc_info=err
)
return False
return False
class Luxtronik:
"""Main luxtronik class."""
def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT, safe=True):
self._lock = threading.Lock()
self._host = host
self._port = port
self._safe = safe
self._socket = None
self.read()
def __del__(self):
if self._socket is not None:
if not is_socket_closed(self._socket):
self._socket.close()
self._socket = None
LOGGER.info(
"Disconnected from Luxtronik heatpump %s:%s", self._host, self._port
)
def read(self):
"""Read data from heatpump."""
return self._read_after_write(parameters=None)
def write(self, parameters):
"""Write parameter to heatpump."""
return self._read_after_write(parameters=parameters)
def _read_after_write(self, parameters):
"""
Read and/or write value from and/or to heatpump.
This method is essentially a wrapper for the _read() and _write()
methods.
Locking is being used to ensure that only a single socket operation is
performed at any point in time. This helps to avoid issues with the
Luxtronik controller, which seems unstable otherwise.
If write is true, all parameters will be written to the heat pump
prior to reading back in all data from the heat pump. If write is
false, no data will be written, but all available data will be read
from the heat pump.
:param Parameters() parameters Parameter dictionary to be written
to the heatpump before reading all available data
from the heatpump. At 'None' it is read only.
"""
with self._lock:
is_none = self._socket is None
if is_none:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_none or is_socket_closed(self._socket):
self._socket.connect((self._host, self._port))
LOGGER.info(
"Connected to Luxtronik heatpump %s:%s", self._host, self._port
)
if parameters is not None:
return self._write(parameters)
return self._read()
def _read(self):
parameters = self._read_parameters()
calculations = self._read_calculations()
visibilities = self._read_visibilities()
return calculations, parameters, visibilities
def _write(self, parameters):
for index, value in parameters.queue.items():
if not isinstance(index, int) or not isinstance(value, int):
LOGGER.warning(
"%s: Parameter id '%s' or value '%s' invalid!",
self._host,
index,
value,
)
continue
LOGGER.info("%s: Parameter '%d' set to '%s'", self._host, index, value)
self._send_ints(LUXTRONIK_PARAMETERS_WRITE, index, value)
cmd = self._read_int()
LOGGER.debug("%s: Command %s", self._host, cmd)
val = self._read_int()
LOGGER.debug("%s: Value %s", self._host, val)
# Flush queue after writing all values
parameters.queue = {}
# Give the heatpump a short time to handle the value changes/calculations:
time.sleep(WAIT_TIME_AFTER_PARAMETER_WRITE)
# Read the new values based on our parameter changes:
return self._read()
def _read_parameters(self):
data = []
self._send_ints(LUXTRONIK_PARAMETERS_READ, 0)
cmd = self._read_int()
LOGGER.debug("%s: Command %s", self._host, cmd)
length = self._read_int()
LOGGER.debug("%s: Length %s", self._host, length)
for _ in range(0, length):
try:
data.append(self._read_int())
except struct.error as err:
# not logging this as error as it would be logged on every read cycle
LOGGER.debug("%s: %s", self._host, err)
LOGGER.info("%s: Read %d parameters", self._host, length)
parameters = Parameters(safe=self._safe)
parameters.parse(data)
return parameters
def _read_calculations(self):
data = []
self._send_ints(LUXTRONIK_CALCULATIONS_READ, 0)
cmd = self._read_int()
LOGGER.debug("%s: Command %s", self._host, cmd)
stat = self._read_int()
LOGGER.debug("%s: Stat %s", self._host, stat)
length = self._read_int()
LOGGER.debug("%s: Length %s", self._host, length)
for _ in range(0, length):
try:
data.append(self._read_int())
except struct.error as err:
# not logging this as error as it would be logged on every read cycle
LOGGER.debug("%s: %s", self._host, err)
LOGGER.info("%s: Read %d calculations", self._host, length)
calculations = Calculations()
calculations.parse(data)
return calculations
def _read_visibilities(self):
data = []
self._send_ints(LUXTRONIK_VISIBILITIES_READ, 0)
cmd = self._read_int()
LOGGER.debug("%s: Command %s", self._host, cmd)
length = self._read_int()
LOGGER.debug("%s: Length %s", self._host, length)
for _ in range(0, length):
try:
data.append(self._read_char())
except struct.error as err:
# not logging this as error as it would be logged on every read cycle
LOGGER.debug("%s: %s", self._host, err)
LOGGER.info("%s: Read %d visibilities", self._host, length)
visibilities = Visibilities()
visibilities.parse(data)
return visibilities
def _send_ints(self, *ints):
"Low-level helper to send a tuple of ints"
data = struct.pack(">" + "i" * len(ints), *ints)
LOGGER.debug("%s: sending %s", self._host, data)
self._socket.sendall(data)
def _read_int(self):
"Low-level helper to receive an int"
reading = self._socket.recv(LUXTRONIK_SOCKET_READ_SIZE_INTEGER)
return struct.unpack(">i", reading)[0]
def _read_char(self):
"Low-level helper to receive a signed int"
reading = self._socket.recv(LUXTRONIK_SOCKET_READ_SIZE_CHAR)
return struct.unpack(">b", reading)[0]