diff --git a/monitoring/README.md b/monitoring/README.md new file mode 100644 index 0000000..1bc5553 --- /dev/null +++ b/monitoring/README.md @@ -0,0 +1,129 @@ +# Asset telemetry monitoring + +[!CAUTION] +This is a PoC, not suitable for real world due to lack of any authentication and security + +## Pre-requisites + +- Python 3.12 +- Podman (or Docker) + +## Architecture + +The `dispatcher.py` script collects data (`cab` commands) from a CommandStation and sends it a MQTT broker. + +The command being monitored is the `` one returned by the `` throttle command. See the [DCC-EX command reference](https://dcc-ex.com/reference/software/command-summary-consolidated.html#t-cab-speed-dir-set-cab-loco-speed). + +`mosquitto` is the MQTT broker. + +The `handler.py` script subscribes to the MQTT broker and saves relevant data to the Timescale database. + +Data is finally save into a Timescale hypertable. + +## How to run + +### Deploy Timescale + +```bash +$ podman run -d -p 5432:5432 -v $(pwd)/data:/var/lib/postgresql/data -e "POSTGRES_USER=dccmonitor" -e "POSTGRES_PASSWORD=dccmonitor" --name timescale timescale/timescaledb:latest-pg17 +``` +[!IMPORTANT] +A volume should be created for persistent data + +Tables and hypertables are automatically created by the `handler.py` script + +### Deploy Mosquitto + +```bash +$ podman run --userns=keep-id -d -p 1883:1883 -v $(pwd)/config/mosquitto.conf:/mosquitto/config/mosquitto.conf --name mosquitto eclipse-mosquitto:2.0 +``` + +### Run the dispatcher and the handler + +```bash +$ python dispatcher.py +``` + +```bash +$ python handler.py +``` + +## Debug data in Timescale + +### Create a 10 secs aggregated data table + +```sql +CREATE MATERIALIZED VIEW telemetry_10secs +WITH (timescaledb.continuous) AS +SELECT + time_bucket('10 seconds', timestamp) AS bucket, + cab, + ROUND(CAST(AVG(speed) AS NUMERIC), 1) AS avg_speed, + MIN(speed) AS min_speed, + MAX(speed) AS max_speed +FROM telemetry +GROUP BY bucket, cab; +``` + +and set the update policy: + +```sql +SELECT add_continuous_aggregate_policy( + 'telemetry_10secs', + start_offset => INTERVAL '1 hour', -- Go back 1 hour for updates + end_offset => INTERVAL '1 minute', -- Keep the latest 5 min fresh + schedule_interval => INTERVAL '1 minute' -- Run every minute +); + +``` + +### Running statistics from 10 seconds table + +```sql +WITH speed_durations AS ( + SELECT + cab, + avg_speed, + max_speed, + bucket AS start_time, + LEAD(bucket) OVER ( + PARTITION BY cab ORDER BY bucket + ) AS end_time, + LEAD(bucket) OVER (PARTITION BY cab ORDER BY bucket) - bucket AS duration + FROM telemetry_10secs +) +SELECT * FROM speed_durations WHERE end_time IS NOT NULL; +``` + +and filtered by `cab` number, via a function + +```sql +CREATE FUNCTION get_speed_durations(cab_id INT) +RETURNS TABLE ( + cab INT, + speed DOUBLE PRECISION, + dir TEXT, + start_time TIMESTAMPTZ, + end_time TIMESTAMPTZ, + duration INTERVAL +) +AS $$ +WITH speed_durations AS ( + SELECT + cab, + avg_speed, + max_speed, + bucket AS start_time, + LEAD(bucket) OVER ( + PARTITION BY cab ORDER BY bucket + ) AS end_time, + LEAD(bucket) OVER (PARTITION BY cab ORDER BY bucket) - bucket AS duration + FROM telemetry_10secs +) +SELECT * FROM speed_durations WHERE end_time IS NOT NULL AND cab = cab_id; +$$ LANGUAGE sql; + +-- Refresh data +CALL refresh_continuous_aggregate('telemetry_10secs', NULL, NULL); +SELECT * FROM get_speed_durations(1); +``` diff --git a/monitoring/compose.yml b/monitoring/compose.yml new file mode 100644 index 0000000..36cc2ed --- /dev/null +++ b/monitoring/compose.yml @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# vim: tabstop=2 shiftwidth=2 softtabstop=2 +networks: + net: + +volumes: + pgdata: + staticdata: + +x-op-service-default: &service_default + restart: always # unless-stopped + init: true + +services: + timescale: + <<: *service_default + image: timescale/timescaledb:latest-pg17 + ports: + - "${CUSTOM_DOCKER_IP:-0.0.0.0}:5432:5432" + environment: + POSTGRES_USER: "dccmonitor" + POSTGRES_PASSWORD: "dccmonitor" + volumes: + - "pgdata:/var/lib/postgresql/data" + networks: + - net + + broker: + <<: *service_default + image: eclipse-mosquitto:2.0 + ports: + - "${CUSTOM_DOCKER_IP:-0.0.0.0}:1883:1883" + volumes: + - "./config/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro" + networks: + - net diff --git a/monitoring/config/mosquitto.conf b/monitoring/config/mosquitto.conf new file mode 100644 index 0000000..76c1e95 --- /dev/null +++ b/monitoring/config/mosquitto.conf @@ -0,0 +1,2 @@ +allow_anonymous true +listener 1883 diff --git a/monitoring/dispatcher.py b/monitoring/dispatcher.py new file mode 100755 index 0000000..2d20ffe --- /dev/null +++ b/monitoring/dispatcher.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 + +import os +import time +import json +import socket +import logging +import paho.mqtt.client as mqtt + +# FIXME: create a configuration +# TCP Socket Configuration +TCP_HOST = "192.168.10.110" # Replace with your TCP server IP +TCP_PORT = 2560 # Replace with your TCP server port + +# FIXME: create a configuration +# MQTT Broker Configuration +MQTT_BROKER = "localhost" +MQTT_PORT = 1883 +MQTT_TOPIC = "telemetry/commandstation" + +# Connect to MQTT Broker +mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + + +# Connect function with automatic reconnection +def connect_mqtt(): + while True: + try: + mqtt_client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60) + mqtt_client.loop_start() # Start background loop + logging.info("Connected to MQTT broker!") + return + except Exception as e: + logging.info(f"Connection failed: {e}. Retrying in 5 seconds...") + time.sleep(5) # Wait before Retrying + + +# Ensure connection before publishing +def safe_publish(topic, message): + if not mqtt_client.is_connected(): + print("MQTT Disconnected! Reconnecting...") + connect_mqtt() # Reconnect if disconnected + + result = mqtt_client.publish(topic, message, qos=1) + result.wait_for_publish() # Ensure message is published + logging.debug(f"Published: {message}") + + +def process_message(message): + """Parses the '' format and converts it to JSON.""" + if not message.startswith(" 1 and speed < 128: + direction = "r" + speed = speed - 1 + elif speed > 129 and speed < 256: + direction = "f" + speed = speed - 129 + else: + speed = 0 + direction = "n" + + try: + json_data = { + "cab": cab, + "speed": speed, + "dir": direction + } + return json_data + except ValueError as e: + logging.error(f"Error parsing message: {e}") + return None + + +def start_tcp_listener(): + """Listens for incoming TCP messages and publishes them to MQTT.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect((TCP_HOST, TCP_PORT)) + logging.info( + f"Connected to TCP server at {TCP_HOST}:{TCP_PORT}" + ) + + while True: + data = sock.recv(1024).decode("utf-8") # Read a chunk of data + if not data: + break + + lines = data.strip().split("\n") # Handle multiple lines + for line in lines: + json_data = process_message(line) + if json_data: + safe_publish(MQTT_TOPIC, json.dumps(json_data)) + + +# Start the listener +if __name__ == "__main__": + logging.basicConfig(level=os.getenv("DCC_LOGLEVEL", "INFO").upper()) + start_tcp_listener() diff --git a/monitoring/handler.py b/monitoring/handler.py new file mode 100755 index 0000000..6ac5f97 --- /dev/null +++ b/monitoring/handler.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 + +import os +import json +import logging +import datetime +import psycopg2 +import paho.mqtt.client as mqtt + +# MQTT Broker Configuration +MQTT_BROKER = "localhost" +MQTT_PORT = 1883 +MQTT_TOPIC = "telemetry/commandstation" + +# TimescaleDB Configuration +DB_HOST = "localhost" +DB_NAME = "dccmonitor" +DB_USER = "dccmonitor" +DB_PASSWORD = "dccmonitor" + + +# The callback for when the client receives a CONNACK response from the server. +def on_connect(client, userdata, flags, reason_code, properties): + logging.info(f"Connected with result code {reason_code}") + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe(MQTT_TOPIC) + + +# MQTT Callback: When a new message arrives +def on_message(client, userdata, msg): + try: + payload = json.loads(msg.payload.decode("utf-8")) + cab = payload["cab"] + speed = payload["speed"] + direction = payload["dir"] + timestamp = datetime.datetime.now(datetime.UTC) + + # Insert into TimescaleDB + cur.execute( + "INSERT INTO telemetry (timestamp, cab, speed, dir) VALUES (%s, %s, %s, %s)", # noqa: E501 + (timestamp, cab, speed, direction), + ) + conn.commit() + logging.debug( + f"Inserted: {timestamp} | Cab: {cab} | Speed: {speed} | Dir: {direction}" # noqa: E501 + ) + + except Exception as e: + logging.error(f"Error processing message: {e}") + + +if __name__ == "__main__": + logging.basicConfig(level=os.getenv("DCC_LOGLEVEL", "INFO").upper()) + + # Connect to TimescaleDB + conn = psycopg2.connect( + dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST + ) + cur = conn.cursor() + + # Ensure hypertable exists + cur.execute(""" + CREATE TABLE IF NOT EXISTS telemetry ( + timestamp TIMESTAMPTZ NOT NULL DEFAULT now(), + cab INT NOT NULL, + speed DOUBLE PRECISION NOT NULL, + dir TEXT NOT NULL + ); + """) + conn.commit() + + # Convert table to hypertable if not already + cur.execute("SELECT EXISTS (SELECT 1 FROM timescaledb_information.hypertables WHERE hypertable_name = 'telemetry');") # noqa: E501 + if not cur.fetchone()[0]: + cur.execute("SELECT create_hypertable('telemetry', 'timestamp');") + conn.commit() + + # Setup MQTT Client + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + client.on_connect = on_connect + client.on_message = on_message + client.connect(MQTT_BROKER, MQTT_PORT) + + # Start listening for messages + logging.info(f"Listening for MQTT messages on {MQTT_TOPIC}...") + client.loop_forever() diff --git a/monitoring/requirements.txt b/monitoring/requirements.txt new file mode 100644 index 0000000..6c8d651 --- /dev/null +++ b/monitoring/requirements.txt @@ -0,0 +1,2 @@ +paho-mqtt +psycopg2-binary