Communication in distributed systems – MQTT
Task (0-10 points)
Goal
Create a network of simulated sensors and monitoring unit that receives data from sensors and print it with the name of specific sensor (the example network structure has been shown in below diagram). All nodes in the network should communicate using MQTT.
Remarks:
- There should be two types of sensors in the network: temperature sensors and humidity sensors. Add at least two sensors of each kind.
- Each type of sensor should publish measured value and sensor name (for example T1, H1) to specific topic.
- Temperature sensor should return randomized value in range from 0 to 30.
- Humidity sensor should return randomized value in range from 20% to 100%.
-
Received data should be validated using pydantic. It means you should put sensor name and measured value in proper data structure and then send it from sensor nodes.
Example data structure
from pydantic import BaseModel class TemperatureData(BaseModel): sensor_name: str temperature: float
Pydantic installation
sudo apt install python3-pydantic
or
pip install pydantic
Example of the objects (derived from BaseModel) conversions to json and restoring them from json files for older versions of Pydantic (for example v1.10).
td = TemperatureData(sensor_name="t1", temperature=10) # Convert to json (serialize to string) json_data=td.json() # Create object from json (deserialize from string) td_restored = TemperatureData.parse_raw(json_data)
Installation of MQTT in Ubuntu (should be installed on laboratory PCs)
Mosquitto broker
sudo apt install mosquitto
Python client (Paho)
sudo apt install python3-paho-mqtt
Alternatively it can be installed with pip
pip install paho-mqtt
Examples of subscriber and publisher
Example for paho-mqtt version >= 2.0
# MQTT subscriber example
import paho.mqtt.client as mqtt
def on_message(client, userdata, msg):
"""Callback called when new message is received."""
print(f'Received: {msg.topic} {msg.qos} {msg.payload}')
def on_connect(client, userdata, flags, rc, properties):
print(f"Connected with result code {rc}")
if __name__ == '__main__':
# Create an MQTT client instance
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
# Set callbacks
client.on_connect = on_connect
client.on_message = on_message
# Connect to broker
client.connect("localhost", 1883, 60)
# Subscribe to temp topic
client.subscribe("temp")
try:
while client.loop() == 0:
pass
except KeyboardInterrupt:
print("Subscriber stopped.")
client.disconnect()
# MQTT publisher example
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc, properties):
if rc == 0:
print("Connected successfully to the broker.")
else:
print(f"Connection failed with code {rc}")
if __name__ == '__main__':
# Create an MQTT client instance
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
# Set callback for connection
client.on_connect = on_connect
# Connect to the broker
client.connect("localhost", 1883, 60)
# Start the loop
client.loop_start()
temperature = 10 # Initial temperature value
try:
while True:
# Publish the temperature to the "temp" topic
client.publish("temp", temperature)
print(f"Published temperature: {temperature}")
# Update temperature
temperature += 1
# Wait 2 seconds before sending the next message
time.sleep(2)
except KeyboardInterrupt:
print("Publisher stopped.")
# Stop the loop and disconnect
client.loop_stop()
client.disconnect()
Older versions of paho-mqtt (<2.0)
# Subscriber example
import paho.mqtt.client as paho
def on_message(mosq, obj, msg):
"""Callback called when new message is received."""
print(f'Received: {msg.topic} {msg.qos} {msg.payload}')
if __name__ == '__main__':
client = paho.Client()
client.on_message = on_message
# Connect to broker
client.connect("localhost", 1883, 60)
# Subscribe to temp topic
client.subscribe("temp")
# Waiting while client is running
while client.loop() == 0:
pass
# MQTT publisher example
import paho.mqtt.publish as publish
import time
temperature = 10
while True:
# Set temperature value
temperature += 1
# Publish the temperature to the "temp" topic
publish.single(topic="temp", payload=temperature, hostname="localhost")
print(f"Published temperature: {temperature}")
# Sleep for a while before publishing the next message
time.sleep(1)
Running
Before you start publishers/subscribers make sure that you have already started the MQTT broker, for example mosquitto.
To run mosquitto broker you can use below command (-v is for verbose mode):
mosquitto -v