mirror of
https://github.com/daniviga/django-ram.git
synced 2025-08-04 13:17:50 +02:00
88 lines
2.6 KiB
Python
Executable File
88 lines
2.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import datetime
|
|
import psycopg2
|
|
import paho.mqtt.client as mqtt
|
|
|
|
# MQTT Broker Configuration
|
|
MQTT_BROKER = "localhost"
|
|
MQTT_PORT = 1883
|
|
MQTT_TOPIC = "telemetry/commandstation"
|
|
|
|
# TimescaleDB Configuration
|
|
DB_HOST = "localhost"
|
|
DB_NAME = "dccmonitor"
|
|
DB_USER = "dccmonitor"
|
|
DB_PASSWORD = "dccmonitor"
|
|
|
|
|
|
# The callback for when the client receives a CONNACK response from the server.
|
|
def on_connect(client, userdata, flags, reason_code, properties):
|
|
logging.info(f"Connected with result code {reason_code}")
|
|
# Subscribing in on_connect() means that if we lose the connection and
|
|
# reconnect then subscriptions will be renewed.
|
|
client.subscribe(MQTT_TOPIC)
|
|
|
|
|
|
# MQTT Callback: When a new message arrives
|
|
def on_message(client, userdata, msg):
|
|
try:
|
|
payload = json.loads(msg.payload.decode("utf-8"))
|
|
cab = payload["cab"]
|
|
speed = payload["speed"]
|
|
direction = payload["dir"]
|
|
timestamp = datetime.datetime.now(datetime.UTC)
|
|
|
|
# Insert into TimescaleDB
|
|
cur.execute(
|
|
"INSERT INTO telemetry (timestamp, cab, speed, dir) VALUES (%s, %s, %s, %s)", # noqa: E501
|
|
(timestamp, cab, speed, direction),
|
|
)
|
|
conn.commit()
|
|
logging.debug(
|
|
f"Inserted: {timestamp} | Cab: {cab} | Speed: {speed} | Dir: {direction}" # noqa: E501
|
|
)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing message: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=os.getenv("DCC_LOGLEVEL", "INFO").upper())
|
|
|
|
# Connect to TimescaleDB
|
|
conn = psycopg2.connect(
|
|
dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST
|
|
)
|
|
cur = conn.cursor()
|
|
|
|
# Ensure hypertable exists
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS telemetry (
|
|
timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
cab INT NOT NULL,
|
|
speed DOUBLE PRECISION NOT NULL,
|
|
dir TEXT NOT NULL
|
|
);
|
|
""")
|
|
conn.commit()
|
|
|
|
# Convert table to hypertable if not already
|
|
cur.execute("SELECT EXISTS (SELECT 1 FROM timescaledb_information.hypertables WHERE hypertable_name = 'telemetry');") # noqa: E501
|
|
if not cur.fetchone()[0]:
|
|
cur.execute("SELECT create_hypertable('telemetry', 'timestamp');")
|
|
conn.commit()
|
|
|
|
# Setup MQTT Client
|
|
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
client.connect(MQTT_BROKER, MQTT_PORT)
|
|
|
|
# Start listening for messages
|
|
logging.info(f"Listening for MQTT messages on {MQTT_TOPIC}...")
|
|
client.loop_forever()
|