From 6e0c9b700c46a9a6801f4c4e99644c9000fbd6ec Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 11:04:09 +0200 Subject: [PATCH 01/10] fix(STT): whisper call was blocking entire huri's loop --- src/modules/speech_to_text/speech_to_text.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/modules/speech_to_text/speech_to_text.py b/src/modules/speech_to_text/speech_to_text.py index 1300dd3..fdc68f0 100644 --- a/src/modules/speech_to_text/speech_to_text.py +++ b/src/modules/speech_to_text/speech_to_text.py @@ -41,7 +41,7 @@ def __init__( ): super().__init__() - self.model_faster = WhisperModel(model) + self.model_faster = WhisperModel(model, cpu_threads=2) self.language = language self.sample_rate = sample_rate @@ -83,13 +83,15 @@ async def process(self, voice: Voice) -> Optional[Transcript]: self.pending_silence = False processing_audio = np.concatenate(processing_chunks, axis=0) - segments, _ = self.model_faster.transcribe( - processing_audio, - language=self.language, - beam_size=1, # faster for realtime - ) + def transcribe_text(): + segments, _ = self.model_faster.transcribe( + processing_audio, + language=self.language, + beam_size=1, + ) + return " ".join(seg.text for seg in segments).strip() - current_text = " ".join([seg.text for seg in segments]).strip() + current_text = await asyncio.to_thread(transcribe_text) processed_size = self.window_size - self.step_size async with self.lock: From d72b1c369b3ff8729d701435edfd789e77f36255 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 17:19:19 +0200 Subject: [PATCH 02/10] feat(client): added ClientHook abstract class --- src/core/client.py | 119 ++++++++++++++++++++++++++++++++----- src/core/client_senders.py | 102 ------------------------------- 2 files changed, 105 insertions(+), 116 deletions(-) delete mode 100644 src/core/client_senders.py diff --git a/src/core/client.py b/src/core/client.py index 085a0b8..4170e70 100644 --- a/src/core/client.py +++ b/src/core/client.py @@ -1,14 +1,79 @@ import asyncio +import importlib import json import os +import struct +from collections import defaultdict from dataclasses import asdict -from typing import Dict, List, Optional, Type +from typing import Any, Dict, List, Optional, Type import websockets from src.core.dataclasses.config import ClientConfig +from src.core.events import EventData -from .client_senders import ClientSender, get_senders +from .interface import Interface + + +class ClientSender: + """This class abstract sending data to HuRI. + + output_type: is the event data structure that the ClientSender will send. + It can be EventData or bytes, and must match event topic it send. + + Class derived from ClientSender must implement input_loop, + and use ClientSender.send to send data to HuRI. + """ + + output_type: Type[EventData] | bytes + + def __init__(self, topic: str, **_): + self.topic = topic + if issubclass(self.output_type, EventData): + self.send_function = self._send_event_data + elif issubclass(self.output_type, bytes): + self.send_function = self._send_bytes + else: + raise RuntimeError(f"{self.output_type} should be inherited from \ +EventData or bytes") + + async def input_loop(self, ws: websockets.ClientConnection): + raise NotImplementedError + + async def _send_bytes(self, ws: websockets.ClientConnection, data: bytes): + topic_bytes = self.topic.encode() + packet = struct.pack("!H", len(topic_bytes)) + topic_bytes + data + + await ws.send(packet) + + async def _send_event_data(self, ws: websockets.ClientConnection, data: EventData): + packet = json.dumps({"topic": self.topic, "data": asdict(data)}) + + await ws.send(packet) + + async def send(self, ws: websockets.ClientConnection, data: EventData | bytes): + await self.send_function(ws, data) + + +class ClientHook: + """This class abstract processing data from HuRI. + + input_type: is the event data structure that the ClientHook will process. + It can be EventData or bytes, and must match event topic it react to. + + Class derived from ClientHook must implement hook. + + `singletton` allow hooks to modifies shared ressources, + and comes from the used interface. + """ + + input_type: Type[EventData] | bytes + + def __init__(self, **_): + pass + + async def hook(self, singletton: Any, data: EventData | bytes): + raise NotImplementedError class Client: @@ -18,11 +83,29 @@ def __init__( self, config: ClientConfig, user_id_file: str = os.path.expanduser("~/.huri_user_id"), - senders_dict: Dict[str, Type[ClientSender]] = get_senders(), ): self.config = config + + module_path, object_name = self.config.interface_path.split(":", 1) + + module = importlib.import_module(module_path) + interface: Interface = getattr(module, object_name) + + self.singletton = interface.singletton + + available_senders = interface.get_senders() + self.senders: List[ClientSender] = [ + available_senders[sender.name](topic=sender.topic, **sender.args) + for sender in self.config.senders.values() + ] + + available_hooks = interface.get_hooks() + self.hooks: Dict[str, List[ClientHook]] = defaultdict(list) + for hook in self.config.hooks.values(): + for topic in hook.topics: + self.hooks[topic].append(available_hooks[hook.name](**hook.args)) + self.user_id_file = user_id_file - self.senders_dict = senders_dict def _load_user_id(self) -> Optional[str]: if os.path.exists(self.user_id_file): @@ -37,9 +120,22 @@ def _save_user_id(self, _user_id: str): async def _receive_loop(self, ws: websockets.ClientConnection): try: while True: - text = await ws.recv() - print("<<", text) - await asyncio.sleep(0.1) + msg = await ws.recv() + + if isinstance(msg, bytes): + topic_len = struct.unpack("!H", msg[:2])[0] + + topic = msg[2 : 2 + topic_len].decode() + data = msg[2 + topic_len :] + else: + event = json.loads(msg) + topic = event["topic"] + data = event["data"] + + for hook in self.hooks[topic]: + if not issubclass(hook.input_type, bytes): + data = hook.input_type(**data) + asyncio.create_task(hook.hook(self.singletton, data)) except (asyncio.CancelledError, websockets.ConnectionClosedOK): pass @@ -50,11 +146,6 @@ async def run(self): self.config.user_id = self._load_user_id() - senders: List[ClientSender] = [ - self.senders_dict[config.name](ws=ws, **config.args) - for config in self.config.senders.values() - ] - await ws.send(json.dumps(asdict(self.config))) init_msg = json.loads(await ws.recv()) @@ -63,9 +154,9 @@ async def run(self): self._save_user_id(user_id) print(f"Session started with _user_id: {user_id}") - receive_task = asyncio.create_task(self._receive_loop(ws)) + receive_task = asyncio.create_task(self._receive_loop(ws=ws)) await asyncio.gather( - *(sender.input_loop() for sender in senders), + *(sender.input_loop(ws=ws) for sender in self.senders), ) receive_task.cancel() diff --git a/src/core/client_senders.py b/src/core/client_senders.py deleted file mode 100644 index 03301a6..0000000 --- a/src/core/client_senders.py +++ /dev/null @@ -1,102 +0,0 @@ -import asyncio -import json -import struct -from dataclasses import asdict -from typing import Dict, Type - -import numpy as np -import sounddevice as sd -import websockets -from prompt_toolkit import PromptSession -from prompt_toolkit.patch_stdout import patch_stdout - -from src.core.events import EventData -from src.modules.speech_to_text.events import Sentence - - -class ClientSender: - """This class abstract sending data to HuRI. - - output_type: is the topic that the ClientSender will send. - Data structure must match event topic. - - Class derived from ClientSender must implement input_loop, - and use ClientSender.send to send data to HuRI. It can be EventData or bytes - """ - - output_type: str - - def __init__(self, ws: websockets.ClientConnection): - self.ws = ws - - async def input_loop(self): - raise NotImplementedError - - async def send(self, topic: str, data: EventData | bytes): - packet: str | bytes - if isinstance(data, EventData): - packet = json.dumps({"topic": topic, "data": asdict(data)}) - else: - topic_bytes = topic.encode() - - packet = struct.pack("!H", len(topic_bytes)) + topic_bytes + data - - await self.ws.send(packet) - - -class AudioSender(ClientSender): - output_type = "audio" - - def __init__( - self, sample_rate: int = 16000, frame_duration: float = 0.030, **kwargs - ): - super().__init__(**kwargs) - - self.sample_rate = sample_rate - self.frame_size = int(sample_rate * frame_duration) - - async def input_loop(self): - loop = asyncio.get_running_loop() - - queue: asyncio.Queue[np.ndarray] = asyncio.Queue() - - def callback(indata: np.ndarray, frames, time, status): - loop.call_soon_threadsafe(queue.put_nowait, indata.copy()) - - with sd.InputStream( - samplerate=self.sample_rate, - channels=1, - dtype="int16", - callback=callback, - blocksize=self.frame_size, - ): - while True: - chunk = await queue.get() - await self.send(self.output_type, chunk.tobytes()) - - -class TextSender(ClientSender): - output_type = "question" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - - async def input_loop(self): - print("'\\exit' or CTRL+D/C to exit.") - session: PromptSession = PromptSession() - try: - while True: - with patch_stdout(): - text = await session.prompt_async(">> ") - if text == "\\exit": - return - await self.send(self.output_type, Sentence(text)) - - except (EOFError, KeyboardInterrupt): - pass - finally: - print("TextSender Exited...") - - -def get_senders() -> Dict[str, Type[ClientSender]]: - return {"audio": AudioSender, "text": TextSender} From eafd64d51503f6100bbfb8f5368c1aa354d12c15 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 18:07:57 +0200 Subject: [PATCH 03/10] evol(client): better typing for event --- src/core/client.py | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/src/core/client.py b/src/core/client.py index 4170e70..cae9dfa 100644 --- a/src/core/client.py +++ b/src/core/client.py @@ -5,17 +5,17 @@ import struct from collections import defaultdict from dataclasses import asdict -from typing import Any, Dict, List, Optional, Type +from typing import Any, Dict, Generic, List, Optional, Type, TypeVar import websockets from src.core.dataclasses.config import ClientConfig from src.core.events import EventData -from .interface import Interface +T = TypeVar("T", bound=EventData | bytes) -class ClientSender: +class ClientSender(Generic[T]): """This class abstract sending data to HuRI. output_type: is the event data structure that the ClientSender will send. @@ -25,17 +25,10 @@ class ClientSender: and use ClientSender.send to send data to HuRI. """ - output_type: Type[EventData] | bytes + output_type: Type[T] def __init__(self, topic: str, **_): self.topic = topic - if issubclass(self.output_type, EventData): - self.send_function = self._send_event_data - elif issubclass(self.output_type, bytes): - self.send_function = self._send_bytes - else: - raise RuntimeError(f"{self.output_type} should be inherited from \ -EventData or bytes") async def input_loop(self, ws: websockets.ClientConnection): raise NotImplementedError @@ -51,11 +44,14 @@ async def _send_event_data(self, ws: websockets.ClientConnection, data: EventDat await ws.send(packet) - async def send(self, ws: websockets.ClientConnection, data: EventData | bytes): - await self.send_function(ws, data) + async def send(self, ws: websockets.ClientConnection, data: T): + if isinstance(data, bytes): + await self._send_bytes(ws, data) + else: + await self._send_event_data(ws, data) -class ClientHook: +class ClientHook(Generic[T]): """This class abstract processing data from HuRI. input_type: is the event data structure that the ClientHook will process. @@ -67,12 +63,12 @@ class ClientHook: and comes from the used interface. """ - input_type: Type[EventData] | bytes + input_type: Type[T] def __init__(self, **_): pass - async def hook(self, singletton: Any, data: EventData | bytes): + async def hook(self, singletton: Any, data: T): raise NotImplementedError @@ -89,7 +85,7 @@ def __init__( module_path, object_name = self.config.interface_path.split(":", 1) module = importlib.import_module(module_path) - interface: Interface = getattr(module, object_name) + interface = getattr(module, object_name) self.singletton = interface.singletton @@ -133,7 +129,7 @@ async def _receive_loop(self, ws: websockets.ClientConnection): data = event["data"] for hook in self.hooks[topic]: - if not issubclass(hook.input_type, bytes): + if not isinstance(data, bytes): data = hook.input_type(**data) asyncio.create_task(hook.hook(self.singletton, data)) From 32f990faeb4b328e41b4d29462d6164de605aaff Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 18:09:18 +0200 Subject: [PATCH 04/10] evol(Sender): send topic and data --- src/modules/utils/sender.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/modules/utils/sender.py b/src/modules/utils/sender.py index f09b0ba..a9fc2fa 100644 --- a/src/modules/utils/sender.py +++ b/src/modules/utils/sender.py @@ -1,3 +1,4 @@ +import struct from dataclasses import asdict from fastapi import WebSocket @@ -23,8 +24,8 @@ def __init__(self, ws: WebSocket, type: str): async def process(self, data: EventData | bytes): if isinstance(data, bytes): - await self.ws.send_bytes(data) - elif isinstance(data, EventData): - await self.ws.send_json(asdict(data)) + topic_bytes = self.input_type.encode() + packet = struct.pack("!H", len(topic_bytes)) + topic_bytes + data + await self.ws.send_bytes(packet) else: - await self.ws.send_text(data) + await self.ws.send_json({"topic": self.input_type, "data": asdict(data)}) From d078a628459d747acd0a84dd9adbc8b9364de500 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 18:10:09 +0200 Subject: [PATCH 05/10] feat(Interface): abstract class to define specific Client sender and hooks --- src/core/interface.py | 22 ++++++++++++++++++++++ src/interfaces/__init__.py | 0 2 files changed, 22 insertions(+) create mode 100644 src/core/interface.py create mode 100644 src/interfaces/__init__.py diff --git a/src/core/interface.py b/src/core/interface.py new file mode 100644 index 0000000..fb2b7ac --- /dev/null +++ b/src/core/interface.py @@ -0,0 +1,22 @@ +from typing import Any, Dict, Type + +from .client import ClientHook, ClientSender + + +class Interface: + """This class abstract defining specific Client senders and hooks. + + `self.singletton`: allow hooks to modifies shared ressources, + and comes from the used interface. + + Class derived from Interface must implement get_senders and get_hooks. + """ + + def __init__(self, singletton: Any): + self.singletton = singletton + + def get_senders(self) -> Dict[str, Type[ClientSender]]: + raise NotImplementedError + + def get_hooks(self) -> Dict[str, Type[ClientHook]]: + raise NotImplementedError diff --git a/src/interfaces/__init__.py b/src/interfaces/__init__.py new file mode 100644 index 0000000..e69de29 From 349372d32ff18cd59e8b544b0ddf1cbf7b5495a7 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 18:11:17 +0200 Subject: [PATCH 06/10] feat(config): ClientHookConfig + Interface path + modified topic_list --- src/core/dataclasses/config.py | 31 +++++++++++++++++++++++++++---- src/core/huri.py | 11 +++++++---- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/core/dataclasses/config.py b/src/core/dataclasses/config.py index aea111f..f515026 100644 --- a/src/core/dataclasses/config.py +++ b/src/core/dataclasses/config.py @@ -15,15 +15,32 @@ def from_dict(self, raw: dict) -> "ModuleConfig": ) +@dataclass +class ClientHookConfig: + name: str + topics: List[str] + args: Mapping[str, Any] + + @classmethod + def from_dict(self, raw: dict) -> "ClientHookConfig": + return self( + name=raw["name"], + topics=raw["topics"], + args=raw.get("args", {}), + ) + + @dataclass class ClientSenderConfig: name: str + topic: str args: Mapping[str, Any] @classmethod def from_dict(self, raw: dict) -> "ClientSenderConfig": return self( name=raw["name"], + topic=raw["topic"], args=raw.get("args", {}), ) @@ -32,15 +49,20 @@ def from_dict(self, raw: dict) -> "ClientSenderConfig": class ClientConfig: user_id: Optional[str] huri_url: str - topic_list: List[str] + interface_path: str + hooks: Dict[str, ClientHookConfig] senders: Dict[str, ClientSenderConfig] modules: Dict[str, ModuleConfig] @classmethod def from_dict(cls, raw: Dict) -> "ClientConfig": + hooks = { + hook_id: ClientHookConfig.from_dict(hok_raw) + for hook_id, hok_raw in raw.get("hooks", {}).items() + } senders = { - sender_id: ClientSenderConfig.from_dict(mod_raw) - for sender_id, mod_raw in raw.get("senders", {}).items() + sender_id: ClientSenderConfig.from_dict(snd_raw) + for sender_id, snd_raw in raw.get("senders", {}).items() } modules = { module_id: ModuleConfig.from_dict(mod_raw) @@ -49,7 +71,8 @@ def from_dict(cls, raw: Dict) -> "ClientConfig": return cls( user_id=None, huri_url=raw["huri_url"], - topic_list=raw["topic_list"], + interface_path=raw["interface_path"], + hooks=hooks, senders=senders, modules=modules, ) diff --git a/src/core/huri.py b/src/core/huri.py index 5fa8038..f5e4eeb 100644 --- a/src/core/huri.py +++ b/src/core/huri.py @@ -32,7 +32,7 @@ def __init__( self, modules: Dict[str, Type[Module]], handles: Dict[str, handle.DeploymentHandle], - events: Dict[str, Type[EventData]], + events: Dict[str, Type[EventData | bytes]], ) -> None: self.module_factory = ModuleFactory(handles) self.event_factory = EventDataFactory() @@ -80,9 +80,12 @@ async def run_session(self, ws: WebSocket): user_id = client_config_raw.get("user_id") or str(uuid.uuid4()) - senders: List[Module] = [ - Sender(ws, topic) for topic in client_config.topic_list + topic_list = [ + topic + for hook_config in client_config.hooks.values() + for topic in hook_config.topics ] + senders: List[Module] = [Sender(ws, topic) for topic in topic_list] modules: List[Module] = ( self.module_factory.create_from_config(user_id, client_config.modules) + senders @@ -112,7 +115,7 @@ async def receive_loop(session: Session, ws: WebSocket): msg_text = msg["text"] event = json.loads(msg_text) topic = event["topic"] - data = event["data"] + data = event["data"] # TODO client/server one function data = self.event_factory.create(topic, data) From f1d112d9ff4b50a39ce213627ad3b8a64ed11d00 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 18:11:46 +0200 Subject: [PATCH 07/10] feat(interface): added cli_interface for cli use --- src/interfaces/cli_interface.py | 123 +++++++++++++++++++ src/modules/speech_to_text/speech_to_text.py | 2 +- 2 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 src/interfaces/cli_interface.py diff --git a/src/interfaces/cli_interface.py b/src/interfaces/cli_interface.py new file mode 100644 index 0000000..c07469f --- /dev/null +++ b/src/interfaces/cli_interface.py @@ -0,0 +1,123 @@ +import asyncio +from typing import Dict, Type + +import numpy as np +import sounddevice as sd +from prompt_toolkit import PromptSession +from prompt_toolkit.patch_stdout import patch_stdout +from scipy.signal import resample + +from src.core.client import ClientHook, ClientSender +from src.core.interface import Interface +from src.modules.speech_to_text.events import Sentence + + +class AudioSender(ClientSender[bytes]): + def __init__( + self, sample_rate: int = 16000, frame_duration: float = 0.030, **kwargs + ): + super().__init__(**kwargs) + + self.sample_rate = sample_rate + self.frame_size = int(sample_rate * frame_duration) + + async def input_loop(self, ws): + loop = asyncio.get_running_loop() + + queue: asyncio.Queue[np.ndarray] = asyncio.Queue() + + def callback(indata: np.ndarray, frames, time, status): + loop.call_soon_threadsafe(queue.put_nowait, indata.copy()) + + with sd.InputStream( + samplerate=self.sample_rate, + channels=1, + dtype="int16", + callback=callback, + blocksize=self.frame_size, + ): + while True: + chunk = await queue.get() + await self.send(ws, chunk.tobytes()) + + +class TextSender(ClientSender[Sentence]): + output_type = Sentence + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + async def input_loop(self, ws): + print("'\\exit' or CTRL+D/C to exit.") + session: PromptSession = PromptSession() + try: + while True: + with patch_stdout(): + text = await session.prompt_async(">> ") + if text == "\\exit": + return + await self.send(ws, Sentence(text)) + + except (EOFError, KeyboardInterrupt): + pass + finally: + print("TextSender Exited...") + + +class AudioHook(ClientHook[bytes]): + input_type = bytes + + def __init__(self, sample_rate=48000, incoming_sample_rate=16000, **kwargs): + super().__init__(**kwargs) + + print("Speaker:", sd.query_devices(kind="output")) + + self.incoming_sample_rate = incoming_sample_rate + self.sample_rate = sample_rate + self.stream = sd.OutputStream( + samplerate=sample_rate, + channels=1, + dtype="int16", + ) + self.stream.start() + + self.resample_function = ( + self._resample if sample_rate != incoming_sample_rate else lambda x: x + ) + + def _resample(self, audio: np.ndarray): + return resample( + audio, + int(len(audio) * self.sample_rate / self.incoming_sample_rate), + ).astype(np.int16) + + async def hook(self, singletton: None, data: bytes): + audio = np.frombuffer(data, dtype=np.int16) + + audio = self.resample_function(audio) + + self.stream.write(audio.reshape(-1, 1)) + + +class TextHook(ClientHook[Sentence]): + input_type = Sentence + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + async def hook(self, singletton: None, data: Sentence): + print("<<", data.text) + + +class CLIInterface(Interface): + def __init__(self): + super().__init__(singletton=None) + + def get_senders(self) -> Dict[str, Type[ClientSender]]: + return {"audio": AudioSender, "text": TextSender} + + def get_hooks(self) -> Dict[str, Type[ClientHook]]: + return {"audio": AudioHook, "text": TextHook} + + +cli_interface = CLIInterface() diff --git a/src/modules/speech_to_text/speech_to_text.py b/src/modules/speech_to_text/speech_to_text.py index fdc68f0..63bf060 100644 --- a/src/modules/speech_to_text/speech_to_text.py +++ b/src/modules/speech_to_text/speech_to_text.py @@ -41,7 +41,7 @@ def __init__( ): super().__init__() - self.model_faster = WhisperModel(model, cpu_threads=2) + self.model_faster = WhisperModel(model) self.language = language self.sample_rate = sample_rate From 88119f62ce11512c3ee01f6830aacc765ac42087 Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 18:32:07 +0200 Subject: [PATCH 08/10] evol(interface): cli TextHook is for RAGResult event type --- src/interfaces/cli_interface.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/interfaces/cli_interface.py b/src/interfaces/cli_interface.py index c07469f..03cacbf 100644 --- a/src/interfaces/cli_interface.py +++ b/src/interfaces/cli_interface.py @@ -9,6 +9,7 @@ from src.core.client import ClientHook, ClientSender from src.core.interface import Interface +from src.modules.rag.events import RAGResult from src.modules.speech_to_text.events import Sentence @@ -99,14 +100,14 @@ async def hook(self, singletton: None, data: bytes): self.stream.write(audio.reshape(-1, 1)) -class TextHook(ClientHook[Sentence]): - input_type = Sentence +class TextHook(ClientHook[RAGResult]): + input_type = RAGResult def __init__(self, **kwargs): super().__init__(**kwargs) - async def hook(self, singletton: None, data: Sentence): - print("<<", data.text) + async def hook(self, singletton: None, data: RAGResult): + print("<<", data.answer) class CLIInterface(Interface): From ac292b680b5d897d776b7c80e5975a5f30986f0a Mon Sep 17 00:00:00 2001 From: Popochounet Date: Mon, 1 Jun 2026 18:32:31 +0200 Subject: [PATCH 09/10] evol(config): yaml files with new config --- config/{client_aux2.yaml => client_aux .yaml} | 22 +++++++++++++-- config/client_aux.yaml | 28 ------------------- config/client_auxio.yaml | 25 ----------------- config/client_template.yaml | 21 ++++++++++++-- config/client_text.yaml | 8 +++++- 5 files changed, 44 insertions(+), 60 deletions(-) rename config/{client_aux2.yaml => client_aux .yaml} (55%) delete mode 100644 config/client_aux.yaml delete mode 100644 config/client_auxio.yaml diff --git a/config/client_aux2.yaml b/config/client_aux .yaml similarity index 55% rename from config/client_aux2.yaml rename to config/client_aux .yaml index 7d7b601..d1f5195 100644 --- a/config/client_aux2.yaml +++ b/config/client_aux .yaml @@ -1,13 +1,31 @@ huri_url: ws://localhost:8000/session -topic_list: [transcript, question, rag_response] +interface_path: src.interfaces.cli_interface:cli_interface senders: audio: name: audio + topic: audio args: sample_rate: 16000 frame_duration: 0.030 + text: + name: text + topic: question + args: + sample_rate: 16000 + frame_duration: 0.030 + +hooks: + text: + name: text + topics: [question, answer] + audio: + name: audio + topics: [audio] + args: + incoming_sample_rate: ${senders.audio.args.sample_rate} + sample_rate: 44100 modules: mic: @@ -21,10 +39,8 @@ modules: args: language: en block_duration: ${senders.audio.args.frame_duration} - logging: INFO tag: name: tag - logging: INFO rag: name: rag args: diff --git a/config/client_aux.yaml b/config/client_aux.yaml deleted file mode 100644 index fe3e332..0000000 --- a/config/client_aux.yaml +++ /dev/null @@ -1,28 +0,0 @@ -huri_url: ws://localhost:8000/session - -topic_list: [question] - -senders: - audio: - name: audio - args: - sample_rate: 16000 - frame_duration: 0.030 - -modules: - mic: - name: mic - args: - vad_agressiveness: 3 - silence_duration: 1.5 - block_duration: ${inputs.audio.args.frame_duration} - logging: INFO - stt: - name: stt - args: - language: "en" - block_duration: ${inputs.audio.args.frame_duration} - logging: INFO - tag: - name: tag - logging: INFO diff --git a/config/client_auxio.yaml b/config/client_auxio.yaml deleted file mode 100644 index 8fa2a91..0000000 --- a/config/client_auxio.yaml +++ /dev/null @@ -1,25 +0,0 @@ -huri_url: ws://localhost:8000/session - -topic_list: [question] - -senders: - text: - name: text - -modules: - mic: - name: mic - args: - vad_agressiveness: 3 - silence_duration: 1.5 - block_duration: ${senders.audio.args.frame_duration} - logging: INFO - stt: - name: stt - args: - language: en - block_duration: ${senders.audio.args.frame_duration} - logging: INFO - tag: - name: tag - logging: INFO diff --git a/config/client_template.yaml b/config/client_template.yaml index cf1627d..441f3c5 100644 --- a/config/client_template.yaml +++ b/config/client_template.yaml @@ -1,19 +1,34 @@ # HuRI websocket server url huri_url: ws://localhost:8000/session -# List of event topic the client will receive -topic_list: [topic1, topic2] +# Define interface to be used's import path +interface_path: src.interfaces.cli_interface:cli_interface # Define senders to be used and their custom args senders: # sender tag can be anything example: - # sender name must be in the list of available ClientSender in Client instance (src.client_sender:get_senders) + # sender name must be in the list of available ClientSender in chosen Interface (Interface.get_senders) name: my_sender + # topic the sender will send to HuRI, it must match output_type event data structure + topic: my_event # if my_sender init with "model", "sample_rate" and "refresh_rate" params, they can be customized here args: refresh_rate: infinite +# Define hooks to be used and their custom args +hooks: + # hook tag can be anything + example: + # hook name must be in the list of available ClientHook in chosen Interface (Interface.get_senders) + name: my_hook + # topics the hook will process from HuRI, it must match input_type event data structure + topics: [my_event, llm_response] + # if my_hook init with "model", "sample_rate" and "refresh_rate" params, they can be customized here + args: + sample_rate: 0 + no: beat + # Define module to be used and their custom args modules: # module tag can be anything diff --git a/config/client_text.yaml b/config/client_text.yaml index 8ddcaab..d2fb26f 100644 --- a/config/client_text.yaml +++ b/config/client_text.yaml @@ -1,10 +1,16 @@ huri_url: ws://localhost:8000/session -topic_list: [question, rag_response] +interface_path: src.interfaces.cli_interface:cli_interface senders: text: name: text + topic: question + +hooks: + text: + name: text + topics: [rag_response] modules: rag: From 87210a74979cc0e3d875e6e32386be4b43ce936e Mon Sep 17 00:00:00 2001 From: Popochounet Date: Tue, 2 Jun 2026 06:27:50 +0200 Subject: [PATCH 10/10] evol(client): move singletton to __init__ for senders and hooks --- src/core/client.py | 31 ++++++++++++++++++++----------- src/interfaces/cli_interface.py | 4 ++-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/core/client.py b/src/core/client.py index cae9dfa..6927565 100644 --- a/src/core/client.py +++ b/src/core/client.py @@ -23,12 +23,18 @@ class ClientSender(Generic[T]): Class derived from ClientSender must implement input_loop, and use ClientSender.send to send data to HuRI. + + `singletton` is available to access shared ressources. """ output_type: Type[T] - def __init__(self, topic: str, **_): + def __init__(self, topic: str, singletton: Any, **_): + """ + :topic: topic sent to HuRI + :singletton: allow to get shared ressources""" self.topic = topic + self.singletton = singletton async def input_loop(self, ws: websockets.ClientConnection): raise NotImplementedError @@ -59,16 +65,15 @@ class ClientHook(Generic[T]): Class derived from ClientHook must implement hook. - `singletton` allow hooks to modifies shared ressources, - and comes from the used interface. + `singletton` is available to access and modifies shared ressources. """ input_type: Type[T] - def __init__(self, **_): - pass + def __init__(self, singletton: Any, **_): + self.singletton = singletton - async def hook(self, singletton: Any, data: T): + async def hook(self, data: T): raise NotImplementedError @@ -87,11 +92,11 @@ def __init__( module = importlib.import_module(module_path) interface = getattr(module, object_name) - self.singletton = interface.singletton - available_senders = interface.get_senders() self.senders: List[ClientSender] = [ - available_senders[sender.name](topic=sender.topic, **sender.args) + available_senders[sender.name]( + topic=sender.topic, singletton=interface.singletton, **sender.args + ) for sender in self.config.senders.values() ] @@ -99,7 +104,11 @@ def __init__( self.hooks: Dict[str, List[ClientHook]] = defaultdict(list) for hook in self.config.hooks.values(): for topic in hook.topics: - self.hooks[topic].append(available_hooks[hook.name](**hook.args)) + self.hooks[topic].append( + available_hooks[hook.name]( + singletton=interface.singletton, **hook.args + ) + ) self.user_id_file = user_id_file @@ -131,7 +140,7 @@ async def _receive_loop(self, ws: websockets.ClientConnection): for hook in self.hooks[topic]: if not isinstance(data, bytes): data = hook.input_type(**data) - asyncio.create_task(hook.hook(self.singletton, data)) + asyncio.create_task(hook.hook(data)) except (asyncio.CancelledError, websockets.ConnectionClosedOK): pass diff --git a/src/interfaces/cli_interface.py b/src/interfaces/cli_interface.py index 03cacbf..2634d07 100644 --- a/src/interfaces/cli_interface.py +++ b/src/interfaces/cli_interface.py @@ -92,7 +92,7 @@ def _resample(self, audio: np.ndarray): int(len(audio) * self.sample_rate / self.incoming_sample_rate), ).astype(np.int16) - async def hook(self, singletton: None, data: bytes): + async def hook(self, data: bytes): audio = np.frombuffer(data, dtype=np.int16) audio = self.resample_function(audio) @@ -106,7 +106,7 @@ class TextHook(ClientHook[RAGResult]): def __init__(self, **kwargs): super().__init__(**kwargs) - async def hook(self, singletton: None, data: RAGResult): + async def hook(self, data: RAGResult): print("<<", data.answer)