239 lines
7.8 KiB
Python
Executable File
239 lines
7.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
telelog.py -- UDP telemetry logger using a triplestore SQLite backend
|
|
|
|
Usage:
|
|
python3 telelog.py --init-db # create/reset schema and exit
|
|
python3 telelog.py # listen on default port, write to db
|
|
python3 telelog.py --port 3001 # specify port
|
|
python3 telelog.py --db mydata.db # specify db file
|
|
python3 telelog.py --quiet # no console output
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import signal
|
|
import socket
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config defaults
|
|
# ---------------------------------------------------------------------------
|
|
DEFAULT_PORT = 3000
|
|
DEFAULT_DB = "telemetry.db"
|
|
DEFAULT_HOST = "0.0.0.0"
|
|
BUFFER_SIZE = 65535
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema
|
|
# ---------------------------------------------------------------------------
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS packets (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ts TEXT NOT NULL,
|
|
source TEXT,
|
|
tag TEXT
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_packets_tag ON packets(tag);
|
|
|
|
CREATE TABLE IF NOT EXISTS triples (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
packet_id INTEGER NOT NULL REFERENCES packets(id),
|
|
subject TEXT NOT NULL,
|
|
predicate TEXT NOT NULL,
|
|
object TEXT
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_triples_subject ON triples(subject);
|
|
CREATE INDEX IF NOT EXISTS idx_triples_predicate ON triples(predicate);
|
|
CREATE INDEX IF NOT EXISTS idx_triples_packet ON triples(packet_id);
|
|
"""
|
|
|
|
|
|
def init_db(db_path):
|
|
conn = sqlite3.connect(db_path)
|
|
conn.executescript(SCHEMA)
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"[telelog] Schema initialized: {db_path}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ingest
|
|
# ---------------------------------------------------------------------------
|
|
def insert_packet(conn, ts, source, tag, triples):
|
|
"""
|
|
Insert one packet and its subject-predicate-object triples.
|
|
triples is a list of (subject, predicate, object) tuples.
|
|
Returns the new packet_id.
|
|
"""
|
|
cur = conn.execute(
|
|
"INSERT INTO packets (ts, source, tag) VALUES (?, ?, ?)",
|
|
(ts, source, tag)
|
|
)
|
|
packet_id = cur.lastrowid
|
|
|
|
rows = [(packet_id, s, p, o) for s, p, o in triples]
|
|
conn.executemany(
|
|
"INSERT INTO triples (packet_id, subject, predicate, object) VALUES (?, ?, ?, ?)",
|
|
rows
|
|
)
|
|
conn.commit()
|
|
return packet_id
|
|
|
|
|
|
def extract_triples(obj, fallback_subject):
|
|
"""
|
|
Convert a parsed JSON payload into a list of (subject, predicate, object) tuples.
|
|
|
|
Rules:
|
|
- If the top-level value is a dict of dicts, the top-level key is the subject
|
|
and its child keys are predicates:
|
|
{"left_motor": {"power": 0.8, "temp": 42}}
|
|
-> ("left_motor", "power", "0.8"), ("left_motor", "temp", "42")
|
|
|
|
- If the top-level value is a scalar or mixed, the top-level key is the
|
|
predicate and fallback_subject (source IP) is the subject:
|
|
{"heading": 1.57}
|
|
-> (fallback_subject, "heading", "1.57")
|
|
|
|
- Deeper nesting collapses predicates with dot notation:
|
|
{"left_motor": {"pid": {"p": 1.0}}}
|
|
-> ("left_motor", "pid.p", "1.0")
|
|
|
|
- Lists are stored as JSON strings.
|
|
- Non-dict top-level payload falls back to (fallback_subject, "value", raw).
|
|
"""
|
|
triples = []
|
|
|
|
if not isinstance(obj, dict):
|
|
triples.append((fallback_subject, "value", str(obj)))
|
|
return triples
|
|
|
|
for top_key, top_val in obj.items():
|
|
if isinstance(top_val, dict):
|
|
# top_key is the subject; flatten children as predicates
|
|
subject = top_key
|
|
for pred, obj_val in _flatten_predicates(top_val).items():
|
|
triples.append((subject, pred, str(obj_val)))
|
|
else:
|
|
# top_key is the predicate; use fallback subject
|
|
if isinstance(top_val, list):
|
|
triples.append((fallback_subject, top_key, json.dumps(top_val)))
|
|
else:
|
|
triples.append((fallback_subject, top_key, str(top_val)))
|
|
|
|
return triples
|
|
|
|
|
|
def _flatten_predicates(obj, prefix=""):
|
|
"""Recursively flatten a dict into dot-separated predicate keys."""
|
|
out = {}
|
|
for k, v in obj.items():
|
|
key = f"{prefix}.{k}" if prefix else k
|
|
if isinstance(v, dict):
|
|
out.update(_flatten_predicates(v, key))
|
|
elif isinstance(v, list):
|
|
out[key] = json.dumps(v)
|
|
else:
|
|
out[key] = v
|
|
return out
|
|
|
|
|
|
def parse_payload(raw_bytes, fallback_subject):
|
|
"""
|
|
Attempt to parse payload as JSON. Fall back to raw string.
|
|
Returns a list of (subject, predicate, object) tuples.
|
|
"""
|
|
try:
|
|
text = raw_bytes.decode("utf-8").strip()
|
|
except UnicodeDecodeError:
|
|
return [(fallback_subject, "_raw_hex", raw_bytes.hex())]
|
|
|
|
try:
|
|
data = json.loads(text)
|
|
return extract_triples(data, fallback_subject)
|
|
except json.JSONDecodeError:
|
|
return [(fallback_subject, "_raw", text)]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main loop
|
|
# ---------------------------------------------------------------------------
|
|
def run(db_path, host, port, tag, quiet):
|
|
conn = sqlite3.connect(db_path)
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind((host, port))
|
|
sock.settimeout(1.0) # allows clean shutdown on Ctrl-C
|
|
|
|
tag_display = tag if tag else "(untagged)"
|
|
print(f"[telelog] Listening on {host}:{port} db={db_path} tag={tag_display}")
|
|
print(f"[telelog] Ctrl-C to stop")
|
|
|
|
running = True
|
|
|
|
def handle_signal(sig, frame):
|
|
nonlocal running
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
|
|
packet_count = 0
|
|
|
|
while running:
|
|
try:
|
|
data, addr = sock.recvfrom(BUFFER_SIZE)
|
|
except socket.timeout:
|
|
continue
|
|
except OSError:
|
|
break
|
|
|
|
ts = datetime.now(timezone.utc).isoformat(timespec="milliseconds")
|
|
source = f"{addr[0]}:{addr[1]}"
|
|
triples = parse_payload(data, source)
|
|
|
|
packet_id = insert_packet(conn, ts, source, tag, triples)
|
|
packet_count += 1
|
|
|
|
if not quiet:
|
|
subjects = sorted(set(s for s, p, o in triples))
|
|
print(f"[{ts}] src={source} id={packet_id} subjects={subjects}")
|
|
|
|
sock.close()
|
|
conn.close()
|
|
print(f"\n[telelog] Stopped. {packet_count} packets written.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="UDP telemetry triplestore logger")
|
|
parser.add_argument("--db", default=DEFAULT_DB, help=f"SQLite db file (default: {DEFAULT_DB})")
|
|
parser.add_argument("--port", default=DEFAULT_PORT, type=int, help=f"UDP listen port (default: {DEFAULT_PORT})")
|
|
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind address (default: {DEFAULT_HOST})")
|
|
parser.add_argument("--init-db", action="store_true", help="Initialize/reset schema and exit")
|
|
parser.add_argument("--tag", default=None, help="Label for this session (e.g. 'auto_match_3')")
|
|
parser.add_argument("--quiet", action="store_true", help="Suppress per-packet console output")
|
|
args = parser.parse_args()
|
|
|
|
if args.init_db:
|
|
init_db(args.db)
|
|
sys.exit(0)
|
|
|
|
# Ensure schema exists before running (idempotent)
|
|
init_db(args.db)
|
|
run(args.db, args.host, args.port, args.tag, args.quiet)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|