Komunikacja w systemach rozproszonych – MQTT
Zadanie (0-10 punktów)
Cel
Utwórz sieć symulowanych sensorów oraz jednostkę monitorującą, która otrzymuje dane z sensorów i wyświetla je wraz z informacją o nazwie sensora (przykładowa struktura sieci została przedstawiona na poniższym diagramie). Węzły powinny komunikować się ze sobą za pomocą MQTT.
Uwagi:
-
Powinny być dwa typy sensorów w sieci: sensory temperatury oraz sensory wilgotności. Dodaj co najmniej dwa sensory każdego typu.
-
Każdy typ sensorów powinien publikować zmierzoną wartość oraz swoją nazwę (na przykład T1, H1) na odpwowiednim topicu.
-
Czujnik temperatury powinien zwracać zrandomizowaną wartość z zakresu od 0 do 30.
-
Czujnik wilgotności powinien zwracać zrandomizowaną wartość z zakresu od 20% do 100%.
-
Otrzymane dane powinny być walidowane z użyciem pydantic. Oznacza to, że zmierzona wartość oraz nazwa czujnika powinna zostać umieszczona w odpowiedniej strukturze danych przed wysłaniem.
Przykładowa struktura danych
from pydantic import BaseModel class TemperatureData(BaseModel): sensor_name: str temperature: float
Instalacja pydantic
sudo apt install python3-pydantic
lub
pip install pydantic
-
Przykład konwersji obiektów pochodnych BaseModel do plików json oraz ich odtwarzania z plików json dla starszych wersji Pydantic (np. v1.10).
td = TemperatureData(sensor_name="t1", temperature=10)
# Convert to json (serialize to string)
json_data=td.json()
# Create object from json (deserialize from string)
td_restored = TemperatureData.parse_raw(json_data)
Instalacja MQTT na Ubuntu (MQTT powinno być zainstalowane na komputerach w laboratorium)
Broker Mosquitto
sudo apt install mosquitto
Klient Python (Paho)
sudo apt install python3-paho-mqtt
Alternatywnie może być zainstalowany z pip
pip install paho-mqtt
Przykłady subscribera i publishera
paho-mqtt version >= 2.0
# MQTT subscriber example
import paho.mqtt.client as mqtt
def on_message(client, userdata, msg):
"""Callback called when new message is received."""
print(f'Received: {msg.topic} {msg.qos} {msg.payload}')
def on_connect(client, userdata, flags, rc, properties):
print(f"Connected with result code {rc}")
if __name__ == '__main__':
# Create an MQTT client instance
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
# Set callbacks
client.on_connect = on_connect
client.on_message = on_message
# Connect to broker
client.connect("localhost", 1883, 60)
# Subscribe to temp topic
client.subscribe("temp")
try:
while client.loop() == 0:
pass
except KeyboardInterrupt:
print("Subscriber stopped.")
client.disconnect()
# MQTT publisher example
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc, properties):
if rc == 0:
print("Connected successfully to the broker.")
else:
print(f"Connection failed with code {rc}")
if __name__ == '__main__':
# Create an MQTT client instance
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
# Set callback for connection
client.on_connect = on_connect
# Connect to the broker
client.connect("localhost", 1883, 60)
# Start the loop
client.loop_start()
temperature = 10 # Initial temperature value
try:
while True:
# Publish the temperature to the "temp" topic
client.publish("temp", temperature)
print(f"Published temperature: {temperature}")
# Update temperature
temperature += 1
# Wait 2 seconds before sending the next message
time.sleep(2)
except KeyboardInterrupt:
print("Publisher stopped.")
# Stop the loop and disconnect
client.loop_stop()
client.disconnect()
paho-mqtt <2.0
# Subscriber example
import paho.mqtt.client as paho
def on_message(mosq, obj, msg):
"""Callback called when new message is received."""
print(f'Received: {msg.topic} {msg.qos} {msg.payload}')
if __name__ == '__main__':
client = paho.Client()
client.on_message = on_message
# Connect to broker
client.connect("localhost", 1883, 60)
# Subscribe to temp topic
client.subscribe("temp")
# Waiting while client is running
while client.loop() == 0:
pass
# MQTT publisher example
import paho.mqtt.publish as publish
import time
temperature = 10
while True:
# Set temperature value
temperature += 1
# Publish the temperature to the "temp" topic
publish.single(topic="temp", payload=temperature, hostname="localhost")
print(f"Published temperature: {temperature}")
# Sleep for a while before publishing the next message
time.sleep(1)
Uruchamianie
Przed startem publishera/subscribera, upewnij się, że masz uruchomiony broker MQTT, na przykład mosquitto.
Aby uruchomić broker mosquitto można użyć poniższej komendy (-v is for verbose mode):
mosquitto -v