Blog Post

Internet of Things Blog
9 MIN READ

Bridge Building – Mosquitto MQTT to Azure IOT Using Paho MQTT and the Azure IoT Device SDK’s

shanebaldacchino's avatar
shanebaldacchino
Former Employee
Dec 08, 2021

Paho MQTT with the Azure IoT Device SDK = Awesome!

I have been on a journey to explore Azure IoT and push the thousands of events that flow through my local MQTT broker (Mosquitto) in to Azure IoT Hub. From direct connection to Azure IoT Hub (via MQTT and SAS tokens) through to Azure IoT Edge running locally with MQTT. I have been able to achieve my goals with varying levels of success, but have a few concerns on the approaches I have tried thus far.

  • Direct-Connection to Azure IoT Hub introduces latency to the cloud.
  • Authentication, from SAS tokens to X509 certificates, its not anonymous and some of my tiny devices (Tasmota) dont bode well.
  • Topic structure, it is defined (devices/{DeviceID}/messsages/events/) and not free form. It means reconfiguration, which isn’t hard, but a lot of friction.

Channeling my inner Steve Ballmer (developer, developer, developer), it’s time to build. My goals for building a solution

  1. No reconfiguration of any of my MQTT devices (Home Assistant, PLC, Arduino Mega 2560, ~75 Tasmota devices).
  2. Bridge my existing MQTT broker (Mosquitto) in to Azure IoT Hub.
  3. Must run on aarch64 architecture, a Raspberry Pi.

Pretty lofty goals, you may even say I am being lazy, but the reality is I want a low friction away to derive operational intelligence from the many thousands of events each day (read below, its over 10K per day!)And for that we need to get our hands dirty, write some code and use SDK’s.

 

What we are going to build

 

Using Paho + Azure IOT SDK we will listen to Mosquitto

To overcome, the limitations described above we are going to build an application in Python using the Python MQTT library Paho and the Azure Python IoT SDK. Lets quickly talk about both of these.

 

Paho MQTT

 

Paho is a Python client class which enable applications to connect to an MQTT broker to publish messages, to subscribe to topics and receive published messages. It also provides some helper functions to make publishing one off messages to an MQTT server very straightforward. This is what we will be using to listen to messages on our Mosquitto broker. For examples and more you can more information about the Paho MQTT module on the pypi.org website.To install Paho, you can use PIP.

pip install paho-mqtt

and can leverage Paho with

import paho.mqtt.client as mqtt

 

Azure IoT SDK for Python

 

Once the messages have been read by Paho from Mosquitto we need to get these in to Azure IoT. The Azure IoT SDKs for Python enables us to do away with MQTT and speak directly to the service in Python. The SDK takes care of ‘Authentication’, ‘Send device-to-cloud message’, ‘Receive cloud-to-device messages’, ‘Device twins’, ‘Direct methods’, ‘Connection status and error reporting’, ‘Retry policies’ and ‘Upload file to blob’.A lot of the heavy lifting I need is being performed by this SDK for us. To install Azure IoT SDK for Python you can use PIP. For code examples and more you can find more information about this device module on the pypi.org website.

pip install azure-iot-device

and can leverage Azure IoT SDK for Python

from azure.iot.device import IoTHubDeviceClien

Lets write some code.

 

Code Summary

 

See the steps below as I tease out this solution or my GitHub repo for the full Python script. To give you a better understanding on how this works I will break it down in to the logical steps below required to receive messages from Mosquitto over MQTT using Paho and to then re-publish them in to Azure IoT Hub using the Azure IoT SDK for Python.

 

Step 1 – Import Modules

We need to use modules, mainly Paho and Azure IoT to provide additional functionality

import paho.mqtt.client as mqtt
import os
import asyncio
import uuid
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
from datetime import datetime

 

Step 2 – Connect To Mosquitto & Subscribe To MQTT Topics

After declaring our modules we need to connect to our MQTT broker, we will do this with a function (on_connect)

