mirror of
https://github.com/daniviga/django-ram.git
synced 2025-08-04 13:17:50 +02:00
130 lines
3.3 KiB
Markdown
130 lines
3.3 KiB
Markdown
# Asset telemetry monitoring
|
|
|
|
[!CAUTION]
|
|
This is a PoC, not suitable for real world due to lack of any authentication and security
|
|
|
|
## Pre-requisites
|
|
|
|
- Python 3.12
|
|
- Podman (or Docker)
|
|
|
|
## Architecture
|
|
|
|
The `dispatcher.py` script collects data (`cab` commands) from a CommandStation and sends it a MQTT broker.
|
|
|
|
The command being monitored is the `<l cab reg speedByte functMap>` one returned by the `<t cab speed dir>` throttle command. See the [DCC-EX command reference](https://dcc-ex.com/reference/software/command-summary-consolidated.html#t-cab-speed-dir-set-cab-loco-speed).
|
|
|
|
`mosquitto` is the MQTT broker.
|
|
|
|
The `handler.py` script subscribes to the MQTT broker and saves relevant data to the Timescale database.
|
|
|
|
Data is finally save into a Timescale hypertable.
|
|
|
|
## How to run
|
|
|
|
### Deploy Timescale
|
|
|
|
```bash
|
|
$ podman run -d -p 5432:5432 -v $(pwd)/data:/var/lib/postgresql/data -e "POSTGRES_USER=dccmonitor" -e "POSTGRES_PASSWORD=dccmonitor" --name timescale timescale/timescaledb:latest-pg17
|
|
```
|
|
[!IMPORTANT]
|
|
A volume should be created for persistent data
|
|
|
|
Tables and hypertables are automatically created by the `handler.py` script
|
|
|
|
### Deploy Mosquitto
|
|
|
|
```bash
|
|
$ podman run --userns=keep-id -d -p 1883:1883 -v $(pwd)/config/mosquitto.conf:/mosquitto/config/mosquitto.conf --name mosquitto eclipse-mosquitto:2.0
|
|
```
|
|
|
|
### Run the dispatcher and the handler
|
|
|
|
```bash
|
|
$ python dispatcher.py
|
|
```
|
|
|
|
```bash
|
|
$ python handler.py
|
|
```
|
|
|
|
## Debug data in Timescale
|
|
|
|
### Create a 10 secs aggregated data table
|
|
|
|
```sql
|
|
CREATE MATERIALIZED VIEW telemetry_10secs
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket('10 seconds', timestamp) AS bucket,
|
|
cab,
|
|
ROUND(CAST(AVG(speed) AS NUMERIC), 1) AS avg_speed,
|
|
MIN(speed) AS min_speed,
|
|
MAX(speed) AS max_speed
|
|
FROM telemetry
|
|
GROUP BY bucket, cab;
|
|
```
|
|
|
|
and set the update policy:
|
|
|
|
```sql
|
|
SELECT add_continuous_aggregate_policy(
|
|
'telemetry_10secs',
|
|
start_offset => INTERVAL '1 hour', -- Go back 1 hour for updates
|
|
end_offset => INTERVAL '1 minute', -- Keep the latest 5 min fresh
|
|
schedule_interval => INTERVAL '1 minute' -- Run every minute
|
|
);
|
|
|
|
```
|
|
|
|
### Running statistics from 10 seconds table
|
|
|
|
```sql
|
|
WITH speed_durations AS (
|
|
SELECT
|
|
cab,
|
|
avg_speed,
|
|
max_speed,
|
|
bucket AS start_time,
|
|
LEAD(bucket) OVER (
|
|
PARTITION BY cab ORDER BY bucket
|
|
) AS end_time,
|
|
LEAD(bucket) OVER (PARTITION BY cab ORDER BY bucket) - bucket AS duration
|
|
FROM telemetry_10secs
|
|
)
|
|
SELECT * FROM speed_durations WHERE end_time IS NOT NULL;
|
|
```
|
|
|
|
and filtered by `cab` number, via a function
|
|
|
|
```sql
|
|
CREATE FUNCTION get_speed_durations(cab_id INT)
|
|
RETURNS TABLE (
|
|
cab INT,
|
|
speed DOUBLE PRECISION,
|
|
dir TEXT,
|
|
start_time TIMESTAMPTZ,
|
|
end_time TIMESTAMPTZ,
|
|
duration INTERVAL
|
|
)
|
|
AS $$
|
|
WITH speed_durations AS (
|
|
SELECT
|
|
cab,
|
|
avg_speed,
|
|
max_speed,
|
|
bucket AS start_time,
|
|
LEAD(bucket) OVER (
|
|
PARTITION BY cab ORDER BY bucket
|
|
) AS end_time,
|
|
LEAD(bucket) OVER (PARTITION BY cab ORDER BY bucket) - bucket AS duration
|
|
FROM telemetry_10secs
|
|
)
|
|
SELECT * FROM speed_durations WHERE end_time IS NOT NULL AND cab = cab_id;
|
|
$$ LANGUAGE sql;
|
|
|
|
-- Refresh data
|
|
CALL refresh_continuous_aggregate('telemetry_10secs', NULL, NULL);
|
|
SELECT * FROM get_speed_durations(1);
|
|
```
|