move everything to linux folder

This commit is contained in:
Kavish Devar
2024-10-07 21:40:11 +05:30
parent a677f2f01b
commit 1163854907
30 changed files with 101 additions and 0 deletions

21
linux/LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Kavish Devar
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

101
linux/README.md Normal file
View File

@@ -0,0 +1,101 @@
# ALN - AirPods like Normal (Linux Only)
### Check out the packet definitions at [AAP Definitions](/AAP%20Definitions.md)
## Currently supported device(s)
- AirPods Pro 2
### 1. Install the required packages
```bash
sudo apt install python3 python3-pip
pip3 install pybluez
```
If you want to run it as a daemon (Refer to the [Daemon Version](#as-a-daemon-using-a-unix-socket) section), you will need to install the `python-daemon` package.
```bash
pip3 install python-daemon
```
### 2. Clone the repository
```bash
git clone https://github.com/kavishdevar/aln.git
cd aln
```
### 3. Preprare
Pair your AirPods with your machine before running this script!
> **Note:** DO NOT FORGET TO EDIT THE `AIRPODS_MAC` VARIABLE IN EXAMPLE SCRIPTS WITH YOUR AIRPODS MAC ADDRESS BEFORE RUNNING THEM!
# Versions
## Non-Daemon based
This version is the most polished version of the script. It can do the following:
- fetch the battery percentage,
- fetch in-ear status (but not actually controlling the media with that information).
- control ANC modes
```bash
python3 examples/logger-and-anc.py
```
## As a daemon (using a UNIX socket)
![Daemon Log Screenshot](imgs/daemon-log.png)
If you want to run a deamon for multiple programs to read/write airpods data, you can use the `airpods_daemon.py` script.
- This creates a standard UNIX socket at `/tmp/airpods_daemon.sock` and listens for commands
- and sends battery/in-ear info
You can run it as follows:
```bash
python3 airpods_daemon.py
```
## Interacting with the daemon
![Set ANC Screenshot](imgs/set-anc.png)
- Sending data to the daemon
You can send data to the daemon using the `set-anc.py` script. Since it's a standard UNIX socket, you can send data to it using any programming language that supports UNIX sockets.
This package includes a demo script that sends a command to turn off the ANC. You can run it as follows:
```bash
python3 examples/daemon/set-anc.py
```
- Reading data from the daemon
![Read Data Screenshot](imgs/read-data.png)
You can listen to the daemon's output by running the `read-data.py` script. This script listens to the UNIX socket and prints the data it receives. Currenty, it recognizes the battery percentage and the in-ear status and dumps the rest of the data to the terminal.
```bash
python3 examples/daemon/read-data.py
```
- Controlling the media with the in-ear status (and get battery status)
![Ear Detection Screenshot](imgs/ear-detection.png)
This script is basically the standalone script, but interacts with the UNIX socket created by the daemon instead. It can control the media with the in-ear status and remove the device as an audio sink when the AirPods are not in your ears.
```bash
python3 examples/daemon/ear-detection.py
```
- App Indicator/Tray Icon
![Tray Icon Hover Screenshot](imgs/tray-icon-hover.png)
![Tray Icon Menu Screenshot](imgs/tray-icon-menu.png)
This script is a simple tray icon that shows the battery percentage and set ANC modes. It can also control the media with the in-ear status.
> Note: This script uses QT.
```bash
python3 examples/daemon/tray.py
```
## Standalone version (without module dependency, mainly for testing, and reverse engineering purposes)
- Controlling the media with the in-ear status.
- Remove the device as an audio sink when the AirPods are not in your ears.
- Try to connect with the AirPods if media is playing and the AirPods are not connected.
- Control ANC modes.
```bash
python3 examples/standalone.py
```

14
linux/aln/AirPods/Pro2.py Normal file
View File

@@ -0,0 +1,14 @@
from ..Capabilites import Capabilites
class Pro2:
def __init__(self):
self.name = 'AirPods Pro 2'
self.capabilites = {
Capabilites.NOISE_CANCELLATION: [
Capabilites.NoiseCancellation.OFF,
Capabilites.NoiseCancellation.ON,
Capabilites.NoiseCancellation.TRANSPARENCY,
Capabilites.NoiseCancellation.ADAPTIVE,
],
Capabilites.CONVERSATION_AWARENESS: True,
Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY: True
}

View File

@@ -0,0 +1,3 @@
from . import Pro2
Pro2 = Pro2.Pro2

View File

@@ -0,0 +1,18 @@
class NoiseCancellation:
OFF = b"\x01"
ON = b"\x02"
TRANSPARENCY = b"\x03"
ADAPTIVE = b"\x04"
class ConversationAwareness:
OFF = b"\x02"
ON = b"\x01"
class Capabilites:
NOISE_CANCELLATION = b"\x0d"
CONVERSATION_AWARENESS = b"\x28"
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = b"\x01\x02"
EAR_DETECTION = b"\x06"
NoiseCancellation = NoiseCancellation
ConversationAwareness = ConversationAwareness

View File

@@ -0,0 +1,25 @@
from ..enums import enums
from ..Capabilites import Capabilites
class ANCNotification:
NOTIFICATION_PREFIX = enums.NOISE_CANCELLATION_PREFIX
OFF = Capabilites.NoiseCancellation.OFF
ON = Capabilites.NoiseCancellation.ON
TRANSPARENCY = Capabilites.NoiseCancellation.TRANSPARENCY
ADAPTIVE = Capabilites.NoiseCancellation.ADAPTIVE
def __init__(self):
pass
def isANCData(self, data: bytes):
# 04 00 04 00 09 00 0D 01 00 00 00
if len(data) != 11:
return False
if data.hex().startswith(self.NOTIFICATION_PREFIX.hex()):
return True
else:
return False
def setData(self, data: bytes):
self.status = data[7]
pass

View File

@@ -0,0 +1,64 @@
class BatteryComponent:
LEFT = 4
RIGHT = 2
CASE = 8
pass
class BatteryStatus:
CHARGING = 1
NOT_CHARGING = 2
DISCONNECTED = 4
pass
class Battery:
def get_name(self, cls, value):
for key, val in cls.__dict__.items():
if val == value:
return key
return None
def get_component(self):
return self.get_name(BatteryComponent, self.component)
def get_level(self):
return self.level
def get_status(self):
return self.get_name(BatteryStatus, self.status)
def __init__(self, component: int, level: int, status: int):
self.component = component
self.level = level
self.status = status
pass
class BatteryNotification:
def __init__(self):
self.first = Battery(BatteryComponent.LEFT, 0, BatteryStatus.DISCONNECTED)
self.second = Battery(BatteryComponent.RIGHT, 0, BatteryStatus.DISCONNECTED)
self.case = Battery(BatteryComponent.CASE, 0, BatteryStatus.DISCONNECTED)
pass
def isBatteryData(self, data):
if len(data) != 22:
return False
if data[0] == 0x04 and data[1] == 0x00 and data[2] == 0x04 and data[3] == 0x00 and data[4] == 0x04 and data[5] == 0x00:
return True
else:
return False
def setBattery(self, data):
self.count = data[6]
self.first = Battery(data[7], data[9], data[10])
self.second = Battery(data[12], data[14], data[15])
self.case = Battery(data[17], data[19], data[20])
pass
def getBattery(self):
if self.first.component == BatteryComponent.LEFT:
self.left = self.first
self.right = self.second
else:
self.left = self.second
self.right = self.first
self.case = self.case
return [self.left, self.right, self.case]

View File

@@ -0,0 +1,19 @@
# 04 00 04 00 4b 00 02 00 01 [level]
from ..enums import enums
class ConversationalAwarenessNotification:
NOTIFICATION_PREFIX = enums.CONVERSATION_AWARENESS_RECEIVE_PREFIX
def __init__(self):
pass
def isConversationalAwarenessData(self, data: bytes):
if len(data) != 10:
return False
if data.hex().startswith(self.NOTIFICATION_PREFIX.hex()):
return True
def setData(self, data: bytes):
self.status = data[9]
pass

View File

@@ -0,0 +1,26 @@
from ..Capabilites import Capabilites
from ..enums import enums
from typing import Literal
class EarDetectionNotification:
NOTIFICATION_BIT = Capabilites.EAR_DETECTION
NOTIFICATION_PREFIX = enums.PREFIX + NOTIFICATION_BIT
IN_EAR = 0x00
OUT_OF_EAR = 0x01
def __init__(self):
pass
def isEarDetectionData(self, data: bytes):
if len(data) != 8:
return False
if data.hex().startswith(self.NOTIFICATION_PREFIX.hex()):
return True
def setEarDetection(self, data: bytes):
self.first = data[6]
self.second = data[7]
def getEarDetection(self):
return [self.first, self.second]
pass

View File

@@ -0,0 +1,58 @@
from bluetooth import BluetoothSocket
import threading
from .Battery import BatteryNotification
from .EarDetection import EarDetectionNotification
from .ConversationalAwareness import ConversationalAwarenessNotification
from .ANC import ANCNotification
import logging
logging = logging.getLogger(__name__)
class NotificationListener:
BATTERY_UPDATED = 0x01
ANC_UPDATED = 0x02
EAR_DETECTION_UPDATED = 0x03
CA_UPDATED = 0x04
UNKNOWN = 0x00
def __init__(self, socket: BluetoothSocket, callback: callable):
self.socket = socket
self.BatteryNotification = BatteryNotification()
self.EarDetectionNotification = EarDetectionNotification()
self.ANCNotification = ANCNotification()
self.ConversationalAwarenessNotification = ConversationalAwarenessNotification()
self.callback = callback
pass
def __start(self):
while True:
data = self.socket.recv(1024)
if len(data) == 0:
break
if self.BatteryNotification.isBatteryData(data):
self.BatteryNotification.setBattery(data)
self.callback(self.BATTERY_UPDATED, data)
pass
if self.EarDetectionNotification.isEarDetectionData(data):
self.EarDetectionNotification.setEarDetection(data)
self.callback(self.EAR_DETECTION_UPDATED, data)
if self.ANCNotification.isANCData(data):
self.ANCNotification.setData(data)
self.callback(self.ANC_UPDATED, data)
if self.ConversationalAwarenessNotification.isConversationalAwarenessData(data):
self.ConversationalAwarenessNotification.setData(data)
self.callback(self.CA_UPDATED, data)
else:
self.callback(self.UNKNOWN, data)
pass
pass
pass
def start(self):
threading.Thread(target=self.__start).start()
pass
def stop(self):
self.socket.close()
pass

View File

@@ -0,0 +1,40 @@
from .Listener import NotificationListener
from ..enums import enums
import bluetooth
import logging
logging = logging.getLogger(__name__)
enums = enums()
class Notifications:
BATTERY_UPDATED = NotificationListener.BATTERY_UPDATED
ANC_UPDATED = NotificationListener.ANC_UPDATED
CA_UPDATED = NotificationListener.CA_UPDATED
EAR_DETECTION_UPDATED = NotificationListener.EAR_DETECTION_UPDATED
UNKNOWN = NotificationListener.UNKNOWN
def __init__(self, socket: bluetooth.BluetoothSocket, callback: callable):
self.socket = socket
self.notificationListener = NotificationListener(self.socket, callback)
self.BatteryNotification = self.notificationListener.BatteryNotification
self.EarDetectionNotification = self.notificationListener.EarDetectionNotification
self.ANCNotification = self.notificationListener.ANCNotification
self.ConversationalAwarenessNotification = self.notificationListener.ConversationalAwarenessNotification
pass
def initialize(self):
try:
self.socket.send(enums.SET_SPECIFIC_FEATURES)
self.socket.send(enums.REQUEST_NOTIFICATIONS)
self.notificationListener.start()
except bluetooth.btcommon.BluetoothError as e:
logging.error(f'Failed to send data to {self.mac_address}: {e}')
return False
return True
def __del__(self):
self.notificationListener.stop()
self.socket.close()
pass
pass

70
linux/aln/__init__.py Normal file
View File

@@ -0,0 +1,70 @@
from .Notifications import Notifications
import bluetooth
import logging
logging = logging.getLogger("Connection Handler")
class Connection:
def __init__(self, mac_address: str):
self.mac_address = mac_address
self.socket = bluetooth.BluetoothSocket(bluetooth.L2CAP)
def connect(self):
try:
self.socket.connect((self.mac_address, 0x1001))
except bluetooth.btcommon.BluetoothError as e:
logging.error(f'Failed to connect to {self.mac_address}: {e}')
return False
return True
def initialize_notifications(self, callback: callable = None):
if callback is None:
callback = self.notification_callback
self.notifications = Notifications(self.socket, callback)
self.notificationListener = self.notifications.notificationListener
self.BatteryNotification = self.notifications.BatteryNotification
self.ANCNotification = self.notifications.ANCNotification
self.EarDetectionNotification = self.notifications.EarDetectionNotification
self.ConversationalAwarenessNotification = self.notifications.ConversationalAwarenessNotification
self.notifications.initialize()
def send(self, data: bytes):
try:
self.socket.send(data)
except bluetooth.btcommon.BluetoothError as e:
logging.error(f'Failed to send data to {self.mac_address}: {e}')
return False
return True
def notification_callback(self, notification_type: int, data: bytes):
import logging
if notification_type == Notifications.BATTERY_UPDATED:
logging = logging.getLogger("Battery Status")
for i in self.notificationListener.BatteryNotification.getBattery():
logging.debug(f'{i.get_component()} - {i.get_status()}: {i.get_level()}')
pass
elif notification_type == Notifications.EAR_DETECTION_UPDATED:
logging = logging.getLogger("In-Ear Status")
logging.debug(f'{self.notificationListener.EarDetectionNotification.getEarDetection()}')
pass
elif notification_type == Notifications.ANC_UPDATED:
logging = logging.getLogger("ANC Status")
logging.debug(f'{self.notificationListener.ANCNotification.status}')
pass
elif notification_type == Notifications.CA_UPDATED:
logging = logging.getLogger("Conversational Awareness Status")
logging.debug(f'{self.notificationListener.ConversationalAwarenessNotification.status}')
pass
elif notification_type == Notifications.UNKNOWN:
logging = logging.getLogger("Unknown Notification")
hex_data = ' '.join(f'{byte:02x}' for byte in data)
logging.debug(f'{hex_data}')
pass
pass
def disconnect(self):
self.socket.close()
pass
def __del__(self):
self.socket.close()
pass

24
linux/aln/enums.py Normal file
View File

@@ -0,0 +1,24 @@
from .Capabilites import Capabilites
class enums:
NOISE_CANCELLATION = Capabilites.NOISE_CANCELLATION
CONVERSATION_AWARENESS = Capabilites.CONVERSATION_AWARENESS
CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY = Capabilites.CUSTOMIZABLE_ADAPTIVE_TRANSPARENCY
PREFIX = b'\x04\x00\x04\x00'
SETTINGS = b"\x09\x00"
SUFFIX = b'\x00\x00\x00'
NOTIFICATION_FILTER = b'\x0f'
SPECIFIC_FEATURES = b'\x4d'
SET_SPECIFIC_FEATURES = PREFIX + SPECIFIC_FEATURES + b"\x00\xff\x00\x00\x00\x00\x00\x00\x00"
REQUEST_NOTIFICATIONS = PREFIX + NOTIFICATION_FILTER + b"\x00\xff\xff\xff\xff"
HANDSHAKE = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
NOISE_CANCELLATION_PREFIX = PREFIX + SETTINGS + NOISE_CANCELLATION
NOISE_CANCELLATION_OFF = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.OFF + SUFFIX
NOISE_CANCELLATION_ON = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ON + SUFFIX
NOISE_CANCELLATION_TRANSPARENCY = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.TRANSPARENCY + SUFFIX
NOISE_CANCELLATION_ADAPTIVE = NOISE_CANCELLATION_PREFIX + Capabilites.NoiseCancellation.ADAPTIVE + SUFFIX
SET_CONVERSATION_AWARENESS_OFF = PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.OFF + SUFFIX
SET_CONVERSATION_AWARENESS_ON = PREFIX + SETTINGS + CONVERSATION_AWARENESS + Capabilites.ConversationAwareness.ON + SUFFIX
CONVERSATION_AWARENESS_RECEIVE_PREFIX = PREFIX + b"\x4b\x00\x02\00"

View File

@@ -0,0 +1,164 @@
import socket
import json
import subprocess
from aln.Notifications import Battery
import threading
import time
import os
import logging
class CustomFormatter(logging.Formatter):
# Define color codes for different log levels
COLORS = {
logging.DEBUG: "\033[48;5;240;38;5;15m%s\033[1;0m", # Grey background, white bold text
logging.INFO: "\033[48;5;34;38;5;15m%s\033[1;0m", # Green background, white bold text
logging.WARNING: "\033[1;48;5;214;38;5;0m%s\033[1;0m", # Orange background, black bold text
logging.ERROR: "\033[1;48;5;202;38;5;15m%s\033[1;0m", # Orange-red background, white bold text
logging.CRITICAL: "\033[1;48;5;196;38;5;15m%s\033[1;0m", # Pure red background, white bold text
}
def format(self, record):
# Apply color to the level name
levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8)
record.levelname = levelname
# Format the message
formatted_message = super().format(record)
return formatted_message
# Custom formatter with fixed width for level name
formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG)
# Set the custom formatter for the root logger
logging.getLogger().handlers[0].setFormatter(formatter)
SOCKET_PATH = "/tmp/airpods_daemon.sock"
class MediaController:
def __init__(self):
self.earStatus = "Both out"
self.wasMusicPlayingInSingle = False
self.wasMusicPlayingInBoth = False
self.firstEarOutTime = 0
self.stop_thread_event = threading.Event()
def playMusic(self):
logging.info("Playing music")
subprocess.call(("playerctl", "play"))
def pauseMusic(self):
logging.info("Pausing music")
subprocess.call(("playerctl", "--all-players", "pause"))
def isPlaying(self):
return "Playing" in subprocess.getoutput("playerctl --all-players status").strip()
def handlePlayPause(self, data):
primary_status = data[0]
secondary_status = data[1]
logging.debug(f"Handling play/pause with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}")
def delayed_action(s):
if not self.stop_thread_event.is_set():
if self.wasMusicPlayingInSingle:
self.playMusic()
self.wasMusicPlayingInBoth = False
elif self.wasMusicPlayingInBoth or s:
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
if primary_status and secondary_status:
if self.earStatus != "Both out":
s = self.isPlaying()
if s:
self.pauseMusic()
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B off")
logging.info("Setting profile to off")
if self.earStatus == "Only one in":
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
self.wasMusicPlayingInSingle = True
self.wasMusicPlayingInBoth = True
self.stop_thread_event.set()
else:
if s:
self.wasMusicPlayingInSingle = True
else:
self.wasMusicPlayingInSingle = False
elif self.earStatus == "Both in":
s = self.isPlaying()
if s:
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
else:
self.wasMusicPlayingInSingle = False
self.earStatus = "Both out"
return "Both out"
elif not primary_status and not secondary_status:
if self.earStatus != "Both in":
if self.earStatus == "Both out":
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp-sink")
logging.info("Setting profile to a2dp-sink")
elif self.earStatus == "Only one in":
self.stop_thread_event.set()
s = self.isPlaying()
if s:
self.wasMusicPlayingInBoth = True
if self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth:
self.playMusic()
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
self.earStatus = "Both in"
return "Both in"
elif (primary_status and not secondary_status) or (not primary_status and secondary_status):
if self.earStatus != "Only one in":
self.stop_thread_event.clear()
s = self.isPlaying()
if s:
self.pauseMusic()
delayed_thread = threading.Timer(0.3, delayed_action, args=[s])
delayed_thread.start()
self.firstEarOutTime = time.time()
if self.earStatus == "Both out":
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp-sink")
logging.info("Setting profile to a2dp-sink")
self.earStatus = "Only one in"
return "Only one in"
def read():
"""Send a command to the daemon via UNIX domain socket."""
client_socket = None
try:
# Create a socket connection to the daemon
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
logging.info("Connecting to daemon...")
client_socket.connect(SOCKET_PATH)
media_controller = MediaController()
# Receive data
while True:
d = client_socket.recv(1024)
if d:
try:
data: dict = json.loads(d.decode('utf-8'))
if data["type"] == "ear_detection":
media_controller.handlePlayPause([data['primary'], data['secondary']])
except json.JSONDecodeError as e:
# logging.error(f"Error deserializing data: {e}")
pass
else:
break
except Exception as e:
logging.error(f"Error communicating with daemon: {e}")
finally:
if client_socket:
client_socket.close()
logging.warning("Socket closed")
if __name__ == "__main__":
read()

View File

@@ -0,0 +1,88 @@
import socket
import json
import logging
from aln.Notifications.ANC import ANCNotification
SOCKET_PATH = "/tmp/airpods_daemon.sock"
import logging
class CustomFormatter(logging.Formatter):
# Define color codes for different log levels
COLORS = {
logging.DEBUG: "\033[48;5;240;38;5;15m%s\033[1;0m", # Grey background, white bold text
logging.INFO: "\033[48;5;34;38;5;15m%s\033[1;0m", # Green background, white bold text
logging.WARNING: "\033[1;48;5;214;38;5;0m%s\033[1;0m", # Orange background, black bold text
logging.ERROR: "\033[1;48;5;202;38;5;15m%s\033[1;0m", # Orange-red background, white bold text
logging.CRITICAL: "\033[1;48;5;196;38;5;15m%s\033[1;0m", # Pure red background, white bold text
}
def format(self, record):
# Apply color to the level name
levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8)
record.levelname = levelname
# Format the message
formatted_message = super().format(record)
return formatted_message
# Custom formatter with fixed width for level name
formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG)
# Set the custom formatter for the root logger
logging.getLogger().handlers[0].setFormatter(formatter)
def read():
"""Send a command to the daemon via UNIX domain socket."""
client_socket = None
try:
# Create a socket connection to the daemon
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
logging.info("Connecting to daemon...")
client_socket.connect(SOCKET_PATH)
# Receive data
while True:
d = client_socket.recv(1024)
if d:
try:
data = json.loads(d.decode('utf-8'))
if isinstance(data, dict):
if data["type"] == "battery":
for b, battery_data in data.items():
if b != "type": # Skip the "type" key
logging.info(f"\033[1;33mReceived battery status: {b} - {battery_data['status']} - {battery_data['level']}\033[1;0m")
elif data["type"] == "ear_detection":
logging.info(f"\033[1;33mReceived ear detection status: {data['primary']} - {data['secondary']}\033[1;0m")
elif data["type"] == "anc":
logging.info(f"\033[1;33mReceived ANC status: {data['mode']}\033[1;0m")
elif data["type"] == "ca":
logging.info(f"\033[1;33mReceived Conversational Awareness status: {data['status']}\033[1;0m")
elif data["type"] == "unknown":
logging.info(f"Received data: {data['data']}")
else:
logging.info(f"Received data: {data}")
else:
logging.error("Received data is not a dictionary")
except json.JSONDecodeError as e:
# logging.warning(f"Error deserializing data: {e}")
# logging.warning(f"raw data: {d}")
pass
except KeyError as e:
logging.error(f"KeyError: {e} in data: {data}")
except TypeError as e:
logging.error(f"TypeError: {e} in data: {data}")
else:
break
except Exception as e:
logging.critical(f"Error communicating with daemon: {e}")
finally:
if client_socket:
client_socket.close()
logging.warning("Socket closed")
if __name__ == "__main__":
read()

