Skip to content

Commit

Permalink
feat: init parser (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Aug 25, 2022
1 parent 5f5b6eb commit a4b521a
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 738 deletions.
140 changes: 47 additions & 93 deletions src/thermobeacon_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,13 @@ class ThermoBeaconDevice:


DEVICE_TYPES = {
0x01: ThermoBeaconDevice("CGG1", ""),
0x04: ThermoBeaconDevice("CGH1", "Door/Window Sensor"), # Door/Window Sensor
0x07: ThermoBeaconDevice("CGG1", ""),
0x09: ThermoBeaconDevice("CGP1W", ""),
0x16: ThermoBeaconDevice("CGG1", "ThermoBeacon Temp RH M"),
0x12: ThermoBeaconDevice("CGPR1", "Motion & Light"),
0x1E: ThermoBeaconDevice("CGC1", "BT Clock Lite"),
0x0C: ThermoBeaconDevice("CGD1", "Alarm Clock"),
0x0E: ThermoBeaconDevice("CGDN1", "Air Monitor Lite"),
0x0F: ThermoBeaconDevice("CGM1", "Lee Guitars Thermo-Hygrometer"),
0x10: ThermoBeaconDevice("16", "Lanyard/mini hygrometer"),
0x11: ThermoBeaconDevice("17", "Smart hygrometer"),
0x15: ThermoBeaconDevice("21", "Smart hygrometer"),
}
MFR_IDS = set(DEVICE_TYPES)


SERVICE_DATA_UUID = "0000fdcd-0000-1000-8000-00805f9b34fb"
SERVICE_UUID = "0000fff0-0000-1000-8000-00805f9b34fb"


class ThermoBeaconBluetoothDeviceData(BluetoothData):
Expand All @@ -49,22 +42,25 @@ class ThermoBeaconBluetoothDeviceData(BluetoothData):
def _start_update(self, service_info: BluetoothServiceInfo) -> None:
"""Update from BLE advertisement data."""
_LOGGER.debug("Parsing thermobeacon BLE advertisement data: %s", service_info)
lower_name = service_info.name.lower()
if SERVICE_DATA_UUID not in service_info.service_data:
if SERVICE_UUID not in service_info.service_uuids:
return
unpadded_data = service_info.service_data[SERVICE_DATA_UUID]
data = b"\x00\x00\x00\x00" + unpadded_data
device_id = data[5]
if not (device := DEVICE_TYPES.get(device_id)):
_LOGGER.debug("Device type %s is not supported", device_id)
if not MFR_IDS.intersection(service_info.manufacturer_data):
return
if device.name:
name = device.name
elif lower_name.startswith("thermobeacon "):
name = service_info.name[9:]
else:
name = service_info.name
self.set_device_type(device.model)
changed_manufacturer_data = self.changed_manufacturer_data(service_info)
if changed_manufacturer_data is None:
return
last_id = list(changed_manufacturer_data)[-1]
data = (
int(last_id).to_bytes(2, byteorder="little")
+ changed_manufacturer_data[last_id]
)
msg_length = len(data)
if msg_length not in (20, 22):
return
device_id = data[0]
device_type = DEVICE_TYPES[device_id]
name = device_type.name
self.set_device_type(device_id)
self.set_title(f"{name} {short_address(service_info.address)}")
self.set_device_name(f"{name} {short_address(service_info.address)}")
self.set_device_manufacturer("ThermoBeacon")
Expand All @@ -73,72 +69,30 @@ def _start_update(self, service_info: BluetoothServiceInfo) -> None:
def _process_update(self, data: bytes) -> None:
"""Update from BLE advertisement data."""
_LOGGER.debug("Parsing ThermoBeacon BLE advertisement data: %s", data)
msg_length = len(data)
if msg_length < 12:
if len(data) != 22:
return
xdata_point = 14
while xdata_point < msg_length:
xdata_id = data[xdata_point - 2]
xdata_size = data[xdata_point - 1]
if xdata_point + xdata_size <= msg_length:
self._process_xdata(
xdata_id,
xdata_size,
data[xdata_point : xdata_point + xdata_size], # noqa: E203
)
xdata_point += xdata_size + 2

def _process_xdata(self, xdata_id: int, xdata_size: int, xdata: bytes) -> None:
if xdata_id == 0x01 and xdata_size == 4:
(temp, humi) = unpack("<hH", xdata)
self.update_predefined_sensor(SensorLibrary.TEMPERATURE__CELSIUS, temp / 10)
self.update_predefined_sensor(SensorLibrary.HUMIDITY__PERCENTAGE, humi / 10)
elif xdata_id == 0x02 and xdata_size == 1:
batt = unpack("B", xdata)[0]
self.update_predefined_sensor(SensorLibrary.BATTERY__PERCENTAGE, batt)
elif xdata_id == 0x07 and xdata_size == 2:
pressure = unpack("<H", xdata)[0]
self.update_predefined_sensor(SensorLibrary.PRESSURE__MBAR, pressure / 10)
elif xdata_id == 0x08 and xdata_size == 4:
(motion, illuminance_1, illuminance_2) = unpack("<BHB", xdata)
self.update_predefined_binary_sensor(
BinarySensorDeviceClass.MOTION, bool(motion)
)
self.update_predefined_sensor(
SensorLibrary.LIGHT__LIGHT_LUX, illuminance_1 + illuminance_2
)
elif xdata_id == 0x04 and xdata_size == 1:
closed = unpack("B", xdata)[0]
self.update_predefined_binary_sensor(
BinarySensorDeviceClass.DOOR, not bool(closed)
)
elif xdata_id == 0x09 and xdata_size == 4:
illuminance = unpack("<I", xdata)[0]
self.update_predefined_sensor(SensorLibrary.LIGHT__LIGHT_LUX, illuminance)
elif xdata_id == 0x11 and xdata_size == 1:
light = unpack("B", xdata)[0]
self.update_predefined_binary_sensor(
BinarySensorDeviceClass.LIGHT, bool(light)
)
elif xdata_id == 0x12 and xdata_size == 4:
(pm2_5, pm10) = unpack("<HH", xdata)
self.update_predefined_sensor(
SensorLibrary.PM25__CONCENTRATION_MICROGRAMS_PER_CUBIC_METER, pm2_5
)
self.update_predefined_sensor(
SensorLibrary.PM10__CONCENTRATION_MICROGRAMS_PER_CUBIC_METER, pm10
)
elif xdata_id == 0x13 and xdata_size == 2:
co2 = unpack("<H", xdata)[0] # noqa: F841
self.update_predefined_sensor(
SensorLibrary.CO2__CONCENTRATION_PARTS_PER_MILLION, co2
)
elif xdata_id == 0x0F and xdata_size == 1:
pass
# packet_id = unpack("B", xdata)[0] # noqa: F841
# result.update({"packet": packet_id})

button_pushed = data[3] & 0x80
xvalue = data[12:18]
(volt, temp, humi) = unpack("<HhH", xvalue)

if volt >= 3000:
batt = 100
elif volt >= 2600:
batt = 60 + (volt - 2600) * 0.1
elif volt >= 2500:
batt = 40 + (volt - 2500) * 0.2
elif volt >= 2450:
batt = 20 + (volt - 2450) * 0.4
else:
_LOGGER.debug(
"Unknown data received from ThermoBeacon device: %s",
xdata.hex(),
)
batt = 0

self.update_predefined_sensor(SensorLibrary.BATTERY__PERCENTAGE, batt)
self.update_predefined_sensor(SensorLibrary.TEMPERATURE__CELSIUS, temp / 16)
self.update_predefined_sensor(SensorLibrary.HUMIDITY__PERCENTAGE, humi / 16)
self.update_predefined_sensor(
SensorLibrary.VOLTAGE__ELECTRIC_POTENTIAL_VOLT, volt / 1000
)
self.update_predefined_binary_sensor(
BinarySensorDeviceClass.OCCUPANCY, bool(button_pushed)
)
Loading

0 comments on commit a4b521a

Please sign in to comment.