Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion roborock/data/b01_q10/b01_q10_containers.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,102 @@
from ..containers import RoborockBase
"""Data container classes for Q10 B01 devices.
Many of these classes use the `field(metadata={"dps": ...})` convention to map
dataclass fields to device Data Points (DPS). This metadata is utilized by the
`update_from_dps` helper in `roborock.devices.traits.b01.q10.common` to
automatically update objects from raw device responses.
"""

from dataclasses import dataclass, field

from ..containers import RoborockBase
from .b01_q10_code_mappings import (
B01_Q10_DP,
YXBackType,
YXDeviceCleanTask,
YXDeviceState,
YXDeviceWorkMode,
YXFanLevel,
YXWaterLevel,
)


@dataclass
class dpCleanRecord(RoborockBase):
op: str
result: int
id: str
data: list


@dataclass
class dpMultiMap(RoborockBase):
op: str
result: int
data: list


@dataclass
class dpGetCarpet(RoborockBase):
op: str
result: int
data: str


@dataclass
class dpSelfIdentifyingCarpet(RoborockBase):
op: str
result: int
data: str


@dataclass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed here - but I wonder if there is a way we can avoid having to do dataclass. Pydantic base models don't, but we do which is odd

class dpNetInfo(RoborockBase):
wifiName: str
ipAdress: str
mac: str
signal: int


@dataclass
class dpNotDisturbExpand(RoborockBase):
disturb_dust_enable: int
disturb_light: int
disturb_resume_clean: int
disturb_voice: int


@dataclass
class dpCurrentCleanRoomIds(RoborockBase):
room_id_list: list


@dataclass
class dpVoiceVersion(RoborockBase):
version: int


@dataclass
class dpTimeZone(RoborockBase):
timeZoneCity: str
timeZoneSec: int


@dataclass
class Q10Status(RoborockBase):
"""Status for Q10 devices.
Fields are mapped to DPS values using metadata. Objects of this class can be
automatically updated using the `update_from_dps` helper.
"""

clean_time: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_TIME})
clean_area: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_AREA})
battery: int | None = field(default=None, metadata={"dps": B01_Q10_DP.BATTERY})
status: YXDeviceState | None = field(default=None, metadata={"dps": B01_Q10_DP.STATUS})
fan_level: YXFanLevel | None = field(default=None, metadata={"dps": B01_Q10_DP.FAN_LEVEL})
water_level: YXWaterLevel | None = field(default=None, metadata={"dps": B01_Q10_DP.WATER_LEVEL})
clean_count: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_COUNT})
clean_mode: YXDeviceWorkMode | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_MODE})
clean_task_type: YXDeviceCleanTask | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_TASK_TYPE})
back_type: YXBackType | None = field(default=None, metadata={"dps": B01_Q10_DP.BACK_TYPE})
cleaning_progress: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEANING_PROGRESS})
23 changes: 20 additions & 3 deletions roborock/data/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ def from_dict(cls, data: dict[str, Any]):
if not isinstance(data, dict):
return None
field_types = {field.name: field.type for field in dataclasses.fields(cls)}
result: dict[str, Any] = {}
normalized_data: dict[str, Any] = {}
for orig_key, value in data.items():
key = _decamelize(orig_key)
if (field_type := field_types.get(key)) is None:
if field_types.get(key) is None:
if (log_key := f"{cls.__name__}.{key}") not in RoborockBase._missing_logged:
_LOGGER.debug(
"Key '%s' (decamelized: '%s') not found in %s fields, skipping",
Expand All @@ -104,6 +104,23 @@ def from_dict(cls, data: dict[str, Any]):
)
RoborockBase._missing_logged.add(log_key)
continue
normalized_data[key] = value

result = RoborockBase.convert_dict(field_types, normalized_data)
return cls(**result)

