-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathPyViCareDeviceConfig.py
118 lines (91 loc) · 4.25 KB
/
PyViCareDeviceConfig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import json
import logging
import re
from PyViCare.PyViCareFuelCell import FuelCell
from PyViCare.PyViCareGazBoiler import GazBoiler
from PyViCare.PyViCareHeatingDevice import HeatingDevice
from PyViCare.PyViCareHeatPump import HeatPump
from PyViCare.PyViCareHybrid import Hybrid
from PyViCare.PyViCareOilBoiler import OilBoiler
from PyViCare.PyViCarePelletsBoiler import PelletsBoiler
from PyViCare.PyViCareRadiatorActuator import RadiatorActuator
from PyViCare.PyViCareRoomSensor import RoomSensor
from PyViCare.PyViCareElectricalEnergySystem import ElectricalEnergySystem
from PyViCare.PyViCareGateway import Gateway
from PyViCare.PyViCareVentilationDevice import VentilationDevice
logger = logging.getLogger('ViCare')
logger.addHandler(logging.NullHandler())
class PyViCareDeviceConfig:
def __init__(self, service, device_id, device_model, status):
self.service = service
self.device_id = device_id
self.device_model = device_model
self.status = status
def asGeneric(self):
return HeatingDevice(self.service)
def asGazBoiler(self):
return GazBoiler(self.service)
def asFuelCell(self):
return FuelCell(self.service)
def asHeatPump(self):
return HeatPump(self.service)
def asOilBoiler(self):
return OilBoiler(self.service)
def asPelletsBoiler(self):
return PelletsBoiler(self.service)
def asHybridDevice(self):
return Hybrid(self.service)
def asRadiatorActuator(self):
return RadiatorActuator(self.service)
def asRoomSensor(self):
return RoomSensor(self.service)
def asElectricalEnergySystem(self):
return ElectricalEnergySystem(self.service)
def asGateway(self):
return Gateway(self.service)
def asVentilation(self):
return VentilationDevice(self.service)
def getConfig(self):
return self.service.accessor
def getId(self):
return self.device_id
def getModel(self):
return self.device_model
def isOnline(self):
return self.status == "Online"
# see: https://vitodata300.viessmann.com/vd300/ApplicationHelp/VD300/1031_de_DE/Ger%C3%A4teliste.html
def asAutoDetectDevice(self):
device_types = [
(self.asFuelCell, r"Vitovalor|Vitocharge|Vitoblo", []),
(self.asGazBoiler, r"Vitodens|VScotH|Vitocrossal|VDensH|Vitopend|VPendH|OT_Heating_System", ["type:boiler"]),
(self.asHeatPump, r"Vitocal|VBC70|V200WO1A|CU401B", ["type:heatpump"]),
(self.asOilBoiler, r"Vitoladens|Vitoradial|Vitorondens|VPlusH|V200KW2_6", []),
(self.asPelletsBoiler, r"Vitoligno|Ecotronic|VBC550P", []),
(self.asRadiatorActuator, r"E3_RadiatorActuator", ["type:radiator"]),
(self.asRoomSensor, r"E3_RoomSensor", ["type:climateSensor"]),
(self.asElectricalEnergySystem, r"E3_HEMS", ["type:hems"]),
(self.asElectricalEnergySystem, r"E3_TCU10_x07", ["type:tcu"]),
(self.asElectricalEnergySystem, r"E3_EEBus", ["type:eebus"]),
(self.asElectricalEnergySystem, r"E3_VitoCharge_03", ["type:energy_storage"]),
(self.asVentilation, r"E3_ViAir", ["type:ventilation"]),
(self.asGateway, r"Heatbox1", ["type:gateway;VitoconnectOpto1"])
]
for (creator_method, type_name, roles) in device_types:
if re.search(type_name, self.device_model) or self.service.hasRoles(roles):
logger.info(f"detected {self.device_model} {creator_method.__name__}")
return creator_method()
logger.info(
f"Could not auto detect {self.device_model}. Use generic device.")
return self.asGeneric()
def get_raw_json(self):
return self.service.fetch_all_features()
def dump_secure(self, flat=False):
if flat:
inner = ',\n'.join([json.dumps(x, sort_keys=True) for x in self.get_raw_json()['data']])
outer = json.dumps({'data': ['placeholder']}, indent=0)
dumpJSON = outer.replace('"placeholder"', inner)
else:
dumpJSON = json.dumps(self.get_raw_json(), indent=4, sort_keys=True)
def repl(m):
return m.group(1) + ('#' * len(m.group(2))) + m.group(3)
return re.sub(r'(["\/])(\d{6,})(["\/])', repl, dumpJSON)