2 Commits

11 changed files with 446 additions and 2 deletions

129
monitoring/README.md Normal file
View File

@@ -0,0 +1,129 @@
# Asset telemetry monitoring
[!CAUTION]
This is a PoC, not suitable for real world due to lack of any authentication and security
## Pre-requisites
- Python 3.12
- Podman (or Docker)
## Architecture
The `dispatcher.py` script collects data (`cab` commands) from a CommandStation and sends it a MQTT broker.
The command being monitored is the `<l cab reg speedByte functMap>` one returned by the `<t cab speed dir>` throttle command. See the [DCC-EX command reference](https://dcc-ex.com/reference/software/command-summary-consolidated.html#t-cab-speed-dir-set-cab-loco-speed).
`mosquitto` is the MQTT broker.
The `handler.py` script subscribes to the MQTT broker and saves relevant data to the Timescale database.
Data is finally save into a Timescale hypertable.
## How to run
### Deploy Timescale
```bash
$ podman run -d -p 5432:5432 -v $(pwd)/data:/var/lib/postgresql/data -e "POSTGRES_USER=dccmonitor" -e "POSTGRES_PASSWORD=dccmonitor" --name timescale timescale/timescaledb:latest-pg17
```
[!IMPORTANT]
A volume should be created for persistent data
Tables and hypertables are automatically created by the `handler.py` script
### Deploy Mosquitto
```bash
$ podman run --userns=keep-id -d -p 1883:1883 -v $(pwd)/config/mosquitto.conf:/mosquitto/config/mosquitto.conf --name mosquitto eclipse-mosquitto:2.0
```
### Run the dispatcher and the handler
```bash
$ python dispatcher.py
```
```bash
$ python handler.py
```
## Debug data in Timescale
### Create a 10 secs aggregated data table
```sql
CREATE MATERIALIZED VIEW telemetry_10secs
WITH (timescaledb.continuous) AS
SELECT
time_bucket('10 seconds', timestamp) AS bucket,
cab,
ROUND(CAST(AVG(speed) AS NUMERIC), 1) AS avg_speed,
MIN(speed) AS min_speed,
MAX(speed) AS max_speed
FROM telemetry
GROUP BY bucket, cab;
```
and set the update policy:
```sql
SELECT add_continuous_aggregate_policy(
'telemetry_10secs',
start_offset => INTERVAL '1 hour', -- Go back 1 hour for updates
end_offset => INTERVAL '1 minute', -- Keep the latest 5 min fresh
schedule_interval => INTERVAL '1 minute' -- Run every minute
);
```
### Running statistics from 10 seconds table
```sql
WITH speed_durations AS (
SELECT
cab,
avg_speed,
max_speed,
bucket AS start_time,
LEAD(bucket) OVER (
PARTITION BY cab ORDER BY bucket
) AS end_time,
LEAD(bucket) OVER (PARTITION BY cab ORDER BY bucket) - bucket AS duration
FROM telemetry_10secs
)
SELECT * FROM speed_durations WHERE end_time IS NOT NULL;
```
and filtered by `cab` number, via a function
```sql
CREATE FUNCTION get_speed_durations(cab_id INT)
RETURNS TABLE (
cab INT,
speed DOUBLE PRECISION,
dir TEXT,
start_time TIMESTAMPTZ,
end_time TIMESTAMPTZ,
duration INTERVAL
)
AS $$
WITH speed_durations AS (
SELECT
cab,
avg_speed,
max_speed,
bucket AS start_time,
LEAD(bucket) OVER (
PARTITION BY cab ORDER BY bucket
) AS end_time,
LEAD(bucket) OVER (PARTITION BY cab ORDER BY bucket) - bucket AS duration
FROM telemetry_10secs
)
SELECT * FROM speed_durations WHERE end_time IS NOT NULL AND cab = cab_id;
$$ LANGUAGE sql;
-- Refresh data
CALL refresh_continuous_aggregate('telemetry_10secs', NULL, NULL);
SELECT * FROM get_speed_durations(1);
```

36
monitoring/compose.yml Normal file
View File

@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# vim: tabstop=2 shiftwidth=2 softtabstop=2
networks:
net:
volumes:
pgdata:
staticdata:
x-op-service-default: &service_default
restart: always # unless-stopped
init: true
services:
timescale:
<<: *service_default
image: timescale/timescaledb:latest-pg17
ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:5432:5432"
environment:
POSTGRES_USER: "dccmonitor"
POSTGRES_PASSWORD: "dccmonitor"
volumes:
- "pgdata:/var/lib/postgresql/data"
networks:
- net
broker:
<<: *service_default
image: eclipse-mosquitto:2.0
ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:1883:1883"
volumes:
- "./config/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro"
networks:
- net

View File

@@ -0,0 +1,2 @@
allow_anonymous true
listener 1883

107
monitoring/dispatcher.py Executable file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
import os
import time
import json
import socket
import logging
import paho.mqtt.client as mqtt
# FIXME: create a configuration
# TCP Socket Configuration
TCP_HOST = "192.168.10.110" # Replace with your TCP server IP
TCP_PORT = 2560 # Replace with your TCP server port
# FIXME: create a configuration
# MQTT Broker Configuration
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "telemetry/commandstation"
# Connect to MQTT Broker
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
# Connect function with automatic reconnection
def connect_mqtt():
while True:
try:
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
mqtt_client.loop_start() # Start background loop
logging.info("Connected to MQTT broker!")
return
except Exception as e:
logging.info(f"Connection failed: {e}. Retrying in 5 seconds...")
time.sleep(5) # Wait before Retrying
# Ensure connection before publishing
def safe_publish(topic, message):
if not mqtt_client.is_connected():
print("MQTT Disconnected! Reconnecting...")
connect_mqtt() # Reconnect if disconnected
result = mqtt_client.publish(topic, message, qos=1)
result.wait_for_publish() # Ensure message is published
logging.debug(f"Published: {message}")
def process_message(message):
"""Parses the '<l cab speed dir>' format and converts it to JSON."""
if not message.startswith("<l"):
return None
parts = message.strip().split() # Split by spaces
if len(parts) != 5:
logging.debug(f"Invalid speed command: {message}")
return None
_, _cab, _, _speed, _ = parts # Ignore the first `<t`
cab = int(_cab)
speed = int(_speed)
if speed > 1 and speed < 128:
direction = "r"
speed = speed - 1
elif speed > 129 and speed < 256:
direction = "f"
speed = speed - 129
else:
speed = 0
direction = "n"
try:
json_data = {
"cab": cab,
"speed": speed,
"dir": direction
}
return json_data
except ValueError as e:
logging.error(f"Error parsing message: {e}")
return None
def start_tcp_listener():
"""Listens for incoming TCP messages and publishes them to MQTT."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((TCP_HOST, TCP_PORT))
logging.info(
f"Connected to TCP server at {TCP_HOST}:{TCP_PORT}"
)
while True:
data = sock.recv(1024).decode("utf-8") # Read a chunk of data
if not data:
break
lines = data.strip().split("\n") # Handle multiple lines
for line in lines:
json_data = process_message(line)
if json_data:
safe_publish(MQTT_TOPIC, json.dumps(json_data))
# Start the listener
if __name__ == "__main__":
logging.basicConfig(level=os.getenv("DCC_LOGLEVEL", "INFO").upper())
start_tcp_listener()

87
monitoring/handler.py Executable file
View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
import os
import json
import logging
import datetime
import psycopg2
import paho.mqtt.client as mqtt
# MQTT Broker Configuration
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "telemetry/commandstation"
# TimescaleDB Configuration
DB_HOST = "localhost"
DB_NAME = "dccmonitor"
DB_USER = "dccmonitor"
DB_PASSWORD = "dccmonitor"
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, reason_code, properties):
logging.info(f"Connected with result code {reason_code}")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(MQTT_TOPIC)
# MQTT Callback: When a new message arrives
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode("utf-8"))
cab = payload["cab"]
speed = payload["speed"]
direction = payload["dir"]
timestamp = datetime.datetime.now(datetime.UTC)
# Insert into TimescaleDB
cur.execute(
"INSERT INTO telemetry (timestamp, cab, speed, dir) VALUES (%s, %s, %s, %s)", # noqa: E501
(timestamp, cab, speed, direction),
)
conn.commit()
logging.debug(
f"Inserted: {timestamp} | Cab: {cab} | Speed: {speed} | Dir: {direction}" # noqa: E501
)
except Exception as e:
logging.error(f"Error processing message: {e}")
if __name__ == "__main__":
logging.basicConfig(level=os.getenv("DCC_LOGLEVEL", "INFO").upper())
# Connect to TimescaleDB
conn = psycopg2.connect(
dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST
)
cur = conn.cursor()
# Ensure hypertable exists
cur.execute("""
CREATE TABLE IF NOT EXISTS telemetry (
timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
cab INT NOT NULL,
speed DOUBLE PRECISION NOT NULL,
dir TEXT NOT NULL
);
""")
conn.commit()
# Convert table to hypertable if not already
cur.execute("SELECT EXISTS (SELECT 1 FROM timescaledb_information.hypertables WHERE hypertable_name = 'telemetry');") # noqa: E501
if not cur.fetchone()[0]:
cur.execute("SELECT create_hypertable('telemetry', 'timestamp');")
conn.commit()
# Setup MQTT Client
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT)
# Start listening for messages
logging.info(f"Listening for MQTT messages on {MQTT_TOPIC}...")
client.loop_forever()

View File

@@ -0,0 +1,2 @@
paho-mqtt
psycopg2-binary

32
ram/ram/db_router.py Normal file
View File

@@ -0,0 +1,32 @@
class TelemetryRouter:
db_table = "telemetry_10secs"
def db_for_read(self, model, **hints):
"""Send read operations to the correct database."""
if model._meta.db_table == self.db_table:
return "telemetry" # Replace with your database name
return None # Default database
def db_for_write(self, model, **hints):
"""Send write operations to the correct database."""
if model._meta.db_table == self.db_table:
return False # Prevent Django from writing RO tables
return None
def allow_relation(self, obj1, obj2, **hints):
"""
Allow relations if a model in the auth or contenttypes apps is
involved.
"""
if (
obj1._meta.db_table == self.db_table
or obj2._meta.db_table == self.db_table
):
return True
return None
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""Prevent Django from migrating this model if it's using a specific database."""
if db == "telemetry":
return False # Prevent Django from creating/modifying tables
return None

View File

@@ -95,8 +95,16 @@ DATABASES = {
"default": { "default": {
"ENGINE": "django.db.backends.sqlite3", "ENGINE": "django.db.backends.sqlite3",
"NAME": STORAGE_DIR / "db.sqlite3", "NAME": STORAGE_DIR / "db.sqlite3",
} },
"telemetry": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "127.0.0.1",
"NAME": "dccmonitor",
"USER": "dccmonitor",
"PASSWORD": "dccmonitor",
},
} }
DATABASE_ROUTERS = ["ram.db_router.TelemetryRouter"]
# Password validation # Password validation