View File

@@ -0,0 +1,73 @@
import socket
import argparse
from aln import enums
import logging
class CustomFormatter(logging.Formatter):
# Define color codes for different log levels
COLORS = {
logging.DEBUG: "\033[48;5;240;38;5;15m%s\033[1;0m", # Grey background, white bold text
logging.INFO: "\033[48;5;34;38;5;15m%s\033[1;0m", # Green background, white bold text
logging.WARNING: "\033[1;48;5;214;38;5;0m%s\033[1;0m", # Orange background, black bold text
logging.ERROR: "\033[1;48;5;202;38;5;15m%s\033[1;0m", # Orange-red background, white bold text
logging.CRITICAL: "\033[1;48;5;196;38;5;15m%s\033[1;0m", # Pure red background, white bold text
}
def format(self, record):
# Apply color to the level name
levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8)
record.levelname = levelname
# Format the message
formatted_message = super().format(record)
return formatted_message
# Custom formatter with fixed width for level name
formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG)
# Set the custom formatter for the root logger
logging.getLogger().handlers[0].setFormatter(formatter)
SOCKET_PATH = "/tmp/airpods_daemon.sock"
def send_command(command):
"""Send a command to the daemon via UNIX domain socket."""
try:
# Create a socket connection to the daemon
logging.info("Connecting to daemon...")
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client_socket.connect(SOCKET_PATH)
logging.info("Connected to daemon")
# Send the command
client_socket.sendall(command)
logging.info(f"Sent command: {command}")
# Close the connection
client_socket.close()
logging.info("Socket closed")
except Exception as e:
logging.error(f"Error communicating with daemon: {e}")
def parse_arguments():
parser = argparse.ArgumentParser(description="Set AirPods ANC mode.")
parser.add_argument("mode", choices=["off", "on", "transparency", "adaptive", "1", "2", "3", "4"], help="ANC mode to set")
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
if args.mode == "off" or args.mode == "1":
command = enums.NOISE_CANCELLATION_OFF
elif args.mode == "on" or args.mode == "2":
command = enums.NOISE_CANCELLATION_ON
elif args.mode == "transparency" or args.mode == "3":
command = enums.NOISE_CANCELLATION_TRANSPARENCY
elif args.mode == "adaptive" or args.mode == "4":
command = enums.NOISE_CANCELLATION_ADAPTIVE
send_command(command)

View File

@@ -0,0 +1 @@
start-daemon.py

View File

@@ -0,0 +1,312 @@
import sys
import socket
import json
import signal
import threading
from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMenu, QAction, QMessageBox
from PyQt5.QtGui import QIcon
from PyQt5.QtCore import QObject, pyqtSignal
import logging
import subprocess
import time
import os
class CustomFormatter(logging.Formatter):
# Define color codes for different log levels
COLORS = {
logging.DEBUG: "\033[48;5;240;38;5;15m%s\033[1;0m", # Grey background, white bold text
logging.INFO: "\033[48;5;34;38;5;15m%s\033[1;0m", # Green background, white bold text
logging.WARNING: "\033[1;48;5;214;38;5;0m%s\033[1;0m", # Orange background, black bold text
logging.ERROR: "\033[1;48;5;202;38;5;15m%s\033[1;0m", # Orange-red background, white bold text
logging.CRITICAL: "\033[1;48;5;196;38;5;15m%s\033[1;0m", # Pure red background, white bold text
}
def format(self, record):
# Apply color to the level name
levelname = self.COLORS.get(record.levelno, "%s") % record.levelname.ljust(8)
record.levelname = levelname
# Format the message
formatted_message = super().format(record)
return formatted_message
# Custom formatter with fixed width for level name
formatter = CustomFormatter('\033[2;37m%(asctime)s\033[1;0m - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG)
# Set the custom formatter for the root logger
logging.getLogger().handlers[0].setFormatter(formatter)
SOCKET_PATH = "/tmp/airpods_daemon.sock"
# Initialize battery_status at the module level
battery_status = {
"LEFT": {"status": "Unknown", "level": 0},
"RIGHT": {"status": "Unknown", "level": 0},
"CASE": {"status": "Unknown", "level": 0}
}
anc_mode = 0
# Define a lock
battery_status_lock = threading.Lock()
class MediaController:
def __init__(self):
self.earStatus = "Both out"
self.wasMusicPlayingInSingle = False
self.wasMusicPlayingInBoth = False
self.firstEarOutTime = 0
self.stop_thread_event = threading.Event()
def playMusic(self):
logging.info("Playing music")
subprocess.call(("playerctl", "play"))
def pauseMusic(self):
logging.info("Pausing music")
subprocess.call(("playerctl", "--all-players", "pause"))
def isPlaying(self):
return "Playing" in subprocess.getoutput("playerctl --all-players status").strip()
def handlePlayPause(self, data):
primary_status = data[0]
secondary_status = data[1]
logging.debug(f"Handling play/pause with data: {data}, previousStatus: {self.earStatus}, wasMusicPlaying: {self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth}")
def delayed_action(s):
if not self.stop_thread_event.is_set():
if self.wasMusicPlayingInSingle:
self.playMusic()
self.wasMusicPlayingInBoth = False
elif self.wasMusicPlayingInBoth or s:
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
if primary_status and secondary_status:
if self.earStatus != "Both out":
s = self.isPlaying()
if s:
self.pauseMusic()
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B off")
logging.info("Setting profile to off")
if self.earStatus == "Only one in":
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
self.wasMusicPlayingInSingle = True
self.wasMusicPlayingInBoth = True
self.stop_thread_event.set()
else:
if s:
self.wasMusicPlayingInSingle = True
else:
self.wasMusicPlayingInSingle = False
elif self.earStatus == "Both in":
s = self.isPlaying()
if s:
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
else:
self.wasMusicPlayingInSingle = False
self.earStatus = "Both out"
return "Both out"
elif not primary_status and not secondary_status:
if self.earStatus != "Both in":
if self.earStatus == "Both out":
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp-sink")
logging.info("Setting profile to a2dp-sink")
elif self.earStatus == "Only one in":
self.stop_thread_event.set()
s = self.isPlaying()
if s:
self.wasMusicPlayingInBoth = True
if self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth:
self.playMusic()
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
self.earStatus = "Both in"
return "Both in"
elif (primary_status and not secondary_status) or (not primary_status and secondary_status):
if self.earStatus != "Only one in":
self.stop_thread_event.clear()
s = self.isPlaying()
if s:
self.pauseMusic()
delayed_thread = threading.Timer(0.3, delayed_action, args=[s])
delayed_thread.start()
self.firstEarOutTime = time.time()
if self.earStatus == "Both out":
os.system("pactl set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp-sink")
logging.info("Setting profile to a2dp-sink")
self.earStatus = "Only one in"
return "Only one in"
# Function to get current sink volume
def get_current_volume():
result = subprocess.run(["pactl", "get-sink-volume", "@DEFAULT_SINK@"], capture_output=True, text=True)
volume_line = result.stdout.splitlines()[0]
volume_percent = int(volume_line.split()[4].strip('%'))
return volume_percent
# Function to set sink volume
def set_volume(percent):
subprocess.run(["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{percent}%"])
initial_volume = get_current_volume()
# Handle conversational awareness
def handle_conversational_awareness(status):
if status < 1 or status > 9:
logging.error(f"Invalid status: {status}")
pass
global initial_volume
# Volume adjustment logic
if status == 1 or status == 2:
globals()["initial_volume"] = get_current_volume()
new_volume = max(0, min(int(initial_volume * 0.1), 100)) # Reduce to 10% for initial speaking
elif status == 3:
new_volume = max(0, min(int(initial_volume * 0.4), 100)) # Slightly increase to 40%
elif status == 6:
new_volume = max(0, min(int(initial_volume * 0.5), 100)) # Set volume to 50%
elif status >= 8:
new_volume = initial_volume # Fully restore volume
set_volume(new_volume)
logging.getLogger("Conversational Awareness").info(f"Volume set to {new_volume}% based on conversational awareness status: {status}")
# If status is 9, print conversation end message
if status == 9:
logging.getLogger("Conversational Awareness").info("Conversation ended. Restored volume to original level.")
class BatteryStatusUpdater(QObject):
battery_status_updated = pyqtSignal()
anc_mode_updated = pyqtSignal()
def __init__(self):
super().__init__()
self.media_controller = MediaController()
def listen_to_socket(self):
global battery_status
global anc_mode
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(SOCKET_PATH)
while True:
data = client.recv(1024)
if data:
try:
response = json.loads(data.decode('utf-8'))
if response["type"] == "battery":
with battery_status_lock:
battery_status = response
logging.debug(f"Received battery status: {response}")
self.battery_status_updated.emit()
elif response["type"] == "ear_detection":
self.media_controller.handlePlayPause([response['primary'], response['secondary']])
logging.debug(f"Received ear detection status: {response}")
elif response["type"] == "anc":
anc_mode = response["mode"]
self.anc_mode_updated.emit()
logging.debug(f"Received ANC status: {anc_mode}")
elif response["type"] == "ca":
ca_status = response["status"]
handle_conversational_awareness(ca_status)
logging.debug(f"Received CA status: {ca_status}")
except json.JSONDecodeError as e:
logging.warning(f"Error deserializing data: {e}")
except KeyError as e:
logging.error(f"KeyError: {e} in data: {response}")
def get_battery_status():
global battery_status
with battery_status_lock:
logging.info(f"Getting battery status: {battery_status}")
left = battery_status["LEFT"]
right = battery_status["RIGHT"]
case = battery_status["CASE"]
left_status = (left['status'] or 'Unknown').title().replace('_', ' ')
right_status = (right['status'] or 'Unknown').title().replace('_', ' ')
case_status = (case['status'] or 'Unknown').title().replace('_', ' ')
return f"Left: {left['level']}% - {left_status} | Right: {right['level']}% - {right_status} | Case: {case['level']}% - {case_status}"
from aln import enums
def set_anc_mode(mode):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(SOCKET_PATH)
command = enums.NOISE_CANCELLATION_OFF
if mode == "on":
command = enums.NOISE_CANCELLATION_ON
elif mode == "off":
command = enums.NOISE_CANCELLATION_OFF
elif mode == "transparency":
command = enums.NOISE_CANCELLATION_TRANSPARENCY
elif mode == "adaptive":
command = enums.NOISE_CANCELLATION_ADAPTIVE
client.sendall(command)
response = client.recv(1024)
return json.loads(response.decode())
def control_anc(action):
response = set_anc_mode(action)
logging.info(f"ANC action: {action}, Response: {response}")
def signal_handler(sig, frame):
logging.info("Exiting...")
QApplication.quit()
sys.exit(0)
# Register the signal handler for SIGINT
signal.signal(signal.SIGINT, signal_handler)
app = QApplication(sys.argv)
# Create the system tray icon
tray_icon = QSystemTrayIcon(QIcon("icon.png"), app)
tray_icon.setToolTip(get_battery_status())
# Create the menu
menu = QMenu()
# Add ANC control actions
anc_on_action = QAction("ANC On")
anc_on_action.triggered.connect(lambda: control_anc("on"))
menu.addAction(anc_on_action)
anc_off_action = QAction("ANC Off")
anc_off_action.triggered.connect(lambda: control_anc("off"))
menu.addAction(anc_off_action)
anc_transparency_action = QAction("Transparency Mode")
anc_transparency_action.triggered.connect(lambda: control_anc("transparency"))
menu.addAction(anc_transparency_action)
anc_adaptive_action = QAction("Adaptive Mode")
anc_adaptive_action.triggered.connect(lambda: control_anc("adaptive"))
menu.addAction(anc_adaptive_action)
quit = QAction("Quit")
quit.triggered.connect(app.quit)
menu.addAction(quit)
# Add the menu to the tray icon
tray_icon.setContextMenu(menu)
# Show the tray icon
tray_icon.show()
# Create an instance of BatteryStatusUpdater
battery_status_updater = BatteryStatusUpdater()
# Connect the signal to the slot
battery_status_updater.battery_status_updated.connect(lambda: tray_icon.setToolTip(get_battery_status()))
# Start the battery status listener thread
listener_thread = threading.Thread(target=battery_status_updater.listen_to_socket, daemon=True)
listener_thread.start()
# Run the application
sys.exit(app.exec_())

View File

@@ -0,0 +1,105 @@
from aln import Connection
from aln import enums
import logging
import threading
import time
import sys
import shutil
AIRPODS_MAC = '28:2D:7F:C2:05:5B'
class CustomFormatter(logging.Formatter):
def format(self, record):
# Format the log message with spaces around colons without altering the original message
formatted_message = record.getMessage().replace(':', ': ')
record.message = formatted_message
return super().format(record)
class ConsoleHandler(logging.StreamHandler):
def __init__(self, stream=None):
super().__init__(stream)
self.terminator = '\n'
self.log_lines = []
def emit(self, record):
try:
msg = self.format(record)
self.log_lines.append(msg)
self.display_logs()
except Exception:
self.handleError(record)
def display_logs(self):
sys.stdout.write('\033[H\033[J') # Clear the screen
terminal_height, _ = shutil.get_terminal_size()
log_display_lines = self.log_lines[-(terminal_height - 5):] # Display the last terminal_height - 5 log lines
empty_lines = terminal_height - 5 - len(log_display_lines)
for _ in range(empty_lines):
sys.stdout.write('\n') # Fill empty space with new lines
for line in log_display_lines:
sys.stdout.write(line + self.terminator)
sys.stdout.write('1: ANC Off\n')
sys.stdout.write('2: Transparency\n')
sys.stdout.write('3: Adaptive Transparency\n')
sys.stdout.write('4: ANC On\n')
sys.stdout.write('Select ANC Mode: ')
sys.stdout.flush()
def input_thread(connection: Connection):
while True:
anc_mode = input()
if anc_mode == '1':
connection.send(enums.NOISE_CANCELLATION_OFF)
logging.info('ANC Off')
elif anc_mode == '2':
connection.send(enums.NOISE_CANCELLATION_TRANSPARENCY)
logging.info('Transparency On')
elif anc_mode == '3':
connection.send(enums.NOISE_CANCELLATION_ADAPTIVE)
logging.info('Adaptive Transparency On')
elif anc_mode == '4':
connection.send(enums.NOISE_CANCELLATION_ON)
logging.info('ANC On')
else:
logging.error('Invalid ANC Mode')
def main():
# Set up logging
handler = ConsoleHandler()
logging.addLevelName(logging.DEBUG, "\033[1;34m%s\033[1;0m" % logging.getLevelName(logging.DEBUG))
logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO))
logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING))
logging.addLevelName(logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
logging.addLevelName(logging.CRITICAL, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.CRITICAL))
formatter = CustomFormatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s')
handler.setFormatter(formatter)
logging.basicConfig(level=logging.DEBUG, handlers=[handler])
connection = Connection(AIRPODS_MAC)
connection.connect()
logging.info('Sending Handshake')
connection.send(enums.HANDSHAKE)
logging.info('Initializing Notifications')
connection.initialize_notifications()
logging.info('Initialized Notifications')
# Start the input thread
thread = threading.Thread(target=input_thread, args=(connection,))
thread.daemon = True
thread.start()
try:
# Keep the main thread alive to handle logging
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info('Program interrupted. Exiting...')
connection.disconnect() # Ensure the connection is properly closed
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,323 @@
import threading
import bluetooth
import subprocess
import time
import threading
import os
# Bluetooth MAC address of AirPods
AIRPODS_MAC = "28:2D:7F:C2:05:5B"
class initL2CAP():
lastEarStatus = ""
earStatus = ""
wasMusicPlayingInBoth = False
wasMusicPlayingInSingle = False
def pauseMusic(self):
subprocess.call(("playerctl", "pause", "--ignore-player", "OnePlus_7"))
def playMusic(self):
subprocess.call(("playerctl", "play", "--ignore-player", "OnePlus_7"))
def getMusicStatus(self):
return subprocess.getoutput("playerctl status --ignore-player OnePlus_7").strip()
# Change to MAC address of your AirPods
connected = False
cmd_off = b"\x04\x00\x04\x00\x09\x00\x0d\x01\x00\x00\x00"
cmd_on = b"\x04\x00\x04\x00\x09\x00\x0d\x02\x00\x00\x00"
cmd_transparency = b"\x04\x00\x04\x00\x09\x00\x0d\x03\x00\x00\x00"
cmd_adaptive = b"\x04\x00\x04\x00\x09\x00\x0d\x04\x00\x00\x00"
cmd_ca_off = b"\x04\x00\x04\x00\x09\x00\x28\x02\x00\x00\x00"
cmd_ca_on = b"\x04\x00\x04\x00\x09\x00\x28\x01\x00\x00\x00"
def start(self):
cmd_handshake = b"\x00\x00\x04\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
# cmd_smth0 = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xfe\xff"
cmd_smth1 = b"\x04\x00\x04\x00\x0f\x00\xff\xff\xff\xff"
address = "28:2D:7F:C2:05:5B"
aap_service = "74EC2172-0BAD-4D01-8F77-997B2BE0722A"
aap_port = 0x1001
services = bluetooth.find_service(address=address)
service = [s for s in services if s["service-classes"] == [aap_service]]
if not service:
print("Device does not have AAP service")
exit()
self.sock = bluetooth.BluetoothSocket(bluetooth.L2CAP)
sock = self.sock
sock.connect((address, aap_port))
print("Connected to AirPods")
self.connected = True
print("Sending handshake...")
print(sock.type)
sock.send(cmd_handshake)
# sock.send(cmd_smth0)
sock.send(cmd_smth1)
threading.Thread(target=self.listen).start()
# battery info: 04 00 04 00 04 00 03 02 01 64 01 01 04 01 64 01 01 08 01 34 02 01
def parse_battery_status(self, data):
if len(data) != 22:
return
self.left_bud_level = data[9]
self.left_bud_status = data[10]
self.right_bud_level = data[14]
self.right_bud_status = data[15]
self.case_level = data[19]
self.case_status = data[20]
# Interpret the status
def interpret_status(status):
if status == 1:
return "Charging"
elif status == 2:
return "Not charging"
elif status == 4:
return "Disconnected"
else:
return "Unknown"
# Print the results
print(f"Left Bud: {self.left_bud_level}% - {interpret_status(self.left_bud_status)}")
print(f"Right Bud: {self.right_bud_level}% - {interpret_status(self.right_bud_status)}")
print(f"Case: {self.case_level}% - {interpret_status(self.case_status)}")
def parse_anc_status(self, data):
# 04 00 04 00 09 00 0d 03 00 00 00
if len(data) != 11 and data.hex().startswith("040004000600"):
return
if data[7] == 1:
return "Off"
elif data[7] == 2:
return "On"
elif data[7] == 3:
return "Transparency"
elif data[7] == 4:
return "Adaptive"
firstEarOutTime = 0
stop_thread_event = threading.Event()
def parse_inear_status(self, data):
if len(data) != 8:
return
second_status = data[6]
first_status = data[7]
def delayed_action(self, s):
print(s)
if not self.stop_thread_event.is_set():
print("Delayed action")
if self.wasMusicPlayingInSingle:
self.playMusic()
self.wasMusicPlayingInBoth = False
elif self.wasMusicPlayingInBoth or s == "Playing":
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
if first_status and second_status:
if self.earStatus != "Both out":
s = self.getMusicStatus()
self.pauseMusic()
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B off")
if self.earStatus == "Only one in":
if self.firstEarOutTime != 0 and time.time() - self.firstEarOutTime < 0.3:
print("Only one in called with both out")
self.wasMusicPlayingInSingle = True
self.wasMusicPlayingInBoth = True
self.stop_thread_event.set()
else:
if s == "Playing":
self.wasMusicPlayingInSingle = True
else:
self.wasMusicPlayingInSingle = False
# wasMusicPlayingInSingle = True
elif self.earStatus == "Both in":
# should be unreachable
s = self.getMusicStatus()
if s == "Playing":
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
else:
self.wasMusicPlayingInSingle = False
self.earStatus = "Both out"
return "Both out"
elif not first_status and not second_status:
if self.earStatus != "Both in":
if self.earStatus == "Both out":
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp_sink")
elif self.earStatus == "Only one in":
self.stop_thread_event.set()
s = self.getMusicStatus()
if s == "Playing":
self.wasMusicPlayingInBoth = True
if self.wasMusicPlayingInSingle or self.wasMusicPlayingInBoth:
self.playMusic()
self.wasMusicPlayingInBoth = True
self.wasMusicPlayingInSingle = False
self.earStatus = "Both in"
return "Both in"
elif (first_status and not second_status) or (not first_status and second_status):
if self.earStatus != "Only one in":
self.stop_thread_event.clear()
s = self.getMusicStatus()
self.pauseMusic()
delayed_thread = threading.Timer(0.3, delayed_action, args=[self, s])
delayed_thread.start()
self.firstEarOutTime = time.time()
if self.earStatus == "Both out":
os.system("pacmd set-card-profile bluez_card.28_2D_7F_C2_05_5B a2dp_sink")
self.earStatus = "Only one in"
return "Only one in"
def listen(self):
while True:
res = self.sock.recv(1024)
print(f"Response: {res.hex()}")
self.battery_status = self.parse_battery_status(res)
self.inear_status = self.parse_inear_status(res)
# anc_status = parse_anc_status(res)
# if anc_status:
# print("ANC: ", anc_status)
if self.battery_status:
print(self.battery_status)
if self.inear_status:
print(self.inear_status)
# while True:
# print("Select command:")
# print("1. Turn off")
# print("2. Turn on")
# print("3. Toggle transparency")
# print("4. Toggle Adaptive")
# print("5. Conversational Awareness On")
# print("6. Conversational Awareness Off")
# print("0. Exit")
# cmd = input("Enter command: ")
# if cmd == "0":
# break
# elif cmd == "1":
# self.sock.send(cmd_off)
# elif cmd == "2":
# self.sock.send(cmd_on)
# elif cmd == "3":
# self.sock.send(cmd_transparency)
# elif cmd == "4":
# self.sock.send(cmd_adaptive)
# elif cmd == "5":
# self.sock.send(cmd_ca_on)
# elif cmd == "6":
# self.sock.send(cmd_ca_off)
def stop(self):
self.connected = False
self.sock.close()
def is_bluetooth_connected():
try:
result = subprocess.run(["bluetoothctl", "info", AIRPODS_MAC], capture_output=True, text=True)
return "Connected: yes" in result.stdout
except Exception as e:
print(f"Error checking Bluetooth connection status: {e}")
return False
# Connect to Bluetooth device using bluetoothctl if not already connected
def connect_bluetooth_device():
if is_bluetooth_connected():
print("AirPods are already connected.")
return
print("Checking if AirPods are available...")
result = subprocess.run(["bluetoothctl", "devices"], capture_output=True, text=True)
if AIRPODS_MAC in result.stdout:
print("AirPods are available. Connecting...")
subprocess.run(["bluetoothctl", "connect", AIRPODS_MAC])
else:
print("AirPods are not available.")
time.sleep(2) # Wait for the connection to establish
# Switch audio output to AirPods (PulseAudio)
try:
result = subprocess.run(["pactl", "list", "short", "sinks"], capture_output=True, text=True)
sink_name = next((line.split()[1] for line in result.stdout.splitlines() if "bluez_sink" in line), None)
if sink_name:
subprocess.run(["pactl", "set-default-sink", sink_name])
print(f"Switched audio to AirPods: {sink_name}")
else:
print("Failed to switch audio to AirPods.")
except Exception as e:
print(f"Error switching audio: {e}")
# Disconnect from Bluetooth device if connected
def disconnect_bluetooth_device():
if not is_bluetooth_connected():
print("AirPods are already disconnected.")
return
print("Disconnecting from AirPods...")
subprocess.run(["bluetoothctl", "disconnect", AIRPODS_MAC])
l2cap = initL2CAP()
# Function to listen to `playerctl --follow` and react to status changes
def mediaListener():
try:
# Run playerctl --follow in a subprocess
process = subprocess.Popen(
["playerctl", "--follow", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# Continuously read from the subprocess stdout
for line in process.stdout:
if line: # Make sure the line is not empty
line = line.strip() # Remove any extraneous whitespace
print(f"Received event from playerctl: {line}")
if "Playing" in line:
print("Media started playing")
connect_bluetooth_device()
if not l2cap.connected:
l2cap.start()
elif "Paused" in line or "Stopped" in line:
print("Media paused or stopped")
# disconnect_bluetooth_device()
# Check for any errors in stderr
stderr = process.stderr.read()
if stderr:
print(f"Error: {stderr}")
except Exception as e:
print(f"An error occurred in mediaListener: {e}")
mediaListener()
# thread = threading.Thread(target=mediaListener)
# thread.start()
# thread.stop()

BIN
linux/icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.3 KiB

BIN
linux/imgs/daemon-log.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 648 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 216 KiB

BIN
linux/imgs/read-data.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 330 KiB

BIN
linux/imgs/set-anc.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

25
linux/pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["aln"]
[project]
name = "aln"
version = "0.0.1"
authors = [
{ name="Kavish Devar", email="mail@kavishdevar.me" },
]
description = "Use your AirPods the way they were intended."
readme = "README.md"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.urls]
Homepage = "https://github.com/kavishdevar/aln"
Issues = "https://github.com/kavishdevar/aln/issues"

248
linux/start-daemon.py Normal file
View File

@@ -0,0 +1,248 @@
import logging.handlers
import socket
import threading
import signal
import sys
import logging
from aln import Connection, enums
from aln.Notifications import Notifications
from aln.Notifications.Battery import Battery
import os
import bluetooth
from aln.enums import enums
connection = None
AIRPODS_MAC = '28:2D:7F:C2:05:5B'
SOCKET_PATH = '/tmp/airpods_daemon.sock'
LOG_FOLDER = '.'
LOG_FILE = os.path.join(LOG_FOLDER, 'airpods_daemon.log')
# Global flag to control the server loop
running = True
# Configure logging to write to a file
# logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format='%(asctime)s %(levelname)s : %(message)s')
# RotatingFileHandler
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=2**20)
handler.setFormatter(formatter)
handler.setLevel(logging.DEBUG)
l = logging.getLogger()
l.setLevel(logging.DEBUG)
l.addHandler(handler)
from json import JSONEncoder
def handle_client(connection, client_socket):
"""Handle client requests by forwarding all received data to aln.Connection, send data back to the client."""
def send_status():
while running:
try:
for notif_key in list(globals().keys()):
if notif_key.startswith("notif_"):
data = globals().get(notif_key)
if data:
if notif_key == "notif_battery":
data: list[Battery] = data
batteryJSON = {"type": "battery"}
for i in data:
batteryJSON[i.get_component()] = {
"status": i.get_status(),
"level": i.get_level()
}
data: str = JSONEncoder().encode(batteryJSON)
elif notif_key == "notif_ear_detection":
# noinspection PyTypeChecker
data: list[int] = data
earDetectionJSON = {
"type": "ear_detection",
"primary": data[0],
"secondary": data[1]
}
data: str = JSONEncoder().encode(earDetectionJSON)
elif notif_key == "notif_anc":
data: int = data
ancJSON = {
"type": "anc",
"mode": data,
}
data: str = JSONEncoder().encode(ancJSON)
elif notif_key == "notif_ca":
data: int = data
caJSON = {
"type": "ca",
"status": data,
}
data: str = JSONEncoder().encode(caJSON)
elif notif_key == "notif_unknown":
logging.debug(f"Unhandled notification type: {notif_key}")
logging.debug(f"Data: {data}")
data: str = JSONEncoder().encode({"type": "unknown", "data": data})
if not client_socket or not isinstance(client_socket, socket.socket):
logging.error("Invalid client socket")
break
logging.info(f'Sending {notif_key} status: {data}')
client_socket.sendall(data.encode('utf-8'))
logging.info(f'Sent {notif_key} status: {data}')
globals()[notif_key] = None
except socket.error as e:
logging.error(f"Socket error sending status: {e}")
break
except Exception as e:
logging.error(f"Error sending status: {e}")
break
def receive_commands():
while running:
try:
data = client_socket.recv(1024)
if not data:
break
logging.info(f'Received command: {data}')
connection.send(data)
except Exception as e:
logging.error(f"Error receiving command: {e}")
break
# Start two threads to handle sending and receiving data
send_thread = threading.Thread(target=send_status)
send_thread.start()
receive_thread = threading.Thread(target=receive_commands)
receive_thread.start()
send_thread.join()
receive_thread.join()
client_socket.close()
logging.info("Client socket closed")
def start_socket_server(connection):
"""Start a UNIX domain socket server."""
global running
# Set up the socket
server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
server_socket.bind(SOCKET_PATH)
except OSError:
logging.error(f"Socket already in use or unavailable: {SOCKET_PATH}")
sys.exit(1)
server_socket.listen(1)
logging.info(f"Socket server listening on {SOCKET_PATH}")
while running:
try:
client_socket, _ = server_socket.accept()
logging.info("Client connected")
# Handle the client connection in a separate thread
client_thread = threading.Thread(target=handle_client, args=(connection, client_socket))
client_thread.start()
except Exception as e:
logging.error(f"Error accepting connection: {e}")
# Close the server socket when stopped
server_socket.close()
logging.info("Socket server stopped")
def stop_daemon(_, __):
"""Signal handler to stop the daemon."""
global running
logging.info("Received termination signal. Stopping daemon...")
running = False # Set running flag to False to stop the loop
# Close the socket gracefully by removing the file path
try:
socket.socket(socket.AF_UNIX, socket.SOCK_STREAM).connect(SOCKET_PATH)
except socket.error:
pass
finally:
# Remove the socket file
if os.path.exists(SOCKET_PATH):
os.remove(SOCKET_PATH)
sys.exit(0)
def notification_handler(notification_type: int, data: bytes):
global connection
logging.debug(f"Received notification: {notification_type}")
if notification_type == Notifications.BATTERY_UPDATED:
logger = logging.getLogger("Battery Status")
battery = connection.notificationListener.BatteryNotification.getBattery()
globals()["notif_battery"] = battery
for i in battery:
logger.debug(f'{i.get_component()} - {i.get_status()}: {i.get_level()}')
elif notification_type == Notifications.EAR_DETECTION_UPDATED:
logger = logging.getLogger("In-Ear Status")
earDetection = connection.notificationListener.EarDetectionNotification.getEarDetection()
globals()["notif_ear_detection"] = earDetection
logger.debug(earDetection)
elif notification_type == Notifications.ANC_UPDATED:
logger = logging.getLogger("ANC Status")
anc = connection.notificationListener.ANCNotification.status
globals()["notif_anc"] = anc
logger.debug(anc)
elif notification_type == Notifications.CA_UPDATED:
logger = logging.getLogger("Conversational Awareness Status")
ca = connection.notificationListener.ConversationalAwarenessNotification.status
globals()["notif_ca"] = ca
logger.debug(ca)
elif notification_type == Notifications.UNKNOWN:
logger = logging.getLogger("Unknown Notification")
hex_data = ' '.join(f'{byte:02x}' for byte in data)
globals()["notif_unknown"] = hex_data
logger.debug(hex_data)
def main():
global running
logging.info("Starting AirPods daemon")
connection = Connection(AIRPODS_MAC)
globals()['connection'] = connection
# Connect to the AirPods and send the handshake
try:
connection.connect()
except bluetooth.btcommon.BluetoothError as e:
logging.error(f"Failed to connect to {AIRPODS_MAC}: {e}")
sys.exit(1)
connection.send(enums.HANDSHAKE)
logging.info("Handshake sent")
connection.initialize_notifications(notification_handler)
# Start the socket server to listen for client connections
start_socket_server(connection)
# Set up signal handlers to handle termination signals
signal.signal(signal.SIGINT, stop_daemon) # Handle Ctrl+C
signal.signal(signal.SIGTERM, stop_daemon) # Handle kill signal
if __name__ == "__main__":
# Daemonize the process
if os.fork():
sys.exit()
os.setsid()
if os.fork():
sys.exit()
sys.stdout.flush()
sys.stderr.flush()
with open('/dev/null', 'r') as devnull:
os.dup2(devnull.fileno(), sys.stdin.fileno())
with open(LOG_FILE, 'a+') as logfile:
os.dup2(logfile.fileno(), sys.stdout.fileno())
os.dup2(logfile.fileno(), sys.stderr.fileno())
main()

