Translating EPA Violation Codes to Internal Alerts: A Production-Ready Pipeline

Operationalizing Safe Drinking Water Act (SDWA) compliance requires the deterministic translation of federal violation codes into actionable internal alerts. Water utility operators, environmental compliance teams, and municipal developers routinely face a gap between regulatory reporting cycles and real-time SCADA telemetry. Closing that gap demands a structured automation pipeline that maps EPA violation codes—Maximum Contaminant Level (MCL) exceedances, monitoring-frequency lapses, treatment-technique (TT) failures, and similar events—directly onto internal escalation matrices. This translation layer must be deterministic, auditable, and resilient to SCADA data anomalies. It is deliberately stateful where it must be: a small, well-bounded state cache is what prevents alert flapping during transient sensor noise.

The foundational logic for this process lives in the Core Architecture & SDWA Compliance Taxonomy, where federal regulatory identifiers are normalized into machine-readable schemas. EPA violation codes follow a structured numeric convention (for example, 51 for MCL violations, 52 for MRDL violations, 55 for monitoring and reporting failures, and the 60 series for public-notification lapses). Each code carries an implicit set of operational thresholds, reporting deadlines, and primacy-agency notification triggers. Translating these codes into internal alerts requires a multi-stage pipeline: telemetry ingestion, code resolution, threshold validation, and deterministic alert routing.

Implementation Pipeline: Python-Based Translation Engine

Municipal developers typically deploy this translation logic as a scheduled Python worker or a containerized microservice wired into the utility’s data historian. The implementation steps below outline a production-ready architecture built for fast operational resolution and regulatory defensibility.

1. Ingest, Normalize, and Stage Telemetry

Raw historian payloads must be normalized before evaluation. Polling over REST or OPC-UA requires strict timestamp alignment to UTC, unit standardization, and drift detection. Persist every raw payload to an immutable staging table to preserve chain of custody for compliance audits. Note that the normalized timestamp remains timezone-aware (UTC), which keeps all downstream window and cooldown comparisons unambiguous.

import datetime
import logging
from dataclasses import dataclass
from typing import Optional

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    handlers=[logging.FileHandler("sdwa_compliance_audit.log")]
)

@dataclass(frozen=True)
class TelemetryPoint:
    sensor_id: str
    parameter: str
    raw_value: float
    unit: str
    timestamp_utc: datetime.datetime

def normalize_telemetry(raw_payload: dict) -> Optional[TelemetryPoint]:
    """Convert historian payload to standardized TelemetryPoint."""
    try:
        ts = datetime.datetime.fromisoformat(raw_payload["timestamp"].replace("Z", "+00:00"))
        # Normalize to a timezone-aware UTC datetime so every downstream
        # comparison (cooldowns, evaluation windows) is unambiguous.
        ts_utc = ts.astimezone(datetime.timezone.utc)

        # Unit conversion: mg/L -> µg/L (multiply by 1000)
        value = float(raw_payload["value"])
        unit = raw_payload["unit"].lower()
        if unit == "mg/l":
            value *= 1000.0
            unit = "µg/l"

        return TelemetryPoint(
            sensor_id=raw_payload["sensor_id"],
            parameter=raw_payload["parameter"],
            raw_value=value,
            unit=unit,
            timestamp_utc=ts_utc
        )
    except Exception as e:
        logging.error(f"Normalization failed: {e}")
        return None

2. Load the EPA Violation Code Taxonomy

Maintain a version-controlled JSON mapping table that records EPA code definitions, the associated SDWA rules (for example, the Lead and Copper Rule Revisions, Stage 2 DBPR, and the Revised Total Coliform Rule), and the internal alert severity for each code. Cross-reference this table against the Violation Code Classification to keep mappings aligned with regulation and to catch drift introduced by primacy updates. The loader below falls back to an embedded copy whenever the on-disk file is missing or fails to parse, so a corrupt taxonomy never halts evaluation.

import json
import logging
from pathlib import Path

# Representative subset — extend this table to cover your full parameter inventory.
# Note: turbidity violations typically generate monitoring/reporting codes (55 series)
# rather than MCL codes, because turbidity at the distribution tap has no numeric MCL
# under the Surface Water Treatment Rule; the trigger is a treatment technique deviation.
EPA_TAXONOMY = {
    "51": {"name": "MCL Exceedance", "severity": "CRITICAL", "rule": "Various", "direction": "above"},
    "52": {"name": "MRDL Exceedance", "severity": "HIGH", "rule": "Stage 2 DBPR", "direction": "above"},
    "55": {"name": "Monitoring/Reporting Failure", "severity": "MEDIUM", "rule": "RTCR", "direction": "gap"},
    "60": {"name": "Public Notification Lapse", "severity": "HIGH", "rule": "SDWA §1414", "direction": "gap"}
}

