Initial commit
This commit is contained in:
238
telelog.py
Executable file
238
telelog.py
Executable file
@@ -0,0 +1,238 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
telelog.py -- UDP telemetry logger using a triplestore SQLite backend
|
||||
|
||||
Usage:
|
||||
python3 telelog.py --init-db # create/reset schema and exit
|
||||
python3 telelog.py # listen on default port, write to db
|
||||
python3 telelog.py --port 3001 # specify port
|
||||
python3 telelog.py --db mydata.db # specify db file
|
||||
python3 telelog.py --quiet # no console output
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import signal
|
||||
import socket
|
||||
import sqlite3
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config defaults
|
||||
# ---------------------------------------------------------------------------
|
||||
DEFAULT_PORT = 3000
|
||||
DEFAULT_DB = "telemetry.db"
|
||||
DEFAULT_HOST = "0.0.0.0"
|
||||
BUFFER_SIZE = 65535
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Schema
|
||||
# ---------------------------------------------------------------------------
|
||||
SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS packets (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts TEXT NOT NULL,
|
||||
source TEXT,
|
||||
tag TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_packets_tag ON packets(tag);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS triples (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
packet_id INTEGER NOT NULL REFERENCES packets(id),
|
||||
subject TEXT NOT NULL,
|
||||
predicate TEXT NOT NULL,
|
||||
object TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_triples_subject ON triples(subject);
|
||||
CREATE INDEX IF NOT EXISTS idx_triples_predicate ON triples(predicate);
|
||||
CREATE INDEX IF NOT EXISTS idx_triples_packet ON triples(packet_id);
|
||||
"""
|
||||
|
||||
|
||||
def init_db(db_path):
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.executescript(SCHEMA)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
print(f"[telelog] Schema initialized: {db_path}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Ingest
|
||||
# ---------------------------------------------------------------------------
|
||||
def insert_packet(conn, ts, source, tag, triples):
|
||||
"""
|
||||
Insert one packet and its subject-predicate-object triples.
|
||||
triples is a list of (subject, predicate, object) tuples.
|
||||
Returns the new packet_id.
|
||||
"""
|
||||
cur = conn.execute(
|
||||
"INSERT INTO packets (ts, source, tag) VALUES (?, ?, ?)",
|
||||
(ts, source, tag)
|
||||
)
|
||||
packet_id = cur.lastrowid
|
||||
|
||||
rows = [(packet_id, s, p, o) for s, p, o in triples]
|
||||
conn.executemany(
|
||||
"INSERT INTO triples (packet_id, subject, predicate, object) VALUES (?, ?, ?, ?)",
|
||||
rows
|
||||
)
|
||||
conn.commit()
|
||||
return packet_id
|
||||
|
||||
|
||||
def extract_triples(obj, fallback_subject):
|
||||
"""
|
||||
Convert a parsed JSON payload into a list of (subject, predicate, object) tuples.
|
||||
|
||||
Rules:
|
||||
- If the top-level value is a dict of dicts, the top-level key is the subject
|
||||
and its child keys are predicates:
|
||||
{"left_motor": {"power": 0.8, "temp": 42}}
|
||||
-> ("left_motor", "power", "0.8"), ("left_motor", "temp", "42")
|
||||
|
||||
- If the top-level value is a scalar or mixed, the top-level key is the
|
||||
predicate and fallback_subject (source IP) is the subject:
|
||||
{"heading": 1.57}
|
||||
-> (fallback_subject, "heading", "1.57")
|
||||
|
||||
- Deeper nesting collapses predicates with dot notation:
|
||||
{"left_motor": {"pid": {"p": 1.0}}}
|
||||
-> ("left_motor", "pid.p", "1.0")
|
||||
|
||||
- Lists are stored as JSON strings.
|
||||
- Non-dict top-level payload falls back to (fallback_subject, "value", raw).
|
||||
"""
|
||||
triples = []
|
||||
|
||||
if not isinstance(obj, dict):
|
||||
triples.append((fallback_subject, "value", str(obj)))
|
||||
return triples
|
||||
|
||||
for top_key, top_val in obj.items():
|
||||
if isinstance(top_val, dict):
|
||||
# top_key is the subject; flatten children as predicates
|
||||
subject = top_key
|
||||
for pred, obj_val in _flatten_predicates(top_val).items():
|
||||
triples.append((subject, pred, str(obj_val)))
|
||||
else:
|
||||
# top_key is the predicate; use fallback subject
|
||||
if isinstance(top_val, list):
|
||||
triples.append((fallback_subject, top_key, json.dumps(top_val)))
|
||||
else:
|
||||
triples.append((fallback_subject, top_key, str(top_val)))
|
||||
|
||||
return triples
|
||||
|
||||
|
||||
def _flatten_predicates(obj, prefix=""):
|
||||
"""Recursively flatten a dict into dot-separated predicate keys."""
|
||||
out = {}
|
||||
for k, v in obj.items():
|
||||
key = f"{prefix}.{k}" if prefix else k
|
||||
if isinstance(v, dict):
|
||||
out.update(_flatten_predicates(v, key))
|
||||
elif isinstance(v, list):
|
||||
out[key] = json.dumps(v)
|
||||
else:
|
||||
out[key] = v
|
||||
return out
|
||||
|
||||
|
||||
def parse_payload(raw_bytes, fallback_subject):
|
||||
"""
|
||||
Attempt to parse payload as JSON. Fall back to raw string.
|
||||
Returns a list of (subject, predicate, object) tuples.
|
||||
"""
|
||||
try:
|
||||
text = raw_bytes.decode("utf-8").strip()
|
||||
except UnicodeDecodeError:
|
||||
return [(fallback_subject, "_raw_hex", raw_bytes.hex())]
|
||||
|
||||
try:
|
||||
data = json.loads(text)
|
||||
return extract_triples(data, fallback_subject)
|
||||
except json.JSONDecodeError:
|
||||
return [(fallback_subject, "_raw", text)]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main loop
|
||||
# ---------------------------------------------------------------------------
|
||||
def run(db_path, host, port, tag, quiet):
|
||||
conn = sqlite3.connect(db_path)
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind((host, port))
|
||||
sock.settimeout(1.0) # allows clean shutdown on Ctrl-C
|
||||
|
||||
tag_display = tag if tag else "(untagged)"
|
||||
print(f"[telelog] Listening on {host}:{port} db={db_path} tag={tag_display}")
|
||||
print(f"[telelog] Ctrl-C to stop")
|
||||
|
||||
running = True
|
||||
|
||||
def handle_signal(sig, frame):
|
||||
nonlocal running
|
||||
running = False
|
||||
|
||||
signal.signal(signal.SIGINT, handle_signal)
|
||||
signal.signal(signal.SIGTERM, handle_signal)
|
||||
|
||||
packet_count = 0
|
||||
|
||||
while running:
|
||||
try:
|
||||
data, addr = sock.recvfrom(BUFFER_SIZE)
|
||||
except socket.timeout:
|
||||
continue
|
||||
except OSError:
|
||||
break
|
||||
|
||||
ts = datetime.now(timezone.utc).isoformat(timespec="milliseconds")
|
||||
source = f"{addr[0]}:{addr[1]}"
|
||||
triples = parse_payload(data, source)
|
||||
|
||||
packet_id = insert_packet(conn, ts, source, tag, triples)
|
||||
packet_count += 1
|
||||
|
||||
if not quiet:
|
||||
subjects = sorted(set(s for s, p, o in triples))
|
||||
print(f"[{ts}] src={source} id={packet_id} subjects={subjects}")
|
||||
|
||||
sock.close()
|
||||
conn.close()
|
||||
print(f"\n[telelog] Stopped. {packet_count} packets written.")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="UDP telemetry triplestore logger")
|
||||
parser.add_argument("--db", default=DEFAULT_DB, help=f"SQLite db file (default: {DEFAULT_DB})")
|
||||
parser.add_argument("--port", default=DEFAULT_PORT, type=int, help=f"UDP listen port (default: {DEFAULT_PORT})")
|
||||
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind address (default: {DEFAULT_HOST})")
|
||||
parser.add_argument("--init-db", action="store_true", help="Initialize/reset schema and exit")
|
||||
parser.add_argument("--tag", default=None, help="Label for this session (e.g. 'auto_match_3')")
|
||||
parser.add_argument("--quiet", action="store_true", help="Suppress per-packet console output")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.init_db:
|
||||
init_db(args.db)
|
||||
sys.exit(0)
|
||||
|
||||
# Ensure schema exists before running (idempotent)
|
||||
init_db(args.db)
|
||||
run(args.db, args.host, args.port, args.tag, args.quiet)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user