View File

@@ -17,6 +17,7 @@ from roster.models import (
RollingStockImage, RollingStockImage,
RollingStockProperty, RollingStockProperty,
RollingStockJournal, RollingStockJournal,
RollingStockTelemetry,
) )
@@ -287,3 +288,29 @@ class RollingStockAdmin(SortableAdminBase, admin.ModelAdmin):
download_csv.short_description = "Download selected items as CSV" download_csv.short_description = "Download selected items as CSV"
actions = [publish, unpublish, download_csv] actions = [publish, unpublish, download_csv]
@admin.register(RollingStockTelemetry)
class RollingTelemtryAdmin(admin.ModelAdmin):
list_filter = ("bucket", "cab")
list_display = ("bucket_highres", "cab", "max_speed", "avg_speed")
def bucket_highres(self, obj):
return obj.bucket.strftime("%Y-%m-%d %H:%M:%S")
bucket_highres.admin_order_field = "bucket" # Enable sorting
bucket_highres.short_description = "Bucket" # Column name in admin
def get_changelist_instance(self, request):
changelist = super().get_changelist_instance(request)
changelist.list_display_links = None # Disable links
return changelist
def has_add_permission(self, request):
return False # Disable adding new objects
def has_change_permission(self, request, obj=None):
return False # Disable editing objects
def has_delete_permission(self, request, obj=None):
return False # Disable deleting objects

