Python  

Build a Log Aggregator with Structured JSON Parsing Using Python

Table of Contents

  • Introduction

  • Why Structured Logging Matters

  • Real-World Scenario: Debugging a Payment Failure During Black Friday Sales

  • Core Components of a Log Aggregator

  • Complete, Error-Free Python Implementation

  • Running and Testing the Aggregator

  • Conclusion

Introduction

When your application spans dozens of microservices, debugging a single user issue feels like finding a needle in a haystack. Unstructured logs—lines of free-form text—are nearly useless at scale. But structured logs, especially in JSON format, turn chaos into clarity.

In this article, you’ll build a lightweight, real-time log aggregator that collects, parses, and filters JSON logs from multiple sources—inspired by how engineering teams debug critical outages during high-stakes events like Black Friday.

Why Structured Logging Matters

Structured logging means every log entry is a machine-readable object (usually JSON) with consistent fields like:

  • timestamp

  • level (info, error, etc.)

  • service

  • trace_id

  • message

  • user_id

This allows you to:

  • Search across services using user_id or trace_id

  • Alert on level: "error" with specific error codes

  • Build dashboards showing error rates per service

  • Correlate frontend and backend events

Without structure, you’re grepping through gigabytes of text. With it, you get observability.

Real-World Scenario: Debugging a Payment Failure During Black Friday Sales

It’s 6 p.m. on Black Friday. Your e-commerce site is doing 10x normal traffic. Suddenly, customer support is flooded with “Payment failed” complaints.

In a world of unstructured logs:

  • You’d SSH into 10 servers

  • Manually search for “payment” or “error”

  • Hope timestamps align across services

With structured JSON logs and a live aggregator:

  • You run: logagg --filter 'user_id=U12345'

  • Instantly see the full trace:
    web → auth → cart → payment → bank_api (timeout)

  • Discover the bank’s API is throttling requests

  • Roll out a fallback in minutes

PlantUML Diagram

This isn’t hypothetical—it’s how top engineering teams operate under fire.

Core Components of a Log Aggregator

Our aggregator has three parts:

  1. Input: Read log lines from stdin, files, or network streams

  2. Parser: Safely parse each line as JSON; skip invalid entries

  3. Filter & Output: Apply filters (e.g., by service or error level) and pretty-print results

It’s designed to be composable—pipe it into grep, jq, or your monitoring system.

Complete Python Implementation

PlantUML Diagram
import sys
import json
import argparse
from datetime import datetime

# --- Configuration for Pretty Printing ---
# ANSI escape codes for coloring terminal output
class Colors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

# Mapping log levels to colors
COLOR_MAP = {
    'ERROR': Colors.FAIL + Colors.BOLD,
    'WARNING': Colors.WARNING,
    'INFO': Colors.OKCYAN,
    'DEBUG': Colors.OKBLUE,
    'TRACE': Colors.HEADER,
}

def parse_log_line(line: str) -> dict | None:
    """
    Safely attempts to parse a string line into a JSON dictionary.

    Args:
        line: The raw log line string.

    Returns:
        A dictionary if parsing succeeds, otherwise None.
    """
    try:
        # Strip potential leading/trailing whitespace or newlines
        return json.loads(line.strip())
    except json.JSONDecodeError:
        # Ignore non-JSON lines (unstructured logs)
        return None

def parse_filter_args(filter_args: list[str]) -> dict:
    """
    Converts a list of 'key=value' filter strings into a dictionary.

    Args:
        filter_args: List of filter strings from command line (e.g., ['level=error', 'service=auth']).

    Returns:
        A dictionary mapping filter keys to values.
    """
    filters = {}
    for arg in filter_args:
        if '=' in arg:
            # Splits only on the first '=' to allow value to contain '='
            key, value = arg.split('=', 1)
            filters[key.strip()] = value.strip()
    return filters

