Real-Time MCL Exceedance Detection: Streaming Architecture & Deterministic Python Logic
Municipal water utilities are moving from post-hoc laboratory reconciliation to deterministic streaming evaluation engines that flag instantaneous breaches, compute regulatory running averages, and trigger automated compliance workflows in real time. Bridging high-frequency SCADA telemetry to the EPA 40 CFR Part 141 framework requires strict data normalization, stateful windowing logic, and fault-tolerant routing. The architecture below is a production-ready blueprint for mapping raw sensor payloads to regulatory thresholds while controlling latency, false-positive drift, and audit-trail fragmentation.
Telemetry Ingestion & Strict Normalization
The ingestion layer typically interfaces with OPC-UA gateways, MQTT brokers, or historian APIs (for example, OSIsoft PI or AVEVA Historian). Python’s asyncio runtime, paired with paho-mqtt or an OPC-UA client, handles concurrent stream subscriptions on non-blocking I/O. Before any compliance evaluation runs, incoming telemetry must pass through deterministic normalization:
- Timestamp coercion. All timestamps are anchored to UTC at the edge. Local UTC offsets and Daylight Saving Time transitions are resolved during ingestion so that hourly aggregations are neither duplicated nor skipped.
- Quality-flag filtering. SCADA quality codes are evaluated against a strict allowlist. In OPC-UA, the two most significant bits of the status code carry the quality severity (GOOD, UNCERTAIN, or BAD); any reading that is not GOOD is routed to a quarantine queue and excluded from MCL calculations.
- Unit standardization. Sensor outputs are converted to EPA reporting units (mg/L or µg/L) through a deterministic conversion table. Calibration events are tagged with a
calibration_flagso that maintenance-induced spikes do not trigger compliance violations.
Normalized payloads are buffered in a message broker such as Redis Streams or Apache Kafka. Python consumers pull micro-batches over 1-to-5-second windows, keeping evaluation latency low while preserving ordering and at-least-once delivery for downstream audit logging.
Core Evaluation Engine: Stateful Rolling Windows
The compliance engine evaluates contaminants using three calculation methods defined by federal regulations: instantaneous limits, running annual averages (RAA), and percentile-based thresholds such as the Lead and Copper Rule 90th percentile. The logic maintains state across streaming windows without reloading the full dataset. The implementation below uses bounded deque structures and explicit type annotations to keep outputs deterministic and memory usage stable.
import logging
import math
from collections import deque
from datetime import datetime, timedelta, timezone
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class TelemetryPoint:
timestamp: datetime
value: float
quality_code: int
unit: str
calibration_flag: bool = False
@dataclass
class ComplianceEvent:
contaminant: str
mcl_type: str
threshold: float
observed_value: float
timestamp: datetime
severity: str # 'warning', 'exceedance', 'critical'
routing_status: str = 'pending'
class MCLEvaluator:
"""
Production-grade evaluator for EPA 40 CFR Part 141 thresholds.
Maintains stateful rolling windows without full dataset reloads.
"""
def __init__(self, contaminant: str, mcl_type: str, threshold: float, window_days: int = 365):
self.contaminant = contaminant
self.mcl_type = mcl_type # 'instantaneous', 'running_avg', 'percentile_90'
self.threshold = threshold
self.window_days = window_days
self.history: deque = deque()
self._last_eval_time = datetime.now(timezone.utc)
# OPC-UA quality severity occupies the two most significant bits of the
# status code: 0b00 = GOOD, 0b01 = UNCERTAIN, 0b10/0b11 = BAD. Mask 0xC0
# isolates that field so any non-GOOD reading is rejected.
QUALITY_SEVERITY_MASK = 0xC0
def ingest(self, point: TelemetryPoint) -> Optional[ComplianceEvent]:
if point.calibration_flag or (point.quality_code & self.QUALITY_SEVERITY_MASK):
logger.debug("Skipping %s point: calibration or non-GOOD quality", self.contaminant)
return None
# UTC anchor enforcement
if point.timestamp.tzinfo is None:
point.timestamp = point.timestamp.replace(tzinfo=timezone.utc)
self.history.append(point)
self._prune_window()
return self._evaluate()
def _prune_window(self) -> None:
# Roll the window back by window_days using timedelta so month and year
# boundaries are handled correctly (a naive day subtraction overflows).
cutoff = datetime.now(timezone.utc) - timedelta(days=self.window_days)
while self.history and self.history[0].timestamp < cutoff:
self.history.popleft()
def _evaluate(self) -> Optional[ComplianceEvent]:
if not self.history:
return None
latest = self.history[-1]
result = None
if self.mcl_type == 'instantaneous':
if latest.value > self.threshold:
result = self._build_event(latest, 'exceedance')
elif self.mcl_type == 'running_avg':
avg = sum(p.value for p in self.history) / len(self.history)
if avg > self.threshold:
result = self._build_event(latest, 'exceedance', metric_value=avg)
elif self.mcl_type == 'percentile_90':
if len(self.history) >= 5: # Minimum sample size for statistical validity
sorted_vals = sorted(p.value for p in self.history)
idx = math.ceil(0.9 * len(sorted_vals)) - 1
p90 = sorted_vals[idx]
if p90 > self.threshold:
result = self._build_event(latest, 'exceedance', metric_value=p90)
return result
def _build_event(self, point: TelemetryPoint, severity: str, metric_value: Optional[float] = None) -> ComplianceEvent:
return ComplianceEvent(
contaminant=self.contaminant,
mcl_type=self.mcl_type,
threshold=self.threshold,
observed_value=metric_value if metric_value is not None else point.value,
timestamp=point.timestamp,
severity=severity
)
The _evaluate method branches on mcl_type, applying a different comparison for each regulatory calculation method.
%% caption: MCLEvaluator dispatches each point by mcl_type into the matching threshold comparison.
flowchart TD
A["ingest(point)"] --> B{"calibration flag or non-GOOD quality?"}
B -->|"yes"| C["Skip: return None"]
B -->|"no"| D["Append to window, prune by window_days"]
D --> E{"mcl_type?"}
E -->|"instantaneous"| F["latest value > threshold?"]
E -->|"running_avg"| G["window mean > threshold?"]
E -->|"percentile_90"| H["90th percentile > threshold? (n >= 5)"]
F --> I["Build ComplianceEvent"]
G --> I
H --> I
This stateful evaluator plugs directly into the broader Violation Detection & Rule Engine Logic framework, keeping threshold evaluations deterministic regardless of stream velocity or network jitter.
Fallback Routing & Immediate Operational Resolution
Detection is only half of the compliance lifecycle. When an exceedance is flagged, the system must route the event deterministically, degrade gracefully on downstream failures, and give operators an immediate resolution path. The routing layer combines a circuit breaker, a dead-letter queue, and multi-channel dispatch.
import asyncio
import logging
from collections import deque
from datetime import datetime, timezone
from enum import Enum
from typing import Awaitable, Callable, Dict
logger = logging.getLogger(__name__)
class RoutingChannel(Enum):
SCADA_HMI = "scada_hmi"
EMAIL_ALERT = "email"
WEBHOOK_PAGER = "pagerduty_webhook"
DEAD_LETTER = "dead_letter_queue"
class ExceedanceRouter:
def __init__(self, primary_handlers: Dict[RoutingChannel, Callable[[ComplianceEvent], Awaitable[None]]]):
self.handlers = primary_handlers
self.circuit_open = False
self.dlq: deque = deque(maxlen=10000)
async def route_event(self, event: ComplianceEvent) -> None:
if self.circuit_open:
self._queue_to_dlq(event)
return
# Primary dispatch: SCADA HMI write + PagerDuty webhook. gather with
# return_exceptions=True never raises, so inspect each result and treat
# any exception as a failed primary channel.
results = await asyncio.gather(
self.handlers[RoutingChannel.SCADA_HMI](event),
self.handlers[RoutingChannel.WEBHOOK_PAGER](event),
return_exceptions=True
)
failures = [r for r in results if isinstance(r, Exception)]
if not failures:
event.routing_status = 'dispatched'
logger.info("Routed %s exceedance to primary channels", event.contaminant)
return
logger.error("Primary routing failed: %s", failures)
# Fallback: email alert as a degraded path.
try:
await self.handlers[RoutingChannel.EMAIL_ALERT](event)
event.routing_status = 'fallback_dispatched'
except Exception as fallback_err:
logger.critical("All routing channels exhausted: %s", fallback_err)
event.routing_status = 'dead_lettered'
self._queue_to_dlq(event)
self._trip_circuit()
def _queue_to_dlq(self, event: ComplianceEvent) -> None:
self.dlq.append({
"event": event.__dict__,
"queued_at": datetime.now(timezone.utc).isoformat(),
"retry_count": 0
})
def _trip_circuit(self) -> None:
self.circuit_open = True
# Schedule an automatic reset 300s later on the running event loop.
loop = asyncio.get_running_loop()
loop.call_later(300, self._reset_circuit)
def _reset_circuit(self) -> None:
self.circuit_open = False
logger.info("Routing circuit breaker reset. Processing DLQ backlog...")
# Implement DLQ replay logic here for production deployments.
The router attempts primary channels first, degrades to email, and finally dead-letters the event while tripping the circuit breaker.
%% caption: Exceedance routing: circuit breaker guards primary channels, with email fallback and a dead-letter queue of last resort.
flowchart TD
A["route_event(event)"] --> B{"circuit open?"}
B -->|"yes"| C["Queue to dead-letter queue"]
B -->|"no"| D["Primary: SCADA HMI + PagerDuty webhook"]
D --> E{"any failures?"}
E -->|"no"| F["status = dispatched"]
E -->|"yes"| G["Fallback: email alert"]
G --> H{"email succeeded?"}
H -->|"yes"| I["status = fallback_dispatched"]
H -->|"no"| J["Dead-letter + trip circuit (300s reset)"]
This routing architecture ensures that compliance events are never silently dropped, even during broker outages or webhook failures. Operators receive HMI tag updates and mobile alerts, while the dead-letter queue preserves audit continuity for later replay. For teams building custom MCL Exceedance Logic Implementation pipelines, this fallback pattern prevents regulatory reporting gaps during infrastructure degradation.
Audit Trails & Compliance Validation
Every evaluation and routing decision must be serialized to an immutable audit store. Production deployments typically write JSON or Parquet records to a time-series data lake, capturing the following:
- Raw telemetry payload (pre-normalization)
- Quality flag resolution path
- Window state snapshot at evaluation time
- Routing channel responses and latency metrics
- Operator acknowledgment timestamps
Regulatory audits require traceable lineage from sensor to compliance decision. Cryptographic hashing of each evaluation batch (for example, chaining a SHA-256 digest across records) makes post-hoc tampering detectable. Validation suites should replay synthetic telemetry against known EPA thresholds, such as nitrate at 10 mg/L and arsenic at 10 µg/L, to verify the window calculations before any production promotion. Continuous monitoring of evaluation latency and queue depth keeps the system within its latency targets during peak SCADA polling intervals.
For authoritative threshold definitions and sampling-frequency requirements, consult the official EPA Drinking Water Standards documentation. Concurrency patterns should follow the asyncio official documentation; because asyncio is single-threaded and cooperatively scheduled, confining all window state to the event loop keeps evaluation across workers free of data races.