#!/usr/bin/env python3 """ telelog.py -- UDP telemetry logger using a triplestore SQLite backend Usage: python3 telelog.py --init-db # create/reset schema and exit python3 telelog.py # listen on default port, write to db python3 telelog.py --port 3001 # specify port python3 telelog.py --db mydata.db # specify db file python3 telelog.py --quiet # no console output """ import argparse import json import signal import socket import sqlite3 import sys from datetime import datetime, timezone # --------------------------------------------------------------------------- # Config defaults # --------------------------------------------------------------------------- DEFAULT_PORT = 3000 DEFAULT_DB = "telemetry.db" DEFAULT_HOST = "0.0.0.0" BUFFER_SIZE = 65535 # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- SCHEMA = """ CREATE TABLE IF NOT EXISTS packets ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, source TEXT, tag TEXT ); CREATE INDEX IF NOT EXISTS idx_packets_tag ON packets(tag); CREATE TABLE IF NOT EXISTS triples ( id INTEGER PRIMARY KEY AUTOINCREMENT, packet_id INTEGER NOT NULL REFERENCES packets(id), subject TEXT NOT NULL, predicate TEXT NOT NULL, object TEXT ); CREATE INDEX IF NOT EXISTS idx_triples_subject ON triples(subject); CREATE INDEX IF NOT EXISTS idx_triples_predicate ON triples(predicate); CREATE INDEX IF NOT EXISTS idx_triples_packet ON triples(packet_id); """ def init_db(db_path): conn = sqlite3.connect(db_path) conn.executescript(SCHEMA) conn.commit() conn.close() print(f"[telelog] Schema initialized: {db_path}") # --------------------------------------------------------------------------- # Ingest # --------------------------------------------------------------------------- def insert_packet(conn, ts, source, tag, triples): """ Insert one packet and its subject-predicate-object triples. triples is a list of (subject, predicate, object) tuples. Returns the new packet_id. """ cur = conn.execute( "INSERT INTO packets (ts, source, tag) VALUES (?, ?, ?)", (ts, source, tag) ) packet_id = cur.lastrowid rows = [(packet_id, s, p, o) for s, p, o in triples] conn.executemany( "INSERT INTO triples (packet_id, subject, predicate, object) VALUES (?, ?, ?, ?)", rows ) conn.commit() return packet_id def extract_triples(obj, fallback_subject): """ Convert a parsed JSON payload into a list of (subject, predicate, object) tuples. Rules: - If the top-level value is a dict of dicts, the top-level key is the subject and its child keys are predicates: {"left_motor": {"power": 0.8, "temp": 42}} -> ("left_motor", "power", "0.8"), ("left_motor", "temp", "42") - If the top-level value is a scalar or mixed, the top-level key is the predicate and fallback_subject (source IP) is the subject: {"heading": 1.57} -> (fallback_subject, "heading", "1.57") - Deeper nesting collapses predicates with dot notation: {"left_motor": {"pid": {"p": 1.0}}} -> ("left_motor", "pid.p", "1.0") - Lists are stored as JSON strings. - Non-dict top-level payload falls back to (fallback_subject, "value", raw). """ triples = [] if not isinstance(obj, dict): triples.append((fallback_subject, "value", str(obj))) return triples for top_key, top_val in obj.items(): if isinstance(top_val, dict): # top_key is the subject; flatten children as predicates subject = top_key for pred, obj_val in _flatten_predicates(top_val).items(): triples.append((subject, pred, str(obj_val))) else: # top_key is the predicate; use fallback subject if isinstance(top_val, list): triples.append((fallback_subject, top_key, json.dumps(top_val))) else: triples.append((fallback_subject, top_key, str(top_val))) return triples def _flatten_predicates(obj, prefix=""): """Recursively flatten a dict into dot-separated predicate keys.""" out = {} for k, v in obj.items(): key = f"{prefix}.{k}" if prefix else k if isinstance(v, dict): out.update(_flatten_predicates(v, key)) elif isinstance(v, list): out[key] = json.dumps(v) else: out[key] = v return out def parse_payload(raw_bytes, fallback_subject): """ Attempt to parse payload as JSON. Fall back to raw string. Returns a list of (subject, predicate, object) tuples. """ try: text = raw_bytes.decode("utf-8").strip() except UnicodeDecodeError: return [(fallback_subject, "_raw_hex", raw_bytes.hex())] try: data = json.loads(text) return extract_triples(data, fallback_subject) except json.JSONDecodeError: return [(fallback_subject, "_raw", text)] # --------------------------------------------------------------------------- # Main loop # --------------------------------------------------------------------------- def run(db_path, host, port, tag, quiet): conn = sqlite3.connect(db_path) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.settimeout(1.0) # allows clean shutdown on Ctrl-C tag_display = tag if tag else "(untagged)" print(f"[telelog] Listening on {host}:{port} db={db_path} tag={tag_display}") print(f"[telelog] Ctrl-C to stop") running = True def handle_signal(sig, frame): nonlocal running running = False signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) packet_count = 0 while running: try: data, addr = sock.recvfrom(BUFFER_SIZE) except socket.timeout: continue except OSError: break ts = datetime.now(timezone.utc).isoformat(timespec="milliseconds") source = f"{addr[0]}:{addr[1]}" triples = parse_payload(data, source) packet_id = insert_packet(conn, ts, source, tag, triples) packet_count += 1 if not quiet: subjects = sorted(set(s for s, p, o in triples)) print(f"[{ts}] src={source} id={packet_id} subjects={subjects}") sock.close() conn.close() print(f"\n[telelog] Stopped. {packet_count} packets written.") # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="UDP telemetry triplestore logger") parser.add_argument("--db", default=DEFAULT_DB, help=f"SQLite db file (default: {DEFAULT_DB})") parser.add_argument("--port", default=DEFAULT_PORT, type=int, help=f"UDP listen port (default: {DEFAULT_PORT})") parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind address (default: {DEFAULT_HOST})") parser.add_argument("--init-db", action="store_true", help="Initialize/reset schema and exit") parser.add_argument("--tag", default=None, help="Label for this session (e.g. 'auto_match_3')") parser.add_argument("--quiet", action="store_true", help="Suppress per-packet console output") args = parser.parse_args() if args.init_db: init_db(args.db) sys.exit(0) # Ensure schema exists before running (idempotent) init_db(args.db) run(args.db, args.host, args.port, args.tag, args.quiet) if __name__ == "__main__": main()