def load_taxonomy(path: Path = Path("epa_codes_v2024.json")) -> dict:
    """Load the on-disk taxonomy, falling back to the embedded copy on any
    read or parse failure so evaluation never aborts on a malformed file."""
    if path.exists():
        try:
            with open(path, "r", encoding="utf-8") as f:
                return json.load(f)
        except (OSError, json.JSONDecodeError) as e:
            logging.error("Failed to load taxonomy from %s: %s", path, e)
    logging.warning("Falling back to embedded taxonomy. Verify against primacy agency updates.")
    return EPA_TAXONOMY

3. Apply Rule-Based Translation & State Machine Logic

Each parameter maps to an EPA code, a threshold, and a direction: most MCL violations fire when a value is above a maximum, but some parameters are minimum requirements. Free chlorine residual, for example, is a minimum — the SDWA treatment technique requires a detectable residual at the distribution system entry point, so an alert fires when the value is below the operational floor, not above it. The MRDL (code 52) applies to the total disinfectant residual in the distribution system and is evaluated as a running annual average against an upper limit. Confusing these directions produces either missed alerts or constant false positives.

A deterministic state machine suppresses duplicate alerts during transient sensor noise: a parameter must breach its threshold on consecutive evaluations before it escalates from WARNING to VIOLATION, and a per-rule cooldown window prevents re-firing while a violation persists. This idempotent design keeps alert volume aligned with EPA SDWA reporting guidelines.

%% caption: Alert state machine: a sustained breach escalates; cooldown suppresses re-firing.
stateDiagram-v2
    [*] --> NOMINAL
    NOMINAL --> WARNING : value breaches threshold (first detection)
    WARNING --> VIOLATION : breach on next evaluation
    WARNING --> NOMINAL : value recovers within threshold
    VIOLATION --> VIOLATION : within cooldown (suppress)
    VIOLATION --> NOMINAL : value recovers within threshold
import datetime
from enum import Enum
from typing import Dict, Optional, Tuple

# TelemetryPoint is defined in the ingestion module (Step 1).
from telemetry import TelemetryPoint

class AlertState(Enum):
    NOMINAL = "nominal"
    WARNING = "warning"
    VIOLATION = "violation"

# Parameter-to-rule mapping.
# "direction" controls which side of the threshold constitutes a breach:
#   "above"  – MCL/MRDL upper limit; alert when value EXCEEDS threshold
#   "below"  – minimum requirement; alert when value FALLS BELOW threshold
PARAMETER_RULES = {
    "free_chlorine_residual_mg_l": {
        "epa_code": "55",          # Monitoring/TT failure when residual is absent
        "threshold_mg_l": 0.2,     # Minimum detectable residual at distribution entry
        "direction": "below",
        "description": "Distribution system free-chlorine residual below minimum"
    },
    "nitrate_mg_l": {
        "epa_code": "51",          # MCL = 10 mg/L as N (acute, single confirmed sample)
        "threshold_mg_l": 10.0,
        "direction": "above",
        "description": "Nitrate MCL exceedance"
    },
    "arsenic_ug_l": {
        "epa_code": "51",          # MCL = 10 µg/L
        "threshold_ug_l": 10.0,
        "direction": "above",
        "description": "Arsenic MCL exceedance"
    },
}

class ComplianceEngine:
    def __init__(self, taxonomy: dict):
        self.taxonomy = taxonomy
        # In-memory state tracker: sensor_id -> (state, last_triggered_utc)
        self._state_cache: Dict[str, Tuple[AlertState, Optional[datetime.datetime]]] = {}

    def evaluate(self, point: TelemetryPoint) -> Optional[dict]:
        """Deterministic threshold evaluation with flapping prevention."""
        state, last_triggered = self._state_cache.get(point.sensor_id, (AlertState.NOMINAL, None))

        rule = PARAMETER_RULES.get(point.parameter)
        if rule is None:
            return None  # Parameter not in compliance inventory

        code = rule["epa_code"]
        threshold = rule.get("threshold_mg_l") or rule.get("threshold_ug_l", 0.0)
        direction = rule["direction"]
        taxonomy_entry = self.taxonomy.get(code, {})

        # Cooldown window derived from the rule (defaulting to 4 hours).
        cooldown_seconds = float(taxonomy_entry.get("threshold_hours", 4.0)) * 3600.0

        # Direction-aware breach detection
        if direction == "above":
            breached = point.raw_value > threshold
        else:  # "below"
            breached = point.raw_value < threshold

        if breached:
            if state == AlertState.NOMINAL:
                state = AlertState.WARNING
                self._state_cache[point.sensor_id] = (state, None)
                return None  # First breach: wait for a sustained exceedance.

            # Sustained breach: suppress duplicates inside the cooldown window.
            if last_triggered is not None and \
                    (point.timestamp_utc - last_triggered).total_seconds() < cooldown_seconds:
                return None  # Within cooldown window

            state = AlertState.VIOLATION
            self._state_cache[point.sensor_id] = (state, point.timestamp_utc)

            return {
                "epa_code": code,
                "severity": taxonomy_entry.get("severity", "UNKNOWN"),
                "rule": rule["description"],
                "sensor_id": point.sensor_id,
                "value": point.raw_value,
                "unit": point.unit,
                "threshold": threshold,
                "direction": direction,
                "timestamp_utc": point.timestamp_utc.isoformat(),
                "state": state.value
            }
        else:
            # Reset to nominal
            self._state_cache[point.sensor_id] = (AlertState.NOMINAL, None)
            return None

