1
0
mirror of https://github.com/daniviga/bite.git synced 2024-11-22 21:16:12 +01:00

Run black

This commit is contained in:
Daniele Viganò 2023-09-09 10:00:48 +02:00
parent da92935001
commit 398a62ded3
3 changed files with 88 additions and 72 deletions

View File

@ -35,12 +35,12 @@ from api.models import Device
class Command(BaseCommand): class Command(BaseCommand):
help = 'MQTT to DB deamon' help = "Telemetry dispatcher"
MQTT_HOST = settings.MQTT_BROKER['HOST'] MQTT_HOST = settings.MQTT_BROKER["HOST"]
MQTT_PORT = int(settings.MQTT_BROKER['PORT']) MQTT_PORT = int(settings.MQTT_BROKER["PORT"])
KAFKA_HOST = settings.KAFKA_BROKER['HOST'] KAFKA_HOST = settings.KAFKA_BROKER["HOST"]
KAFKA_PORT = int(settings.KAFKA_BROKER['PORT']) KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"])
producer = None producer = None
@sync_to_async @sync_to_async
@ -52,10 +52,7 @@ class Command(BaseCommand):
@sync_to_async @sync_to_async
def dispatch(self, message): def dispatch(self, message):
self.producer.send( self.producer.send("telemetry", {"transport": "mqtt", "body": message})
'telemetry', {"transport": 'mqtt',
"body": message}
)
async def mqtt_broker(self): async def mqtt_broker(self):
async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client: async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client:
@ -66,12 +63,13 @@ class Command(BaseCommand):
device = await self.get_device(message.topic) device = await self.get_device(message.topic)
if device is not None: if device is not None:
message_body = json.loads( message_body = json.loads(
message.payload.decode('utf-8')) message.payload.decode("utf-8")
)
await self.dispatch(message_body) await self.dispatch(message_body)
else: else:
self.stdout.write( self.stdout.write(
self.style.ERROR( self.style.ERROR("DEBUG: message discarded")
'DEBUG: message discarded')) )
def handle(self, *args, **options): def handle(self, *args, **options):
client = mqtt.Client() client = mqtt.Client()
@ -81,24 +79,26 @@ class Command(BaseCommand):
break break
except (socket.gaierror, ConnectionRefusedError): except (socket.gaierror, ConnectionRefusedError):
self.stdout.write( self.stdout.write(
self.style.WARNING('WARNING: MQTT broker not available')) self.style.WARNING("WARNING: MQTT broker not available")
)
time.sleep(5) time.sleep(5)
while True: while True:
try: try:
self.producer = KafkaProducer( self.producer = KafkaProducer(
bootstrap_servers='{}:{}'.format( bootstrap_servers="{}:{}".format(
self.KAFKA_HOST, self.KAFKA_PORT self.KAFKA_HOST, self.KAFKA_PORT
), ),
value_serializer=lambda v: json.dumps(v).encode('utf-8'), value_serializer=lambda v: json.dumps(v).encode("utf-8"),
retries=5 retries=5,
) )
break break
except NoBrokersAvailable: except NoBrokersAvailable:
self.stdout.write( self.stdout.write(
self.style.WARNING('WARNING: Kafka broker not available')) self.style.WARNING("WARNING: Kafka broker not available")
)
time.sleep(5) time.sleep(5)
self.stdout.write(self.style.SUCCESS('INFO: Brokers subscribed')) self.stdout.write(self.style.SUCCESS("INFO: Brokers subscribed"))
client.disconnect() client.disconnect()
asyncio.run(self.mqtt_broker()) asyncio.run(self.mqtt_broker())

View File

@ -31,10 +31,10 @@ from telemetry.models import Telemetry
class Command(BaseCommand): class Command(BaseCommand):
help = 'MQTT to DB deamon' help = "Telemetry handler"
KAFKA_HOST = settings.KAFKA_BROKER['HOST'] KAFKA_HOST = settings.KAFKA_BROKER["HOST"]
KAFKA_PORT = int(settings.KAFKA_BROKER['PORT']) KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"])
def get_device(self, serial): def get_device(self, serial):
try: try:
@ -47,7 +47,7 @@ class Command(BaseCommand):
transport=transport, transport=transport,
device=self.get_device(message["device"]), device=self.get_device(message["device"]),
clock=message["clock"], clock=message["clock"],
payload=message["payload"] payload=message["payload"],
) )
def handle(self, *args, **options): def handle(self, *args, **options):
@ -55,22 +55,22 @@ class Command(BaseCommand):
try: try:
consumer = KafkaConsumer( consumer = KafkaConsumer(
"telemetry", "telemetry",
bootstrap_servers='{}:{}'.format( bootstrap_servers="{}:{}".format(
self.KAFKA_HOST, self.KAFKA_PORT self.KAFKA_HOST, self.KAFKA_PORT
), ),
group_id="handler", group_id="handler",
value_deserializer=lambda m: json.loads(m.decode('utf8')), value_deserializer=lambda m: json.loads(m.decode("utf8")),
) )
break break
except NoBrokersAvailable: except NoBrokersAvailable:
self.stdout.write( self.stdout.write(
self.style.WARNING('WARNING: Kafka broker not available')) self.style.WARNING("WARNING: Kafka broker not available")
)
time.sleep(5) time.sleep(5)
self.stdout.write(self.style.SUCCESS('INFO: Kafka broker subscribed')) self.stdout.write(self.style.SUCCESS("INFO: Kafka broker subscribed"))
for message in consumer: for message in consumer:
self.store_telemetry( self.store_telemetry(
message.value["transport"], message.value["transport"], message.value["body"]
message.value["body"]
) )
consumer.unsuscribe() consumer.unsuscribe()

