Initial commit

This commit is contained in:
Jan Mrna 2022-06-12 17:35:40 +02:00
commit d2c6185c47
10 changed files with 263 additions and 0 deletions

7
LICENSE Normal file
View File

@ -0,0 +1,7 @@
Copyright © 2022 VelesLabs
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

0
README.md Normal file
View File

3
pyproject.toml Normal file
View File

@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools>=42"]
build-backend = "setuptools.build_meta"

29
setup.cfg Normal file
View File

@ -0,0 +1,29 @@
[metadata]
name = veles
version = 0.0.9
author = VelesLabs
author_email = info@veleslabs.org
description = Python package for VelesLabs sensors and other hardware
long_description = file : README.md
long_description_content_type = text/markdown
url = https://gitlab.com/VelesLabs
#project_urls =
# Bug Tracker =
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
package_dir =
= src
packages = find:
python_requires = >=3.6
install_requires =
minimalmodbus>=2.0.1
pyserial>=3.5
[options.packages.find]
where = src

0
src/veles/__init__.py Normal file
View File

View File

10
src/veles/devices/find.py Normal file
View File

@ -0,0 +1,10 @@
from typing import Final, Dict
from .generic import Device
from .sensor_wired_IAQ import SensorWiredIAQ
from .sensor_wired_RHT import SensorWiredRHT
# links device identifiers to its class
device_identifiers: Final[Dict[int, Device]] = {
SensorWiredIAQ.IDENTIFIER: SensorWiredIAQ,
SensorWiredRHT.IDENTIFIER: SensorWiredRHT
}

View File

@ -0,0 +1,144 @@
'''Module containing classes for generic wired/wireless devices'''
from dataclasses import dataclass
from typing import Dict, Final
import minimalmodbus
import serial
class Modbus():
'''Class holding Modbus related constants'''
HOLDING_REGISTER_START = 40001
HOLDING_REGISTER_END = 49999
INPUT_REGISTER_START = 30001
INPUT_REGISTER_END = 39999
# ranges for testing if address is in address range
input_register_range = range(INPUT_REGISTER_START,
INPUT_REGISTER_END)
holding_register_range = range(HOLDING_REGISTER_START,
HOLDING_REGISTER_END)
@dataclass
class ReadoutErrorCounter():
'''Class used to track readout errors'''
total_attempts: int = 0
invalid_response: int = 0
no_response: int = 0
class Device():
'''Base class for all devices'''
class ModbusRTUDevice(Device):
'''
Base class for wired device controlled over MODBUS RTU (via RS-485)
RS-485 to USB converter is needed for devices based off this class
'''
# Reflects array fw/Core/Src/config.c:config_baudrates[]
BAUDRATES = [4800, 9600, 14400, 19200, 28800, 38400, 57600, 76800, 115200]
# registers common to all Modbus RTU devices
input_registers: Dict[str, int] = {
'IDENTIFIER': 30100
}
holding_registers: Dict[str, int] = {
'RESET': 40100
}
# magic constant for resetting: common to all Modbus RTU devices
MAGIC_RESET_CONSTANT: Final[int] = 0xABCD
def __comm_device_init(self) -> minimalmodbus.Instrument:
comm_device = minimalmodbus.Instrument(self.dev,
self.modbus_address,
close_port_after_each_call=True)
# RS-485 serial paramater init
comm_device.serial.baudrate = self.baudrate
comm_device.serial.bytesize = 8
comm_device.serial.parity = serial.PARITY_EVEN
comm_device.serial.stopbits = 1
comm_device.serial.timeout = 0.05 # seconds
comm_device.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
comm_device.clear_buffers_before_each_transaction = True
return comm_device
def __init__(self, modbus_address, baudrate=19200, dev='/dev/rs485'):
self.modbus_address: int = modbus_address
self.baudrate: int = baudrate
self.dev: str = dev
self.comm_device: minimalmodbus.Instrument = self.__comm_device_init()
self.readout_errors: ReadoutErrorCounter = ReadoutErrorCounter()
def read_register(self,
register_number: int,
signed: bool = False,
retries: int = 10) -> int:
'''Read Modbus input/holding register via serial device'''
if register_number in Modbus.input_register_range:
function_code = 4
register_offset = register_number - Modbus.INPUT_REGISTER_START
elif register_number in Modbus.holding_register_range:
function_code = 3
register_offset = register_number - Modbus.HOLDING_REGISTER_START
else:
# wrong register number
raise ValueError
for i in range(retries):
try:
self.readout_errors.total_attempts += 1
# minimalmodbus divides received register value by 10
return self.comm_device.read_register(
register_offset,
1,
functioncode=function_code,
signed=signed
) * 10
except minimalmodbus.NoResponseError as e:
last_exception = e
self.readout_errors.no_response += 1
continue
except minimalmodbus.InvalidResponseError as e:
last_exception = e
self.readout_errors.invalid_response += 1
continue
# retries failed, raise last exception to inform user
raise last_exception
def write_register(self,
register_number: int,
register_value: int,
retries: int = 10) -> None:
# only holding registers can be written
if register_number not in Modbus.holding_register_range:
raise ValueError
register_offset = register_number - Modbus.HOLDING_REGISTER_START
for i in range(retries):
try:
return self.comm_device.write_register(register_offset,
register_value,
functioncode=6)
except (minimalmodbus.NoResponseError,
minimalmodbus.InvalidResponseError) as e:
last_exception = e
continue
raise last_exception
def __getitem__(self, key: int) -> int:
return self.read_register(key)
def __setitem__(self, key: int, value: int) -> None:
return self.write_register(key, value)
def reset(self) -> bool:
'''
Soft-reset the device
'''
try:
self.write_register(ModbusRTUDevice.holding_registers['RESET'],
ModbusRTUDevice.MAGIC_RESET_CONSTANT)
return False # got answer => failed to reset
except minimalmodbus.NoResponseError:
return True # no answer => reset successful