client = mqtt.Client(MQTTClientName)  
client.on_connect = on_connect 

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print(str(datetime.now()) + " | Connecting to MQTT Broker : " + MQTTBrokerIP)
    print(str(datetime.now()) + " | Connected with result code {0}".format(str(rc))) 
    print(str(datetime.now()) + " | We are connected!")
    print()
    print(str(datetime.now()) + " | Subscribing to MQTT Topics")
    print(str(datetime.now()) + " | Subscribing to " + MQTTTopicSubscribe)
    client.subscribe(MQTTTopicSubscribe)
    print()

 

After connecting, we need to tell our MQTT broker what topics we want to subscribe to. This way we can be more precise on what we want to replicate in to Azure. We can use MQTT topic filters to do this. Initially I started with a # but decided to use a single level wildcard +.

MQTTTopicSubscribe = "stat/+/POWER" #MQTT Topic Filter

Plus sign (+): It is a single level wildcard that matches any name for a specific topic level. We can use this wildcard instead of specifying a name for any topic level in the topic filter.Hash (#): It is a multi level wildcard that we can use only at the end of the topic filter, as the last level and matches any topic whose first levels are the same as the topic levels specified at the left-hand side of the # symbol.

Understanding wildcards | MQTT Essentials – A Lightweight IoT Protocol (packtpub.com)

 

Step 3 – Listen For Messages

We have now subscribed to MQTT topics and we need to listen and act on these incoming messages. I am taking the MQTT Topic and MQTT Payload, and passing these in to the my python Azure function (not an Azure Function ;)) which will push the payload in to Azure.

client.on_message = on_message  

def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    global mqtt_topic
    mqtt_topic = msg.topic
    global mqtt_payload
    mqtt_payload = str(msg.payload)
    print(str(datetime.now()) + " | Message received")
    print(str(datetime.now()) + " | MQTT Topic and Payload: " + msg.topic + " " + str(msg.payload)[2:][:-1])  # Print a received msg
    asyncio.run(azure())

 

Step 4 – Send Messages To Azure

With the MQTT Topic and Payload we can now push these messages in to Azure. I am sending these as a JSON object. I have had to massage the MQTT payload as my PLC is adding a few extra values I dont need.

async def azure():
    # Create instance of the device client using the connection string
    device_client = IoTHubDeviceClient.create_from_connection_string(AzureIOTHub_conn_str)
     
    # Connect the device client.
    await device_client.connect()
    print(str(datetime.now()) + " | Async connection established to Azure IOT")

    # Send a single message
    print(str(datetime.now()) + " | Sending message to Azure IOT Hub")
    msg = Message("{ \"DateTime\": \"" + str(datetime.now()) + "\", \"MQTT Topic\": \"" + mqtt_topic + "\", \"Payload\": \"" + mqtt_payload[2:][:-1] + "\" }")
    msg.message_id = uuid.uuid4()
    msg.content_encoding = "utf-8"
    msg.content_type = "application/json"

    
    await device_client.send_message(msg)
    print(str(datetime.now()) + " | Message sent, tearing down Azure IOT Hub connection")
    print()

    # Finally, shut down the client
    await device_client.shutdown()

 

Pulling It All Together

 

Here is a complete copy of the above, plus a bit more. Assuming you have installed Paho and Azure IoT installed via PIP. You could cut and paste the below or clone my GitHub repository.

import paho.mqtt.client as mqtt
import os
import asyncio
import uuid
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
from datetime import datetime

# -----------------------------------------------------------------------
# EDIT BELOW THIS LINE

ScriptVersion = "1.0"
ModifiedDate = "Monday 15, November 2021"
MQTTBrokerIP = "10.0.0.200" #IP Address of your MQTT Broker
MQTTTopicSubscribe = "stat/+/POWER" #MQTT Topic Filter
MQTTClientName = "RaspiPI4" #Used to identify the device to your MQTT Broker
AzureIOTHub_conn_str = "********************************************************" #Azure IOT Hub Connection String

