Handling Missing Sensor Readings Without Triggering False Violations

In municipal water SCADA environments, telemetry interruptions are an operational certainty. Communication dropouts, PLC polling failures, and scheduled sensor calibration cycles create data gaps that, if processed naively, cascade into erroneous Maximum Contaminant Level (MCL) exceedances or monitoring frequency violations. The core engineering challenge is to deterministically distinguish genuine compliance failures from telemetry artifacts before automated reporting pipelines feed EPA-mandated submission formats such as the Safe Drinking Water Information System (SDWIS). This guide details the implementation steps, regulatory logic, and Python automation patterns required to suppress false positives while maintaining strict audit readiness.

Regulatory Substitution Logic & Gap Classification

EPA compliance frameworks, including the Disinfectants and Disinfection Byproducts Rule (DBPR) and the Revised Total Coliform Rule (RTCR), explicitly define how missing monitoring data must be treated. These rules do not permit arbitrary statistical imputation or forward-filling for compliance determination. Instead, they require documented gap classification, adherence to minimum monitoring frequencies, and strict alignment with reporting windows, as outlined in 40 CFR Part 141. When a sensor fails to report within a compliance period, the automation pipeline must evaluate whether the gap constitutes a formal monitoring violation under 40 CFR § 141.21 or merely a telemetry interruption.

Automated systems must differentiate a missed sample caused by operational oversight from a missed reading caused by a known SCADA polling timeout. This distinction is foundational to Monitoring Gap Detection Algorithms, which classify interruptions by duration, timestamp continuity, and sensor health flags before routing them to compliance evaluation modules. The rule engine enforces a strict hierarchy: telemetry-loss flags are evaluated first, and only after a gap is confirmed to exceed the allowable tolerance window does the system advance to compliance-state assessment.

Production-Ready Python Architecture

Building a robust automation pipeline requires a stateful data-processing layer that separates data-quality validation from regulatory evaluation. The following architecture handles missing data deterministically and provides explicit fallback routing for unresolved telemetry states.

1. Ingestion & Timestamp Normalization

SCADA historians frequently exhibit clock skew across distributed RTUs and PLCs. Before any compliance logic executes, all timestamps must be coerced to UTC, deduplicated, and sorted chronologically. Out-of-sequence records are flagged for manual reconciliation rather than interpolated, and the flag is computed against the original arrival order before the rows are sorted.

import pandas as pd

def normalize_scada_ingestion(raw_df: pd.DataFrame, time_col: str = "timestamp") -> pd.DataFrame:
    """
    Coerce timestamps to UTC, drop duplicate readings, and enforce monotonic ordering.
    """
    df = raw_df.copy()
    df[time_col] = pd.to_datetime(df[time_col], utc=True)

    # Detect out-of-sequence records in the original arrival order, before sorting.
    df["is_out_of_sequence"] = df[time_col].diff() < pd.Timedelta(0)

    # Drop exact-duplicate timestamps, then enforce monotonic ordering.
    df = df.drop_duplicates(subset=time_col).sort_values(time_col).reset_index(drop=True)
    return df

2. Deterministic Gap Classification Engine

Once the data is normalized, the engine identifies contiguous null sequences, measures the duration of each gap, and assigns a compliance state. Single readings within the telemetry tolerance are treated as routine polling artifacts; gaps that exceed the tolerance but remain below the monitoring-violation threshold are held for review; and gaps beyond that threshold are escalated to a formal violation. The step deliberately avoids pandas.interpolate(), since statistical imputation violates primacy-agency audit requirements.

import numpy as np
from enum import Enum

class ComplianceState(Enum):
    VALID = "VALID"
    TELEMETRY_GAP = "TELEMETRY_GAP"
    MONITORING_VIOLATION = "MONITORING_VIOLATION"
    PENDING_REVIEW = "PENDING_REVIEW"

def classify_gaps(
    df: pd.DataFrame,
    value_col: str,
    time_col: str = "timestamp",
    telemetry_tolerance_min: int = 15,
    monitoring_violation_threshold_min: int = 1440,
) -> pd.DataFrame:
    """
    Identify contiguous null sequences and classify each against the telemetry
    tolerance and the regulatory monitoring-violation threshold.
    """
    tolerance = pd.Timedelta(minutes=telemetry_tolerance_min)
    violation_threshold = pd.Timedelta(minutes=monitoring_violation_threshold_min)

    # Boolean mask of missing readings.
    is_missing = df[value_col].isna()

    # Assign a stable group id to every contiguous run of identical missingness.
    group_id = is_missing.ne(is_missing.shift()).cumsum()

    # Gap duration is the span of each contiguous null block (first to last
    # missing timestamp), broadcast back to every row in that block.
    df["gap_duration"] = pd.Timedelta(0)
    df.loc[is_missing, "gap_duration"] = (
        df[is_missing]
        .groupby(group_id[is_missing])[time_col]
        .transform(lambda block: block.max() - block.min())
    )

    # Mutually exclusive classification conditions, evaluated row by row.
    conditions = [
        ~is_missing,
        is_missing & (df["gap_duration"] <= tolerance),
        is_missing & (df["gap_duration"] > tolerance) & (df["gap_duration"] <= violation_threshold),
        is_missing & (df["gap_duration"] > violation_threshold),
    ]
    choices = [
        ComplianceState.VALID,
        ComplianceState.TELEMETRY_GAP,
        ComplianceState.PENDING_REVIEW,
        ComplianceState.MONITORING_VIOLATION,
    ]

    df["compliance_state"] = np.select(conditions, choices, default=ComplianceState.PENDING_REVIEW)
    return df