View File

@ -0,0 +1,61 @@
from typing import Dict, Final
from minimalmodbus import NoResponseError
from .generic import ModbusRTUDevice
class SensorWiredIAQ(ModbusRTUDevice):
'''
Wired sensor measuring temperature, relative humidity,
carbon dioxide and VOC, optionally particulate matter
'''
IDENTIFIER: int = 0xbeef
input_registers: Dict[str, int] = {
'T': 30010, # from SHT4x
'T_F': 30011,
'RH': 30012, # from SHT4x
'CO2': 30013, # from SCD4x
'VOC_index': 30014,
'VOC_ticks': 30015,
'NOx_index': 30016,
'NOx_ticks': 30017,
'PM_mass_concentration_1.0': 30018,
'PM_mass_concentration_2.5': 30019,
'PM_mass_concentration_4.0': 30020,
'PM_mass_concentration_10.0': 30021,
'PM_number_concentration_0.5': 30022,
'PM_number_concentration_1.0': 30023,
'PM_number_concentration_2.5': 30024,
'PM_number_concentration_4.0': 30025,
'PM_number_concentration_10.0': 30026,
'PM_typical_particle_size': 30027,
'T_SCD4x': 30028,
'T_SCF4x_F': 30029,
'RH_SCD4x': 30030
} | ModbusRTUDevice.input_registers
# TODO use super, but __class__ not defined
holding_registers: Dict[str, int] = {
'LED_on': 40001,
'LED_brightness': 40002,
'LED_smooth': 40003,
'CO2_alert_limit_1': 40004,
'CO2_alert_limit_2': 40005,
'SCD4x_temperature_offset': 40006,
'MODBUS_address': 40007,
'baudrate': 40008
} | ModbusRTUDevice.holding_registers
RESET_MAGIC_NUMBER: Final[int] = 0xABCD
@property
def CO2(self):
return int(self.read_register(self.input_registers['CO2']))
@property
def T(self):
# TODO maybe use signed version?
return self.read_register(self.input_registers['T'],
signed=True
) / 10
@property
def RH(self):
return self.read_register(self.input_registers['RH'])

View File

@ -0,0 +1,9 @@
from .generic import ModbusRTUDevice
class SensorWiredRHT(ModbusRTUDevice):
'''
Wired sensor measuring temperature, relative humidity,
carbon dioxide and VOC, optionally particulate matter
'''
IDENTIFIER: int = 0xbb01