4. Deterministic Fallback Routing & Immediate Resolution

Alert routing must survive network partitions, API rate limits, and gateway failures. Implement a cascading fallback chain—primary webhook, then secondary SMS gateway, then tertiary email, and finally a local immutable log. Each step is idempotent and retry-aware, and the chain applies exponential backoff between channels (skipping the wait after the last one, which would only delay the local-log fallback).

%% caption: Cascading alert routing with exponential backoff between channels.
flowchart TD
    A["Alert generated"] --> B["Webhook (primary)"]
    B -->|Success| Z["Delivered"]
    B -->|Fail, backoff| C["SMS gateway (secondary)"]
    C -->|Success| Z
    C -->|Fail, backoff| D["Email (tertiary)"]
    D -->|Success| Z
    D -->|Fail| E["Local immutable audit log"]
import logging
import time
import requests
from typing import Callable, List

def route_alert(alert: dict, fallback_chain: List[Callable]) -> bool:
    """Execute alert routing with an exponential-backoff fallback chain."""
    last_index = len(fallback_chain) - 1
    for i, dispatcher in enumerate(fallback_chain):
        try:
            if dispatcher(alert):
                logging.info("Alert routed successfully via channel %d: %s", i, dispatcher.__name__)
                return True
            logging.warning("Channel %d (%s) reported failure. Attempting fallback.", i, dispatcher.__name__)
        except Exception as e:
            logging.warning("Channel %d (%s) raised: %s. Attempting fallback.", i, dispatcher.__name__, e)
        # Back off before the next channel; skip the wait after the final one.
        if i < last_index:
            time.sleep(2 ** i)  # Exponential backoff
    logging.critical("ALL ROUTING CHANNELS FAILED. ALERT WRITTEN TO LOCAL AUDIT LOG: %s", alert)
    return False

# Example dispatchers
def webhook_dispatch(alert: dict) -> bool:
    resp = requests.post("https://ops.internal/api/v1/alerts", json=alert, timeout=5)
    resp.raise_for_status()
    return True

def sms_dispatch(alert: dict) -> bool:
    # Placeholder for Twilio/ClickSend integration
    logging.info(f"SMS fallback triggered for {alert['sensor_id']}")
    return True

def email_dispatch(alert: dict) -> bool:
    # Placeholder for SMTP integration
    logging.info(f"Email fallback triggered for {alert['sensor_id']}")
    return True

FALLBACK_PIPELINE = [webhook_dispatch, sms_dispatch, email_dispatch]

Production Hardening & Operational Handoff

Deploying this pipeline requires strict adherence to compliance engineering practices:

  1. Direction-aware threshold logic: Always verify whether a parameter is governed by a maximum (MCL/MRDL — alert when value is above) or a minimum requirement (residual, pressure — alert when value is below). Inverting this direction is a silent correctness failure that can mask real compliance events.
  2. Idempotent Evaluation Windows: The state machine tracks the last trigger time per sensor to suppress duplicate alerts during sensor oscillation. Size the cooldown window by SDWA rule—commonly a rolling 4-hour or 24-hour interval—and source it from the taxonomy so policy changes never require a code change.
  3. Immutable Audit Logging: Write all raw payloads, normalization steps, and routing outcomes to append-only storage. Configure the Python logging module with a RotatingFileHandler or forward records to a centralized SIEM.
  4. Primacy Agency Sync: EPA codes and thresholds shift with rule revisions (LCR revisions, DBPR updates, and the like). Schedule a weekly validation job that diffs your local taxonomy against the EPA SDWA compliance portal and opens a CI/CD update when the two diverge.
  5. Fast Operational Resolution: When a CRITICAL or HIGH alert routes successfully, deliver an automated playbook link to the SCADA control room. Standardize the response matrices: MCL exceedances trigger immediate sampling and public-notification workflows, while monitoring lapses trigger technician dispatch and historian gap-filling.

By decoupling telemetry ingestion from regulatory translation, utilities retire manual spreadsheet reconciliation and cut compliance latency from days to seconds. The cascading routing layer ensures that even during infrastructure degradation, violation codes are captured, routed, and logged without data loss.