Added script query_device.py
This commit is contained in:
parent
1bcbae877b
commit
cfb945408e
@ -67,7 +67,7 @@ print(f'Baudrates: {baud}')
|
|||||||
print('---- Looking for device ----')
|
print('---- Looking for device ----')
|
||||||
total_devices = 0
|
total_devices = 0
|
||||||
tried_devices = 0
|
tried_devices = 0
|
||||||
CO2_addr = Sensor.input_register_offset['CO2_addr']
|
CO2_offset = Sensor.input_register_offset['CO2']
|
||||||
for a in addr:
|
for a in addr:
|
||||||
for b in baud:
|
for b in baud:
|
||||||
print(f'Address {a : >3} baud {b : >6}: ', end='')
|
print(f'Address {a : >3} baud {b : >6}: ', end='')
|
||||||
@ -82,7 +82,7 @@ for a in addr:
|
|||||||
instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
|
instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
|
||||||
instrument.clear_buffers_before_each_transaction = True
|
instrument.clear_buffers_before_each_transaction = True
|
||||||
##
|
##
|
||||||
CO2 = instrument.read_register(CO2_addr, 1, functioncode=4) * 10
|
CO2 = instrument.read_register(CO2_offset, 1, functioncode=4) * 10
|
||||||
print('DEVICE RESPONDED')
|
print('DEVICE RESPONDED')
|
||||||
total_devices += 1
|
total_devices += 1
|
||||||
except minimalmodbus.NoResponseError:
|
except minimalmodbus.NoResponseError:
|
||||||
|
133
tests/query_device.py
Executable file
133
tests/query_device.py
Executable file
@ -0,0 +1,133 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from sensor import Sensor
|
||||||
|
from sys import argv,exit
|
||||||
|
|
||||||
|
# query read input
|
||||||
|
# read holding
|
||||||
|
# write input/holding [REG]
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# query_device.py ADDR [BAUDRATE] [{read [REGISTER] | write REGISTER}]
|
||||||
|
# where:
|
||||||
|
# - ADDR is MODBUS address. Special case is ADDR=0 (broadcast),
|
||||||
|
# for which you may write, but not read
|
||||||
|
# - BAUDRATE is baudrate (duh!), default value 19200
|
||||||
|
# - REGISTER is either register number (e.g. 30010 for input register CO2),
|
||||||
|
# or register name (e.g. CO2)
|
||||||
|
# - list of register names...
|
||||||
|
# - if only ADDR (and optionally BAUDRATE) is supplied, query all possible information from sensor
|
||||||
|
#
|
||||||
|
|
||||||
|
def print_help():
|
||||||
|
print("HEEEELP")
|
||||||
|
|
||||||
|
# default values
|
||||||
|
DEFAULT_BAUDRATE = 19200
|
||||||
|
DEFAULT_ACTION = 'all'
|
||||||
|
|
||||||
|
# first positional argument (ADDR) is necessary
|
||||||
|
arg_index = 1
|
||||||
|
try:
|
||||||
|
addr = int(argv[arg_index])
|
||||||
|
except IndexError:
|
||||||
|
print('Argument needed: ADDR')
|
||||||
|
print_help()
|
||||||
|
exit(-1)
|
||||||
|
except ValueError:
|
||||||
|
print(f'{argv[arg_index]} is invalid form for argument ADDR')
|
||||||
|
print_help()
|
||||||
|
exit(-2)
|
||||||
|
# second argument might be either BAUDRATE, read or write (or nothing)
|
||||||
|
arg_index += 1
|
||||||
|
action = None
|
||||||
|
register_number = []
|
||||||
|
register_name = []
|
||||||
|
baudrate = None
|
||||||
|
value = None
|
||||||
|
while arg_index < len(argv):
|
||||||
|
if not baudrate:
|
||||||
|
try:
|
||||||
|
baudrate = int(argv[arg_index])
|
||||||
|
except ValueError:
|
||||||
|
baudrate = DEFAULT_BAUDRATE
|
||||||
|
continue
|
||||||
|
elif not action:
|
||||||
|
action = argv[arg_index]
|
||||||
|
elif action == 'write' and value == None:
|
||||||
|
try:
|
||||||
|
value = int(argv[arg_index])
|
||||||
|
except ValueError:
|
||||||
|
print(f'Wrong format for argument "value"')
|
||||||
|
exit(-3)
|
||||||
|
else:
|
||||||
|
# rest of the arguments should be register numbers/names
|
||||||
|
try:
|
||||||
|
register_number.append(int(argv[arg_index]))
|
||||||
|
except ValueError:
|
||||||
|
register_name.append(argv[arg_index])
|
||||||
|
arg_index += 1
|
||||||
|
|
||||||
|
|
||||||
|
# sanity check
|
||||||
|
if not action:
|
||||||
|
action = 'all'
|
||||||
|
if action != 'read' and action != 'write' and action != 'all':
|
||||||
|
print(f'Unknown action {action}')
|
||||||
|
exit(-4)
|
||||||
|
if action == 'write' and value == None:
|
||||||
|
print(f'Action "write" needs value"')
|
||||||
|
exit(-5)
|
||||||
|
if action == 'write' and len(register_number) + len(register_name) != 1:
|
||||||
|
print(f'Action "write" needs exactly one register to write to')
|
||||||
|
exit(-6)
|
||||||
|
if not baudrate:
|
||||||
|
baudrate = DEFAULT_BAUDRATE
|
||||||
|
if action != 'write' and len(register_name) + len(register_number) == 0:
|
||||||
|
input_registers = [ x for x in Sensor.input_register_offset.keys() ]
|
||||||
|
holding_registers = [ x for x in Sensor.holding_register_offset.keys() ]
|
||||||
|
register_name = input_registers + holding_registers
|
||||||
|
|
||||||
|
# Query device
|
||||||
|
print('---- Query device ----')
|
||||||
|
print(f'Address: {addr}\nBaudrate: {baudrate}\nAction: {action}')
|
||||||
|
if action == 'write':
|
||||||
|
print(f'Value to be written: {value}')
|
||||||
|
# open device
|
||||||
|
s = Sensor(address=addr, baudrate=baudrate)
|
||||||
|
if action == 'read' or action == 'all':
|
||||||
|
print('---- Register readout ----')
|
||||||
|
for register in register_name + register_number:
|
||||||
|
if isinstance(register, str):
|
||||||
|
reg_name = register
|
||||||
|
if register in Sensor.input_register_offset:
|
||||||
|
reg_number = Sensor.input_register_start + Sensor.input_register_offset[register]
|
||||||
|
elif register in Sensor.holding_register_offset:
|
||||||
|
reg_number = Sensor.holding_register_start + Sensor.holding_register_offset[register]
|
||||||
|
else:
|
||||||
|
print(f'Register name {register} not known')
|
||||||
|
exit(-7)
|
||||||
|
else:
|
||||||
|
reg_number = register
|
||||||
|
reg_name = ''
|
||||||
|
try:
|
||||||
|
result = s.read_register(reg_number)
|
||||||
|
except ValueError:
|
||||||
|
print(f'Register number {reg_number} cannot be read (wrong number?)')
|
||||||
|
exit(-8)
|
||||||
|
print(f'{reg_number : <10} {int(result) : <10} {reg_name}')
|
||||||
|
elif action == 'write':
|
||||||
|
if len(register_name) > 0:
|
||||||
|
if register_name[0] not in Sensor.holding_register_offset:
|
||||||
|
print(f'Register {register_name[0]} does not exist or is not holding register')
|
||||||
|
exit(-9)
|
||||||
|
reg_number = Sensor.holding_register_start + Sensor.holding_register_offset[register_name[0]]
|
||||||
|
elif len(register_number) > 0:
|
||||||
|
reg_number = register_number[0]
|
||||||
|
print('---- Register write ----')
|
||||||
|
try:
|
||||||
|
s.write_register(reg_number, value)
|
||||||
|
except ValueError:
|
||||||
|
print(f'Register number {reg_number} cannot be written')
|
||||||
|
exit(-9)
|
||||||
|
exit(0)
|
@ -1,14 +1,21 @@
|
|||||||
|
import minimalmodbus
|
||||||
|
import serial
|
||||||
|
import time
|
||||||
|
|
||||||
class Sensor():
|
class Sensor():
|
||||||
baudrate_config = [ 19200,4800,9600,14400,19200,28800,38400,57600,76800,115200 ]
|
holding_register_start = 40001
|
||||||
|
holding_register_end = 49999
|
||||||
|
input_register_start = 30001
|
||||||
|
input_register_end = 39999
|
||||||
baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ]
|
baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ]
|
||||||
input_register_offset = { \
|
input_register_offset = { \
|
||||||
'CO2_addr': 9, \
|
'CO2': 9, \
|
||||||
'T_SHT4x_addr': 10, \
|
'T_SHT4x': 10, \
|
||||||
'RH_SHT4x_addr': 11, \
|
'RH_SHT4x': 11, \
|
||||||
'T_SCD4x_addr': 12, \
|
'T_SCD4x': 12, \
|
||||||
'RH_SCD4x_addr': 13, \
|
'RH_SCD4x': 13, \
|
||||||
'T_SHT4x_signed_addr': 14, \
|
'T_SHT4x_signed': 14, \
|
||||||
'T_SCD4x_signed_addr': 15 }
|
'T_SCD4x_signed': 15 }
|
||||||
holding_register_offset = { \
|
holding_register_offset = { \
|
||||||
'LED_on_register': 0, \
|
'LED_on_register': 0, \
|
||||||
'LED_brightness_register': 1, \
|
'LED_brightness_register': 1, \
|
||||||
@ -18,3 +25,76 @@ class Sensor():
|
|||||||
'SCD4x_temperature_offset_register': 5, \
|
'SCD4x_temperature_offset_register': 5, \
|
||||||
'MODBUS_address_register': 6, \
|
'MODBUS_address_register': 6, \
|
||||||
'baudrate_register': 7 }
|
'baudrate_register': 7 }
|
||||||
|
def __init__(self, dev_file='/dev/rs485', address=247, baudrate=19200):
|
||||||
|
self.dev_file = dev_file
|
||||||
|
self.address = address
|
||||||
|
self.baudrate = baudrate
|
||||||
|
self.open()
|
||||||
|
def open(self):
|
||||||
|
self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True)
|
||||||
|
self.serial.serial.baudrate = self.baudrate
|
||||||
|
self.serial.serial.bytesize = 8
|
||||||
|
self.serial.serial.parity = serial.PARITY_EVEN
|
||||||
|
self.serial.serial.stopbits = 1
|
||||||
|
self.serial.serial.timeout = 0.05 # seconds
|
||||||
|
self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode
|
||||||
|
self.serial.clear_buffers_before_each_transaction = True
|
||||||
|
def close(self):
|
||||||
|
self.serial.serial.close()
|
||||||
|
self.serial = None
|
||||||
|
# High level read functions
|
||||||
|
@property
|
||||||
|
def CO2(self):
|
||||||
|
return self.read_input_register(self.input_register_offset['CO2'])*10
|
||||||
|
@property
|
||||||
|
def T(self):
|
||||||
|
# TODO maybe use rather signed version?
|
||||||
|
return self.T_SHT4x
|
||||||
|
@property
|
||||||
|
def RH(self):
|
||||||
|
return self.RH_SHT4x
|
||||||
|
@property
|
||||||
|
def T_SHT4x(self):
|
||||||
|
return self.read_input_register(self.input_register_offset['T_SHT4x'])
|
||||||
|
@property
|
||||||
|
def T_SHT4x_signed(self):
|
||||||
|
return self.read_input_register(self.input_register_offset['T_SHT4x_signed'], signed=True)
|
||||||
|
@property
|
||||||
|
def RH_SHT4x(self):
|
||||||
|
return self.read_input_register(self.input_register_offset['RH_SHT4x'])*10
|
||||||
|
@property
|
||||||
|
def T_SCD4x(self):
|
||||||
|
return self.read_input_register(self.input_register_offset['T_SCD4x'])
|
||||||
|
@property
|
||||||
|
def T_SCD4x_signed(self):
|
||||||
|
return self.read_input_register(self.input_register_offset['T_SCD4x_signed'], signed=True)
|
||||||
|
@property
|
||||||
|
def RH_SCD4x(self):
|
||||||
|
return self.read_input_register(self.input_register_offset['RH_SCD4x'])*10
|
||||||
|
def read_input_register(self, register_offset):
|
||||||
|
retries = 10
|
||||||
|
while retries:
|
||||||
|
try:
|
||||||
|
return self.serial.read_register(register_offset, 1, functioncode=4)
|
||||||
|
except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e:
|
||||||
|
retries -= 1
|
||||||
|
continue
|
||||||
|
raise e
|
||||||
|
# generic read register function
|
||||||
|
def read_register(self, register_number):
|
||||||
|
if self.input_register_start <= register_number <= self.input_register_end:
|
||||||
|
function_code = 4
|
||||||
|
register_offset = register_number - self.input_register_start
|
||||||
|
elif self.holding_register_start <= register_number <= self.holding_register_end:
|
||||||
|
function_code = 3
|
||||||
|
register_offset = register_number - self.holding_register_start
|
||||||
|
else:
|
||||||
|
# wrong register number
|
||||||
|
raise ValueError
|
||||||
|
return self.serial.read_register(register_offset, 1, functioncode=function_code) * 10
|
||||||
|
def write_register(self, register_number, register_value):
|
||||||
|
if not self.holding_register_start <= register_number <= self.holding_register_end:
|
||||||
|
raise ValueError
|
||||||
|
register_offset = register_number - self.holding_register_start
|
||||||
|
return self.serial.write_register(register_offset, register_value, functioncode=6)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user