From 0c2d126fe9464f3a6bd3a3399e50ecb627de720d Mon Sep 17 00:00:00 2001 From: JarbasAi Date: Thu, 7 Sep 2023 18:14:41 +0100 Subject: [PATCH] testing... --- hivemind_core/__main__.py | 2 +- hivemind_core/protocol.py | 29 ++++++++++++----------------- hivemind_core/service.py | 32 ++++++++++++++------------------ 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/hivemind_core/__main__.py b/hivemind_core/__main__.py index 622e5bd..13b2fc0 100644 --- a/hivemind_core/__main__.py +++ b/hivemind_core/__main__.py @@ -3,7 +3,7 @@ def main(): service = HiveMindService() - service.start() + service.run() if __name__ == "__main__": diff --git a/hivemind_core/protocol.py b/hivemind_core/protocol.py index 4d4d1bb..8fdcb9e 100644 --- a/hivemind_core/protocol.py +++ b/hivemind_core/protocol.py @@ -64,7 +64,7 @@ def peer(self) -> str: # this is how ovos refers to connected nodes in message.context return f"{self.name}:{self.ip}::{self.sess.session_id}" - def send(self, message: HiveMessage): + async def send(self, message: HiveMessage): # TODO some cleaning around HiveMessage if isinstance(message.payload, dict): _msg_type = message.payload.get("type") @@ -90,8 +90,7 @@ def send(self, message: HiveMessage): else: LOG.debug(f"sent unencrypted!") - self.loop.install() - self.socket.write_message(payload, is_bin) + await self.socket.write_message(payload, is_bin) def decode(self, payload: str) -> HiveMessage: if self.crypto_key: @@ -178,7 +177,7 @@ def handle_send(self, message: Message): "hive.client.send.error", {"error": "That client is not connected", "peer": peer})) - def handle_internal_mycroft(self, message: str): + async def handle_internal_mycroft(self, message: str): """ forward internal messages to clients if they are the target here is where the client isolation happens, clients only get responses to their own messages""" @@ -205,7 +204,7 @@ def handle_internal_mycroft(self, message: str): source_peer=peer, target_peers=target_peers, payload=message) - client.send(msg) + await client.send(msg) @dataclass() @@ -226,16 +225,12 @@ class HiveMindListenerProtocol: mycroft_bus_callback = None # slave asked to inject payload into mycroft bus shared_bus_callback = None # passive sharing of slave device bus (info) - def bind(self, websocket, bus=None): + def bind(self, websocket, bus): websocket.protocol = self - if bus is None: - bus = MessageBusClient() - bus.run_in_thread() - bus.connected_event.wait() self.internal_protocol = HiveMindListenerInternalProtocol(bus) self.internal_protocol.register_bus_handlers() - def handle_new_client(self, client: HiveMindClientConnection): + async def handle_new_client(self, client: HiveMindClientConnection): LOG.debug(f"new client: {client.peer}") self.clients[client.peer] = client message = Message("hive.client.connect", @@ -253,7 +248,7 @@ def handle_new_client(self, client: HiveMindClientConnection): "peer": client.peer, # this identifies the connected client in ovos message.context "node_id": self.peer}) LOG.debug(f"saying HELLO to: {client.peer}") - client.send(msg) + await client.send(msg) needs_handshake = not client.crypto_key and self.handshake_enabled @@ -269,7 +264,7 @@ def handle_new_client(self, client: HiveMindClientConnection): } msg = HiveMessage(HiveMessageType.HANDSHAKE, payload) LOG.debug(f"starting {client.peer} HANDSHAKE: {payload}") - client.send(msg) + await client.send(msg) # if client is in protocol V1 -> self.handle_handshake_message # clients can rotate their pubkey or session_key by sending a new handshake @@ -296,7 +291,7 @@ def handle_invalid_protocol_version(self, client: HiveMindClientConnection): {"source": client.peer}) self.internal_protocol.bus.emit(message) - def handle_message(self, message: HiveMessage, client: HiveMindClientConnection): + async def handle_message(self, message: HiveMessage, client: HiveMindClientConnection): """ message (HiveMessage): HiveMind message object @@ -309,7 +304,7 @@ def handle_message(self, message: HiveMessage, client: HiveMindClientConnection) message.update_hop_data() if message.msg_type == HiveMessageType.HANDSHAKE: - self.handle_handshake_message(message, client) + await self.handle_handshake_message(message, client) # mycroft Message handlers elif message.msg_type == HiveMessageType.BUS: @@ -340,7 +335,7 @@ def handle_unknown_message(self, message: HiveMessage, client: HiveMindClientCon def handle_binary_message(self, message: HiveMessage, client: HiveMindClientConnection): assert message.msg_type == HiveMessageType.BINARY - def handle_handshake_message(self, message: HiveMessage, + async def handle_handshake_message(self, message: HiveMessage, client: HiveMindClientConnection): LOG.debug("handshake received, generating session key") payload = message.payload @@ -386,7 +381,7 @@ def handle_handshake_message(self, message: HiveMessage, return msg = HiveMessage(HiveMessageType.HANDSHAKE, payload) - client.send(msg) # client can recreate crypto_key on his side now + await client.send(msg) # client can recreate crypto_key on his side now def handle_bus_message(self, message: HiveMessage, client: HiveMindClientConnection): diff --git a/hivemind_core/service.py b/hivemind_core/service.py index 4fa4f0e..3f82c4e 100644 --- a/hivemind_core/service.py +++ b/hivemind_core/service.py @@ -16,7 +16,7 @@ from ovos_bus_client.session import Session from ovos_utils.xdg_utils import xdg_data_home from poorman_handshake import HandShake, PasswordHandShake -from pyee import EventEmitter +from pyee import EventEmitter, ExecutorEventEmitter, AsyncIOEventEmitter from tornado import web, ioloop from tornado.platform.asyncio import AnyThreadEventLoopPolicy from tornado.websocket import WebSocketHandler @@ -95,10 +95,6 @@ def on_stopping(): class MessageBusEventHandler(WebSocketHandler): protocol: Optional[HiveMindListenerProtocol] = None - def __init__(self, application, request, **kwargs): - super().__init__(application, request, **kwargs) - self.emitter = EventEmitter() - @staticmethod def decode_auth(auth) -> Tuple[str, str]: userpass_encoded = bytes(auth, encoding="utf-8") @@ -106,15 +102,12 @@ def decode_auth(auth) -> Tuple[str, str]: name, key = userpass_decoded.split(":") return name, key - def on(self, event_name, handler): - self.emitter.on(event_name, handler) - - def on_message(self, message): + async def on_message(self, message): message = self.client.decode(message) LOG.info(f"received {self.client.peer} message: {message}") - self.protocol.handle_message(message, self.client) + await self.protocol.handle_message(message, self.client) - def open(self): + async def open(self): auth = self.request.uri.split("/?authorization=")[-1] name, key = self.decode_auth(auth) LOG.info(f"authorizing client: {name}") @@ -152,7 +145,7 @@ def open(self): self.close() return - self.protocol.handle_new_client(self.client) + await self.protocol.handle_new_client(self.client) # self.write_message(Message("connected").serialize()) def on_close(self): @@ -163,7 +156,7 @@ def check_origin(self, origin) -> bool: return True -class HiveMindService(Thread): +class HiveMindService: identity = NodeIdentity() def __init__(self, @@ -174,7 +167,6 @@ def __init__(self, stopping_hook: Callable = on_stopping, websocket_config: Optional[Dict[str, Any]] = None): - super().__init__() websocket_config = websocket_config or \ Configuration().get('hivemind_websocket', {}) callbacks = StatusCallbackMap(on_started=started_hook, @@ -183,10 +175,6 @@ def __init__(self, on_error=error_hook, on_stopping=stopping_hook) - self.bus = MessageBusClient(emitter=EventEmitter()) - self.bus.run_in_thread() - self.bus.connected_event.wait() - self.status = ProcessStatus('HiveMind', callback_map=callbacks) self.host = websocket_config.get('host') or "0.0.0.0" self.port = websocket_config.get('port') or 5678 @@ -200,11 +188,19 @@ def __init__(self, port=self.port, zeroconf=websocket_config.get('zeroconf', False)) + def connect_to_mycroft(self): + self.bus = MessageBusClient(emitter=AsyncIOEventEmitter()) + self.bus.run_in_thread() + self.bus.connected_event.wait() + def run(self): self.status.set_alive() asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) + loop = ioloop.IOLoop.current() + self.connect_to_mycroft() + self.protocol = HiveMindListenerProtocol(loop=loop) self.protocol.bind(MessageBusEventHandler, self.bus) self.status.bind(self.bus)