View File

@@ -224,6 +224,20 @@ class RollingStockJournal(models.Model):
objects = PublicManager() objects = PublicManager()
# trick: this is technically an abstract class
# it is made readonly via db_router and admin to avoid any unwanted change
class RollingStockTelemetry(models.Model):
bucket = models.DateTimeField(primary_key=True, editable=False)
cab = models.PositiveIntegerField(editable=False)
avg_speed = models.FloatField(editable=False)
max_speed = models.PositiveIntegerField(editable=False)
class Meta:
db_table = "telemetry_10secs"
ordering = ["cab", "bucket"]
verbose_name_plural = "Telemetries"
# @receiver(models.signals.post_delete, sender=Cab) # @receiver(models.signals.post_delete, sender=Cab)
# def post_save_image(sender, instance, *args, **kwargs): # def post_save_image(sender, instance, *args, **kwargs):
# try: # try:

View File

@@ -8,7 +8,7 @@ django-countries
django-health-check django-health-check
django-admin-sortable2 django-admin-sortable2
django-tinymce django-tinymce
# Optional: # psycopg2-binary psycopg2-binary
# Required by django-countries and not always installed # Required by django-countries and not always installed
# by default on modern venvs (like Python 3.12 on Fedora 39) # by default on modern venvs (like Python 3.12 on Fedora 39)
setuptools setuptools