Troubleshooting MODBUS ASCII Device Monitor Connections

How to Build a Simple MODBUS ASCII Device Monitor with Python

Overview

A MODBUS ASCII device monitor reads device registers over MODBUS ASCII (serial) and logs/alerts on changes. We’ll build a simple monitor that:

  • Connects to a serial port using MODBUS ASCII framing
  • Polls specified addresses periodically
  • Logs values and reports changes

Requirements

  • Python 3.8+
  • pip packages: pyserial, pymodbus, schedule (or use asyncio) Install:

    Code

    pip install pyserial pymodbus schedule

Design decisions (reasonable defaults)

  • Communication: MODBUS ASCII over RS-232/USB serial (9600, 8, E, 1)
  • Poll interval: 5 seconds
  • Read type: Holding registers (function 03)
  • Logging: Console + CSV file
  • Change detection: Compare last read value per register and log if changed

Code (synchronous, simple)

python

import csv import time from pymodbus.client.sync import ModbusSerialClient as ModbusClient SERIAL_PORT = ’/dev/ttyUSB0’# change for Windows like ‘COM3’ BAUDRATE = 9600 STOPBITS = 1 BYTESIZE = 8 PARITY = ‘E’ UNIT = 1 # slave id START_ADDR = 0 # first register COUNT = 10 # number of registers to read POLL_INTERVAL = 5 # seconds CSV_FILE = ‘modbus_ascii_log.csv’ def connect_client(): client = ModbusClient(method=‘ascii’, port=SERIAL_PORT, baudrate=BAUDRATE, stopbits=STOPBITS, bytesize=BYTESIZE, parity=PARITY, timeout=2) if not client.connect(): raise ConnectionError(f’Unable to open serial port {SERIAL_PORT}) return client def read_registers(client): rr = client.read_holding_registers(address=START_ADDR, count=COUNT, unit=UNIT) if rr.isError(): return None return rr.registers def write_csv_header_if_needed(): try: with open(CSVFILE, ‘x’, newline=) as f: writer = csv.writer(f) header = [‘timestamp’] + [f’reg{START_ADDR + i} for i in range(COUNT)] writer.writerow(header) except FileExistsError: pass def log_row(values): ts = time.strftime(’%Y-%m-%d %H:%M:%S’) with open(CSV_FILE, ‘a’, newline=) as f: writer = csv.writer(f) writer.writerow([ts] + values) def main(): write_csv_header_if_needed() client = connect_client() last_values = [None] * COUNT try: while True: regs = read_registers(client) if regs is None: print(‘Read error or timeout’) else: # Log on every poll log_row(regs) # Detect changes for i, val in enumerate(regs): if last_values[i] is None: last_values[i] = val elif val != last_values[i]: print(f’Change detected at reg {START_ADDR + i}: {last_values[i]} -> {val}) last_values[i] = val time.sleep(POLL_INTERVAL) except KeyboardInterrupt: print(‘Stopping monitor’) finally: client.close() if name == main: main()

Notes and extensions

  • For larger deployments use asynchronous pymodbus client or thread pool.
  • Add retries, exponential backoff, and better error handling for robustness.
  • Convert raw register values to engineering units (e.g., combine two 16-bit registers for 32-bit floats) using struct.unpack.
  • Add alerting (email, webhook) when changes exceed thresholds.
  • Secure serial access and validate UNIT IDs to avoid interfering with other devices.

Quick checklist before running

  • Confirm device uses MODBUS ASCII (not RTU).
  • Set correct SERIAL_PORT and BAUDRATE/PARITY.
  • Verify START_ADDR, COUNT, and UNIT match the device map.
  • Run with appropriate permissions to access serial port.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *