Real-Time MCL Exceedance Detection: Streaming Architecture & Deterministic Python Logic

Municipal water utilities are moving from post-hoc laboratory reconciliation to deterministic streaming evaluation engines that flag instantaneous breaches, compute regulatory running averages, and trigger automated compliance workflows in real time. Bridging high-frequency SCADA telemetry to the EPA 40 CFR Part 141 framework requires strict data normalization, stateful windowing logic, and fault-tolerant routing. The architecture below is a production-ready blueprint for mapping raw sensor payloads to regulatory thresholds while controlling latency, false-positive drift, and audit-trail fragmentation.

Telemetry Ingestion & Strict Normalization

The ingestion layer typically interfaces with OPC-UA gateways, MQTT brokers, or historian APIs (for example, OSIsoft PI or AVEVA Historian). Python’s asyncio runtime, paired with paho-mqtt or an OPC-UA client, handles concurrent stream subscriptions on non-blocking I/O. Before any compliance evaluation runs, incoming telemetry must pass through deterministic normalization:

  1. Timestamp coercion. All timestamps are anchored to UTC at the edge. Local UTC offsets and Daylight Saving Time transitions are resolved during ingestion so that hourly aggregations are neither duplicated nor skipped.
  2. Quality-flag filtering. SCADA quality codes are evaluated against a strict allowlist. In OPC-UA, the two most significant bits of the status code carry the quality severity (GOOD, UNCERTAIN, or BAD); any reading that is not GOOD is routed to a quarantine queue and excluded from MCL calculations.
  3. Unit standardization. Sensor outputs are converted to EPA reporting units (mg/L or µg/L) through a deterministic conversion table. Calibration events are tagged with a calibration_flag so that maintenance-induced spikes do not trigger compliance violations.

Normalized payloads are buffered in a message broker such as Redis Streams or Apache Kafka. Python consumers pull micro-batches over 1-to-5-second windows, keeping evaluation latency low while preserving ordering and at-least-once delivery for downstream audit logging.

Core Evaluation Engine: Stateful Rolling Windows

The compliance engine evaluates contaminants using three calculation methods defined by federal regulations: instantaneous limits, running annual averages (RAA), and percentile-based thresholds such as the Lead and Copper Rule 90th percentile. The logic maintains state across streaming windows without reloading the full dataset. The implementation below uses bounded deque structures and explicit type annotations to keep outputs deterministic and memory usage stable.

import logging
import math
from collections import deque
from datetime import datetime, timedelta, timezone
from typing import Optional
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class TelemetryPoint:
    timestamp: datetime
    value: float
    quality_code: int
    unit: str
    calibration_flag: bool = False

@dataclass
class ComplianceEvent:
    contaminant: str
    mcl_type: str
    threshold: float
    observed_value: float
    timestamp: datetime
    severity: str  # 'warning', 'exceedance', 'critical'
    routing_status: str = 'pending'