def apply_filters(log_entry: dict, filters: dict) -> bool:
    """
    Checks if a log entry matches ALL specified filters.

    Args:
        log_entry: The parsed log dictionary.
        filters: The dictionary of required key-value pairs.

    Returns:
        True if the log entry matches all filters, False otherwise.
    """
    if not filters:
        return True

    for key, expected_value in filters.items():
        # Case-insensitive comparison is often helpful for logs
        log_value = log_entry.get(key)
        if log_value is None:
            return False

        # Convert to string and compare (handle int/bool values in log_entry)
        if str(log_value).lower() != expected_value.lower():
            return False

    return True

def pretty_print(log_entry: dict):
    """
    Formats and prints a log entry with color-coding and structured output.
    Uses common keys (timestamp, level, service, trace_id, message).
    """
    # 1. Format Timestamp
    ts_raw = log_entry.get('timestamp')
    try:
        if ts_raw:
            # Assuming timestamp is ISO format or compatible string/float
            if isinstance(ts_raw, (int, float)):
                dt_object = datetime.fromtimestamp(ts_raw)
            else:
                # Handle ISO 8601 (2025-10-10T23:59:59.123456Z)
                dt_object = datetime.fromisoformat(ts_raw.replace('Z', '+00:00'))
            timestamp_fmt = dt_object.strftime('%H:%M:%S.%f')[:-3] # Keep ms
        else:
            timestamp_fmt = 'N/A'
    except Exception:
        timestamp_fmt = 'Invalid TS'

    # 2. Get Color and Level
    level = str(log_entry.get('level', 'INFO')).upper()
    color = COLOR_MAP.get(level, Colors.WARNING)
    
    # 3. Extract Core Fields
    service = str(log_entry.get('service', 'unknown')).ljust(10)
    trace_id = str(log_entry.get('trace_id', '---'))[:8].ljust(8)
    message = log_entry.get('message', 'No message provided')

    # 4. Construct the output line
    output = (
        f"[{Colors.BOLD}{timestamp_fmt}{Colors.ENDC}] "
        f"[{color}{level:<5}{Colors.ENDC}] "
        f"{Colors.OKGREEN}{service}{Colors.ENDC} "
        f"(TID:{Colors.OKBLUE}{trace_id}{Colors.ENDC}): "
        f"{message}"
    )
    
    # 5. Add any custom fields not covered
    custom_fields = []
    reserved_keys = {'timestamp', 'level', 'service', 'trace_id', 'message'}
    for key, value in log_entry.items():
        if key not in reserved_keys:
            custom_fields.append(f"{key}={repr(value)}")
            
    if custom_fields:
        output += f" {Colors.ENDC}| {Colors.WARNING}{', '.join(custom_fields)}{Colors.ENDC}"

    print(output)

def main():
    """
    Main entry point for the log aggregator.
    """
    parser = argparse.ArgumentParser(
        description="A real-time JSON log aggregator and filter inspired by SRE debugging.",
        epilog="Usage: cat logs.jsonl | python log_aggregator.py --filter 'user_id=U12345' --filter 'level=error'"
    )
    parser.add_argument(
        '-f', '--filter',
        action='append',
        default=[],
        help="Filter logs by 'key=value'. Can be used multiple times."
    )
    args = parser.parse_args()
    
    # Parse command line filters into a key-value dictionary
    filters = parse_filter_args(args.filter)

    print(f"{Colors.BOLD}--- Log Aggregator Initialized ---{Colors.ENDC}")
    if filters:
        print(f"Applying filters: {Colors.OKGREEN}{filters}{Colors.ENDC}")
    print(f"Reading from STDIN. Press Ctrl+C or Ctrl+D to exit.\n")

    try:
        # Read lines in real-time from standard input (e.g., piped from another service)
        for line in sys.stdin:
            log_entry = parse_log_line(line)
            
            if log_entry is None:
                # Log line was not valid JSON, skip it
                continue
            
            if apply_filters(log_entry, filters):
                pretty_print(log_entry)

    except KeyboardInterrupt:
        print(f"\n{Colors.WARNING}--- Log Aggregator Terminated by User ---{Colors.ENDC}")
    except Exception as e:
        print(f"\n{Colors.FAIL}An unexpected error occurred: {e}{Colors.ENDC}", file=sys.stderr)

if __name__ == '__main__':
    main()

# Example JSON log lines (paste these into a file named 'sample_logs.jsonl' 
# or directly pipe them into the script when running it):
# {"timestamp": "2025-10-10T18:00:01.123456Z", "level": "INFO", "service": "web", "trace_id": "a1b2c3d4e5f6", "user_id": "U12345", "message": "Request received: /checkout/start"}
# {"timestamp": "2025-10-10T18:00:01.200000Z", "level": "DEBUG", "service": "auth", "trace_id": "a1b2c3d4e5f6", "user_id": "U12345", "message": "Token validated for session: XXXXXX"}
# {"timestamp": "2025-10-10T18:00:01.350000Z", "level": "INFO", "service": "cart", "trace_id": "a1b2c3d4e5f6", "user_id": "U12345", "message": "Cart items retrieved (2 items)"}
# {"timestamp": "2025-10-10T18:00:01.500000Z", "level": "ERROR", "service": "payment", "trace_id": "a1b2c3d4e5f6", "user_id": "U12345", "message": "Bank API timeout on transaction commit", "error_code": 504}
# {"timestamp": "2025-10-10T18:00:02.000000Z", "level": "WARNING", "service": "web", "trace_id": "g7h8i9j0k1l2", "user_id": "U98765", "message": "Client side retry initiated"}
# NOT A JSON LINE
# {"timestamp": "2025-10-10T18:00:02.100000Z", "level": "INFO", "service": "inventory", "trace_id": "m3n4o5p6q7r8", "message": "Stock check successful for product 42"}

How to Run:
# 1. Pipe a log file: cat sample_logs.jsonl | python log_aggregator.py
# 2. Filter by user: cat sample_logs.jsonl | python log_aggregator.py --filter 'user_id=U12345'
# 3. Filter by level and service: cat sample_logs.jsonl | python log_aggregator.py --filter 'level=error' --filter 'service=payment'

Alternative Implementation

import sys
import json
import argparse
import re
from datetime import datetime, timezone
from typing import Any, Callable, Tuple

# --- Configuration for Pretty Printing ---
# ANSI escape codes for coloring terminal output
class Colors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

# Mapping log levels to colors and comparative value (for level filtering)
LEVEL_MAP = {
    'TRACE': (10, Colors.HEADER),
    'DEBUG': (20, Colors.OKBLUE),
    'INFO': (30, Colors.OKCYAN),
    'WARNING': (40, Colors.WARNING),
    'ERROR': (50, Colors.FAIL + Colors.BOLD),
    'CRITICAL': (60, Colors.FAIL + Colors.BOLD),
}

# --- Parsing Utilities ---

def parse_log_line(line: str) -> dict | None:
    """Safely attempts to parse a string line into a JSON dictionary."""
    try:
        return json.loads(line.strip())
    except json.JSONDecodeError:
        return None

def parse_timestamp(ts_raw: Any) -> datetime | None:
    """Attempts to parse common timestamp formats (ISO 8601 or epoch)."""
    if ts_raw is None:
        return None

    try:
        if isinstance(ts_raw, (int, float)):
            # Handle epoch timestamp (seconds)
            return datetime.fromtimestamp(ts_raw, tz=timezone.utc)
        
        if isinstance(ts_raw, str):
            # Handle ISO 8601 with optional timezone (e.g., '2025-10-10T18:00:01.123456Z')
            # Replace Z with +00:00 for compatibility
            ts_str = ts_raw.replace('Z', '+00:00')
            return datetime.fromisoformat(ts_str).astimezone(timezone.utc)
    except ValueError:
        pass
    
    return None

def standardize_level(level: str) -> int:
    """Converts a log level string to its integer value for comparison."""
    return LEVEL_MAP.get(level.upper(), (0, Colors.WARNING))[0]

# --- Advanced Filtering Logic ---

# Define supported comparison operators and their check functions
OPERATORS = {
    '==': lambda a, b: str(a).lower() == str(b).lower(), # Exact match (case insensitive)
    '!=': lambda a, b: str(a).lower() != str(b).lower(),
    '>=': lambda a, b: a >= b,
    '<=': lambda a, b: a <= b,
    '>': lambda a, b: a > b,
    '<': lambda a, b: a < b,
    '=~': lambda a, b: re.search(b, str(a)), # Regex match
}

def parse_filter_args(filter_args: list[str]) -> list[Tuple[str, str, str, Callable]]:
    """
    Parses 'key[op]value' strings into (key, op, value, comparator_function) tuples.
    
    Supported ops: ==, !=, >=, <=, >, <, =~
    """
    parsed_filters = []
    
    # Combined regex to find key, operator, and value.
    # It looks for keys, then one of the operators, and captures the rest as the value.
    op_pattern = r'(==|!=|>=|<=|>|<|=~)'
    
    for arg in filter_args:
        match = re.match(r'(.+?)\s*' + op_pattern + r'\s*(.+)', arg)
        
        if not match:
            # Fallback for simple key=value (treat as key==value)
            if '=' in arg:
                key, value = arg.split('=', 1)
                match = (key.strip(), '==', value.strip())
            else:
                print(f"{Colors.WARNING}Warning: Invalid filter format '{arg}'. Skipping.{Colors.ENDC}", file=sys.stderr)
                continue
        
        key, op, value = match.groups() if match.__class__ is re.Match else match
        
        comparator = OPERATORS.get(op)
        if comparator is None:
             print(f"{Colors.WARNING}Warning: Unknown operator '{op}' in '{arg}'. Skipping.{Colors.ENDC}", file=sys.stderr)
             continue
             
        parsed_filters.append((key.strip(), op, value.strip(), comparator))
        
    return parsed_filters

def apply_filters(log_entry: dict, parsed_filters: list[Tuple[str, str, str, Callable]]) -> bool:
    """
    Checks if a log entry matches ALL specified filters using complex operators.
    """
    for key, op, expected_value, comparator in parsed_filters:
        log_value = log_entry.get(key)
        
        if log_value is None:
            # If the required key is missing, the log entry fails the filter
            return False

        # Custom handling for comparison operations on specific field types
        try:
            if op in ('>', '>=', '<', '<='):
                if key == 'timestamp':
                    log_value_comp = parse_timestamp(log_value)
                    expected_value_comp = parse_timestamp(expected_value)
                elif key == 'level':
                    log_value_comp = standardize_level(str(log_value))
                    expected_value_comp = standardize_level(expected_value)
                else:
                    # Default numeric comparison
                    log_value_comp = float(log_value)
                    expected_value_comp = float(expected_value)

                # Both sides must be comparable (not None for timestamp/level)
                if log_value_comp is None or expected_value_comp is None:
                    # If parsing fails for comparison, it fails the filter
                    return False 

                if not comparator(log_value_comp, expected_value_comp):
                    return False
            else:
                # Default comparison (string comparison or regex match)
                if not comparator(log_value, expected_value):
                    return False
        except Exception:
            # Catch errors during type conversion (e.g., trying to float() a string)
            # This log line fails the current filter expression
            return False

    return True

# --- Output Logic ---

def pretty_print(log_entry: dict, fields: list[str]):
    """
    Formats and prints a log entry based on the requested fields.
    """
    
    # 1. Prepare Core Metadata
    level_str = str(log_entry.get('level', 'INFO')).upper()
    _, color_code = LEVEL_MAP.get(level_str, (0, Colors.WARNING))
    
    # Get the raw timestamp and format it
    ts_raw = log_entry.get('timestamp')
    timestamp_fmt = 'N/A'
    if ts_raw:
        dt_obj = parse_timestamp(ts_raw)
        if dt_obj:
            timestamp_fmt = dt_obj.strftime('%H:%M:%S.%f')[:-3] # Keep ms
            
    # Default values for common fields
    core_values = {
        'timestamp': f"[{Colors.BOLD}{timestamp_fmt}{Colors.ENDC}]",
        'level': f"[{color_code}{level_str:<5}{Colors.ENDC}]",
        'service': f"{Colors.OKGREEN}{str(log_entry.get('service', 'unknown')).ljust(10)}{Colors.ENDC}",
        'trace_id': f"(TID:{Colors.OKBLUE}{str(log_entry.get('trace_id', '---'))[:8].ljust(8)}{Colors.ENDC})",
        'message': log_entry.get('message', 'No message provided'),
    }

    output_parts = []
    
    # 2. Build Output based on requested fields
    for field_name in fields:
        field_name = field_name.strip().lower()
        
        if field_name in core_values:
            # Use pre-formatted core values (includes color/structure)
            output_parts.append(core_values[field_name])
        elif field_name in log_entry:
            # Use custom field as key=value pair
            value = log_entry[field_name]
            # Use repr() to safely display any type of value
            output_parts.append(f"{Colors.WARNING}{field_name}={repr(value)}{Colors.ENDC}")

    print(" ".join(output_parts))


def main():
    """Main entry point for the log aggregator."""
    parser = argparse.ArgumentParser(
        description="A real-time, advanced JSON log aggregator and filter.",
        formatter_class=argparse.RawTextHelpFormatter,
        epilog="""\
Examples:
  cat logs.jsonl | python log_aggregator_v2.py --filter 'user_id==U12345'
  cat logs.jsonl | python log_aggregator_v2.py -f 'level>=WARNING' -f 'service=~auth|payment'
  cat logs.jsonl | python log_aggregator_v2.py --fields timestamp,trace_id,user_id,message

Supported Filters:
  key==value    (Case-insensitive equality)
  key!=value
  key>=value    (Supports numerical, timestamp, and level comparison)
  key<=value
  key>value
  key<value
  key=~regex    (Regular expression match)
"""
    )
    parser.add_argument(
        '-f', '--filter',
        action='append',
        default=[],
        help="Filter logs by 'key[op]value'. Can be used multiple times."
    )
    parser.add_argument(
        '--fields',
        type=str,
        default='timestamp,level,service,trace_id,message',
        help="Comma-separated list of fields to display in output. E.g., 'timestamp,user_id,message'."
    )
    args = parser.parse_args()
    
    # Parse command line filters and output fields
    parsed_filters = parse_filter_args(args.filter)
    output_fields = [f.strip() for f in args.fields.split(',')]

    print(f"{Colors.BOLD}--- Log Aggregator Initialized ---{Colors.ENDC}")
    if parsed_filters:
        print(f"Applying filters: {Colors.OKGREEN}{[f'{k}{o}{v}' for k, o, v, _ in parsed_filters]}{Colors.ENDC}")
    print(f"Displaying fields: {Colors.OKBLUE}{', '.join(output_fields)}{Colors.ENDC}")
    print(f"Reading from STDIN. Press Ctrl+C or Ctrl+D to exit.\n")

    try:
        for line in sys.stdin:
            log_entry = parse_log_line(line)
            
            if log_entry is None:
                continue
            
            if apply_filters(log_entry, parsed_filters):
                pretty_print(log_entry, output_fields)

    except KeyboardInterrupt:
        print(f"\n{Colors.WARNING}--- Log Aggregator Terminated by User ---{Colors.ENDC}")
    except Exception as e:
        print(f"\n{Colors.FAIL}An unexpected error occurred: {e}{Colors.ENDC}", file=sys.stderr)

if __name__ == '__main__':
    main()


Output:

--- Log Aggregator Initialized ---
Displaying fields: timestamp, level, service, trace_id, message
Reading from STDIN. Press Ctrl+C or Ctrl+D to exit.

Conclusion

Structured logging transforms debugging from guesswork into a data-driven process. With a simple aggregator like the one above, you can cut mean-time-to-resolution (MTTR) from hours to seconds—even during your busiest days. You don’t need expensive SaaS tools to start. Just enforce JSON logging, add a few key fields, and build (or use) a lightweight parser. The result? A system that’s not just observable—but understandable.