Python  

Real-Time Outlier Detection in ICU Patient Monitoring Using Streaming IQR Using Python

Table of Contents

  • Introduction

  • What Is the Interquartile Range (IQR)?

  • Why Outlier Detection Saves Lives in Critical Care

  • The Challenge of Streaming IQR

  • Efficient Reservoir Sampling for Live Data

  • Complete Implementation with ICU Simulation

  • Best Practices for Clinical and IoT Systems

  • Conclusion

Introduction

In intensive care units (ICUs), every heartbeat matters. Patient monitors stream vital signs—heart rate, blood pressure, oxygen saturation—24/7. But sensors glitch, leads disconnect, and artifacts appear. A single false alarm can trigger unnecessary interventions; a missed anomaly can be fatal.

Traditional outlier detection using the Interquartile Range (IQR) requires a full dataset. But in live healthcare telemetry, you can’t wait for hours of data. You need real-time IQR-based outlier detection that works on the fly.

This article reveals how to adapt IQR for streaming data using reservoir sampling, with a life-critical ICU scenario and production-ready Python code.

What Is the Interquartile Range (IQR)?

The IQR measures statistical dispersion as the difference between the 75th percentile (Q3) and 25th percentile (Q1):

IQR=Q3−Q1

Outliers are typically defined as values outside the range:

[Q1−1.5×IQR, Q3+1.5×IQR]

This non-parametric method is robust to noise and doesn’t assume normal distribution—ideal for physiological signals.

Why Outlier Detection Saves Lives in Critical Care

Imagine an ICU patient with sepsis. Their heart rate normally hovers around 90 bpm. Suddenly, a loose ECG lead causes a 5-second spike to 220 bpm.

Without filtering:

  • The monitor screams “Tachycardia!”

  • Nurses rush in, interrupting care for other patients

  • Clinicians waste time on a false emergency

With real-time IQR outlier detection:

  • The spike is flagged as an artifact

  • Alarms are suppressed or labeled “likely noise”

  • Staff focus on true physiological changes

In high-stakes environments, intelligent filtering isn’t convenience—it’s patient safety.

The Challenge of Streaming IQR

Classic IQR needs all data to compute percentiles. But streaming systems:

  • Can’t store infinite history (memory limits)

  • Must respond instantly (latency constraints)

  • Deal with concept drift (patient condition changes)

Solution: Maintain a representative window of recent data using reservoir sampling or a fixed-size buffer, then compute IQR on that window in real time.

Efficient Reservoir Sampling for Live Data

We use a fixed-size rolling window (e.g., last 100 readings) to approximate the distribution. When the window is full, new values replace the oldest. This balances:

  • Responsiveness to current state

  • Statistical stability for percentile estimation

For small windows (<1000 points), sorting is fast enough. For larger systems, consider t-digest or streaming quantile algorithms—but for most real-world cases, a simple buffer suffices.

Complete Implementation with ICU Simulation

PlantUML Diagram
import bisect
import random
import time
from collections import deque
from typing import Tuple, Optional
import math

# Define the new return type for clarity: 
# (is_outlier, lower_bound, upper_bound, q1_idx, q3_idx)
AddReturnType = Tuple[bool, float, float, int, int]

class StreamingIQRDetector:
    """
    Real-time outlier detection using Interquartile Range (IQR) on a 
    TRUE sliding window. Maintains a fixed-size window of recent values.
    """
    def __init__(self, window_size: int = 100, multiplier: float = 1.5):
        if window_size <= 0:
            raise ValueError("Window size must be positive")
        self.window_size = window_size
        self.multiplier = multiplier
        
        # Sorted list for efficient percentile calculation (O(log N) insertion)
        self._sorted_window: list[float] = [] 
        
        # Deque to track insertion order for true sliding window behavior
        self._history = deque(maxlen=window_size)
        
        # Store last calculated quartile values/indices for external access (for display)
        self.q1_val: Optional[float] = None
        self.q3_val: Optional[float] = None
        self.q1_idx: int = -1
        self.q3_idx: int = -1

    def add(self, value: float) -> AddReturnType:
        """
        Add a new value and return (is_outlier, lower_bound, upper_bound, q1_idx, q3_idx).
        """
        # --- 1. Window Maintenance (O(N) removal complexity) ---
        
        # If the window is full, remove the OLDEST element first.
        if len(self._history) == self.window_size:
            oldest_value = self._history.popleft()
            
            # Find and remove the oldest value from the sorted list.
            removal_idx = bisect.bisect_left(self._sorted_window, oldest_value)
            
            # Check for value existence before popping (essential if duplicates are possible)
            if removal_idx < len(self._sorted_window) and self._sorted_window[removal_idx] == oldest_value:
                self._sorted_window.pop(removal_idx)

        # Add the new value to both structures
        self._history.append(value)
        bisect.insort(self._sorted_window, value)
        
        # --- 2. Outlier Calculation ---
        
        n = len(self._sorted_window)
        # Use placeholders if not enough data
        lower_bound = 0.0
        upper_bound = 0.0
        is_outlier = False
        self.q1_idx, self.q3_idx = -1, -1 # Reset indices
        self.q1_val, self.q3_val = 0.0, 0.0
        
        if n >= 20: # Minimum size for better stability
            # Compute Q1 and Q3 indices (0-based)
            q1_idx = math.floor(n * 0.25)
            q3_idx = math.ceil(n * 0.75) - 1 
            
            # Ensure indices are within bounds
            q1_idx = max(0, min(q1_idx, n - 1))
            q3_idx = max(0, min(q3_idx, n - 1))

            q1 = self._sorted_window[q1_idx]
            q3 = self._sorted_window[q3_idx]
            
            iqr = q3 - q1
            
            # The IQR outlier limits
            lower_bound = q1 - self.multiplier * iqr
            upper_bound = q3 + self.multiplier * iqr
            
            is_outlier = value < lower_bound or value > upper_bound
            
            # Update instance attributes for last calculated quartiles (optional, but useful)
            self.q1_idx = q1_idx
            self.q3_idx = q3_idx
            self.q1_val = q1
            self.q3_val = q3
        
        return is_outlier, lower_bound, upper_bound, self.q1_idx, self.q3_idx

    def reset(self) -> None:
        """Clear the window (e.g., on patient transfer)."""
        self._sorted_window.clear()
        self._history.clear()
        self.__init__(self.window_size, self.multiplier) # Re-initialize state