class MCLEvaluator:
    """
    Production-grade evaluator for EPA 40 CFR Part 141 thresholds.
    Maintains stateful rolling windows without full dataset reloads.
    """
    def __init__(self, contaminant: str, mcl_type: str, threshold: float, window_days: int = 365):
        self.contaminant = contaminant
        self.mcl_type = mcl_type  # 'instantaneous', 'running_avg', 'percentile_90'
        self.threshold = threshold
        self.window_days = window_days
        self.history: deque = deque()
        self._last_eval_time = datetime.now(timezone.utc)

    # OPC-UA quality severity occupies the two most significant bits of the
    # status code: 0b00 = GOOD, 0b01 = UNCERTAIN, 0b10/0b11 = BAD. Mask 0xC0
    # isolates that field so any non-GOOD reading is rejected.
    QUALITY_SEVERITY_MASK = 0xC0

    def ingest(self, point: TelemetryPoint) -> Optional[ComplianceEvent]:
        if point.calibration_flag or (point.quality_code & self.QUALITY_SEVERITY_MASK):
            logger.debug("Skipping %s point: calibration or non-GOOD quality", self.contaminant)
            return None

        # UTC anchor enforcement
        if point.timestamp.tzinfo is None:
            point.timestamp = point.timestamp.replace(tzinfo=timezone.utc)

        self.history.append(point)
        self._prune_window()

        return self._evaluate()

    def _prune_window(self) -> None:
        # Roll the window back by window_days using timedelta so month and year
        # boundaries are handled correctly (a naive day subtraction overflows).
        cutoff = datetime.now(timezone.utc) - timedelta(days=self.window_days)
        while self.history and self.history[0].timestamp < cutoff:
            self.history.popleft()

    def _evaluate(self) -> Optional[ComplianceEvent]:
        if not self.history:
            return None

        latest = self.history[-1]
        result = None

        if self.mcl_type == 'instantaneous':
            if latest.value > self.threshold:
                result = self._build_event(latest, 'exceedance')
        elif self.mcl_type == 'running_avg':
            avg = sum(p.value for p in self.history) / len(self.history)
            if avg > self.threshold:
                result = self._build_event(latest, 'exceedance', metric_value=avg)
        elif self.mcl_type == 'percentile_90':
            if len(self.history) >= 5:  # Minimum sample size for statistical validity
                sorted_vals = sorted(p.value for p in self.history)
                idx = math.ceil(0.9 * len(sorted_vals)) - 1
                p90 = sorted_vals[idx]
                if p90 > self.threshold:
                    result = self._build_event(latest, 'exceedance', metric_value=p90)

        return result

    def _build_event(self, point: TelemetryPoint, severity: str, metric_value: Optional[float] = None) -> ComplianceEvent:
        return ComplianceEvent(
            contaminant=self.contaminant,
            mcl_type=self.mcl_type,
            threshold=self.threshold,
            observed_value=metric_value if metric_value is not None else point.value,
            timestamp=point.timestamp,
            severity=severity
        )

The _evaluate method branches on mcl_type, applying a different comparison for each regulatory calculation method.

%% caption: MCLEvaluator dispatches each point by mcl_type into the matching threshold comparison.
flowchart TD
    A["ingest(point)"] --> B{"calibration flag or non-GOOD quality?"}
    B -->|"yes"| C["Skip: return None"]
    B -->|"no"| D["Append to window, prune by window_days"]
    D --> E{"mcl_type?"}
    E -->|"instantaneous"| F["latest value > threshold?"]
    E -->|"running_avg"| G["window mean > threshold?"]
    E -->|"percentile_90"| H["90th percentile > threshold? (n >= 5)"]
    F --> I["Build ComplianceEvent"]
    G --> I
    H --> I

This stateful evaluator plugs directly into the broader Violation Detection & Rule Engine Logic framework, keeping threshold evaluations deterministic regardless of stream velocity or network jitter.

Fallback Routing & Immediate Operational Resolution

Detection is only half of the compliance lifecycle. When an exceedance is flagged, the system must route the event deterministically, degrade gracefully on downstream failures, and give operators an immediate resolution path. The routing layer combines a circuit breaker, a dead-letter queue, and multi-channel dispatch.

import asyncio
import logging
from collections import deque
from datetime import datetime, timezone
from enum import Enum
from typing import Awaitable, Callable, Dict

logger = logging.getLogger(__name__)

class RoutingChannel(Enum):
    SCADA_HMI = "scada_hmi"
    EMAIL_ALERT = "email"
    WEBHOOK_PAGER = "pagerduty_webhook"
    DEAD_LETTER = "dead_letter_queue"

