|
- import os
- import time
- import paho.mqtt.client as mqtt
-
- class MqttSubscriber:
- def __init__(self, host: str, port: int, topic: str, protocol: str = "mqttv311",
- client_id: str = "ble-ai-localizer", keepalive: int = 60,
- username: str = "", password: str = "", qos: int = 0):
- self.host = host
- self.port = port
- self.topic = topic
- self.keepalive = keepalive
- self.qos = qos
-
- # protocol mapping
- proto_map = {
- "mqttv311": mqtt.MQTTv311,
- "mqttv31": mqtt.MQTTv31,
- "mqttv5": mqtt.MQTTv5,
- }
- proto = proto_map.get((protocol or "mqttv311").lower(), mqtt.MQTTv311)
-
- self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id, protocol=proto)
-
- if username:
- self.client.username_pw_set(username=username, password=password if password else None)
-
- self.client.on_connect = self._on_connect
- self.client.on_disconnect = self._on_disconnect
-
- # callback utente (topic, payload_bytes)
- self.on_message_user = None
- self.client.on_message = self._on_message
-
- def _on_connect(self, client, userdata, flags, reason_code, properties=None):
- # subscribe
- client.subscribe(self.topic, qos=self.qos)
- print(f"MQTT connected rc={reason_code}, subscribed to {self.topic}", flush=True)
-
- def _on_disconnect(self, client, userdata, reason_code, properties=None):
- print(f"MQTT disconnected rc={reason_code}", flush=True)
-
- def _on_message(self, client, userdata, msg):
- if self.on_message_user:
- self.on_message_user(msg.topic, msg.payload)
-
- def start_forever(self):
- # reconnect loop gestito da loop_forever
- self.client.connect(self.host, self.port, self.keepalive)
- self.client.loop_forever(retry_first_connection=True)
|