View File

@ -29,7 +29,7 @@ import argparse
from time import sleep from time import sleep
import paho.mqtt.publish as publish import paho.mqtt.publish as publish
DEBUG = bool(os.environ.get('IOT_DEBUG', False)) DEBUG = bool(os.environ.get("IOT_DEBUG", False))
http = urllib3.PoolManager() http = urllib3.PoolManager()
@ -39,15 +39,16 @@ def post_json(endpoint, url, data):
if DEBUG: if DEBUG:
print(json_data) print(json_data)
encoded_data = json_data.encode('utf8') encoded_data = json_data.encode("utf8")
while True: while True:
try: try:
r = http.request( r = http.request(
'POST', "POST",
endpoint + url, endpoint + url,
body=encoded_data, body=encoded_data,
headers={'content-type': 'application/json'}) headers={"content-type": "application/json"},
)
return r return r
except urllib3.exceptions.MaxRetryError: except urllib3.exceptions.MaxRetryError:
pass pass
@ -57,74 +58,89 @@ def post_json(endpoint, url, data):
def publish_json(transport, endpoint, data): def publish_json(transport, endpoint, data):
json_data = json.dumps(data) json_data = json.dumps(data)
serial = data['device'] serial = data["device"]
if DEBUG: if DEBUG:
print(json_data) print(json_data)
encoded_data = json_data.encode('utf8') encoded_data = json_data.encode("utf8")
publish.single( publish.single(
topic=serial, topic=serial,
payload=encoded_data, payload=encoded_data,
hostname=endpoint.split(':')[0], hostname=endpoint.split(":")[0],
port=int(endpoint.split(':')[1]), port=int(endpoint.split(":")[1]),
client_id=serial, client_id=serial,
transport=('websockets' if transport == 'ws' else 'tcp'), transport=("websockets" if transport == "ws" else "tcp"),
# auth=auth FIXME # auth=auth FIXME
) )
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(description="IoT simulator oprions")
description='IoT simulator oprions')
parser.add_argument('-e', '--endpoint', parser.add_argument(
default=os.environ.get('IOT_HTTP', "-e",
'http://127.0.0.1:8000'), "--endpoint",
help='IoT HTTP endpoint') default=os.environ.get("IOT_HTTP", "http://127.0.0.1:8000"),
parser.add_argument('-m', '--mqtt', help="IoT HTTP endpoint",
default=os.environ.get('IOT_MQTT', )
'127.0.0.1:1883'), parser.add_argument(
help='IoT MQTT endpoint') "-m",
parser.add_argument('-t', '--transport', "--mqtt",
choices=['mqtt', 'ws', 'http'], default=os.environ.get("IOT_MQTT", "127.0.0.1:1883"),
default=os.environ.get('IOT_TL', 'http'), help="IoT MQTT endpoint",
help='IoT transport layer') )
parser.add_argument('-s', '--serial', parser.add_argument(
default=os.environ.get('IOT_SERIAL'), "-t",
help='IoT device serial number') "--transport",
parser.add_argument('-d', '--delay', metavar='s', type=float, choices=["mqtt", "ws", "http"],
default=os.environ.get('IOT_DELAY', 10), default=os.environ.get("IOT_TL", "http"),
help='Delay between requests') help="IoT transport layer",
)
parser.add_argument(
"-s",
"--serial",
default=os.environ.get("IOT_SERIAL"),
help="IoT device serial number",
)
parser.add_argument(
"-d",
"--delay",
metavar="s",
type=float,
default=os.environ.get("IOT_DELAY", 10),
help="Delay between requests",
)
args = parser.parse_args() args = parser.parse_args()
subscribe = '/api/device/subscribe/' subscribe = "/api/device/subscribe/"
telemetry = '/telemetry/' telemetry = "/telemetry/"
if args.serial is None: if args.serial is None:
args.serial = ''.join( args.serial = "".join(
random.choices(string.ascii_lowercase + string.digits, k=8)) random.choices(string.ascii_lowercase + string.digits, k=8)
)
data = {'serial': args.serial} data = {"serial": args.serial}
post_json(args.endpoint, subscribe, data) post_json(args.endpoint, subscribe, data)
while True: while True:
data = { data = {
'device': args.serial, "device": args.serial,
'clock': int(datetime.datetime.now().timestamp()), "clock": int(datetime.datetime.now().timestamp()),
} }
payload = { payload = {
'id': 'device_simulator', "id": "device_simulator",
'light': random.randint(300, 500), "light": random.randint(300, 500),
'temperature': { "temperature": {"celsius": round(random.uniform(20, 28), 1)},
'celsius': round(random.uniform(20, 28), 1)}
} }
if args.transport == 'http': if args.transport == "http":
post_json(args.endpoint, telemetry, {**data, 'payload': payload}) post_json(args.endpoint, telemetry, {**data, "payload": payload})
elif args.transport in ('mqtt', 'ws'): elif args.transport in ("mqtt", "ws"):
publish_json( publish_json(
args.transport, args.mqtt, {**data, 'payload': payload}) args.transport, args.mqtt, {**data, "payload": payload}
)
else: else:
raise NotImplementedError raise NotImplementedError
sleep(args.delay) sleep(args.delay)