# EDIT ABOVE THIS LINE
# -----------------------------------------------------------------------

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print(str(datetime.now()) + " | Connecting to MQTT Broker : " + MQTTBrokerIP)
    print(str(datetime.now()) + " | Connected with result code {0}".format(str(rc))) 
    print(str(datetime.now()) + " | We are connected!")
    print()
    print(str(datetime.now()) + " | Subscribing to MQTT Topics")
    print(str(datetime.now()) + " | Subscribing to " + MQTTTopicSubscribe)
    client.subscribe(MQTTTopicSubscribe)
    print()

def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    global mqtt_topic
    mqtt_topic = msg.topic
    global mqtt_payload
    mqtt_payload = str(msg.payload)
    print(str(datetime.now()) + " | Message received")
    print(str(datetime.now()) + " | MQTT Topic and Payload: " + msg.topic + " " + str(msg.payload)[2:][:-1])  # Print a received msg
    asyncio.run(azure())
    

async def azure():
    # Create instance of the device client using the connection string
    device_client = IoTHubDeviceClient.create_from_connection_string(AzureIOTHub_conn_str)
     
    # Connect the device client.
    await device_client.connect()
    print(str(datetime.now()) + " | Async connection established to Azure IOT")

    # Send a single message
    print(str(datetime.now()) + " | Sending message to Azure IOT Hub")
    msg = Message("{ \"DateTime\": \"" + str(datetime.now()) + "\", \"MQTT Topic\": \"" + mqtt_topic + "\", \"Payload\": \"" + mqtt_payload[2:][:-1] + "\" }")
    msg.message_id = uuid.uuid4()
    msg.content_encoding = "utf-8"
    msg.content_type = "application/json"

    
    await device_client.send_message(msg)
    print(str(datetime.now()) + " | Message sent, tearing down Azure IOT Hub connection")
    print()

    # Finally, shut down the client
    await device_client.shutdown()

print("*********************************************************************")
print("*                                                                   *")
print("*                                                                   *")
print("*               MQTT --> Azure IOT Bridge                           *")
print("*                                                                   *")
print("*                                                                   *")
print("*                                                                   *")
print("* shane@baldacchino.net                                             *")
print(f"* Version : {ScriptVersion}                                                     *")
print(f"* Modified Date : {ModifiedDate}                          *")
print("*                                                                   *")
print("*********************************************************************")


client = mqtt.Client(MQTTClientName)  
client.on_connect = on_connect 
print(str(datetime.now()) + " | Listening for messages")
print()
client.on_message = on_message  
client.connect(MQTTBrokerIP, 1883, 60)  # Connect to (broker, port, keepalive-time)
client.loop_forever()  # Start networking daemon
try:
    asyncio.run(azure())
except:
    pass #continue on errors - used to solve internet connectivity issues.

 

Seeing This In Action

 

Lets drop to a video to see this in working end-to-end, to validate messages are flowing in to Azure IoT Hub I can use the Azure CLI (AZ-CLI) to monitor the output.

az iot hub monitor-events --output table --device-id devicename --hub-name hubname --output json

 

For the purpose of this demo, I have left a handful of messages at QoS level 2 and set LWT (Last Will and Testament) to true.

 

 

After 24 hours of running, we can see I have published 10.52K of messages in to Azure IoT Hub and there are certain ebbs and flows that occur in my house. You can even tell my kids have screen time late in the afternoon as the number of messages drop considerably. There is temperature, humidity and other signals but movement (captured by passive infrared sensors) almost vanishes from the event stream.

 

Can you see where my children have screen time?

 

Conclusion

 

There are many ways to skin this code cat. My requirements was to publish messages in to Azure and we have been able to achieve this via different ways (I am sure there is more). Automation is a journey, which path will you take?

We illustrated a transparent side-car approach that will listen to an existing broker, on topics you desire and push these in to Azure IoT, all without making any configuration changes (the most important thing for my implementation).

Are there any draw backs? Sure there are. Right now this is one way in direction (simplex) and allows me to push messages in to Azure IoT Hub but not receive messages back. All of my devices locally are represented as a single Azure IoT device which is less than ideal.

