Parsing Modbus Registers for Turbidity Sensors
Continuous turbidity monitoring is a regulatory and operational imperative for drinking water treatment and distribution systems. Under 40 CFR Part 141 (Subpart T) and EPA Method 180.1, utilities must capture, validate, and report nephelometric turbidity units (NTU) with strict auditability, gap-filling protocols, and percentile-based compliance thresholds. Inline optical transmitters typically expose real-time measurements through Modbus TCP or RTU holding registers, yet raw register reads rarely map cleanly onto compliance-ready datasets. Accurate parsing requires deterministic handling of register mapping, IEEE 754 floating-point conversion, byte-order normalization, and regulatory validation before data enters the broader SCADA Data Ingestion & Time-Series Sync architecture.
Register Architecture & IEEE 754 Decoding
Turbidity transmitters generally allocate two consecutive 16-bit holding registers to represent a single 32-bit IEEE 754 floating-point NTU value. Manufacturers diverge in addressing conventions (0-based vs. 1-based, 40001 offset notation), scaling multipliers (for example, 0.01 NTU/LSB), and byte ordering (ABCD, CDAB, BADC, DCBA). Many devices also embed diagnostic flags in adjacent registers to signal optical fouling, lamp degradation, or calibration mode.
A compliant parsing routine must first isolate the exact register block documented in the transmitter’s communication map. For example, if registers 30010 and 30011 return the raw hexadecimal values 0x439A and 0x0000, the concatenated 32-bit word must be interpreted according to the sensor’s specified endianness before conversion. Misaligned byte or word swapping is the leading cause of phantom NTU spikes that trigger false exceedance alerts on compliance dashboards. Always confirm whether the device uses big-endian (network order) or little-endian (Intel order) byte and word sequencing, and whether the manufacturer applies a fixed scaling factor or offset after conversion. Python’s struct module handles low-level unpacking, but production deployments typically rely on a higher-level payload decoder to abstract away the endianness permutations.
%% caption: Turbidity decode pipeline with NaN/Inf and NTU-range guards feeding a quality flag.
flowchart TD
RB["Read 2 holding registers"] --> DEC["Decode IEEE-754 float (byte/word order)"]
DEC --> NAN{"NaN / Inf?"}
NAN -->|yes| BAD["Quality = BAD"]
NAN -->|no| SC["Apply scale & offset"]
SC --> RNG{"Within NTU range?"}
RNG -->|no| SUS["Quality = SUSPECT"]
RNG -->|yes| GOOD["Quality = GOOD"]
BAD --> RT["Fallback router"]
SUS --> RT
GOOD --> RT
Production-Ready Python Implementation
Municipal developers and automation engineers deploy asynchronous Modbus clients for high-frequency polling, paired with deterministic payload unpacking so that every reading lands in the historian with an unambiguous quality flag. The following implementation reads the register block, validates response integrity, applies IEEE 754 conversion, and enforces scaling and physical bounds in a single function. It targets the pymodbus v3.x async client and includes explicit NaN/Inf guards.
import logging
import math
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
logger = logging.getLogger("turbidity_parser")
# Maps a device's declared register layout to (byteorder, wordorder) as
# expected by BinaryPayloadDecoder.fromRegisters. The label names the on-wire
# byte sequence, where A is the most significant byte of the 32-bit float:
# ABCD = no swap -> byteorder BIG, wordorder BIG
# BADC = bytes swapped -> byteorder LITTLE, wordorder BIG
# CDAB = words swapped -> byteorder BIG, wordorder LITTLE
# DCBA = bytes and words -> byteorder LITTLE, wordorder LITTLE
# pymodbus 3.x exposes the uppercase members Endian.BIG / Endian.LITTLE.
BYTE_ORDER_MAP = {
"ABCD": (Endian.BIG, Endian.BIG),
"BADC": (Endian.LITTLE, Endian.BIG),
"CDAB": (Endian.BIG, Endian.LITTLE),
"DCBA": (Endian.LITTLE, Endian.LITTLE),
}
async def parse_turbidity_ntu(
client: AsyncModbusTcpClient,
register_address: int,
unit_id: int,
byte_order: str = "ABCD",
scale_factor: float = 1.0,
offset: float = 0.0
) -> Dict[str, Any]:
"""
Reads two consecutive holding registers, decodes IEEE 754 float,
applies scaling/offset, and returns compliance-ready payload.
"""
if byte_order not in BYTE_ORDER_MAP:
raise ValueError(f"Unsupported byte order: {byte_order}. Use {list(BYTE_ORDER_MAP.keys())}")
byteorder, wordorder = BYTE_ORDER_MAP[byte_order]
try:
response = await client.read_holding_registers(
address=register_address, count=2, slave=unit_id
)
if response.isError():
logger.error(f"Modbus read error at register {register_address}: {response}")
return {"value": None, "quality": "BAD", "timestamp": datetime.now(timezone.utc)}
decoder = BinaryPayloadDecoder.fromRegisters(
response.registers, byteorder=byteorder, wordorder=wordorder
)
raw_ntu = decoder.decode_32bit_float()
# IEEE 754 validity check
if math.isnan(raw_ntu) or math.isinf(raw_ntu):
logger.warning(f"Invalid IEEE 754 value detected: {raw_ntu}")
return {"value": None, "quality": "BAD", "timestamp": datetime.now(timezone.utc)}
calibrated_ntu = (raw_ntu * scale_factor) + offset
# Physical bounds validation (typical optical range: 0.00 - 4000 NTU)
if not (0.0 <= calibrated_ntu <= 4000.0):
logger.warning(f"Out-of-range NTU value: {calibrated_ntu}")
return {"value": calibrated_ntu, "quality": "SUSPECT", "timestamp": datetime.now(timezone.utc)}
return {
"value": round(calibrated_ntu, 4),
"quality": "GOOD",
"timestamp": datetime.now(timezone.utc)
}
except Exception as e:
logger.exception(f"Unhandled parsing exception: {e}")
return {"value": None, "quality": "BAD", "timestamp": datetime.now(timezone.utc)}
Fallback Routing & Operational Resolution
Network instability, PLC reboots, and sensor maintenance windows inevitably interrupt Modbus polling, and compliance reporting cannot tolerate unhandled None values or silent data gaps. Production systems therefore need deterministic fallback routing that preserves audit continuity while raising immediate operational alerts. The router below serves the last known-good reading within a bounded staleness window, then escalates to an explicitly invalid hold value once that window expires.
%% caption: Fallback router quality states: serve last-good within the window, else escalate to OFFLINE.
stateDiagram-v2
[*] --> GOOD
GOOD --> INTERPOLATED: read fails (within staleness window)
INTERPOLATED --> GOOD: fresh good read
INTERPOLATED --> OFFLINE: staleness window expires
OFFLINE --> GOOD: sustained valid reads
class TurbidityFallbackRouter:
def __init__(self, max_stale_seconds: int = 120):
self._last_good: Optional[Dict[str, Any]] = None
self._stale_threshold = max_stale_seconds
self._alert_triggered = False
def resolve(self, current: Dict[str, Any]) -> Dict[str, Any]:
if current["quality"] == "GOOD":
self._last_good = current
self._alert_triggered = False
return current
now = datetime.now(timezone.utc)
# Fallback 1: Last-known-good interpolation
if self._last_good and (now - self._last_good["timestamp"]).total_seconds() < self._stale_threshold:
fallback = self._last_good.copy()
fallback["quality"] = "INTERPOLATED"
fallback["timestamp"] = now
return fallback
# Fallback 2: Compliance-safe default with hard alert
if not self._alert_triggered:
logger.critical("Turbidity sensor offline. Defaulting to compliance-safe hold. Verify field diagnostics.")
self._alert_triggered = True
return {
"value": -1.0, # Explicitly invalid to force historian exclusion
"quality": "OFFLINE",
"timestamp": now
}
This routing pattern keeps downstream time-series databases from ingesting unvalidated floats and escalates to operators as soon as polling degrades beyond an acceptable threshold. Tie SNMP traps or MQTT alerts to the OFFLINE quality flag so that field technicians are dispatched before any regulatory reporting window closes.
Compliance Validation & Audit Trails
After decoding and fallback resolution, NTU values must undergo regulatory validation. Under 40 CFR Part 141, utilities flag values that exceed 0.3 NTU for filtered systems or 5.0 NTU for unfiltered systems, and apply rolling 95th-percentile calculations for monthly reporting. The parser attaches a data-quality flag (GOOD, SUSPECT, BAD, INTERPOLATED, or OFFLINE) derived from IEEE 754 validity, sensor diagnostics, and the out-of-range check. This structured output feeds directly into Modbus TCP Parsing Workflows for downstream historian synchronization and automated exceedance reporting.
For audit readiness, every parsed record must include:
- Source metadata: Device ID, register address, polling interval
- Quality provenance: Raw vs. interpolated vs. offline
- Regulatory flags: Threshold breaches, percentile impact
- Immutable timestamps: UTC-synced with millisecond precision
Deterministic register parsing, explicit IEEE 754 guards, and structured fallback routing together eliminate phantom spikes, close compliance gaps, and give environmental compliance teams defensible, audit-ready datasets.