@staticmethod
def convert_dict(types_map: dict[Any, type], data: dict[Any, Any]) -> dict[Any, Any]:
"""Generic helper to convert a dictionary of values based on a schema map of types.
This is meant to be used by traits that use dataclass reflection similar to
`Roborock.from_dict` to merge in new data updates.
"""
result: dict[Any, Any] = {}
for key, value in data.items():
if key not in types_map:
continue
field_type = types_map[key]
if value == "None" or value is None:
result[key] = None
continue
Expand All @@ -124,7 +141,7 @@ def from_dict(cls, data: dict[str, Any]):
_LOGGER.exception(f"Failed to convert {key} with value {value} to type {field_type}")
continue

return cls(**result)
return result

def as_dict(self) -> dict:
return asdict(
Expand Down
14 changes: 9 additions & 5 deletions roborock/devices/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,14 @@ async def connect(self) -> None:
if self._unsub:
raise ValueError("Already connected to the device")
unsub = await self._channel.subscribe(self._on_message)
if self.v1_properties is not None:
try:
try:
if self.v1_properties is not None:
await self.v1_properties.discover_features()
except RoborockException:
unsub()
raise
elif self.b01_q10_properties is not None:
await self.b01_q10_properties.start()
except RoborockException:
unsub()
raise
self._logger.info("Connected to device")
self._unsub = unsub

Expand All @@ -214,6 +216,8 @@ async def close(self) -> None:
await self._connect_task
except asyncio.CancelledError:
pass
if self.b01_q10_properties is not None:
await self.b01_q10_properties.close()
if self._unsub:
self._unsub()
self._unsub = None
Expand Down
21 changes: 21 additions & 0 deletions roborock/devices/rpc/b01_q10_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,39 @@
from __future__ import annotations

import logging
from collections.abc import AsyncGenerator
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q10_protocol import (
ParamsType,
decode_rpc_response,
encode_mqtt_payload,
)

_LOGGER = logging.getLogger(__name__)


async def stream_decoded_responses(
mqtt_channel: MqttChannel,
) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]:
"""Stream decoded DPS messages received via MQTT."""

async for response_message in mqtt_channel.subscribe_stream():
try:
decoded_dps = decode_rpc_response(response_message)
except RoborockException as ex:
_LOGGER.debug(
"Failed to decode B01 Q10 RPC response: %s: %s",
response_message,
ex,
)
continue
yield decoded_dps


async def send_command(
mqtt_channel: MqttChannel,
command: B01_Q10_DP,
Expand Down
45 changes: 45 additions & 0 deletions roborock/devices/traits/b01/q10/__init__.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,74 @@
"""Traits for Q10 B01 devices."""

import asyncio
import logging
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses
from roborock.devices.traits import Trait
from roborock.devices.transport.mqtt_channel import MqttChannel

from .command import CommandTrait
from .status import StatusTrait
from .vacuum import VacuumTrait

__all__ = [
"Q10PropertiesApi",
]

_LOGGER = logging.getLogger(__name__)


class Q10PropertiesApi(Trait):
"""API for interacting with B01 devices."""

command: CommandTrait
"""Trait for sending commands to Q10 devices."""

status: StatusTrait
"""Trait for managing the status of Q10 devices."""

vacuum: VacuumTrait
"""Trait for sending vacuum related commands to Q10 devices."""

def __init__(self, channel: MqttChannel) -> None:
"""Initialize the B01Props API."""
self._channel = channel
self.command = CommandTrait(channel)
self.vacuum = VacuumTrait(self.command)
self.status = StatusTrait()
self._subscribe_task: asyncio.Task[None] | None = None

async def start(self) -> None:
"""Start any necessary subscriptions for the trait."""
self._subscribe_task = asyncio.create_task(self._subscribe_loop())

async def close(self) -> None:
"""Close any resources held by the trait."""
if self._subscribe_task is not None:
self._subscribe_task.cancel()
try:
await self._subscribe_task
except asyncio.CancelledError:
pass # ignore cancellation errors
self._subscribe_task = None

async def refresh(self) -> None:
"""Refresh all traits."""
# Sending the REQUEST_DPS will cause the device to send all DPS values
# to the device. Updates will be received by the subscribe loop below.
await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})

