diff --git a/.gitignore b/.gitignore index 4decbf5..7b9a2dc 100644 --- a/.gitignore +++ b/.gitignore @@ -151,5 +151,8 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +# VSCode .vscode + +# Testcase for personal use test/ \ No newline at end of file diff --git a/demo.py b/demo_v2.py similarity index 98% rename from demo.py rename to demo_v2.py index b6aa6d9..4c66ca1 100644 --- a/demo.py +++ b/demo_v2.py @@ -29,4 +29,4 @@ async def _(): await dglab_instance.close() -asyncio.run(_()) +asyncio.run(_()) \ No newline at end of file diff --git a/demo_v3.py b/demo_v3.py new file mode 100644 index 0000000..a5def97 --- /dev/null +++ b/demo_v3.py @@ -0,0 +1,35 @@ +# Description: This is a demo script to show how to use the pydglab library to interact with the DGLab device. + +import asyncio +import logging + +import pydglab +from pydglab import model_v3 + +logging.basicConfig( + format="%(module)s [%(levelname)s]: %(message)s", level=logging.DEBUG +) + + +async def _(): + await pydglab.scan() + dglab_instance = pydglab.dglab_v3() + try: + await dglab_instance.create() + except TimeoutError: + logging.error("Timeout, retrying...") + await dglab_instance.create() + await dglab_instance.set_coefficient(100, 100, 100, model_v3.ChannelA) + await dglab_instance.set_coefficient(100, 100, 100, model_v3.ChannelB) + await dglab_instance.get_strength() + await dglab_instance.set_strength_sync(1, 1) + await dglab_instance.set_wave_sync(0, 0, 0, 0, 0, 0) + await dglab_instance.set_wave_set( + model_v3.Wave_set["Going_Faster"], model_v3.ChannelA + ) + await dglab_instance.get_strength() + await asyncio.sleep(2) + await dglab_instance.close() + + +asyncio.run(_()) diff --git a/pydglab/__init__.py b/pydglab/__init__.py index cd6654f..64fcdb8 100644 --- a/pydglab/__init__.py +++ b/pydglab/__init__.py @@ -10,6 +10,8 @@ handler.setFormatter(logging.Formatter(fmt=LOGFORMAT)) _logger.addHandler(handler) -from .service import dglab +from .service import dglab, dglab_v3 from .model import * from .bthandler import scan +from .model_v3 import * +from .bthandler_v3 import scan diff --git a/pydglab/bthandler_v3.py b/pydglab/bthandler_v3.py new file mode 100644 index 0000000..fbc583b --- /dev/null +++ b/pydglab/bthandler_v3.py @@ -0,0 +1,87 @@ +import logging +from bleak import BleakClient, BleakScanner +from typing import Tuple, List +from bitstring import BitArray + +from pydglab.model_v3 import * +from pydglab.uuid import * + +logger = logging.getLogger(__name__) + + +async def scan(): + """ + Scan for DGLAB v3.0 devices and return a list of tuples with the address and the RSSI of the devices found. + + Returns: + List[Tuple[str, int]]: (address, RSSI) + """ + devices = await BleakScanner().discover(return_adv=True) + dglab_v3: List[Tuple[str, int]] = [] + for i, j in devices.values(): + if j.local_name == CoyoteV3.name and i.address is not None: + logger.info(f"Found DGLAB v3.0 {i.address}") + dglab_v3.append((i.address, j.rssi)) + if not dglab_v3: + logger.error("No DGLAB v3.0 found") + return dglab_v3 + + +async def scan_(): + dglab_v3 = await scan() + if not dglab_v3: + raise Exception("No DGLAB v3.0 found") + if len(dglab_v3) > 1: + logger.warning("Multiple DGLAB v3.0 found, chosing the closest one") + elif len(dglab_v3) == 0: + raise Exception("No DGLAB v3.0 found") + return sorted(dglab_v3, key=lambda device: device[1])[0][0] + + +async def notify_(client: BleakClient, characteristics: CoyoteV3, callback: callable): + await client.start_notify(characteristics.characteristicNotify, callback) + + +async def write_strenth_(client: BleakClient, value: Coyote_v3, characteristics: CoyoteV3): + struct = ( + 0xB0, + 0b00010000 + 0b00001111, + value.ChannelA.strength, + value.ChannelB.strength, + value.ChannelA.wave, + value.ChannelA.waveStrenth, + value.ChannelB.wave, + value.ChannelB.waveStrenth, + ) + bytes_ = bytes( + tuple( + item if isinstance(item, int) else subitem + for item in struct + for subitem in (tuple(item) if isinstance(item, list) else (item,)) + ) + ) + logger.debug(f"Sending bytes: {bytes_.hex()} , which is {bytes_}") + await client.write_gatt_char(characteristics.characteristicWrite, bytes_) + + +async def write_coefficient_( + client: BleakClient, value: Coyote_v3, characteristics: CoyoteV3 +): + struct = ( + 0xBF, + value.ChannelA.limit, + value.ChannelB.limit, + value.ChannelA.coefficientFrequency, + value.ChannelB.coefficientFrequency, + value.ChannelA.coefficientStrenth, + value.ChannelB.coefficientStrenth, + ) + bytes_ = bytes( + tuple( + item if isinstance(item, int) else subitem + for item in struct + for subitem in (item if isinstance(item, tuple) else (item,)) + ) + ) + logger.debug(f"Sending bytes: {bytes_.hex()} , which is {bytes_}") + await client.write_gatt_char(characteristics.characteristicWrite, bytes_) diff --git a/pydglab/model.py b/pydglab/model.py index e4ac08c..14360de 100644 --- a/pydglab/model.py +++ b/pydglab/model.py @@ -44,4 +44,4 @@ def __init__(self): (1, 14, 20), (1, 9, 20), ], -} +} \ No newline at end of file diff --git a/pydglab/model_v3.py b/pydglab/model_v3.py new file mode 100644 index 0000000..435a72e --- /dev/null +++ b/pydglab/model_v3.py @@ -0,0 +1,48 @@ +from typing import Optional + + +class ChannelA(object): + def __init__(self): + self.strength: Optional[int] = None + self.wave: Optional[list[int]] = [0, 0, 0, 0] + self.waveStrenth: Optional[list[int]] = [0, 0, 0, 0] + self.coefficientStrenth: Optional[int] = None + self.coefficientFrequency: Optional[int] = None + self.limit: Optional[int] = None + + +class ChannelB(object): + def __init__(self): + self.strength: Optional[int] = None + self.wave: Optional[list[int]] = [0, 0, 0, 0] + self.waveStrenth: Optional[list[int]] = [0, 0, 0, 0] + self.coefficientStrenth: Optional[int] = None + self.coefficientFrequency: Optional[int] = None + self.limit: Optional[int] = None + + +class Coyote_v3(object): + def __init__(self): + self.ChannelA: Optional[ChannelA] = ChannelA() + self.ChannelB: Optional[ChannelB] = ChannelB() + + +Wave_set = { + "Going_Faster": [ + (5, 135, 20), + (5, 125, 20), + (5, 115, 20), + (5, 105, 20), + (5, 95, 20), + (4, 86, 20), + (4, 76, 20), + (4, 66, 20), + (3, 57, 20), + (3, 47, 20), + (3, 37, 20), + (2, 28, 20), + (2, 18, 20), + (1, 14, 20), + (1, 9, 20), + ], +} diff --git a/pydglab/service.py b/pydglab/service.py index 9af8a28..1ac38f7 100644 --- a/pydglab/service.py +++ b/pydglab/service.py @@ -1,14 +1,14 @@ -import logging, asyncio +import logging, asyncio, time from bleak import BleakClient from typing import Tuple from pydglab.model import * +from pydglab.model_v3 import * from pydglab.uuid import * -from pydglab.bthandler import * -import time +import pydglab.bthandler as v2 +import pydglab.bthandler_v3 as v3 logger = logging.getLogger(__name__) - class dglab(object): coyote = Coyote() @@ -30,7 +30,7 @@ async def create(self) -> "dglab": if self.address is None: # If address is not provided, scan for it. - self.address = await scan_() + self.address = await v2.scan_() # Connect to the device. logger.debug(f"Connecting to {self.address}") @@ -111,7 +111,7 @@ async def get_batterylevel(self) -> int: int: The battery level as an integer value. """ - value = await get_batterylevel_(self.client, self.characteristics) + value = await v2.get_batterylevel_(self.client, self.characteristics) value = value[0] logger.debug(f"Received battery level: {value}") self.coyote.Battery = int(value) @@ -125,7 +125,7 @@ async def get_strength(self) -> Tuple[int, int]: Returns: Tuple[int, int]: 通道A强度,通道B强度 """ - value = await get_strength_(self.client, self.characteristics) + value = await v2.get_strength_(self.client, self.characteristics) logger.debug(f"Received strength: A: {value[0]}, B: {value[1]}") self.coyote.ChannelA.strength = int(value[0]) self.coyote.ChannelB.strength = int(value[1]) @@ -149,7 +149,7 @@ async def set_strength(self, strength: int, channel: ChannelA | ChannelB) -> Non self.coyote.ChannelA.strength = strength elif channel is ChannelB: self.coyote.ChannelB.strength = strength - r = await set_strength_(self.client, self.coyote, self.characteristics) + r = await v2.set_strength_(self.client, self.coyote, self.characteristics) logger.debug(f"Set strength response: {r}") return ( self.coyote.ChannelA.strength @@ -172,7 +172,7 @@ async def set_strength_sync(self, strengthA: int, strengthB: int) -> None: """ self.coyote.ChannelA.strength = strengthA self.coyote.ChannelB.strength = strengthB - r = await set_strength_(self.client, self.coyote, self.characteristics) + r = await v2.set_strength_(self.client, self.coyote, self.characteristics) logger.debug(f"Set strength response: {r}") return self.coyote.ChannelA.strength, self.coyote.ChannelB.strength @@ -314,9 +314,9 @@ async def set_wave_sync( """ self.channelA_wave_set = [(waveX_A, waveY_A, waveZ_A)] self.channelB_wave_set = [(waveX_B, waveY_B, waveZ_B)] - r = await set_wave_( + r = await v2.set_wave_( self.client, self.coyote.ChannelA, self.characteristics - ), await set_wave_(self.client, self.coyote.ChannelB, self.characteristics) + ), await v2.set_wave_(self.client, self.coyote.ChannelB, self.characteristics) return (waveX_A, waveY_A, waveZ_A), (waveX_B, waveY_B, waveZ_B) async def _keep_wave(self) -> None: @@ -337,15 +337,15 @@ async def _keep_wave(self) -> None: # Refresh A and B channel by turn if self.coyote.ChannelA.strength == 0: - r = (0,0,0), await set_wave_(self.client, self.coyote.ChannelB, self.characteristics) + r = (0,0,0), await v2.set_wave_(self.client, self.coyote.ChannelB, self.characteristics) elif self.coyote.ChannelB.strength == 0: - r = await set_wave_(self.client, self.coyote.ChannelA, self.characteristics), (0,0,0) + r = await v2.set_wave_(self.client, self.coyote.ChannelA, self.characteristics), (0,0,0) else: if last_refreshing == ChannelA: - r = (0,0,0), await set_wave_(self.client, self.coyote.ChannelB, self.characteristics) + r = (0,0,0), await v2.set_wave_(self.client, self.coyote.ChannelB, self.characteristics) last_refreshing = ChannelB elif last_refreshing == ChannelB: - r = await set_wave_(self.client, self.coyote.ChannelA, self.characteristics), (0,0,0) + r = await v2.set_wave_(self.client, self.coyote.ChannelA, self.characteristics), (0,0,0) last_refreshing = ChannelA logger.debug(f"Set wave response: {r}") logger.debug(f"Time elapsed: {time.time() - last_time_local}") @@ -370,3 +370,391 @@ async def close(self): pass await self.client.disconnect() return None + + +class dglab_v3(object): + coyote = Coyote_v3() + + def __init__(self, address: str = None) -> None: + self.address = address + return None + + async def create(self) -> "dglab_v3": + """ + 建立郊狼连接并初始化。 + Creates a connection to the DGLAB device and initialize. + + Returns: + dglab: The initialized DGLAB object. + + Raises: + Exception: If the device is not supported or if an unknown device is connected. + """ + + if self.address is None: + # If address is not provided, scan for it. + self.address = await v3.scan_() + + # Connect to the device. + logger.debug(f"Connecting to {self.address}") + self.client = BleakClient(self.address, timeout=20.0) + await self.client.connect() + + # Wait for a second to allow service discovery to complete + await asyncio.sleep(1) + + # Check if the device is valid. + services = self.client.services + service = [service.uuid for service in services] + logger.debug(f"Got services: {str(service)}") + if CoyoteV2.serviceBattery in service and CoyoteV2.serviceEStim in service: + raise Exception("Use dglab_v2 instead") + elif CoyoteV3.serviceWrite in service and CoyoteV3.serviceNotify in service: + logger.info("Connected to DGLAB v3.0") + + # Update BleakGATTCharacteristic into characteristics list, to optimize performence. + self.characteristics = CoyoteV3 + logger.debug(f"Got characteristics: {str(self.characteristics)}") + for i in self.client.services.characteristics.values(): + if i.uuid == self.characteristics.characteristicWrite: + self.characteristics.characteristicWrite = i + elif i.uuid == self.characteristics.characteristicNotify: + self.characteristics.characteristicNotify = i + + else: + raise Exception( + "Unknown device (你自己看看你连的是什么jb设备)" + ) # Sorry for my language. + + self.channelA_wave_set: list[tuple[int, int, int]] = [] + self.channelB_wave_set: list[tuple[int, int, int]] = [] + + # Initialize notify + await v3.notify_(self.client, self.characteristics, self.notify_callback) + + # Initialize self.coyote + self.coyote.ChannelA.limit = 200 + self.coyote.ChannelB.limit = 200 + self.coyote.ChannelA.coefficientStrenth = 100 + self.coyote.ChannelB.coefficientStrenth = 100 + self.coyote.ChannelA.coefficientFrequency = 100 + self.coyote.ChannelB.coefficientFrequency = 100 + + await self.set_coefficient(200, 100, 100, ChannelA) + await self.set_coefficient(200, 100, 100, ChannelB) + await self.set_wave_sync(0, 0, 0, 0, 0, 0) + await self.set_strength_sync(0, 0) + + # Start the wave tasks, to keep the device functioning. + self.wave_tasks = asyncio.gather( + self._retainer(), + ) + + return self + + @classmethod + async def from_address(cls, address: str) -> "dglab_v3": + """ + 从指定的地址创建一个新的郊狼实例,在需要同时连接多个设备时格外好用。 + Creates a new instance of the 'dglab' class using the specified address. + + Args: + address (str): The address to connect to. + + Returns: + dglab: An instance of the 'dglab' class. + + """ + + return cls(address) + + async def notify_callback(self, sender: BleakGATTCharacteristic, data: bytearray): + logger.debug(f"{sender}: {data}") + if data[0] == 0xB1: + # self.coyote.ChannelA.strength = int(data[2]) + # self.coyote.ChannelB.strength = int(data[3]) + logger.debug(f"Getting bytes(0xB1): {data.hex()} , which is {data}") + if data[0] == 0xBE: + # self.coyote.ChannelA.limit = int(data[1]) + # self.coyote.ChannelB.limit = int(data[2]) + # self.coyote.ChannelA.coefficientFrequency = int(data[3]) + # self.coyote.ChannelB.coefficientFrequency = int(data[4]) + # self.coyote.ChannelA.coefficientStrenth = int(data[5]) + # self.coyote.ChannelB.coefficientStrenth = int(data[6]) + logger.debug(f"Getting bytes(0xBE): {data.hex()} , which is {data}") + + async def get_strength(self) -> Tuple[int, int]: + """ + 读取郊狼当前强度。 + Retrieves the strength of the device. + + Returns: + Tuple[int, int]: 通道A强度,通道B强度 + """ + return self.coyote.ChannelA.strength, self.coyote.ChannelB.strength + + async def set_strength(self, strength: int, channel: ChannelA | ChannelB) -> None: + """ + 设置电压强度。 + 额外设置这个函数用于单独调整强度只是为了和设置波形的函数保持一致罢了。 + Set the strength of the device. + + Args: + strength (int): 电压强度 + channel (ChannelA | ChannelB): 对手频道 + + Returns: + int: 电压强度 + """ + + if channel is ChannelA: + self.coyote.ChannelA.strength = strength + elif channel is ChannelB: + self.coyote.ChannelB.strength = strength + return ( + self.coyote.ChannelA.strength + if channel is ChannelA + else self.coyote.ChannelB.strength + ) + + async def set_coefficient(self, strength_limit: int, strength_coefficient: int, frequency_coefficient: int, channel: ChannelA | ChannelB) -> None: + """ + 设置强度上线与平衡常数。 + Set the strength limit and coefficient of the device. + + Args: + strength_limit (int): 电压强度上限 + strength_coefficient (int): 强度平衡常数 + frequency_coefficient (int): 频率平衡常数 + channel (ChannelA | ChannelB): 对手频道 + + Returns: + Tuple[int, int, int]: 电压强度上限,强度平衡常数,频率平衡常数 + """ + + if channel is ChannelA: + self.coyote.ChannelA.limit = strength_limit + self.coyote.ChannelA.coefficientStrenth = strength_coefficient + self.coyote.ChannelA.coefficientFrequency = frequency_coefficient + elif channel is ChannelB: + self.coyote.ChannelB.limit = strength_limit + self.coyote.ChannelB.coefficientStrenth = strength_coefficient + self.coyote.ChannelB.coefficientFrequency = frequency_coefficient + + await v3.write_coefficient_(self.client, self.coyote, self.characteristics) + + return ( + (self.coyote.ChannelA.limit, self.coyote.ChannelA.coefficientStrenth, self.coyote.ChannelA.coefficientFrequency) + if channel is ChannelA + else (self.coyote.ChannelB.limit, self.coyote.ChannelB.coefficientStrenth, self.coyote.ChannelB.coefficientFrequency) + ) + + async def set_strength_sync(self, strengthA: int, strengthB: int) -> None: + """ + 同步设置电流强度。 + 这是正道。 + Set the strength of the device synchronously. + + Args: + strengthA (int): 通道A电压强度 + strengthB (int): 通道B电压强度 + + Returns: + (int, int): A通道强度,B通道强度 + """ + self.coyote.ChannelA.strength = strengthA + self.coyote.ChannelB.strength = strengthB + return self.coyote.ChannelA.strength, self.coyote.ChannelB.strength + + """ + How wave set works: + 1. Set the wave set for channel A and channel B. + 2. The wave set will be looped indefinitely by + wave_set_handler, and change the value in + self.coyote.ChannelN.waveN. + """ + + async def set_wave_set( + self, wave_set: list[tuple[int, int, int]], channel: ChannelA | ChannelB + ) -> None: + """ + 设置波形组,也就是所谓“不断变化的波形”。 + Set the wave set for the device. + + Args: + wave_set (list[tuple[int, int, int]]): 波形组 + channel (ChannelA | ChannelB): 对手通道 + + Returns: + None: None + """ + if channel is ChannelA: + self.channelA_wave_set = wave_set + elif channel is ChannelB: + self.channelB_wave_set = wave_set + return None + + async def set_wave_set_sync( + self, + wave_setA: list[tuple[int, int, int]], + wave_setB: list[tuple[int, int, int]], + ) -> None: + """ + 同步设置波形组。 + Set the wave set for the device synchronously. + + Args: + wave_setA (list[tuple[int, int, int]]): 通道A波形组 + wave_setB (list[tuple[int, int, int]]): 通道B波形组 + + Returns: + None: None + """ + self.channelA_wave_set = wave_setA + self.channelB_wave_set = wave_setB + return None + + def waveset_converter( + self, wave_set: list[tuple[int, int, int]] + ) -> tuple[int, int]: + """ + Convert the wave set to the correct format. + """ + freq = int((((wave_set[0] + wave_set[1]) - 10) / 990) * 230 + 10) + strenth = int(wave_set[2] * 5) + + return freq, strenth + + """ + How set_wave works: + Basically, it will generate a wave set with only one wave, + and changes the value in self.hannelN_wave_set. + All the wave changes will be applied to the device by wave_set. + """ + + async def set_wave( + self, waveX: int, waveY: int, waveZ: int, channel: ChannelA | ChannelB + ) -> Tuple[int, int, int]: + """ + 设置波形。 + 枯燥,乏味,感觉不如。。。 + Set the wave for the device. + + Args: + waveX (int): 连续发出X个脉冲,每个脉冲持续1ms + waveY (int): 发出脉冲后停止Y个周期,每个周期持续1ms + waveZ (int): 每个脉冲的宽度为Z*5us + channel (ChannelA | ChannelB): 对手通道 + + Returns: + Tuple[int, int, int]: 波形 + """ + if channel is ChannelA: + self.channelA_wave_set = [(waveX, waveY, waveZ)] + elif channel is ChannelB: + self.channelB_wave_set = [(waveX, waveY, waveZ)] + return waveX, waveY, waveZ + + async def set_wave_sync( + self, + waveX_A: int, + waveY_A: int, + waveZ_A: int, + waveX_B: int, + waveY_B: int, + waveZ_B: int, + ) -> Tuple[int, int, int, int, int, int]: + """ + 同步设置波形。 + Set the wave for the device synchronously. + + Args: + waveX_A (int): 通道A,连续发出X个脉冲,每个脉冲持续1ms + waveY_A (int): 通道A,发出脉冲后停止Y个周期,每个周期持续1ms + waveZ_A (int): 通道A,每个脉冲的宽度为Z + waveX_B (int): 通道B,连续发出X个脉冲,每个脉冲持续1ms + waveY_B (int): 通道B,发出脉冲后停止Y个周期,每个周期持续1ms + waveZ_B (int): 通道B,每个脉冲的宽度为Z + + Returns: + Tuple[Tuple[int, int, int], Tuple[int, int, int]]: A通道波形,B通道波形 + """ + self.channelA_wave_set = [(waveX_A, waveY_A, waveZ_A)] + self.channelB_wave_set = [(waveX_B, waveY_B, waveZ_B)] + return (waveX_A, waveY_A, waveZ_A), (waveX_B, waveY_B, waveZ_B) + + def _channelA_wave_set_handler(self) -> None: + """ + Do not use this function directly. + + Yep this is how wave set works :) + PR if you have a better solution. + """ + try: + while True: + for wave in self.channelA_wave_set: + wave = self.waveset_converter(wave) + self.coyote.ChannelA.wave.insert(0,wave[0]) + self.coyote.ChannelA.wave.pop() + self.coyote.ChannelA.waveStrenth.insert(0,wave[1]) + self.coyote.ChannelA.waveStrenth.pop() + yield(None) + except asyncio.exceptions.CancelledError: + pass + + def _channelB_wave_set_handler(self) -> None: + """ + Do not use this function directly. + + Yep this is how wave set works :) + PR if you have a better solution. + """ + try: + while True: + for wave in self.channelB_wave_set: + wave = self.waveset_converter(wave) + self.coyote.ChannelB.wave.insert(0,wave[0]) + self.coyote.ChannelB.wave.pop() + self.coyote.ChannelB.waveStrenth.insert(0,wave[1]) + self.coyote.ChannelB.waveStrenth.pop() + yield(None) + except asyncio.exceptions.CancelledError: + pass + + async def _retainer(self) -> None: + """ + Don't use this function directly. + """ + ChannelA_keeping = self._channelA_wave_set_handler() + ChannelB_keeping = self._channelB_wave_set_handler() + + last_time = time.time() + + while True: + if time.time() - last_time >= 0.1: + + # Record time for loop + last_time = time.time() + logger.debug(f"Using wave: {self.coyote.ChannelA.wave}, {self.coyote.ChannelA.waveStrenth}, {self.coyote.ChannelB.wave}, {self.coyote.ChannelB.waveStrenth}") + r = await v3.write_strenth_(self.client, self.coyote, self.characteristics) + logger.debug(f"Retainer response: {r}") + next(ChannelA_keeping) + next(ChannelB_keeping) + + return None + + async def close(self) -> None: + """ + 郊狼虽好,可不要贪杯哦。 + Close the connection to the device. + + Returns: + None: None + """ + try: + self.wave_tasks.cancel() + await self.wave_tasks + except asyncio.CancelledError or asyncio.exceptions.InvalidStateError: + pass + await self.client.disconnect() + return None