38
linux/test-head.py Normal file
View File

@@ -0,0 +1,38 @@
import bluetooth
import threading
sock = bluetooth.BluetoothSocket(bluetooth.L2CAP)
bt_addr = "28:2D:7F:C2:05:5B" # sys.argv[1]
psm = 0x1001 # AAP
print(f"Trying to connect to {bt_addr} on PSM 0x{psm:04X}...")
sock.connect((bt_addr, psm))
running = threading.Event()
def listen():
global running
while not running.is_set():
res = sock.recv(1024)
print(f"Response: {res.hex()}")
t = threading.Thread(target=listen)
t.start()
print("Connected. Type something...")
try:
byts = bytes(int(b, 16) for b in "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00".split(" "))
sock.send(byts)
byts = bytes(int(b, 16) for b in "04 00 04 00 0F 00 FF FF FE FF".split(" "))
sock.send(byts)
byts = bytes(int(b, 16) for b in "04 00 04 00 17 00 00 00 10 00 11 00 08 7C 10 02 42 0B 08 4E 10 02 1A 05 01 40 9C 00 00".split(" "))
sock.send(byts)
import time
time.sleep(5)
byts = bytes(int(b, 16) for b in "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00".split(" "))
sock.send(byts)
except:
...
running.set()
sock.close()