Przejdź do treści

Robotic Programming Environments: L3

Communication in distributed systems – MQTT

Task (0-10 points)

Goal

Create a network of simulated sensors and monitoring unit that receives data from sensors and print it with the name of specific sensor (the example network structure has been shown in below diagram). All nodes in the network should communicate using MQTT.

Remarks:

  • There should be two types of sensors in the network: temperature sensors and humidity sensors. Add at least two sensors of each kind.
  • Each type of sensor should publish measured value and sensor name (for example T1, H1) to specific topic.
  • Temperature sensor should return randomized value in range from 0 to 30.
  • Humidity sensor should return randomized value in range from 20% to 100%.
  • Received data should be validated using pydantic. It means you should put sensor name and measured value in proper data structure and then send it from sensor nodes.

    Example data structure

    from pydantic import BaseModel
    
    class TemperatureData(BaseModel):
        sensor_name: str
        temperature: float

    Pydantic installation

    sudo apt install python3-pydantic

    or

    pip install pydantic

    Example of the objects (derived from BaseModel) conversions to json and restoring them from json files for older versions of Pydantic (for example v1.10).

    td = TemperatureData(sensor_name="t1", temperature=10)
    
    # Convert to json (serialize to string)
    json_data=td.json()
    
    # Create object from json (deserialize from string)
    td_restored = TemperatureData.parse_raw(json_data)

Installation of MQTT in Ubuntu (should be installed on laboratory PCs)

Mosquitto broker
sudo apt install mosquitto
Python client (Paho)
sudo apt install python3-paho-mqtt

Alternatively it can be installed with pip

pip install paho-mqtt

Examples of subscriber and publisher

Example for paho-mqtt version >= 2.0

# MQTT subscriber example
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    """Callback called when new message is received."""
    print(f'Received: {msg.topic} {msg.qos} {msg.payload}')

def on_connect(client, userdata, flags, rc, properties):
    print(f"Connected with result code {rc}")

if __name__ == '__main__':
    # Create an MQTT client instance
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)

    # Set callbacks
    client.on_connect = on_connect
    client.on_message = on_message

    # Connect to broker
    client.connect("localhost", 1883, 60)

    # Subscribe to temp topic
    client.subscribe("temp")

    try:
        while client.loop() == 0:
            pass
    except KeyboardInterrupt:
        print("Subscriber stopped.")

    client.disconnect()
# MQTT publisher example

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc, properties):
    if rc == 0:
        print("Connected successfully to the broker.")
    else:
        print(f"Connection failed with code {rc}")

if __name__ == '__main__':
    # Create an MQTT client instance
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)

    # Set callback for connection
    client.on_connect = on_connect

    # Connect to the broker
    client.connect("localhost", 1883, 60)

    # Start the loop
    client.loop_start()

    temperature = 10  # Initial temperature value

    try:
        while True:
            # Publish the temperature to the "temp" topic
            client.publish("temp", temperature)
            print(f"Published temperature: {temperature}")

            # Update temperature
            temperature += 1

            # Wait 2 seconds before sending the next message
            time.sleep(2)

    except KeyboardInterrupt:
        print("Publisher stopped.")

    # Stop the loop and disconnect
    client.loop_stop()
    client.disconnect()

Older versions of paho-mqtt (<2.0)

# Subscriber example
import paho.mqtt.client as paho

def on_message(mosq, obj, msg):
    """Callback called when new message is received."""
    print(f'Received: {msg.topic} {msg.qos} {msg.payload}')

if __name__ == '__main__':
    client = paho.Client()
    client.on_message = on_message

    # Connect to broker
    client.connect("localhost", 1883, 60)

    # Subscribe to temp topic
    client.subscribe("temp")

    # Waiting while client is running
    while client.loop() == 0:
        pass
# MQTT publisher example
import paho.mqtt.publish as publish
import time

temperature = 10

while True:
    # Set temperature value
    temperature += 1
    # Publish the temperature to the "temp" topic
    publish.single(topic="temp", payload=temperature, hostname="localhost")

    print(f"Published temperature: {temperature}")

    # Sleep for a while before publishing the next message
    time.sleep(1)

Running

Before you start publishers/subscribers make sure that you have already started the MQTT broker, for example mosquitto.

To run mosquitto broker you can use below command (-v is for verbose mode):

mosquitto -v

Resources