From cfb945408e2e1d1fea8e67cab0f83d976dbc3d48 Mon Sep 17 00:00:00 2001 From: mj Date: Fri, 26 Nov 2021 20:34:32 +0100 Subject: [PATCH] Added script query_device.py --- tests/find_device.py | 4 +- tests/query_device.py | 133 ++++++++++++++++++++++++++++++++++++++++++ tests/sensor.py | 96 +++++++++++++++++++++++++++--- 3 files changed, 223 insertions(+), 10 deletions(-) create mode 100755 tests/query_device.py diff --git a/tests/find_device.py b/tests/find_device.py index dea07f9..9837501 100755 --- a/tests/find_device.py +++ b/tests/find_device.py @@ -67,7 +67,7 @@ print(f'Baudrates: {baud}') print('---- Looking for device ----') total_devices = 0 tried_devices = 0 -CO2_addr = Sensor.input_register_offset['CO2_addr'] +CO2_offset = Sensor.input_register_offset['CO2'] for a in addr: for b in baud: print(f'Address {a : >3} baud {b : >6}: ', end='') @@ -82,7 +82,7 @@ for a in addr: instrument.mode = minimalmodbus.MODE_RTU # rtu or ascii mode instrument.clear_buffers_before_each_transaction = True ## - CO2 = instrument.read_register(CO2_addr, 1, functioncode=4) * 10 + CO2 = instrument.read_register(CO2_offset, 1, functioncode=4) * 10 print('DEVICE RESPONDED') total_devices += 1 except minimalmodbus.NoResponseError: diff --git a/tests/query_device.py b/tests/query_device.py new file mode 100755 index 0000000..9bc1fef --- /dev/null +++ b/tests/query_device.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +from sensor import Sensor +from sys import argv,exit + +# query read input +# read holding +# write input/holding [REG] +# +# +# +# query_device.py ADDR [BAUDRATE] [{read [REGISTER] | write REGISTER}] +# where: +# - ADDR is MODBUS address. Special case is ADDR=0 (broadcast), +# for which you may write, but not read +# - BAUDRATE is baudrate (duh!), default value 19200 +# - REGISTER is either register number (e.g. 30010 for input register CO2), +# or register name (e.g. CO2) +# - list of register names... +# - if only ADDR (and optionally BAUDRATE) is supplied, query all possible information from sensor +# + +def print_help(): + print("HEEEELP") + +# default values +DEFAULT_BAUDRATE = 19200 +DEFAULT_ACTION = 'all' + +# first positional argument (ADDR) is necessary +arg_index = 1 +try: + addr = int(argv[arg_index]) +except IndexError: + print('Argument needed: ADDR') + print_help() + exit(-1) +except ValueError: + print(f'{argv[arg_index]} is invalid form for argument ADDR') + print_help() + exit(-2) +# second argument might be either BAUDRATE, read or write (or nothing) +arg_index += 1 +action = None +register_number = [] +register_name = [] +baudrate = None +value = None +while arg_index < len(argv): + if not baudrate: + try: + baudrate = int(argv[arg_index]) + except ValueError: + baudrate = DEFAULT_BAUDRATE + continue + elif not action: + action = argv[arg_index] + elif action == 'write' and value == None: + try: + value = int(argv[arg_index]) + except ValueError: + print(f'Wrong format for argument "value"') + exit(-3) + else: + # rest of the arguments should be register numbers/names + try: + register_number.append(int(argv[arg_index])) + except ValueError: + register_name.append(argv[arg_index]) + arg_index += 1 + + +# sanity check +if not action: + action = 'all' +if action != 'read' and action != 'write' and action != 'all': + print(f'Unknown action {action}') + exit(-4) +if action == 'write' and value == None: + print(f'Action "write" needs value"') + exit(-5) +if action == 'write' and len(register_number) + len(register_name) != 1: + print(f'Action "write" needs exactly one register to write to') + exit(-6) +if not baudrate: + baudrate = DEFAULT_BAUDRATE +if action != 'write' and len(register_name) + len(register_number) == 0: + input_registers = [ x for x in Sensor.input_register_offset.keys() ] + holding_registers = [ x for x in Sensor.holding_register_offset.keys() ] + register_name = input_registers + holding_registers + +# Query device +print('---- Query device ----') +print(f'Address: {addr}\nBaudrate: {baudrate}\nAction: {action}') +if action == 'write': + print(f'Value to be written: {value}') +# open device +s = Sensor(address=addr, baudrate=baudrate) +if action == 'read' or action == 'all': + print('---- Register readout ----') + for register in register_name + register_number: + if isinstance(register, str): + reg_name = register + if register in Sensor.input_register_offset: + reg_number = Sensor.input_register_start + Sensor.input_register_offset[register] + elif register in Sensor.holding_register_offset: + reg_number = Sensor.holding_register_start + Sensor.holding_register_offset[register] + else: + print(f'Register name {register} not known') + exit(-7) + else: + reg_number = register + reg_name = '' + try: + result = s.read_register(reg_number) + except ValueError: + print(f'Register number {reg_number} cannot be read (wrong number?)') + exit(-8) + print(f'{reg_number : <10} {int(result) : <10} {reg_name}') +elif action == 'write': + if len(register_name) > 0: + if register_name[0] not in Sensor.holding_register_offset: + print(f'Register {register_name[0]} does not exist or is not holding register') + exit(-9) + reg_number = Sensor.holding_register_start + Sensor.holding_register_offset[register_name[0]] + elif len(register_number) > 0: + reg_number = register_number[0] + print('---- Register write ----') + try: + s.write_register(reg_number, value) + except ValueError: + print(f'Register number {reg_number} cannot be written') + exit(-9) +exit(0) diff --git a/tests/sensor.py b/tests/sensor.py index 8060f04..b719a2a 100644 --- a/tests/sensor.py +++ b/tests/sensor.py @@ -1,14 +1,21 @@ +import minimalmodbus +import serial +import time + class Sensor(): - baudrate_config = [ 19200,4800,9600,14400,19200,28800,38400,57600,76800,115200 ] + holding_register_start = 40001 + holding_register_end = 49999 + input_register_start = 30001 + input_register_end = 39999 baudrates = [ 4800,9600,14400,19200,28800,38400,57600,76800,115200 ] input_register_offset = { \ - 'CO2_addr': 9, \ - 'T_SHT4x_addr': 10, \ - 'RH_SHT4x_addr': 11, \ - 'T_SCD4x_addr': 12, \ - 'RH_SCD4x_addr': 13, \ - 'T_SHT4x_signed_addr': 14, \ - 'T_SCD4x_signed_addr': 15 } + 'CO2': 9, \ + 'T_SHT4x': 10, \ + 'RH_SHT4x': 11, \ + 'T_SCD4x': 12, \ + 'RH_SCD4x': 13, \ + 'T_SHT4x_signed': 14, \ + 'T_SCD4x_signed': 15 } holding_register_offset = { \ 'LED_on_register': 0, \ 'LED_brightness_register': 1, \ @@ -18,3 +25,76 @@ class Sensor(): 'SCD4x_temperature_offset_register': 5, \ 'MODBUS_address_register': 6, \ 'baudrate_register': 7 } + def __init__(self, dev_file='/dev/rs485', address=247, baudrate=19200): + self.dev_file = dev_file + self.address = address + self.baudrate = baudrate + self.open() + def open(self): + self.serial = minimalmodbus.Instrument(self.dev_file, self.address, close_port_after_each_call=True) + self.serial.serial.baudrate = self.baudrate + self.serial.serial.bytesize = 8 + self.serial.serial.parity = serial.PARITY_EVEN + self.serial.serial.stopbits = 1 + self.serial.serial.timeout = 0.05 # seconds + self.serial.mode = minimalmodbus.MODE_RTU # rtu or ascii mode + self.serial.clear_buffers_before_each_transaction = True + def close(self): + self.serial.serial.close() + self.serial = None + # High level read functions + @property + def CO2(self): + return self.read_input_register(self.input_register_offset['CO2'])*10 + @property + def T(self): + # TODO maybe use rather signed version? + return self.T_SHT4x + @property + def RH(self): + return self.RH_SHT4x + @property + def T_SHT4x(self): + return self.read_input_register(self.input_register_offset['T_SHT4x']) + @property + def T_SHT4x_signed(self): + return self.read_input_register(self.input_register_offset['T_SHT4x_signed'], signed=True) + @property + def RH_SHT4x(self): + return self.read_input_register(self.input_register_offset['RH_SHT4x'])*10 + @property + def T_SCD4x(self): + return self.read_input_register(self.input_register_offset['T_SCD4x']) + @property + def T_SCD4x_signed(self): + return self.read_input_register(self.input_register_offset['T_SCD4x_signed'], signed=True) + @property + def RH_SCD4x(self): + return self.read_input_register(self.input_register_offset['RH_SCD4x'])*10 + def read_input_register(self, register_offset): + retries = 10 + while retries: + try: + return self.serial.read_register(register_offset, 1, functioncode=4) + except (minimalmodbus.NoResponseError, minimalmodbus.InvalidResponseError) as e: + retries -= 1 + continue + raise e + # generic read register function + def read_register(self, register_number): + if self.input_register_start <= register_number <= self.input_register_end: + function_code = 4 + register_offset = register_number - self.input_register_start + elif self.holding_register_start <= register_number <= self.holding_register_end: + function_code = 3 + register_offset = register_number - self.holding_register_start + else: + # wrong register number + raise ValueError + return self.serial.read_register(register_offset, 1, functioncode=function_code) * 10 + def write_register(self, register_number, register_value): + if not self.holding_register_start <= register_number <= self.holding_register_end: + raise ValueError + register_offset = register_number - self.holding_register_start + return self.serial.write_register(register_offset, register_value, functioncode=6) +