Blog Post

Azure Integration Services Blog
4 MIN READ

Build ActiveMQ trigger for Logic App Standard

Mohammed_Barqawi's avatar
May 18, 2021

 

The source code is available on GitHub 

https://github.com/Azure/logicapps-connector-extensions

 

 

In this article I will show how to build the ActiveMQ trigger using the service provider capability in the logic app standard , the project was inspired from the article Azure Logic Apps Running Anywhere: Built-in connector extensibility - Microsoft Tech Community

 

 
  This project is only a POC and it is not fully tested

 

 

 

 

The ServiceProvider is serving two consumers

The logic app designer and the logic app run time.

 

 

The Designer can be the VS code or the portal where  requests will be done to get  the skeleton/Swager  of the trigger by calling REST API hosted on function app runtime like below

 

 

 

Run time will read the Logic app Json definition and execute the invoke operation.

 

the developed trigger work based on polling mechanism so the Logic app run time will call the invokeoperation based on the time interval configured in the trigger request.

   

 

 

 

 

 

"triggers": {
            "Receive_Messages": {
                "type": "ServiceProvider",
                "kind": "Polling",
                "inputs": {
                    "parameters": {
                        "queue": "TransactionQueue",
                        "MaximumNo": 44
                    },
                    "serviceProviderConfiguration": {
                        "connectionName": "activemq",
                        "operationId": "ActiveMQ : ReceiveMessages",
                        "serviceProviderId": "/serviceProviders/activemq"
                    }
                },
                "recurrence": {
                    "frequency": "Second",
                    "interval": 15
                }
            }
        },

 

 

 

 

 

 

this can be specified first by specifying the Recurrence Setting to be basic so the designer can know that is a polling trigger

 

 

 

 

 

Recurrence = new RecurrenceSetting
              {
                   Type = RecurrenceType.Basic,
              },

 

 

 

 

 

 

Then the designer will add the keyword kind = Polling

 

If the kind keyword is not added, then add it manually.

InvokeOperation operation steps

 

The operation is doing the following.

Read the connection properties as well as the trigger request properties.

If there are no messages in the queue then the response will be System.Net.HttpStatusCode.Accepted which will be understood by the Logic app run engine as a skipped trigger

 

 

 

 

 

 

public Task<ServiceOperationResponse> InvokeOperation(string operationId, InsensitiveDictionary<JToken> connectionParameters,
        ServiceOperationRequest serviceOperationRequest)
{
    //System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) start InvokeOperation ");
    string Error = "";
    try
    {
      
        ServiceOpertionsProviderValidation.OperationId(operationId);
        triggerPramsDto _triggerPramsDto = new triggerPramsDto(connectionParameters, serviceOperationRequest);
      
        var connectionFactory = new NmsConnectionFactory(_triggerPramsDto.UserName, _triggerPramsDto.Password, _triggerPramsDto.BrokerUri);
        using (var connection = connectionFactory.CreateConnection())
        {
            connection.ClientId = _triggerPramsDto.ClientId;
            using (var session = connection.CreateSession(AcknowledgementMode.Transactional))
            {
                using (var queue = session.GetQueue(_triggerPramsDto.QueueName))
                {
                    using (var consumer = session.CreateConsumer(queue))
                    {
                        connection.Start();
                        List<JObject> receiveMessages = new List<JObject>();
                        for (int i = 0; i < _triggerPramsDto.MaximumNo; i++)
                        {
                            var message = consumer.Receive(new TimeSpan(0,0,0,1)) as ITextMessage;
                            //System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) message != null {(message != null).ToString()} ");
                            if (message != null)
                            {
                                receiveMessages.Add(new JObject
                            {
                                { "contentData", message.Text },
                                { "Properties",new JObject{ { "NMSMessageId", message.NMSMessageId } } },
                            });
                            }
                            else
                            {
                                //the we will exit the loop if there are no message
                                break;
                            }
                        }
                        session.Commit();
                        session.Close();
                        connection.Close();
                        if (receiveMessages.Count == 0)
                        {
                            //System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) Skip  { JObject.FromObject(new { message = "No messages"} ) }");
                            return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JObject.FromObject(new { message = "No messages" }), System.Net.HttpStatusCode.Accepted));
                        }
                        else
                        {
                        //System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) Ok  {JArray.FromObject(receiveMessages)}");
                        return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JArray.FromObject(receiveMessages), System.Net.HttpStatusCode.OK));
                  
                        }
                         }
                }
            }
        }
    }
    catch (Exception e)
    {
        Error = e.Message;
        //System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) error {e.Message}");
    }
    return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JObject.FromObject(new { message = Error }), System.Net.HttpStatusCode.InternalServerError));
}

 

 

 

 

 

 

Development environment

 

To let the designer recognized the new serviceprovider, the information for the Dll should be added to the extensions.json

Which can be found in the below path

 

 

 

 

 

C:\Users\...\.azure-functions-core-tools\Functions\ExtensionBundles\Microsoft.Azure.Functions.ExtensionBundle.Workflows\1.1.7\bin\extensions.json

 

 

 

 

 

 

Also we need to copy dll and it is dependencies to the bin folder next to the extensions.json file , this is done by using powershell script that run after the build

the powershell script can be found in the C# project folder 

 

After building the ServiceProvider project you can switch to vs code that have the logic app designer installed more information can be found on Azure Logic Apps (Preview) - Visual Studio Marketplace

 

 

 

 

To add the service provider package to the logic app 

First convert the logic app project to nuget as described here Create Logic Apps Preview workflows in Visual Studio Code - Azure Logic Apps | Microsoft Docs

 

Then to get the nupkg file you can enable the below checkbox

 

Then to add the package  you can run the powershell file Common/tools/add-extension.ps1

1- run Import-Module C:\path to the file\add-extension.ps1 

2- add-extension  Path "ActiveMQ" 

 

 

 

 

 

You may face difficulty regarding the Nueget package cache ,So keep in mind that you may need manually delete  the package file form the cache

 

 

Setup the ActiveMQ server

 

I used the docker image rmohr/activemq (docker.com)

Then create messages using the management page http://localhost:8161/

 

 

 

 

 

The source code is  available on GitHub 

https://github.com/Azure/logicapps-connector-extensions

 

 

 
 
 
Updated Jun 21, 2021
Version 2.0
No CommentsBe the first to comment