#!/usr/bin/env python3 # Needs https://github.com/google/bumble on Windows # See https://github.com/google/bumble/blob/main/docs/mkdocs/src/platforms/windows.md for usage. # You need to associate WinUSB with your Bluetooth interface. Once done, you can roll back to the original driver from Device Manager. import asyncio import argparse import logging import signal import struct import sys import threading from queue import Queue from typing import Any, Callable, Dict, List, Optional from colorama import Fore, Style, init as colorama_init colorama_init(autoreset=True) from PyQt5.QtWidgets import ( QApplication, QWidget, QVBoxLayout, QLabel, QSlider, QCheckBox, QPushButton, QLineEdit, QGridLayout ) from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject handler = logging.StreamHandler() class ColorFormatter(logging.Formatter): COLORS = { logging.DEBUG: Fore.BLUE, logging.INFO: Fore.GREEN, logging.WARNING: Fore.YELLOW, logging.ERROR: Fore.RED, logging.CRITICAL: Fore.MAGENTA, } def format(self, record): color = self.COLORS.get(record.levelno, "") prefix = f"{color}[{record.levelname}:{record.name}]{Style.RESET_ALL}" return f"{prefix} {record.getMessage()}" handler.setFormatter(ColorFormatter()) logging.basicConfig(level=logging.INFO, handlers=[handler]) logger = logging.getLogger("hearing-aid") OPCODE_READ_REQUEST: int = 0x0A OPCODE_READ_RESPONSE: int = 0x0B OPCODE_WRITE_REQUEST: int = 0x12 OPCODE_WRITE_RESPONSE: int = 0x13 OPCODE_HANDLE_VALUE_NTF: int = 0x1B ATT_HANDLES: Dict[str, int] = { 'LOUD_SOUND_REDUCTION': 0x1B, 'HEARING_AID': 0x2A, } ATT_CCCD_HANDLES: Dict[str, int] = { 'LOUD_SOUND_REDUCTION': ATT_HANDLES['LOUD_SOUND_REDUCTION'] + 1, 'HEARING_AID': ATT_HANDLES['HEARING_AID'] + 1, } AACP_HEADER = bytes([0x04, 0x00, 0x04, 0x00]) AACP_HANDSHAKE = bytes([0x00, 0x00, 0x04, 0x00, 0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) class AACPOpcodes: SET_FEATURE_FLAGS = 0x4D REQUEST_NOTIFICATIONS = 0x0F CONTROL_COMMAND = 0x09 class ControlCommandId: HEARING_AID = 0x2C HPS_GAIN_SWIPE = 0x2F HEARING_ASSIST_CONFIG = 0x33 def _make_reader(ch): recv_q: asyncio.Queue = asyncio.Queue() def _sink(sdu): try: recv_q.put_nowait(sdu) except Exception: logger.debug("Dropping SDU in sink fallback") try: ch.sink = _sink except Exception: logger.debug("Failed to set channel.sink fallback") async def _reader_from_sink(): item = await recv_q.get() return item return _reader_from_sink class HearingAidSettings: def __init__(self, left_eq: List[float], right_eq: List[float], left_amp: float, right_amp: float, left_tone: float, right_tone: float, left_conv: bool, right_conv: bool, left_anr: float, right_anr: float, net_amp: float, balance: float, own_voice: float) -> None: self.left_eq = left_eq self.right_eq = right_eq self.left_amplification = left_amp self.right_amplification = right_amp self.left_tone = left_tone self.right_tone = right_tone self.left_conversation_boost = left_conv self.right_conversation_boost = right_conv self.left_ambient_noise_reduction = left_anr self.right_ambient_noise_reduction = right_anr self.net_amplification = net_amp self.balance = balance self.own_voice_amplification = own_voice def parse_hearing_aid_settings(data: bytes) -> Optional[HearingAidSettings]: if len(data) < 104: logger.warning("Data too short for parsing") return None buffer = data offset = 4 left_eq = [] for _ in range(8): val, = struct.unpack(' 0.5 offset += 4 left_anr, = struct.unpack(' 0.5 offset += 4 right_anr, = struct.unpack(' None: if handle not in self.listeners: self.listeners[handle] = [] self.listeners[handle].append(listener) async def enable_notifications(self, handle_name: str) -> None: await self.write_cccd(handle_name, b'\x01\x00') logger.info(f"Enabled notifications for handle {handle_name}") async def read(self, handle_name: str) -> bytes: handle_value = ATT_HANDLES[handle_name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_READ_REQUEST, lsb, msb]) self.channel.send_pdu(pdu) response = await self._read_response() return response async def write(self, handle_name: str, value: bytes) -> None: handle_value = ATT_HANDLES[handle_name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_WRITE_REQUEST, lsb, msb]) + value self.channel.send_pdu(pdu) try: await self._read_response(timeout=2.0) except Exception: logger.warning(f"No write response received for handle {handle_name}") async def write_cccd(self, handle_name: str, value: bytes) -> None: handle_value = ATT_CCCD_HANDLES[handle_name] lsb = handle_value & 0xFF msb = (handle_value >> 8) & 0xFF pdu = bytes([OPCODE_WRITE_REQUEST, lsb, msb]) + value self.channel.send_pdu(pdu) try: await self._read_response(timeout=2.0) except Exception: logger.warning(f"No CCCD write response received for handle {handle_name}") async def _read_response(self, timeout: float = 2.0) -> bytes: try: response = await asyncio.wait_for( asyncio.get_event_loop().run_in_executor( None, lambda: self.responses.get(timeout=timeout) ), timeout=timeout + 0.5 ) return response[1:] # Skip opcode except Exception: raise Exception("No response received") async def listen_notifications(self) -> None: logger.info("ATT notification listener started") while self.running: try: pdu = await self._recv_q.get() if not isinstance(pdu, (bytes, bytearray)): pdu = bytes(pdu) if len(pdu) > 0 and pdu[0] == OPCODE_HANDLE_VALUE_NTF: handle = pdu[1] | (pdu[2] << 8) value = pdu[3:] if handle in self.listeners: for listener in self.listeners[handle]: listener(value) else: self.responses.put(pdu) except asyncio.CancelledError: break except Exception as e: logger.debug(f"ATT listen error: {e}") break logger.info("ATT notification listener stopped") def stop(self): self.running = False class SignalEmitter(QObject): update_ui = pyqtSignal(HearingAidSettings) update_hearing_aid_toggle = pyqtSignal(bool) update_swipe_toggle = pyqtSignal(bool) update_loud_sound_reduction_toggle = pyqtSignal(bool) connected = pyqtSignal() class HearingAidApp(QWidget): def __init__(self, att_manager: ATTManager, aacp_manager: AACPManager, loop: asyncio.AbstractEventLoop) -> None: super().__init__() self.att_manager = att_manager self.aacp_manager = aacp_manager self.loop = loop self.emitter = SignalEmitter() self.emitter.update_ui.connect(self.on_update_ui) self.emitter.update_hearing_aid_toggle.connect(self._set_hearing_aid_toggle) self.emitter.update_swipe_toggle.connect(self._set_swipe_toggle) self.emitter.update_loud_sound_reduction_toggle.connect(self._set_loud_sound_reduction_toggle) self.emitter.connected.connect(self.on_connected) self.debounce_timer = QTimer() self.debounce_timer.setSingleShot(True) self.debounce_timer.timeout.connect(self.send_settings) self.init_ui() def init_ui(self) -> None: self.setWindowTitle("LibrePods - Hearing Aid") layout = QVBoxLayout() self.loud_sound_reduction_checkbox = QCheckBox("Loud Sound Reduction") self.loud_sound_reduction_checkbox.stateChanged.connect(self.on_loud_sound_reduction_toggle) layout.addWidget(self.loud_sound_reduction_checkbox) self.hearing_aid_checkbox = QCheckBox("Hearing Aid") self.hearing_aid_checkbox.stateChanged.connect(self.on_hearing_aid_toggle) layout.addWidget(self.hearing_aid_checkbox) # EQ Inputs eq_layout = QGridLayout() self.left_eq_inputs: List[QLineEdit] = [] self.right_eq_inputs: List[QLineEdit] = [] eq_labels = ["250Hz", "500Hz", "1kHz", "2kHz", "3kHz", "4kHz", "6kHz", "8kHz"] eq_layout.addWidget(QLabel("Frequency"), 0, 0) eq_layout.addWidget(QLabel("Left"), 0, 1) eq_layout.addWidget(QLabel("Right"), 0, 2) for i, label in enumerate(eq_labels): eq_layout.addWidget(QLabel(label), i + 1, 0) left_input = QLineEdit() right_input = QLineEdit() left_input.setPlaceholderText("Left") right_input.setPlaceholderText("Right") self.left_eq_inputs.append(left_input) self.right_eq_inputs.append(right_input) eq_layout.addWidget(left_input, i + 1, 1) eq_layout.addWidget(right_input, i + 1, 2) eq_group = QWidget() eq_group.setLayout(eq_layout) layout.addWidget(QLabel("Loss, in dBHL")) layout.addWidget(eq_group) # Amplification self.amp_slider = QSlider(Qt.Horizontal) self.amp_slider.setRange(-100, 100) self.amp_slider.setValue(0) layout.addWidget(QLabel("Amplification")) layout.addWidget(self.amp_slider) self.swipe_checkbox = QCheckBox("Swipe to control amplification") self.swipe_checkbox.stateChanged.connect(self.on_swipe_toggle) layout.addWidget(self.swipe_checkbox) # Balance self.balance_slider = QSlider(Qt.Horizontal) self.balance_slider.setRange(-100, 100) self.balance_slider.setValue(0) layout.addWidget(QLabel("Balance")) layout.addWidget(self.balance_slider) # Tone self.tone_slider = QSlider(Qt.Horizontal) self.tone_slider.setRange(-100, 100) self.tone_slider.setValue(0) layout.addWidget(QLabel("Tone")) layout.addWidget(self.tone_slider) # Ambient Noise Reduction self.anr_slider = QSlider(Qt.Horizontal) self.anr_slider.setRange(0, 100) self.anr_slider.setValue(0) layout.addWidget(QLabel("Ambient Noise Reduction")) layout.addWidget(self.anr_slider) # Conversation Boost self.conv_checkbox = QCheckBox("Conversation Boost") layout.addWidget(self.conv_checkbox) # Own Voice (hidden) self.own_voice_slider = QSlider(Qt.Horizontal) self.own_voice_slider.setRange(0, 100) self.own_voice_slider.setValue(50) # Status label self.status_label = QLabel("Connecting...") layout.addWidget(self.status_label) # Reset button self.reset_button = QPushButton("Reset") layout.addWidget(self.reset_button) # Connect signals for ATT settings for input_box in self.left_eq_inputs + self.right_eq_inputs: input_box.textChanged.connect(self.on_value_changed) self.amp_slider.valueChanged.connect(self.on_value_changed) self.balance_slider.valueChanged.connect(self.on_value_changed) self.tone_slider.valueChanged.connect(self.on_value_changed) self.anr_slider.valueChanged.connect(self.on_value_changed) self.conv_checkbox.stateChanged.connect(self.on_value_changed) self.own_voice_slider.valueChanged.connect(self.on_value_changed) self.reset_button.clicked.connect(self.reset_settings) self.setLayout(layout) def on_connected(self) -> None: self.status_label.setText("Connected") self.att_manager.register_listener(ATT_HANDLES['HEARING_AID'], self.on_att_notification) self.att_manager.register_listener(ATT_HANDLES['LOUD_SOUND_REDUCTION'], self.on_loud_sound_reduction_notification) self.aacp_manager.register_control_cmd_listener(ControlCommandId.HEARING_AID, self._on_hearing_aid_cmd) self.aacp_manager.register_control_cmd_listener(ControlCommandId.HPS_GAIN_SWIPE, self._on_swipe_cmd) asyncio.run_coroutine_threadsafe(self._initial_setup(), self.loop) def on_loud_sound_reduction_notification(self, value: bytes) -> None: enabled = value[0] == 0x01 if value else False self.emitter.update_loud_sound_reduction_toggle.emit(enabled) def _set_loud_sound_reduction_toggle(self, enabled: bool): self.loud_sound_reduction_checkbox.blockSignals(True) self.loud_sound_reduction_checkbox.setChecked(enabled) self.loud_sound_reduction_checkbox.blockSignals(False) def on_loud_sound_reduction_toggle(self, state: int): enabled = state == Qt.Checked asyncio.run_coroutine_threadsafe(self._send_loud_sound_reduction_toggle(enabled), self.loop) async def _send_loud_sound_reduction_toggle(self, enabled: bool): value = bytes([0x01]) if enabled else bytes([0x00]) await self.att_manager.write('LOUD_SOUND_REDUCTION', value) def _on_hearing_aid_cmd(self, value: bytes): enabled = value[0] == 0x01 if value else False self.emitter.update_hearing_aid_toggle.emit(enabled) def _on_swipe_cmd(self, value: bytes): enabled = value[0] == 0x01 if value else False self.emitter.update_swipe_toggle.emit(enabled) def _set_hearing_aid_toggle(self, enabled: bool): self.hearing_aid_checkbox.blockSignals(True) self.hearing_aid_checkbox.setChecked(enabled) self.hearing_aid_checkbox.blockSignals(False) def _set_swipe_toggle(self, enabled: bool): self.swipe_checkbox.blockSignals(True) self.swipe_checkbox.setChecked(enabled) self.swipe_checkbox.blockSignals(False) def on_hearing_aid_toggle(self, state: int): enabled = state == Qt.Checked asyncio.run_coroutine_threadsafe(self._send_hearing_aid_toggle(enabled), self.loop) def on_swipe_toggle(self, state: int): enabled = state == Qt.Checked asyncio.run_coroutine_threadsafe(self._send_swipe_toggle(enabled), self.loop) async def _send_hearing_aid_toggle(self, enabled: bool): if enabled: await self.aacp_manager.send_control_command(ControlCommandId.HEARING_AID, bytes([0x01, 0x01])) await self.aacp_manager.send_control_command(ControlCommandId.HEARING_ASSIST_CONFIG, bytes([0x01])) else: await self.aacp_manager.send_control_command(ControlCommandId.HEARING_AID, bytes([0x02, 0x02])) await self.aacp_manager.send_control_command(ControlCommandId.HEARING_ASSIST_CONFIG, bytes([0x02])) async def _send_swipe_toggle(self, enabled: bool): value = bytes([0x01]) if enabled else bytes([0x02]) await self.aacp_manager.send_control_command(ControlCommandId.HPS_GAIN_SWIPE, value) async def _initial_setup(self): try: await self.att_manager.enable_notifications('HEARING_AID') await self.att_manager.enable_notifications('LOUD_SOUND_REDUCTION') data = await self.att_manager.read('HEARING_AID') settings = parse_hearing_aid_settings(data) if settings: self.emitter.update_ui.emit(settings) logger.info("Initial ATT settings loaded") loud_sound_data = await self.att_manager.read('LOUD_SOUND_REDUCTION') loud_sound_enabled = loud_sound_data[0] == 0x01 if loud_sound_data else False self.emitter.update_loud_sound_reduction_toggle.emit(loud_sound_enabled) logger.info("Initial loud sound reduction setting loaded") except Exception as e: logger.error(f"Initial ATT setup failed: {e}") def on_att_notification(self, value: bytes) -> None: settings = parse_hearing_aid_settings(value) if settings: self.emitter.update_ui.emit(settings) def on_update_ui(self, settings: HearingAidSettings) -> None: self.amp_slider.blockSignals(True) self.balance_slider.blockSignals(True) self.tone_slider.blockSignals(True) self.anr_slider.blockSignals(True) self.conv_checkbox.blockSignals(True) self.own_voice_slider.blockSignals(True) self.amp_slider.setValue(int(settings.net_amplification * 100)) self.balance_slider.setValue(int(settings.balance * 100)) self.tone_slider.setValue(int(settings.left_tone * 100)) self.anr_slider.setValue(int(settings.left_ambient_noise_reduction * 100)) self.conv_checkbox.setChecked(settings.left_conversation_boost) self.own_voice_slider.setValue(int(settings.own_voice_amplification * 100)) for i, value in enumerate(settings.left_eq): self.left_eq_inputs[i].blockSignals(True) self.left_eq_inputs[i].setText(f"{value:.2f}") self.left_eq_inputs[i].blockSignals(False) for i, value in enumerate(settings.right_eq): self.right_eq_inputs[i].blockSignals(True) self.right_eq_inputs[i].setText(f"{value:.2f}") self.right_eq_inputs[i].blockSignals(False) self.amp_slider.blockSignals(False) self.balance_slider.blockSignals(False) self.tone_slider.blockSignals(False) self.anr_slider.blockSignals(False) self.conv_checkbox.blockSignals(False) self.own_voice_slider.blockSignals(False) def on_value_changed(self) -> None: self.debounce_timer.start(100) def send_settings(self) -> None: asyncio.run_coroutine_threadsafe(self._send_settings_async(), self.loop) async def _send_settings_async(self) -> None: try: amp = self.amp_slider.value() / 100.0 balance = self.balance_slider.value() / 100.0 tone = self.tone_slider.value() / 100.0 anr = self.anr_slider.value() / 100.0 conv = self.conv_checkbox.isChecked() own_voice = self.own_voice_slider.value() / 100.0 left_amp = amp + (0.5 - balance) * amp * 2 if balance < 0 else amp right_amp = amp + (balance - 0.5) * amp * 2 if balance > 0 else amp left_eq = [float(input_box.text() or 0) for input_box in self.left_eq_inputs] right_eq = [float(input_box.text() or 0) for input_box in self.right_eq_inputs] settings = HearingAidSettings( left_eq, right_eq, left_amp, right_amp, tone, tone, conv, conv, anr, anr, amp, balance, own_voice ) await self._send_hearing_aid_settings(settings) except Exception as e: logger.error(f"Failed to send settings: {e}") async def _send_hearing_aid_settings(self, settings: HearingAidSettings) -> None: data = await self.att_manager.read('HEARING_AID') if len(data) < 104: logger.error("Read data too short for sending settings") return buffer = bytearray(data) buffer[2] = 0x64 for i in range(8): struct.pack_into(' None: self.att_manager.stop() self.aacp_manager.stop() event.accept() async def run_bumble(bdaddr: str, att_manager: ATTManager, aacp_manager: AACPManager, app_window: HearingAidApp): try: from bumble.l2cap import ClassicChannelSpec, ClassicChannel from bumble.transport import open_transport from bumble.device import Device, Connection from bumble.host import Host from bumble.core import PhysicalTransport, UUID from bumble.pairing import PairingConfig, PairingDelegate from bumble.hci import HCI_Error from bumble.keys import JsonKeyStore from bumble.sdp import ServiceAttribute, DataElement except ImportError: logger.error("Bumble not installed") return 1 async def get_device(): logger.info("Opening transport...") transport = await open_transport("usb:0") device = Device(host=Host(controller_source=transport.source, controller_sink=transport.sink)) device.classic_enabled = True device.le_enabled = False device.keystore = JsonKeyStore.from_device(device, "./keys.json") device.pairing_config_factory = lambda conn: PairingConfig( sc=True, mitm=False, bonding=True, delegate=PairingDelegate(io_capability=PairingDelegate.NO_OUTPUT_NO_INPUT) ) await device.power_on() logger.info("Device powered on") def on_l2cap_connection(channel: ClassicChannel): logger.info("Incoming L2CAP connection on PSM %d", channel.psm) async def handle_data(): try: reader = _make_reader(channel) while True: data = await reader() print(f"Received PDU on PSM {channel.psm}: {data.hex() if data else 'None'}") except Exception as e: logger.info("L2CAP channel on PSM %d closed: %s", channel.psm, e) asyncio.create_task(handle_data()) att_server_spec = ClassicChannelSpec(psm=31, mtu=512) device.create_l2cap_server(att_server_spec, handler=on_l2cap_connection) logger.info("L2CAP server registered on PSM 0x%04X", att_server_spec.psm) device.sdp_service_records = { 0x4f491200: [ ServiceAttribute(0x0000, DataElement.unsigned_integer_32(0x4f491200)), ServiceAttribute(0x0001, DataElement.sequence([DataElement.uuid(UUID.from_16_bits(0x1200))])), ServiceAttribute(0x0002, DataElement.unsigned_integer_32(0x00000000)), ServiceAttribute(0x0005, DataElement.sequence([DataElement.uuid(UUID.from_16_bits(0x1002))])), ServiceAttribute(0x0006, DataElement.sequence([ DataElement.unsigned_integer_16(0x656e), DataElement.unsigned_integer_16(0x006a), DataElement.unsigned_integer_16(0x0100), DataElement.unsigned_integer_16(0x6672), DataElement.unsigned_integer_16(0x006a), DataElement.unsigned_integer_16(0x0110), DataElement.unsigned_integer_16(0x6465), DataElement.unsigned_integer_16(0x006a), DataElement.unsigned_integer_16(0x0120), DataElement.unsigned_integer_16(0x6a61), DataElement.unsigned_integer_16(0x006a), DataElement.unsigned_integer_16(0x0130) ])), ServiceAttribute(0x0008, DataElement.unsigned_integer_8(0xff)), ServiceAttribute(0x0101, DataElement.text_string('PnP Information')), ServiceAttribute(0x0200, DataElement.unsigned_integer_16(0x0102)), ServiceAttribute(0x0201, DataElement.unsigned_integer_16(0x004c)), ServiceAttribute(0x0202, DataElement.unsigned_integer_16(0x0000)), ServiceAttribute(0x0203, DataElement.unsigned_integer_16(0x0f60)), ServiceAttribute(0x0204, DataElement.boolean(True)), ServiceAttribute(0x0205, DataElement.unsigned_integer_16(0x0001)), ServiceAttribute(0xa000, DataElement.unsigned_integer_32(0x00a026c4)), ServiceAttribute(0xafff, DataElement.unsigned_integer_16(0x0001)) ] } logger.info("SDP service records set up") return transport, device async def setup_aacp(conn: Connection): spec = ClassicChannelSpec(psm=4097, mtu=2048) logger.info("Requesting AACP channel on PSM = 0x%04X", spec.psm) if not conn.is_encrypted: await conn.encrypt() await asyncio.sleep(0.05) channel: ClassicChannel = await conn.create_l2cap_channel(spec=spec) aacp_manager.set_channel(channel) logger.info("AACP channel established") await aacp_manager.send_handshake() await asyncio.sleep(0.1) await aacp_manager.send_notification_request() await asyncio.sleep(0.1) await aacp_manager.send_set_feature_flags() return channel async def setup_att(conn: Connection): spec = ClassicChannelSpec(psm=31, mtu=512) logger.info("Requesting ATT channel on PSM = 0x%04X", spec.psm) if not conn.is_encrypted: await conn.encrypt() await asyncio.sleep(0.05) channel: ClassicChannel = await conn.create_l2cap_channel(spec=spec) att_manager.set_channel(channel) logger.info("ATT channel established") return channel transport, device = await get_device() logger.info("Connecting to %s (BR/EDR)...", bdaddr) try: connection = await device.connect(bdaddr, PhysicalTransport.BR_EDR) logger.info("Connected to %s (handle %s)", connection.peer_address, connection.handle) logger.info("Authenticating...") await connection.authenticate() if not connection.is_encrypted: logger.info("Encrypting link...") await connection.encrypt() await setup_aacp(connection) await setup_att(connection) app_window.emitter.connected.emit() att_listen_task = asyncio.create_task(att_manager.listen_notifications()) aacp_listen_task = asyncio.create_task(aacp_manager.listen()) logger.info("Connection established. UI is now active.") try: await asyncio.Event().wait() except asyncio.CancelledError: pass finally: att_listen_task.cancel() aacp_listen_task.cancel() try: await att_listen_task except asyncio.CancelledError: pass try: await aacp_listen_task except asyncio.CancelledError: pass except HCI_Error as e: if "PAIRING_NOT_ALLOWED_ERROR" in str(e): logger.error("Put your device into pairing mode and run the script again") else: logger.error("HCI error: %s", e) except Exception as e: logger.error("Unexpected error: %s", e) finally: if hasattr(transport, "close"): logger.info("Closing transport...") await transport.close() logger.info("Transport closed") return 0 def main(): parser = argparse.ArgumentParser() parser.add_argument("bdaddr", help="Bluetooth address of the hearing aid device") parser.add_argument("--debug", action="store_true", help="Enable debug logging") args = parser.parse_args() logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO) qt_app = QApplication(sys.argv) loop = asyncio.new_event_loop() att_manager = ATTManager() aacp_manager = AACPManager() window = HearingAidApp(att_manager, aacp_manager, loop) window.show() def quit_app(signum, frame): att_manager.stop() aacp_manager.stop() qt_app.quit() signal.signal(signal.SIGINT, quit_app) def run_async(): asyncio.set_event_loop(loop) loop.run_until_complete(run_bumble(args.bdaddr, att_manager, aacp_manager, window)) async_thread = threading.Thread(target=run_async, daemon=True) async_thread.start() timer = QTimer() timer.timeout.connect(lambda: None) timer.start(100) sys.exit(qt_app.exec_()) if __name__ == "__main__": main()