class ExceedanceRouter:
    def __init__(self, primary_handlers: Dict[RoutingChannel, Callable[[ComplianceEvent], Awaitable[None]]]):
        self.handlers = primary_handlers
        self.circuit_open = False
        self.dlq: deque = deque(maxlen=10000)

    async def route_event(self, event: ComplianceEvent) -> None:
        if self.circuit_open:
            self._queue_to_dlq(event)
            return

        # Primary dispatch: SCADA HMI write + PagerDuty webhook. gather with
        # return_exceptions=True never raises, so inspect each result and treat
        # any exception as a failed primary channel.
        results = await asyncio.gather(
            self.handlers[RoutingChannel.SCADA_HMI](event),
            self.handlers[RoutingChannel.WEBHOOK_PAGER](event),
            return_exceptions=True
        )
        failures = [r for r in results if isinstance(r, Exception)]

        if not failures:
            event.routing_status = 'dispatched'
            logger.info("Routed %s exceedance to primary channels", event.contaminant)
            return

        logger.error("Primary routing failed: %s", failures)
        # Fallback: email alert as a degraded path.
        try:
            await self.handlers[RoutingChannel.EMAIL_ALERT](event)
            event.routing_status = 'fallback_dispatched'
        except Exception as fallback_err:
            logger.critical("All routing channels exhausted: %s", fallback_err)
            event.routing_status = 'dead_lettered'
            self._queue_to_dlq(event)
            self._trip_circuit()

    def _queue_to_dlq(self, event: ComplianceEvent) -> None:
        self.dlq.append({
            "event": event.__dict__,
            "queued_at": datetime.now(timezone.utc).isoformat(),
            "retry_count": 0
        })

    def _trip_circuit(self) -> None:
        self.circuit_open = True
        # Schedule an automatic reset 300s later on the running event loop.
        loop = asyncio.get_running_loop()
        loop.call_later(300, self._reset_circuit)

    def _reset_circuit(self) -> None:
        self.circuit_open = False
        logger.info("Routing circuit breaker reset. Processing DLQ backlog...")
        # Implement DLQ replay logic here for production deployments.

The router attempts primary channels first, degrades to email, and finally dead-letters the event while tripping the circuit breaker.

%% caption: Exceedance routing: circuit breaker guards primary channels, with email fallback and a dead-letter queue of last resort.
flowchart TD
    A["route_event(event)"] --> B{"circuit open?"}
    B -->|"yes"| C["Queue to dead-letter queue"]
    B -->|"no"| D["Primary: SCADA HMI + PagerDuty webhook"]
    D --> E{"any failures?"}
    E -->|"no"| F["status = dispatched"]
    E -->|"yes"| G["Fallback: email alert"]
    G --> H{"email succeeded?"}
    H -->|"yes"| I["status = fallback_dispatched"]
    H -->|"no"| J["Dead-letter + trip circuit (300s reset)"]

This routing architecture ensures that compliance events are never silently dropped, even during broker outages or webhook failures. Operators receive HMI tag updates and mobile alerts, while the dead-letter queue preserves audit continuity for later replay. For teams building custom MCL Exceedance Logic Implementation pipelines, this fallback pattern prevents regulatory reporting gaps during infrastructure degradation.

Audit Trails & Compliance Validation

Every evaluation and routing decision must be serialized to an immutable audit store. Production deployments typically write JSON or Parquet records to a time-series data lake, capturing the following:

  • Raw telemetry payload (pre-normalization)
  • Quality flag resolution path
  • Window state snapshot at evaluation time
  • Routing channel responses and latency metrics
  • Operator acknowledgment timestamps

Regulatory audits require traceable lineage from sensor to compliance decision. Cryptographic hashing of each evaluation batch (for example, chaining a SHA-256 digest across records) makes post-hoc tampering detectable. Validation suites should replay synthetic telemetry against known EPA thresholds, such as nitrate at 10 mg/L and arsenic at 10 µg/L, to verify the window calculations before any production promotion. Continuous monitoring of evaluation latency and queue depth keeps the system within its latency targets during peak SCADA polling intervals.

For authoritative threshold definitions and sampling-frequency requirements, consult the official EPA Drinking Water Standards documentation. Concurrency patterns should follow the asyncio official documentation; because asyncio is single-threaded and cooperatively scheduled, confining all window state to the event loop keeps evaluation across workers free of data races.