Added concentrator script

This commit is contained in:
mj 2021-11-27 18:32:03 +01:00
parent 1de3c05fea
commit 13e1ba0ab1
6 changed files with 80 additions and 16 deletions

View File

@ -0,0 +1,7 @@
# Concentrator
Future software for Central Unit (RPi HAT / standalone embedded Linux computer). Temporary stored here, to be moved to Central Unit repository in the future.
This script should read all sensors, log output (optionally send it to MQTT broker) and visualize current IAQ status (possibly show history graph) in webserver.
For now only Flask is used to serve http content. This is not good solution for production environment; use nginx + uWSGI in the future.

View File

@ -0,0 +1,56 @@
#!/usr/bin/env python3
from sensor import Sensor
from sys import argv,exit
from flask import Flask
from time import sleep
import threading
app = Flask('Sensor central unit')
# list of sensor addresses
address_list = [ 247 ]
baudrate = 19200
sensors = []
modbus_mutex = threading.Lock()
# Flask functions
@app.route('/')
def index():
html = ''
for s in sensors:
html += f'<h2>Address {s.address}</h2><br><h3>Input registers:</h3><br>'
for reg_name in s.input_registers:
reg_number = s.input_registers[reg_name]
with modbus_mutex:
reg_value = s.read_register(reg_number)
html += f'{reg_number : <10}&nbsp&nbsp{int(reg_value) : >10}&nbsp&nbsp{reg_name}<br>'
html += '<h3>Holding registers</h3><br>'
for reg_name in s.holding_registers:
reg_number = s.holding_registers[reg_name]
with modbus_mutex:
reg_value = s.read_register(reg_number)
html += f'{reg_number : <10}&nbsp&nbsp{int(reg_value) : >10}&nbsp&nbsp{reg_name}<br>'
return html
for addr in address_list:
sensors.append(Sensor(addr, baudrate))
# run webserver thread
flask_thread = threading.Thread(target=app.run, kwargs={'host': '0.0.0.0', 'port': 8080})
flask_thread.start()
#app.run(host='0.0.0.0', port=8080)
# measuring
while True:
# logging: for now just writing to csv file (can be anything: write to db, mqtt...)
for s in sensors:
log_string = ''
for reg_name, reg_number in s.input_registers.items():
with modbus_mutex:
log_string += str(int(s.read_register(reg_number))) + ' '
log_string += str(s.readout_total) + ' '
log_string += str(s.readout_error_no_response) + ' '
log_string += str(s.readout_error_invalid_response) + ' '
with open(f'sensor_{s.address}.csv', 'a+') as f:
f.write(log_string + '\n')
sleep(10)

View File

@ -0,0 +1 @@
../sensor.py

View File

@ -72,7 +72,7 @@ for a in addr:
print(f'Address {a : >3} baud {b : >6}: ', end='')
try:
s = Sensor(address=a, baudrate=b)
reg_number = Sensor.input_register['CO2']
reg_number = Sensor.input_registers['CO2']
s.read_register(reg_number, retries=1)
print('DEVICE RESPONDED')
total_devices += 1

View File

@ -90,8 +90,8 @@ if action == 'write' and len(register_number) + len(register_name) != 1:
if not baudrate:
baudrate = DEFAULT_BAUDRATE
if action != 'write' and len(register_name) + len(register_number) == 0:
input_registers = [ x for x in Sensor.input_register.keys() ]
holding_registers = [ x for x in Sensor.holding_register.keys() ]
input_registers = [ x for x in Sensor.input_registers.keys() ]
holding_registers = [ x for x in Sensor.holding_registers.keys() ]
register_name = input_registers + holding_registers
if action != 'write' and addr == 0:
print(f'Cannot broadcast action "{action}"')
@ -109,8 +109,8 @@ if action == 'read' or action == 'all':
for register in register_name + register_number:
if isinstance(register, str):
reg_name = register
all_registers = Sensor.input_register.copy()
all_registers.update(Sensor.holding_register)
all_registers = Sensor.input_registers.copy()
all_registers.update(Sensor.holding_registers)
if reg_name in all_registers:
reg_number = all_registers[reg_name]
else:
@ -127,10 +127,10 @@ if action == 'read' or action == 'all':
print(f'{reg_number : <10} {int(result) : <10} {reg_name}')
elif action == 'write':
if len(register_name) > 0:
if register_name[0] not in Sensor.holding_register:
if register_name[0] not in Sensor.holding_registers:
print(f'Register {register_name[0]} does not exist or is not holding register')
exit(-9)
reg_number = Sensor.holding_register[register_name[0]]
reg_number = Sensor.holding_registers[register_name[0]]
elif len(register_number) > 0:
reg_number = register_number[0]
print('---- Register write ----')

View File

@ -10,7 +10,7 @@ class Sensor():
input_register_end = 39999
# Sensor specific config
baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ] # allowed baudrates
input_register = { \
input_registers = { \
'CO2': 30010, \
'T_SHT4x': 30011, \
'RH_SHT4x': 30012, \
@ -18,7 +18,7 @@ class Sensor():
'RH_SCD4x': 30014, \
'T_SHT4x_signed': 30015, \
'T_SCD4x_signed': 30016 }
holding_register = { \
holding_registers = { \
'LED_on': 40001, \
'LED_brightness': 40002, \
'LED_smooth': 40003, \
@ -32,7 +32,7 @@ class Sensor():
readout_error_invalid_response = 0 # checksum error: bus transmission corrupted?
readout_error_no_response = 0 # no response - sensor device was busy
# methods
def __init__(self, dev_file='/dev/rs485', address=247, baudrate=19200):
def __init__(self, address=247, baudrate=19200, dev_file='/dev/rs485'):
self.dev_file = dev_file
self.address = address
self.baudrate = baudrate
@ -52,7 +52,7 @@ class Sensor():
# High level read functions
@property
def CO2(self):
return int(self.read_register(self.input_register['CO2']))
return int(self.read_register(self.input_registers['CO2']))
@property
def T(self):
# TODO maybe use rather signed version?
@ -62,22 +62,22 @@ class Sensor():
return self.RH_SHT4x
@property
def T_SHT4x(self):
return self.read_register(self.input_register['T_SHT4x']) / 10
return self.read_register(self.input_registers['T_SHT4x']) / 10
@property
def T_SHT4x_signed(self):
return self.read_register(self.input_register['T_SHT4x_signed'], signed=True) / 10
return self.read_register(self.input_registers['T_SHT4x_signed'], signed=True) / 10
@property
def RH_SHT4x(self):
return self.read_register(self.input_register['RH_SHT4x'])
return self.read_register(self.input_registers['RH_SHT4x'])
@property
def T_SCD4x(self):
return self.read_register(self.input_register['T_SCD4x']) / 10
return self.read_register(self.input_registers['T_SCD4x']) / 10
@property
def T_SCD4x_signed(self):
return self.read_register(self.input_register['T_SCD4x_signed'], signed=True) / 10
@property
def RH_SCD4x(self):
return self.read_register(self.input_register['RH_SCD4x'])
return self.read_register(self.input_registers['RH_SCD4x'])
# generic read register function
def read_register(self, register_number, retries=10):
if self.input_register_start <= register_number <= self.input_register_end: