forked from veles_labs/pyveles
Making the device selectable for the find_device function
This commit is contained in:
parent
8960349d97
commit
fb95c7061d
@ -1,5 +1,5 @@
|
|||||||
from typing import Final, Dict, Any, TypeVar, Type, Iterable
|
from typing import Final, Dict, Any, TypeVar, Type, Iterable
|
||||||
from .generic import Device, NoResponseError
|
from .generic import Device
|
||||||
from .sensor_wired import SensorWiredIAQ, SensorWiredRHT
|
from .sensor_wired import SensorWiredIAQ, SensorWiredRHT
|
||||||
|
|
||||||
# links device identifiers to its class
|
# links device identifiers to its class
|
||||||
@ -11,10 +11,10 @@ DEVICE_IDENTIFIERS: Final[Dict[int, Device]] = {
|
|||||||
T = TypeVar("T", bound=Device)
|
T = TypeVar("T", bound=Device)
|
||||||
|
|
||||||
|
|
||||||
def find_devices(device_cls: Type[T], address_space: Iterable[Any]) -> list[T]:
|
def find_devices(device_cls: Type[T], address_space: Iterable[Any], dev="/dev/rs485", baudrate=19200) -> list[T]:
|
||||||
"""
|
"""
|
||||||
Look for devices in given address space
|
Look for devices in given address space
|
||||||
"""
|
"""
|
||||||
return list(filter(device_cls.probe, address_space))
|
return list(filter(lambda x: device_cls.probe(x,dev,baudrate), address_space))
|
||||||
|
|
||||||
# TODO add device args
|
# TODO add device args
|
||||||
|
@ -3,8 +3,10 @@ from dataclasses import dataclass
|
|||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
import sys
|
||||||
|
py310 = sys.version_info.minor >= 10 or sys.version_info.major > 3
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(**({"slots": True} if py310 else {}))
|
||||||
class ReadoutErrorCounter:
|
class ReadoutErrorCounter:
|
||||||
"""Class used to track readout errors"""
|
"""Class used to track readout errors"""
|
||||||
|
|
||||||
@ -51,15 +53,16 @@ class Device(ABC):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def probe(cls, address) -> bool:
|
def probe(cls, address, dev="/dev/rs485", baudrate=19200) -> bool:
|
||||||
"""
|
"""
|
||||||
Probe given address, return True if device detected,
|
Probe given address, return True if device detected,
|
||||||
False otherwise
|
False otherwise
|
||||||
"""
|
"""
|
||||||
# try instantiating; this raises NoResponseError
|
# try instantiating; this raises NoResponseError
|
||||||
# if device not detected
|
# if device not detected
|
||||||
|
print(f'Probing address {address}')
|
||||||
try:
|
try:
|
||||||
_ = cls(address)
|
_ = cls(address,dev,baudrate)
|
||||||
except NoResponseError:
|
except NoResponseError:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
@ -83,7 +83,7 @@ class SensorWiredIAQ(ModbusRTUDevice):
|
|||||||
deleted_registers = list(filter(lambda x: sensor in x, self.input_registers))
|
deleted_registers = list(filter(lambda x: sensor in x, self.input_registers))
|
||||||
_ = list((self.input_registers.pop(x) for x in deleted_registers))
|
_ = list((self.input_registers.pop(x) for x in deleted_registers))
|
||||||
|
|
||||||
def __init__(self, modbus_address, baudrate=19200, dev="/dev/rs485"):
|
def __init__(self, modbus_address, dev="/dev/rs485", baudrate=19200):
|
||||||
super().__init__(modbus_address, baudrate, dev)
|
super().__init__(modbus_address, baudrate, dev)
|
||||||
# detect sensor configuration and modify input_registers accordingly
|
# detect sensor configuration and modify input_registers accordingly
|
||||||
# Check if VOC sensor present
|
# Check if VOC sensor present
|
||||||
|
Loading…
Reference in New Issue
Block a user