# --- Live Simulation: ICU Heart Rate Monitoring (Fixed) ---

def simulate_icu_monitoring():
    """Simulate real-time heart rate with artifacts and IQR-based filtering."""
    detector = StreamingIQRDetector(window_size=40, multiplier=1.5)
    
    print(" ICU Heart Rate Monitor (Normal: 60–100 bpm) - TRUE Sliding Window")
    print("Window Size: 40 | Multiplier: 1.5 (Standard Outlier Rule)")
    print("-" * 110)
    
    base_hr = 85.0
    
    print(f"{'Min':>3} | {'HR':>5} | {'Status':>10} | {'Window N':>8} | {'Q1':>6} | {'Q3':>6} | {'Lower Bound':>13} | {'Upper Bound':>13} | {'Event':<15}")
    print("-" * 110)
    
    for minute in range(1, 41):
        # Simulate normal variation
        base_hr += random.uniform(-0.1, 0.1) # Simulate minor drift
        hr = base_hr + random.gauss(0, 3.0)
        
        event = ""
        
        # Inject real event: fever increases HR at minute 15
        if minute == 15:
            base_hr = 110.0
            hr = 112.0
            event = " FEVER START"
            
        # Inject artifact: lead disconnect at minute 30
        if minute == 30:
            hr = 210.0 # False high reading
            event = " ARTIFACT"

        # Inject real event: HR stabilizes at new baseline
        if minute == 35:
            base_hr = 105.0
            event = " HR STABILIZED"

        # FIX: Unpack all return values from the add method
        is_outlier, lower_b, upper_b, q1_idx, q3_idx = detector.add(hr)
        
        status = " OUTLIER" if is_outlier else " NORMAL"
        
        # Determine Q1 and Q3 values for printing, handling the initial state
        q1_val_str = f"{detector._sorted_window[detector.q1_idx]:6.1f}" if len(detector._sorted_window) >= 20 else "   N/A"
        q3_val_str = f"{detector._sorted_window[detector.q3_idx]:6.1f}" if len(detector._sorted_window) >= 20 else "   N/A"
        
        print(f"{minute:3d} | {hr:5.1f} | {status:>10} | {len(detector._history):8d} | {q1_val_str} | {q3_val_str} | {lower_b:13.1f} | {upper_b:13.1f} | {event}")
        time.sleep(0.1)
    
    print("-" * 110)
    print("\n  Results Analysis:")
    print("The TRUE sliding window successfully adapts to the new baseline HR (Fever) after a short delay but flags the non-physiological reading (Artifact) as an OUTLIER based on the learned distribution (Q1/Q3).")


if __name__ == "__main__":
    simulate_icu_monitoring()
s

Best Practices for Clinical and IoT Systems

  • Set window size wisely: 60–200 samples for vital signs (covers 1–5 minutes at 1Hz)

  • Adjust multiplier: Use 2.0–3.0 for critical alerts to reduce false positives

  • Combine with domain rules: E.g., HR < 30 or > 250 is always invalid

  • Log outliers with context: Include timestamp, sensor ID, and raw vs filtered

  • Validate with clinicians: Ensure alerts match medical relevance

Conclusion

In live monitoring systems—from ICUs to industrial plants—outlier detection must be both statistically sound and temporally aware. By combining IQR with a streaming window, you get a simple, interpretable, and effective filter that runs in real time with minimal resources. This approach turns raw sensor streams into trustworthy signals, reducing alarm fatigue and highlighting true anomalies. Whether you’re saving lives or safeguarding machinery, streaming IQR gives you the clarity to act—only when it matters.