Przejdź do treści

Rozproszone Systemy Sterowania: L3

Komunikacja w systemach rozproszonych – MQTT

Zadanie (0-10 punktów)

Cel

Utwórz sieć symulowanych sensorów oraz jednostkę monitorującą, która otrzymuje dane z sensorów i wyświetla je wraz z informacją o nazwie sensora (przykładowa struktura sieci została przedstawiona na poniższym diagramie). Węzły powinny komunikować się ze sobą za pomocą MQTT.

Uwagi:

  • Powinny być dwa typy sensorów w sieci: sensory temperatury oraz sensory wilgotności. Dodaj co najmniej dwa sensory każdego typu.

  • Każdy typ sensorów powinien publikować zmierzoną wartość oraz swoją nazwę (na przykład T1, H1) na odpwowiednim topicu.

  • Czujnik temperatury powinien zwracać zrandomizowaną wartość z zakresu od 0 do 30.

  • Czujnik wilgotności powinien zwracać zrandomizowaną wartość z zakresu od 20% do 100%.

  • Otrzymane dane powinny być walidowane z użyciem pydantic. Oznacza to, że zmierzona wartość oraz nazwa czujnika powinna zostać umieszczona w odpowiedniej strukturze danych przed wysłaniem.

    Przykładowa struktura danych

    from pydantic import BaseModel
    
    class TemperatureData(BaseModel):
        sensor_name: str
        temperature: float

    Instalacja pydantic

    sudo apt install python3-pydantic

    lub

    pip install pydantic
  • Przykład konwersji obiektów pochodnych BaseModel do plików json oraz ich odtwarzania z plików json dla starszych wersji Pydantic (np. v1.10).

td = TemperatureData(sensor_name="t1", temperature=10)

# Convert to json (serialize to string)
json_data=td.json()

# Create object from json (deserialize from string)
td_restored = TemperatureData.parse_raw(json_data)

Instalacja MQTT na Ubuntu (MQTT powinno być zainstalowane na komputerach w laboratorium)

Broker Mosquitto
sudo apt install mosquitto
Klient Python (Paho)
sudo apt install python3-paho-mqtt

Alternatywnie może być zainstalowany z pip

pip install paho-mqtt

Przykłady subscribera i publishera

paho-mqtt version >= 2.0

# MQTT subscriber example
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    """Callback called when new message is received."""
    print(f'Received: {msg.topic} {msg.qos} {msg.payload}')

def on_connect(client, userdata, flags, rc, properties):
    print(f"Connected with result code {rc}")

if __name__ == '__main__':
    # Create an MQTT client instance
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)

    # Set callbacks
    client.on_connect = on_connect
    client.on_message = on_message

    # Connect to broker
    client.connect("localhost", 1883, 60)

    # Subscribe to temp topic
    client.subscribe("temp")

    try:
        while client.loop() == 0:
            pass
    except KeyboardInterrupt:
        print("Subscriber stopped.")

    client.disconnect()
# MQTT publisher example

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc, properties):
    if rc == 0:
        print("Connected successfully to the broker.")
    else:
        print(f"Connection failed with code {rc}")

if __name__ == '__main__':
    # Create an MQTT client instance
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)

    # Set callback for connection
    client.on_connect = on_connect

    # Connect to the broker
    client.connect("localhost", 1883, 60)

    # Start the loop
    client.loop_start()

    temperature = 10  # Initial temperature value

    try:
        while True:
            # Publish the temperature to the "temp" topic
            client.publish("temp", temperature)
            print(f"Published temperature: {temperature}")

            # Update temperature
            temperature += 1

            # Wait 2 seconds before sending the next message
            time.sleep(2)

    except KeyboardInterrupt:
        print("Publisher stopped.")

    # Stop the loop and disconnect
    client.loop_stop()
    client.disconnect()

paho-mqtt <2.0

# Subscriber example
import paho.mqtt.client as paho

def on_message(mosq, obj, msg):
    """Callback called when new message is received."""
    print(f'Received: {msg.topic} {msg.qos} {msg.payload}')

if __name__ == '__main__':
    client = paho.Client()
    client.on_message = on_message

    # Connect to broker
    client.connect("localhost", 1883, 60)

    # Subscribe to temp topic
    client.subscribe("temp")

    # Waiting while client is running
    while client.loop() == 0:
        pass
# MQTT publisher example
import paho.mqtt.publish as publish
import time

temperature = 10

while True:
    # Set temperature value
    temperature += 1
    # Publish the temperature to the "temp" topic
    publish.single(topic="temp", payload=temperature, hostname="localhost")

    print(f"Published temperature: {temperature}")

    # Sleep for a while before publishing the next message
    time.sleep(1)

Uruchamianie

Przed startem publishera/subscribera, upewnij się, że masz uruchomiony broker MQTT, na przykład mosquitto.

Aby uruchomić broker mosquitto można użyć poniższej komendy (-v is for verbose mode):

mosquitto -v

Materiały