Use Azure Data Explorer to Simplify the Industrial IoT Data Journey
Published Apr 26 2022 09:00 AM 5,730 Views

Simplifying the industrial IoT data pipeline is an important enabler for allowing businesses to benefit from the speedy and targeted analytics of such data.  We present an effective and easy to implement approach for pre-processing and storing Industrial Time Series IoT data in Azure and make it available for further historian-like analytics. The approach extends this tutorial, and involves only one Azure data service, namely Azure Data Explorer (ADX), where all the pre-processing/storage/further-processing take place.


Azure Industrial IoT (IIoT) provides the ability to integrate data from assets and sensors - including those systems that are already operating on the factory floor - into the Azure cloud. An important component of Azure IIoT is the OPC Publisher, which runs on a local gateway as an Azure IoT Edge module. OPC Publisher facilitates connecting to OPC UA server systems and publishing telemetry data to Azure IoT Hub (see Figure 1) as a JSON payload in the standardized OPC UA Pub/Sub format as specified in the OPC UA specification. While an OPC UA publisher knows a-priori the data it will receive from the OPC UA servers, the nature of the application protocol makes the actual payload unpredictable, i.e. not all the data in the schema will be available all the time.


OPC Publisher.png


 Figure 1 Industrial Data Pipeline with Azure


Once the OPC UA messages have been ingested by Azure IoT Hub, there are a variety of data services in Azure that can be used to pre-process and analyse the data in order to address a variety of industrial use cases such as predictive maintenance and anomaly detection. One such service is ADX which is considered as a natural destination for IoT data as it provides managed ingestion from IoT Hub and advanced analytics/ad-hoc queries on the ingested time series data. ADX has built-in advanced features such as Time Series Analysis and Anomaly Detection which are valuable for getting insights about the industrial assets.


In order to provide the data in a simple time series format, some pre-processing needs to be carried out to transform the data from the OPC UA Pub/Sub format emitted by the OPC publisher as shown in the example of Figure 2, to the Time Series format shown in Figure 3. As we can notice from Figure 2, the original JSON message includes an array and potentially an unknown number of Payload tags.


Screenshot 2022-04-22 110942.png


Figure 2 OPC UA Pub/Sub Message Example




Figure 3 Time Series Data Format


There are a number of options to do this pre-processing in the cloud, but ADX’s rich functionality allows us to do all the pre-processing on the fly as the data is ingested from the IoT Hub. This way we can consolidate all the data pipeline inside ADX without the need to invoke other services, thus simplifying the end-to-end solution management.


In scenarios like this where the message structure is a complex JSON, and probably variable (in terms of the tags contained within the Payload) over time, it is good practice to first ingest the data into a staging table with one column of the ADX Dynamic data type. The staging table can then be processed into other tables each of which with a specific schema to serve different analytics use-cases. This processing can be carried out as new data arrives in the staging table using ADX Update Policies.


The starting point for this implementation is to have an Azure IoT Edge device with the OPC Publisher module deployed and connected to OPC UA assets. Find details on how to provision an IoT Edge gateway here and for installing/configuring the OPC publisher module here.
If you don’t have access to an OPC UA server, you can easily deploy one as a container instance in Azure. This is described here.

Then we are ready to build the data pipeline:

1. Create an ADX cluster by following the steps here.


2. Create an ADX database by following the steps here.


For the remaining steps use the ADX web interface to run the necessary queries. Add the ADX cluster to the web interface as explained in the previous link.


3. Create an ADX table with one column of type Dynamic. This will be the staging table. We also need a JSON ingestion mapping to map the incoming data from the IoT Hub to the staging table


.create table ['opcua-raw'] (rawpayload: dynamic)
.create table ['opcua-raw'] ingestion json mapping 'opcua-raw-mapping' '[{ "column" : "rawpayload","path":"$","datatype":"dynamic"}]'


4. Next use the instructions here to connect the IoT Hub to the ADX cluster and start ingesting the data into the staging table. Specify the following parameters when creating the connection: table name “opcua-raw”, ingestion mapping “opcua-raw-mapping”, and data format as “Multiline Json”.


5. Once the connection is verified, data will start flowing to the table. We can use the following query in the ADX Web UI to examine a data sample of 10 rows. We can see now that each cell in the column “rawpayload” has the full complex message of the form shown in Figure 2.


['opcua-raw'] | take 10


6. Create a table that will store the Time Series data shown in Figure 3. We have 5 columns in the table


.create table Parsed_OPCUA (Tags: string, Value: double, SourceTimestamp: datetime, DataSetWriterId: string, MessageId: int)


7. Create an ADX stored function which encapsulates a query to transform the data from the format of Figure 2 to that of Figure 3. The query essentially contains 3 sections: The first section (lines 2-4 in the query snippet below) takes each complex message and expands the Messages array into its individual elements using the ADX mv-expand operator extracting the Payload, MessageId and DataSetWriterId columns. The second (line 5) section simply expands the internal Payload into the different telemetry components. Finally the third section (lines 6-11)extracts the different tags in the Payload and creates one row per tag.  


.create function ParseOPCUA {
| mv-expand rawpayload.Messages
| project data=rawpayload_Messages.Payload, MessageId = rawpayload.MessageId, DataSetWriterId = rawpayload_Messages.DataSetWriterId
| mv-expand data
| extend tags = tostring(bag_keys(data)[0])
| extend Value= todouble(data[tags].Value)
| extend SourceTimestamp = todatetime(data[tags].SourceTimestamp)
| extend MessageId = toint(MessageId)
| extend DataSetWriterId = tostring(DataSetWriterId)
| project tags, Value, SourceTimestamp, DataSetWriterId, MessageId


Note that the data tags carried by the message (e.g. “Solenoid”,  “Negative TrendData”, “Positive TrendData” and “Conveyor Status”) are likely not known in advance and need to be extracted from the message using the ADX bag_keys function.


8. Now that we have the transformation function from the previous step, we can create an update policy that takes the data from the source table (newly ingested data), applies the transformation and stores the data in the destination table


.alter  table Parsed_OPCUA policy update @'[{"Source": "opcua-raw", "Query": " ParseOPCUA", "IsEnabled": "True"}]'


9. Now we can examine the data in the new table to see our Time Series representation


Parsed_OPCUA | take 10


At this stage we have an easy-to-use Time Series data that can be used for further processing.


You can easily replicate these steps and start getting real value out of your OPC UA asset data for scenarios such as optimisation, monitoring, anomaly detection and predictive maintenance. And if you need to visualize your data, please check out our previous article “Azure Data Explorer Dashboards for IoT Data” which explains how ADX can be used to build powerful IoT dashboards. It is straightforward to use the same procedure to visualise the OPC UA data. Additionally, ADX also has powerful integration features that allow, for example, to extend the alarm detection logic by sending emails or triggering other business processes using other Azure capabilities such as Logic Apps.


Version history
Last update:
‎Apr 25 2022 11:51 AM
Updated by: