Table of Contents
Introduction
Why Structured Logging Matters
Real-World Scenario: Debugging a Payment Failure During Black Friday Sales
Core Components of a Log Aggregator
Complete, Error-Free Python Implementation
Running and Testing the Aggregator
Conclusion
Introduction
When your application spans dozens of microservices, debugging a single user issue feels like finding a needle in a haystack. Unstructured logs—lines of free-form text—are nearly useless at scale. But structured logs, especially in JSON format, turn chaos into clarity.
In this article, you’ll build a lightweight, real-time log aggregator that collects, parses, and filters JSON logs from multiple sources—inspired by how engineering teams debug critical outages during high-stakes events like Black Friday.
Why Structured Logging Matters
Structured logging means every log entry is a machine-readable object (usually JSON) with consistent fields like:
This allows you to:
Search across services using user_id
or trace_id
Alert on level: "error"
with specific error codes
Build dashboards showing error rates per service
Correlate frontend and backend events
Without structure, you’re grepping through gigabytes of text. With it, you get observability.
Real-World Scenario: Debugging a Payment Failure During Black Friday Sales
It’s 6 p.m. on Black Friday. Your e-commerce site is doing 10x normal traffic. Suddenly, customer support is flooded with “Payment failed” complaints.
In a world of unstructured logs:
You’d SSH into 10 servers
Manually search for “payment” or “error”
Hope timestamps align across services
With structured JSON logs and a live aggregator:
You run: logagg --filter 'user_id=U12345'
Instantly see the full trace:
web → auth → cart → payment → bank_api (timeout)
Discover the bank’s API is throttling requests
Roll out a fallback in minutes
![PlantUML Diagram]()
This isn’t hypothetical—it’s how top engineering teams operate under fire.
Core Components of a Log Aggregator
Our aggregator has three parts:
Input: Read log lines from stdin, files, or network streams
Parser: Safely parse each line as JSON; skip invalid entries
Filter & Output: Apply filters (e.g., by service or error level) and pretty-print results
It’s designed to be composable—pipe it into grep
, jq
, or your monitoring system.
Complete Python Implementation
![PlantUML Diagram]()
import sys
import json
import argparse
from datetime import datetime
# --- Configuration for Pretty Printing ---
# ANSI escape codes for coloring terminal output
class Colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# Mapping log levels to colors
COLOR_MAP = {
'ERROR': Colors.FAIL + Colors.BOLD,
'WARNING': Colors.WARNING,
'INFO': Colors.OKCYAN,
'DEBUG': Colors.OKBLUE,
'TRACE': Colors.HEADER,
}
def parse_log_line(line: str) -> dict | None:
"""
Safely attempts to parse a string line into a JSON dictionary.
Args:
line: The raw log line string.
Returns:
A dictionary if parsing succeeds, otherwise None.
"""
try:
# Strip potential leading/trailing whitespace or newlines
return json.loads(line.strip())
except json.JSONDecodeError:
# Ignore non-JSON lines (unstructured logs)
return None
def parse_filter_args(filter_args: list[str]) -> dict:
"""
Converts a list of 'key=value' filter strings into a dictionary.
Args:
filter_args: List of filter strings from command line (e.g., ['level=error', 'service=auth']).
Returns:
A dictionary mapping filter keys to values.
"""
filters = {}
for arg in filter_args:
if '=' in arg:
# Splits only on the first '=' to allow value to contain '='
key, value = arg.split('=', 1)
filters[key.strip()] = value.strip()
return filters
def apply_filters(log_entry: dict, filters: dict) -> bool:
"""
Checks if a log entry matches ALL specified filters.
Args:
log_entry: The parsed log dictionary.
filters: The dictionary of required key-value pairs.
Returns:
True if the log entry matches all filters, False otherwise.
"""
if not filters:
return True
for key, expected_value in filters.items():
# Case-insensitive comparison is often helpful for logs
log_value = log_entry.get(key)
if log_value is None:
return False
# Convert to string and compare (handle int/bool values in log_entry)
if str(log_value).lower() != expected_value.lower():
return False
return True
def pretty_print(log_entry: dict):
"""
Formats and prints a log entry with color-coding and structured output.
Uses common keys (timestamp, level, service, trace_id, message).
"""
# 1. Format Timestamp
ts_raw = log_entry.get('timestamp')
try:
if ts_raw:
# Assuming timestamp is ISO format or compatible string/float
if isinstance(ts_raw, (int, float)):
dt_object = datetime.fromtimestamp(ts_raw)
else:
# Handle ISO 8601 (2025-10-10T23:59:59.123456Z)
dt_object = datetime.fromisoformat(ts_raw.replace('Z', '+00:00'))
timestamp_fmt = dt_object.strftime('%H:%M:%S.%f')[:-3] # Keep ms
else:
timestamp_fmt = 'N/A'
except Exception:
timestamp_fmt = 'Invalid TS'
# 2. Get Color and Level
level = str(log_entry.get('level', 'INFO')).upper()
color = COLOR_MAP.get(level, Colors.WARNING)
# 3. Extract Core Fields
service = str(log_entry.get('service', 'unknown')).ljust(10)
trace_id = str(log_entry.get('trace_id', '---'))[:8].ljust(8)
message = log_entry.get('message', 'No message provided')
# 4. Construct the output line
output = (
f"[{Colors.BOLD}{timestamp_fmt}{Colors.ENDC}] "
f"[{color}{level:<5}{Colors.ENDC}] "
f"{Colors.OKGREEN}{service}{Colors.ENDC} "
f"(TID:{Colors.OKBLUE}{trace_id}{Colors.ENDC}): "
f"{message}"
)
# 5. Add any custom fields not covered
custom_fields = []
reserved_keys = {'timestamp', 'level', 'service', 'trace_id', 'message'}
for key, value in log_entry.items():
if key not in reserved_keys:
custom_fields.append(f"{key}={repr(value)}")
if custom_fields:
output += f" {Colors.ENDC}| {Colors.WARNING}{', '.join(custom_fields)}{Colors.ENDC}"
print(output)
def main():
"""
Main entry point for the log aggregator.
"""
parser = argparse.ArgumentParser(
description="A real-time JSON log aggregator and filter inspired by SRE debugging.",
epilog="Usage: cat logs.jsonl | python log_aggregator.py --filter 'user_id=U12345' --filter 'level=error'"
)
parser.add_argument(
'-f', '--filter',
action='append',
default=[],
help="Filter logs by 'key=value'. Can be used multiple times."
)
args = parser.parse_args()
# Parse command line filters into a key-value dictionary
filters = parse_filter_args(args.filter)
print(f"{Colors.BOLD}--- Log Aggregator Initialized ---{Colors.ENDC}")
if filters:
print(f"Applying filters: {Colors.OKGREEN}{filters}{Colors.ENDC}")
print(f"Reading from STDIN. Press Ctrl+C or Ctrl+D to exit.\n")
try:
# Read lines in real-time from standard input (e.g., piped from another service)
for line in sys.stdin:
log_entry = parse_log_line(line)
if log_entry is None:
# Log line was not valid JSON, skip it
continue
if apply_filters(log_entry, filters):
pretty_print(log_entry)
except KeyboardInterrupt:
print(f"\n{Colors.WARNING}--- Log Aggregator Terminated by User ---{Colors.ENDC}")
except Exception as e:
print(f"\n{Colors.FAIL}An unexpected error occurred: {e}{Colors.ENDC}", file=sys.stderr)
if __name__ == '__main__':
main()
# Example JSON log lines (paste these into a file named 'sample_logs.jsonl'
# or directly pipe them into the script when running it):
# {"timestamp": "2025-10-10T18:00:01.123456Z", "level": "INFO", "service": "web", "trace_id": "a1b2c3d4e5f6", "user_id": "U12345", "message": "Request received: /checkout/start"}
# {"timestamp": "2025-10-10T18:00:01.200000Z", "level": "DEBUG", "service": "auth", "trace_id": "a1b2c3d4e5f6", "user_id": "U12345", "message": "Token validated for session: XXXXXX"}
# {"timestamp": "2025-10-10T18:00:01.350000Z", "level": "INFO", "service": "cart", "trace_id": "a1b2c3d4e5f6", "user_id": "U12345", "message": "Cart items retrieved (2 items)"}
# {"timestamp": "2025-10-10T18:00:01.500000Z", "level": "ERROR", "service": "payment", "trace_id": "a1b2c3d4e5f6", "user_id": "U12345", "message": "Bank API timeout on transaction commit", "error_code": 504}
# {"timestamp": "2025-10-10T18:00:02.000000Z", "level": "WARNING", "service": "web", "trace_id": "g7h8i9j0k1l2", "user_id": "U98765", "message": "Client side retry initiated"}
# NOT A JSON LINE
# {"timestamp": "2025-10-10T18:00:02.100000Z", "level": "INFO", "service": "inventory", "trace_id": "m3n4o5p6q7r8", "message": "Stock check successful for product 42"}
How to Run:
# 1. Pipe a log file: cat sample_logs.jsonl | python log_aggregator.py
# 2. Filter by user: cat sample_logs.jsonl | python log_aggregator.py --filter 'user_id=U12345'
# 3. Filter by level and service: cat sample_logs.jsonl | python log_aggregator.py --filter 'level=error' --filter 'service=payment'
Alternative Implementation
import sys
import json
import argparse
import re
from datetime import datetime, timezone
from typing import Any, Callable, Tuple
# --- Configuration for Pretty Printing ---
# ANSI escape codes for coloring terminal output
class Colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# Mapping log levels to colors and comparative value (for level filtering)
LEVEL_MAP = {
'TRACE': (10, Colors.HEADER),
'DEBUG': (20, Colors.OKBLUE),
'INFO': (30, Colors.OKCYAN),
'WARNING': (40, Colors.WARNING),
'ERROR': (50, Colors.FAIL + Colors.BOLD),
'CRITICAL': (60, Colors.FAIL + Colors.BOLD),
}
# --- Parsing Utilities ---
def parse_log_line(line: str) -> dict | None:
"""Safely attempts to parse a string line into a JSON dictionary."""
try:
return json.loads(line.strip())
except json.JSONDecodeError:
return None
def parse_timestamp(ts_raw: Any) -> datetime | None:
"""Attempts to parse common timestamp formats (ISO 8601 or epoch)."""
if ts_raw is None:
return None
try:
if isinstance(ts_raw, (int, float)):
# Handle epoch timestamp (seconds)
return datetime.fromtimestamp(ts_raw, tz=timezone.utc)
if isinstance(ts_raw, str):
# Handle ISO 8601 with optional timezone (e.g., '2025-10-10T18:00:01.123456Z')
# Replace Z with +00:00 for compatibility
ts_str = ts_raw.replace('Z', '+00:00')
return datetime.fromisoformat(ts_str).astimezone(timezone.utc)
except ValueError:
pass
return None
def standardize_level(level: str) -> int:
"""Converts a log level string to its integer value for comparison."""
return LEVEL_MAP.get(level.upper(), (0, Colors.WARNING))[0]
# --- Advanced Filtering Logic ---
# Define supported comparison operators and their check functions
OPERATORS = {
'==': lambda a, b: str(a).lower() == str(b).lower(), # Exact match (case insensitive)
'!=': lambda a, b: str(a).lower() != str(b).lower(),
'>=': lambda a, b: a >= b,
'<=': lambda a, b: a <= b,
'>': lambda a, b: a > b,
'<': lambda a, b: a < b,
'=~': lambda a, b: re.search(b, str(a)), # Regex match
}
def parse_filter_args(filter_args: list[str]) -> list[Tuple[str, str, str, Callable]]:
"""
Parses 'key[op]value' strings into (key, op, value, comparator_function) tuples.
Supported ops: ==, !=, >=, <=, >, <, =~
"""
parsed_filters = []
# Combined regex to find key, operator, and value.
# It looks for keys, then one of the operators, and captures the rest as the value.
op_pattern = r'(==|!=|>=|<=|>|<|=~)'
for arg in filter_args:
match = re.match(r'(.+?)\s*' + op_pattern + r'\s*(.+)', arg)
if not match:
# Fallback for simple key=value (treat as key==value)
if '=' in arg:
key, value = arg.split('=', 1)
match = (key.strip(), '==', value.strip())
else:
print(f"{Colors.WARNING}Warning: Invalid filter format '{arg}'. Skipping.{Colors.ENDC}", file=sys.stderr)
continue
key, op, value = match.groups() if match.__class__ is re.Match else match
comparator = OPERATORS.get(op)
if comparator is None:
print(f"{Colors.WARNING}Warning: Unknown operator '{op}' in '{arg}'. Skipping.{Colors.ENDC}", file=sys.stderr)
continue
parsed_filters.append((key.strip(), op, value.strip(), comparator))
return parsed_filters
def apply_filters(log_entry: dict, parsed_filters: list[Tuple[str, str, str, Callable]]) -> bool:
"""
Checks if a log entry matches ALL specified filters using complex operators.
"""
for key, op, expected_value, comparator in parsed_filters:
log_value = log_entry.get(key)
if log_value is None:
# If the required key is missing, the log entry fails the filter
return False
# Custom handling for comparison operations on specific field types
try:
if op in ('>', '>=', '<', '<='):
if key == 'timestamp':
log_value_comp = parse_timestamp(log_value)
expected_value_comp = parse_timestamp(expected_value)
elif key == 'level':
log_value_comp = standardize_level(str(log_value))
expected_value_comp = standardize_level(expected_value)
else:
# Default numeric comparison
log_value_comp = float(log_value)
expected_value_comp = float(expected_value)
# Both sides must be comparable (not None for timestamp/level)
if log_value_comp is None or expected_value_comp is None:
# If parsing fails for comparison, it fails the filter
return False
if not comparator(log_value_comp, expected_value_comp):
return False
else:
# Default comparison (string comparison or regex match)
if not comparator(log_value, expected_value):
return False
except Exception:
# Catch errors during type conversion (e.g., trying to float() a string)
# This log line fails the current filter expression
return False
return True
# --- Output Logic ---
def pretty_print(log_entry: dict, fields: list[str]):
"""
Formats and prints a log entry based on the requested fields.
"""
# 1. Prepare Core Metadata
level_str = str(log_entry.get('level', 'INFO')).upper()
_, color_code = LEVEL_MAP.get(level_str, (0, Colors.WARNING))
# Get the raw timestamp and format it
ts_raw = log_entry.get('timestamp')
timestamp_fmt = 'N/A'
if ts_raw:
dt_obj = parse_timestamp(ts_raw)
if dt_obj:
timestamp_fmt = dt_obj.strftime('%H:%M:%S.%f')[:-3] # Keep ms
# Default values for common fields
core_values = {
'timestamp': f"[{Colors.BOLD}{timestamp_fmt}{Colors.ENDC}]",
'level': f"[{color_code}{level_str:<5}{Colors.ENDC}]",
'service': f"{Colors.OKGREEN}{str(log_entry.get('service', 'unknown')).ljust(10)}{Colors.ENDC}",
'trace_id': f"(TID:{Colors.OKBLUE}{str(log_entry.get('trace_id', '---'))[:8].ljust(8)}{Colors.ENDC})",
'message': log_entry.get('message', 'No message provided'),
}
output_parts = []
# 2. Build Output based on requested fields
for field_name in fields:
field_name = field_name.strip().lower()
if field_name in core_values:
# Use pre-formatted core values (includes color/structure)
output_parts.append(core_values[field_name])
elif field_name in log_entry:
# Use custom field as key=value pair
value = log_entry[field_name]
# Use repr() to safely display any type of value
output_parts.append(f"{Colors.WARNING}{field_name}={repr(value)}{Colors.ENDC}")
print(" ".join(output_parts))
def main():
"""Main entry point for the log aggregator."""
parser = argparse.ArgumentParser(
description="A real-time, advanced JSON log aggregator and filter.",
formatter_class=argparse.RawTextHelpFormatter,
epilog="""\
Examples:
cat logs.jsonl | python log_aggregator_v2.py --filter 'user_id==U12345'
cat logs.jsonl | python log_aggregator_v2.py -f 'level>=WARNING' -f 'service=~auth|payment'
cat logs.jsonl | python log_aggregator_v2.py --fields timestamp,trace_id,user_id,message
Supported Filters:
key==value (Case-insensitive equality)
key!=value
key>=value (Supports numerical, timestamp, and level comparison)
key<=value
key>value
key<value
key=~regex (Regular expression match)
"""
)
parser.add_argument(
'-f', '--filter',
action='append',
default=[],
help="Filter logs by 'key[op]value'. Can be used multiple times."
)
parser.add_argument(
'--fields',
type=str,
default='timestamp,level,service,trace_id,message',
help="Comma-separated list of fields to display in output. E.g., 'timestamp,user_id,message'."
)
args = parser.parse_args()
# Parse command line filters and output fields
parsed_filters = parse_filter_args(args.filter)
output_fields = [f.strip() for f in args.fields.split(',')]
print(f"{Colors.BOLD}--- Log Aggregator Initialized ---{Colors.ENDC}")
if parsed_filters:
print(f"Applying filters: {Colors.OKGREEN}{[f'{k}{o}{v}' for k, o, v, _ in parsed_filters]}{Colors.ENDC}")
print(f"Displaying fields: {Colors.OKBLUE}{', '.join(output_fields)}{Colors.ENDC}")
print(f"Reading from STDIN. Press Ctrl+C or Ctrl+D to exit.\n")
try:
for line in sys.stdin:
log_entry = parse_log_line(line)
if log_entry is None:
continue
if apply_filters(log_entry, parsed_filters):
pretty_print(log_entry, output_fields)
except KeyboardInterrupt:
print(f"\n{Colors.WARNING}--- Log Aggregator Terminated by User ---{Colors.ENDC}")
except Exception as e:
print(f"\n{Colors.FAIL}An unexpected error occurred: {e}{Colors.ENDC}", file=sys.stderr)
if __name__ == '__main__':
main()
Output:
--- Log Aggregator Initialized ---
Displaying fields: timestamp, level, service, trace_id, message
Reading from STDIN. Press Ctrl+C or Ctrl+D to exit.
Conclusion
Structured logging transforms debugging from guesswork into a data-driven process. With a simple aggregator like the one above, you can cut mean-time-to-resolution (MTTR) from hours to seconds—even during your busiest days. You don’t need expensive SaaS tools to start. Just enforce JSON logging, add a few key fields, and build (or use) a lightweight parser. The result? A system that’s not just observable—but understandable.