Azure IoT Edge or direct MQTT publishing to Azure IoT Hub would be duplex communication resulting in each local device being represented as a unique device in Azure IoT Hub.

The Azure IoT SDK for Python is capable of duplex communication (Receive cloud-to-device message) but I have yet to implement it. Will I? I am unsure but its nice to know I can. Today I am using this approach to pass in telemetry and am able to distinguish my devices based on the MQTT topic structure. My data will be in a future post massaged within Azure removing the constraints of MQTT. There is many ways to do this, from a pure code approach (I mentioned above the Azure IoT SDK for Python has a lot more smarts) though to Device Twins and Module Twins. Personally, I like the SDK approach, it's my code, my choices on what I do, but I do understand this is not for everyone. We now have my messages, my events, in Azure and now its time to make some friends and learn how to derive operational intelligence from visualisations through to machine learning and beyond.

Think big and happy building!

Shane

Updated Dec 08, 2021
Version 3.0

7 Comments

  • Thanks for the replies Christian Beier 

    Alessanddro 
    I will just add a bit more context to using standard Paho libraries. Azure IoT Hub is not a MQTT broker. Whilst it talks MQTT, it has a few nuances as Christian has mentioned, such as a strict message format, TLS and more. I authored this post as I ran in to so many challenges trying to communicate with standard MQTT (Paho). In order to meet my needs I did need to use the SDK.

    If you have devices transmitting telemetry, my recommendation would be to see if you can pivot to transmitting with a SDK versus standard MQTT. If your sensors are connected to a MCU, Azure IoT Hub provides support for ESP, STM and more devices. 

    If you can tell me more about the devices that are sending messages, I would be happy to try and help you find a solution.

    Shane

  • Do you mind to post your challenges in the Q&A and tag it with IoT Hub?

    https://docs.microsoft.com/en-us/answers/index.html

     

    Please also share more details on your data volume, frequency, number of devices as well as the planned data processing for your use case.

    There are several design considerations to have optimized throughput. Please let's discuss in the Q&A so it's easier to find for others having the same challenges.

    BR

    Christian 

  • Alessanddro's avatar
    Alessanddro
    Copper Contributor

    unfortunately my initial enthusiasm for the fact that I was able to connect send and receive to Azure IOT was not justified. In reality we were able to connect only using the Azure Event Handler. However with this method,  if the amount of data transmitted is quite large as in our case, is unable to follow and therefore the latency in the data consumption increased and increased. Therefore we stopped using that method.

     

    About the example you mentioned, with the use of paho base library,  we saw it already but there was no way to make it working or to find the right connection strings to make it working.  

  • Christian_Beier's avatar
    Christian_Beier
    Copper Contributor

    Hi Alessanddro,

     

    Great that you managed to have your device sending messages to IoT Hub and as I understood, you also created a service what can receive and process these messages from IoT Hub. This is also the explanation why at least two connection strings come into the game:

    • one connection string for each IoT device out in the wild and potentially exposed to hackers.
    • one or more connection strings for your (trustworthy?) consuming backend service(s)

    To allow separation of the different areas and provide access control (a device usually should only send and receive data but not alter the device registry or read other messages), you should not use one connection string to connect a device and use the same for your backend service to consume the messages. To learn more about the other access control options to IoT Hub you can look athttps://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security and for security by design https://docs.microsoft.com/en-us/azure/iot-fundamentals/security-recommendations

     

    To your second question on how to connect your device with Paho or any other MQTT client to IoT Hub, I recommend reading https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-mqtt-support This doc article contains even a python snippet showing how to connect a device. You just need to add the code to subscribe for receiving Cloud2Device Messages from IoT Hub as described further down in the doc. I still would highly recommend using the Azure IoT Device SDK as it already solves typical base functionality and is regularly updated for new features, bugs, and security-related fixes.

     

    Your use case sounds interesting, and I do not see why this should not be possible with IoT Hub. You are right, IoT Hub is no full MQTT broker and does not provide subscriptions to read only a particular device, although if required, you can filter in IoT Hub on a specific device id or other message properties and even on the payload to route traffic into a specific processing pipeline. https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-d2c

     

    Best regards

    Christian

  • Alessanddro's avatar
    Alessanddro
    Copper Contributor

    dear Shane

     

    thanks for your explanation.  With the help of a colleague,  we understood that we had to read from a standard MQTT channel.  Through that we succeded to send messages to Iot Hub and,  googling,  we foubd how to receive the messages.  What is totally unclear in all the examples online is that the sending connection string is different from the receivg connection string in IoT.

     

    Also I have not found any example of using the standard paho libraries instead of the SDK to write and read to/from Azure IoT Hub.  I think  that Microsoft should make an effort if wants that his messaging system is widely used.

     

    Last but not least, the concet of 'device' is limiting in my opinion because it would be better to have a 'channel' and then several topics.  In this way it is possible to establish a channel and then send all the messages of a group of devices inside the channel.  To receive one specific device one subscribes to a channel.  As is the standard MQTT concept/  With the IOT it is necessary to define one device for each physical device, otherwise,  when listening,  you have to isten all the essages and then filter what you are interested in. 

     

    Maybe I a am new in this MQTT business and what I wrote is not true.  However our scenario is that we have about 100  devices measuring the sea level and transmitting 7 points per seconds that I would like to store and analyse in a remote server. 

     

    Thanks again for your time

  • Hello Allesanddro,

    Thanks for taking the time to not only comment but to try this code snippet. 

    Looking through the error you are getting, the issue is you are trying to connect to Azure IOT Hub, which is not fully MQTT compliant today. My code is connecting to my local MQTT broker (Eclipses Mosquito) which is then via the Azure IOT Python SDK making a call back to your Azure IOT Hub. 

    Mosquitto does not require (by default) TLS, Authentication or a specific topic structure. 

    Can you explain what you are trying to achieve and I will be more than happy to see if I can help you find a solution. 

    My code's flow looks something like
    - Never ending Python loop

    - Reading from Local MQTT broker (you cannot use Azure as yet)

    - For each message, convert MQTT message into Azure IOT SDK call and publish back to Azure IOT-Hub.

     

    Happy to help in any way I can. 

  • Alessanddro's avatar
    Alessanddro
    Copper Contributor

    dear Shane, 

     

    thanks for the very comprehensive piece of code,  which is very clear.  However if I try  I always get 

     

    2022-06-15 06:44:19.618960 | Listening for messages

    Traceback (most recent call last):
    File "E:\RPI\software\prog\MQTT\new test\AZURESDK_send.py", line 84, in <module>
    client.connect(MQTTBrokerIP, 1883, 60) # Connect to (broker, port, keepalive-time)
    File "C:\Users\annunal\.conda\envs\pyTAD\lib\site-packages\paho\mqtt\client.py", line 914, in connect
    return self.reconnect()
    File "C:\Users\annunal\.conda\envs\pyTAD\lib\site-packages\paho\mqtt\client.py", line 1044, in reconnect
    sock = self._create_socket_connection()
    File "C:\Users\annunal\.conda\envs\pyTAD\lib\site-packages\paho\mqtt\client.py", line 3685, in _create_socket_connection
    return socket.create_connection(addr, timeout=self._connect_timeout, source_address=source)
    File "C:\Users\annunal\.conda\envs\pyTAD\lib\socket.py", line 844, in create_connection
    raise err
    File "C:\Users\annunal\.conda\envs\pyTAD\lib\socket.py", line 832, in create_connection
    sock.connect(sa)
    socket.timeout: timed out

     

    About the MQTTBrokerIP I have included "JRC-Telemetry.azure-devices.net"  that is our IOT Hub identification URL.  I also have tried with the IP value  (in your article it is not clear if you have to use exacly the IP you indicated 10.0.0.200  or the IP that one obtain by nslookup of the URL.  However the example should wor also with the URL.

     

    About the connection string I copied it from the IoTHub.  Nevertheless it does not connect.

     

    Any idea ?

     

    Thanks