async def _subscribe_loop(self) -> None:
"""Persistent loop to listen for status updates."""
async for decoded_dps in stream_decoded_responses(self._channel):
_LOGGER.debug("Received Q10 status update: %s", decoded_dps)

# Notify all traits about a new message and each trait will
# only update what fields that it is responsible for.
# More traits can be added here below.
self.status.update_from_dps(decoded_dps)


def create(channel: MqttChannel) -> Q10PropertiesApi:
Expand Down
82 changes: 82 additions & 0 deletions roborock/devices/traits/b01/q10/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Common utilities for Q10 traits.
This module provides infrastructure for mapping Roborock Data Points (DPS) to
Python dataclass fields and handling the lifecycle of data updates from the
device.
### DPS Metadata Annotation
Classes extending `RoborockBase` can annotate their fields with DPS IDs using
the `field(metadata={"dps": ...})` convention. This creates a declarative
mapping that `DpsDataConverter` uses to automatically route incoming device
data to the correct attribute.
Example:
```python
@dataclass
class MyStatus(RoborockBase):
battery: int = field(metadata={"dps": B01_Q10_DP.BATTERY})
```
### Update Lifecycle
1. **Raw Data**: The device sends encoded DPS updates over MQTT.
2. **Decoding**: The transport layer decodes these into a dictionary (e.g., `{"101": 80}`).
3. **Conversion**: `DpsDataConverter` uses `RoborockBase.convert_dict` to transform
raw values into appropriate Python types (e.g., Enums, ints) based on the
dataclass field types.
4. **Update**: `update_from_dps` maps these converted values to field names and
updates the target object using `setattr`.
### Usage
Typically, a trait will instantiate a single `DpsDataConverter` for its status class
and call `update_from_dps` whenever new data is received from the device stream.
"""

import dataclasses
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.data.containers import RoborockBase


class DpsDataConverter:
"""Utility to handle the transformation and merging of DPS data into models.
This class pre-calculates the mapping between Data Point IDs and dataclass fields
to optimize repeated updates from device streams.
"""

def __init__(self, dps_type_map: dict[B01_Q10_DP, type], dps_field_map: dict[B01_Q10_DP, str]):
"""Initialize the converter for a specific RoborockBase-derived class."""
self._dps_type_map = dps_type_map
self._dps_field_map = dps_field_map

@classmethod
def from_dataclass(cls, dataclass_type: type[RoborockBase]):
"""Initialize the converter for a specific RoborockBase-derived class."""
dps_type_map: dict[B01_Q10_DP, type] = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing wrong here - just stylistically(the naming), B01_Q10_DP feels off. I think it was me who originally added it, but maybe we should change as a follow up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just like DataPoint could work given its in a module, but yeah, happy to address in a followup.

dps_field_map: dict[B01_Q10_DP, str] = {}
for field_obj in dataclasses.fields(dataclass_type):
if field_obj.metadata and "dps" in field_obj.metadata:
dps_id = field_obj.metadata["dps"]
dps_type_map[dps_id] = field_obj.type
dps_field_map[dps_id] = field_obj.name
return cls(dps_type_map, dps_field_map)

def update_from_dps(self, target: RoborockBase, decoded_dps: dict[B01_Q10_DP, Any]) -> None:
"""Convert and merge raw DPS data into the target object.
Uses the pre-calculated type mapping to ensure values are converted to the
correct Python types before being updated on the target.
Args:
target: The target object to update.
decoded_dps: The decoded DPS data to convert.
"""
conversions = RoborockBase.convert_dict(self._dps_type_map, decoded_dps)
for dps_id, value in conversions.items():
field_name = self._dps_field_map[dps_id]
setattr(target, field_name, value)
Loading