diff --git a/src/veles/device/__init__.py b/src/veles/device/__init__.py index a19b375..d98d887 100644 --- a/src/veles/device/__init__.py +++ b/src/veles/device/__init__.py @@ -1,2 +1,2 @@ from .sensor_wired_IAQ import SensorWiredIAQ -from .sensor_wired_RHT import SensorWiredRHT \ No newline at end of file +from .sensor_wired_RHT import SensorWiredRHT diff --git a/src/veles/device/find.py b/src/veles/device/find.py index 5041344..6946ae6 100644 --- a/src/veles/device/find.py +++ b/src/veles/device/find.py @@ -1,10 +1,30 @@ -from typing import Final, Dict -from .generic import Device +from typing import Final, Dict, Any, TypeVar, Type +from .generic import Device, NoResponseError from .sensor_wired_IAQ import SensorWiredIAQ from .sensor_wired_RHT import SensorWiredRHT # links device identifiers to its class -device_identifiers: Final[Dict[int, Device]] = { +DEVICE_IDENTIFIERS: Final[Dict[int, Device]] = { SensorWiredIAQ.DEVICE_CODE: SensorWiredIAQ, SensorWiredRHT.DEVICE_CODE: SensorWiredRHT, } + +T = TypeVar("T", bound=Device) + +def find_devices(device_cls: Type[T], address_space: list[Any]) -> list[T]: + """ + Look for devices in given address space + """ + # found_devices = [] + # for address in address_space: + # try: + # found_devices.append(device_cls(address)) + # except NoResponseError: + # pass + # return found_devices + + return list(filter(device_cls.probe, address_space)) + + # TODO add device args + # TODO return devices themselves, not addresses + # TODO add .address to Device diff --git a/src/veles/device/generic.py b/src/veles/device/generic.py index 465e85f..7800c44 100644 --- a/src/veles/device/generic.py +++ b/src/veles/device/generic.py @@ -1,7 +1,7 @@ """Module containing classes for generic wired/wireless devices""" from dataclasses import dataclass from typing import Dict, Final, Any -from abc import ABC, abstractmethod +from abc import ABC, abstractclassmethod, abstractmethod, abstractstaticmethod import minimalmodbus import serial @@ -27,7 +27,7 @@ class ReadoutErrorCounter: no_response: int = 0 -class NoResponseException(Exception): +class NoResponseError(Exception): """ Raised when device fails to respond """ @@ -59,6 +59,20 @@ class Device(ABC): Return serial number, unique for each device """ + @classmethod + def probe(cls, address) -> bool: + """ + Probe given address, return True if device detected, + False otherwise + """ + # try instantiating; this raises NoResponseError + # if device not detected + try: + _ = cls(address) + except NoResponseError: + return False + return True + class ModbusRTUDevice(Device): """ @@ -103,7 +117,7 @@ class ModbusRTUDevice(Device): try: self.read_register(self.input_registers["SERIAL_NUMBER_1"]) except minimalmodbus.NoResponseError as exc: - raise NoResponseException from exc + raise NoResponseError from exc def read_register( self, register_number: int, signed: bool = False, retries: int = 10