The classifier maps each reading into one of four mutually exclusive compliance states based on gap duration relative to the telemetry tolerance and the monitoring-violation threshold.

%% caption: Gap classification by duration: within tolerance is a routine artifact; beyond the violation threshold is a formal violation.
stateDiagram-v2
    [*] --> VALID: reading present
    [*] --> Missing: reading is NaN
    Missing --> TELEMETRY_GAP: duration <= tolerance
    Missing --> PENDING_REVIEW: tolerance < duration <= violation threshold
    Missing --> MONITORING_VIOLATION: duration > violation threshold
    VALID --> [*]
    TELEMETRY_GAP --> [*]
    PENDING_REVIEW --> [*]
    MONITORING_VIOLATION --> [*]

3. Regulatory Routing & Fallback State Machine

When a gap exceeds the telemetry tolerance but falls below the formal violation threshold, the system suppresses automated violation generation and routes the record to a fallback queue. This prevents false SDWIS submissions while preserving an immutable audit trail.

from dataclasses import dataclass, field
from typing import Any, Dict, List

@dataclass
class ComplianceRoute:
    sensor_id: str
    state: ComplianceState
    gap_minutes: float
    action: str
    audit_payload: Dict[str, Any] = field(default_factory=dict)

def route_compliance_records(df: pd.DataFrame, sensor_id: str) -> List[ComplianceRoute]:
    """
    Suppress false positives and route each non-valid record to the appropriate
    operational workflow.
    """
    routes: List[ComplianceRoute] = []
    for _, row in df.iterrows():
        if row["compliance_state"] == ComplianceState.VALID:
            continue

        gap_min = row["gap_duration"].total_seconds() / 60

        if row["compliance_state"] == ComplianceState.TELEMETRY_GAP:
            # Auto-resolve: log as a known polling artifact and suppress the violation.
            routes.append(ComplianceRoute(
                sensor_id=sensor_id,
                state=row["compliance_state"],
                gap_minutes=gap_min,
                action="SUPPRESS_AND_LOG",
                audit_payload={"reason": "SCADA_POLLING_TIMEOUT", "resolved": True},
            ))
        elif row["compliance_state"] == ComplianceState.PENDING_REVIEW:
            # Fallback: route to the compliance dashboard and open a CMMS ticket.
            routes.append(ComplianceRoute(
                sensor_id=sensor_id,
                state=row["compliance_state"],
                gap_minutes=gap_min,
                action="ROUTE_TO_REVIEW",
                audit_payload={"requires_manual_substitution": True, "sdwis_blocked": True},
            ))
        else:
            # Formal violation: generate an SDWIS-ready violation record.
            routes.append(ComplianceRoute(
                sensor_id=sensor_id,
                state=row["compliance_state"],
                gap_minutes=gap_min,
                action="FLAG_VIOLATION",
                audit_payload={"requires_primary_agency_notification": True},
            ))

    return routes

Each compliance state maps to a distinct routing action, so false positives are suppressed while genuine violations still reach the primacy agency.

%% caption: route_compliance_records maps each non-valid state to its operational action.
flowchart TD
    A["Classified record"] --> B{"compliance_state?"}
    B -->|"VALID"| C["Skip"]
    B -->|"TELEMETRY_GAP"| D["SUPPRESS_AND_LOG: known polling artifact"]
    B -->|"PENDING_REVIEW"| E["ROUTE_TO_REVIEW: dashboard + CMMS ticket"]
    B -->|"MONITORING_VIOLATION"| F["FLAG_VIOLATION: SDWIS-ready record"]
    E --> G["SDWIS submission blocked pending review"]
    F --> H["Primary agency notification"]

Operational Resolution & Audit Trail Enforcement

Reliable operational resolution requires decoupling violation generation from data ingestion. When the fallback router flags a PENDING_REVIEW state, the pipeline halts automated submission, generates a structured JSON payload for the compliance dashboard, and optionally issues a REST call to the municipal CMMS for field verification. This ensures that primacy agencies receive only validated records, while operators retain visibility into unresolved telemetry states.

The rule engine must maintain an immutable ledger of all gap classifications, substitution decisions, and manual overrides. By integrating Violation Detection & Rule Engine Logic directly into the routing layer, utilities can enforce consistent handling across multiple treatment plants and distribution zones. Audit exports should include raw timestamps, gap durations, the regulatory thresholds applied, and the exact substitution rationale, formatted to align with EPA data-validation checklists.

For production deployments, wrap the pipeline in a scheduled orchestrator such as Prefect or Airflow, with explicit retry logic on historian API failures. Use Python’s datetime module for timezone-aware boundary calculations, and enforce strict schema validation with pydantic before any record enters the compliance submission queue. This architecture eliminates false violation triggers, reduces manual reconciliation overhead, and keeps reporting continuously aligned with federal monitoring requirements.