-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathhpm.py
103 lines (86 loc) · 4.08 KB
/
hpm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import dataclasses
from typing import Dict, Any
from ovos_bus_client import MessageBusClient
from ovos_bus_client.message import Message
from ovos_config import Configuration
from ovos_utils.fakebus import FakeBus
from ovos_utils.log import LOG
from pyee import EventEmitter
from hivemind_bus_client.message import HiveMessage, HiveMessageType
from hivemind_core.protocol import AgentProtocol
@dataclasses.dataclass()
class OVOSProtocol(AgentProtocol):
bus: MessageBusClient = dataclasses.field(default_factory=FakeBus)
config: Dict[str, Any] = dataclasses.field(default_factory=lambda: Configuration().get("websocket", {}))
def __post_init__(self):
if not self.bus or isinstance(self.bus, FakeBus):
ovos_bus_address = self.config.get("host") or "127.0.0.1"
ovos_bus_port = self.config.get("port") or 8181
self.bus = MessageBusClient(
host=ovos_bus_address,
port=ovos_bus_port,
emitter=EventEmitter(),
)
self.bus.run_in_thread()
self.bus.connected_event.wait()
self.register_bus_handlers()
def register_bus_handlers(self):
LOG.debug("registering internal OVOS bus handlers")
self.bus.on("hive.send.downstream", self.handle_send)
self.bus.on("message", self.handle_internal_mycroft) # catch all
# mycroft handlers - from master -> slave
def handle_send(self, message: Message):
"""ovos wants to send a HiveMessage
a device can be both a master and a slave, downstream messages are handled here
HiveMindSlaveInternalProtocol will handle requests meant to go upstream
"""
payload = message.data.get("payload")
peer = message.data.get("peer")
msg_type = message.data["msg_type"]
hmessage = HiveMessage(msg_type, payload=payload, target_peers=[peer])
if msg_type in [HiveMessageType.PROPAGATE, HiveMessageType.BROADCAST]:
# this message is meant to be sent to all slave nodes
for peer in self.clients:
self.clients[peer].send(hmessage)
elif msg_type == HiveMessageType.ESCALATE:
# only slaves can escalate, ignore silently
# if this device is also a slave to something,
# HiveMindSlaveInternalProtocol will handle the request
pass
# NOT a protocol specific message, send directly to requested peer
# ovos component is asking explicitly to send a message to a peer
elif peer:
if peer in self.clients:
# send message to client
client = self.clients[peer]
client.send(hmessage)
else:
LOG.error("That client is not connected")
self.bus.emit(
message.forward(
"hive.client.send.error",
{"error": "That client is not connected", "peer": peer},
)
)
def handle_internal_mycroft(self, message: str):
"""forward internal messages to clients if they are the target
here is where the client isolation happens,
clients only get responses to their own messages"""
# "message" event is a special case in ovos-bus-client that is not deserialized
message = Message.deserialize(message)
target_peers = message.context.get("destination") or []
if not isinstance(target_peers, list):
target_peers = [target_peers]
if target_peers:
for peer, client in self.clients.items():
if peer in target_peers:
# forward internal messages to clients if they are the target
LOG.debug(f"{message.msg_type} - destination: {peer}")
message.context["source"] = "hive"
msg = HiveMessage(
HiveMessageType.BUS,
source_peer=peer,
target_peers=target_peers,
payload=message,
)
client.send(msg)