Files
django-ram/monitoring/README.md

3.3 KiB

Asset telemetry monitoring

[!CAUTION] This is a PoC, not suitable for real world due to lack of any authentication and security

Pre-requisites

  • Python 3.12
  • Podman (or Docker)

Architecture

The dispatcher.py script collects data (cab commands) from a CommandStation and sends it a MQTT broker.

The command being monitored is the <l cab reg speedByte functMap> one returned by the <t cab speed dir> throttle command. See the DCC-EX command reference.

mosquitto is the MQTT broker.

The handler.py script subscribes to the MQTT broker and saves relevant data to the Timescale database.

Data is finally save into a Timescale hypertable.

How to run

Deploy Timescale

$ podman run -d -p 5432:5432 -v $(pwd)/data:/var/lib/postgresql/data -e "POSTGRES_USER=dccmonitor" -e "POSTGRES_PASSWORD=dccmonitor" --name timescale timescale/timescaledb:latest-pg17

[!IMPORTANT] A volume should be created for persistent data

Tables and hypertables are automatically created by the handler.py script

Deploy Mosquitto

$ podman run --userns=keep-id -d -p 1883:1883 -v $(pwd)/config/mosquitto.conf:/mosquitto/config/mosquitto.conf --name mosquitto eclipse-mosquitto:2.0

Run the dispatcher and the handler

$ python dispatcher.py
$ python handler.py

Debug data in Timescale

Create a 10 secs aggregated data table

CREATE MATERIALIZED VIEW telemetry_10secs
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('10 seconds', timestamp) AS bucket,
    cab,
    ROUND(CAST(AVG(speed) AS NUMERIC), 1) AS avg_speed,
    MIN(speed) AS min_speed,
    MAX(speed) AS max_speed
FROM telemetry
GROUP BY bucket, cab;

and set the update policy:

SELECT add_continuous_aggregate_policy(
    'telemetry_10secs',
    start_offset => INTERVAL '1 hour',  -- Go back 1 hour for updates
    end_offset   => INTERVAL '1 minute', -- Keep the latest 5 min fresh
    schedule_interval => INTERVAL '1 minute' -- Run every minute
);

Running statistics from 10 seconds table

WITH speed_durations AS (
    SELECT 
        cab,
        avg_speed,
        max_speed,
        bucket AS start_time,
        LEAD(bucket) OVER (
            PARTITION BY cab ORDER BY bucket
        ) AS end_time,
        LEAD(bucket) OVER (PARTITION BY cab ORDER BY bucket) - bucket AS duration
    FROM telemetry_10secs
)
SELECT * FROM speed_durations WHERE end_time IS NOT NULL;

and filtered by cab number, via a function

CREATE FUNCTION get_speed_durations(cab_id INT) 
RETURNS TABLE (
    cab INT, 
    speed DOUBLE PRECISION, 
    dir TEXT, 
    start_time TIMESTAMPTZ, 
    end_time TIMESTAMPTZ, 
    duration INTERVAL
) 
AS $$
WITH speed_durations AS (
    SELECT 
        cab,
        avg_speed,
        max_speed,
        bucket AS start_time,
        LEAD(bucket) OVER (
            PARTITION BY cab ORDER BY bucket
        ) AS end_time,
        LEAD(bucket) OVER (PARTITION BY cab ORDER BY bucket) - bucket AS duration
    FROM telemetry_10secs
)
SELECT * FROM speed_durations WHERE end_time IS NOT NULL AND cab = cab_id;
$$ LANGUAGE sql;

-- Refresh data
CALL refresh_continuous_aggregate('telemetry_10secs', NULL, NULL);
SELECT * FROM get_speed_durations(1);