The source code is available on GitHub
https://github.com/Azure/logicapps-connector-extensions
In this article I will show how to build the ActiveMQ trigger using the service provider capability in the logic app standard , the project was inspired from the article Azure Logic Apps Running Anywhere: Built-in connector extensibility - Microsoft Tech Community
|
The ServiceProvider is serving two consumers
The logic app designer and the logic app run time.
The Designer can be the VS code or the portal where requests will be done to get the skeleton/Swager of the trigger by calling REST API hosted on function app runtime like below
Run time will read the Logic app Json definition and execute the invoke operation.
the developed trigger work based on polling mechanism so the Logic app run time will call the invokeoperation based on the time interval configured in the trigger request.
"triggers": {
"Receive_Messages": {
"type": "ServiceProvider",
"kind": "Polling",
"inputs": {
"parameters": {
"queue": "TransactionQueue",
"MaximumNo": 44
},
"serviceProviderConfiguration": {
"connectionName": "activemq",
"operationId": "ActiveMQ : ReceiveMessages",
"serviceProviderId": "/serviceProviders/activemq"
}
},
"recurrence": {
"frequency": "Second",
"interval": 15
}
}
},
this can be specified first by specifying the Recurrence Setting to be basic so the designer can know that is a polling trigger
Recurrence = new RecurrenceSetting
{
Type = RecurrenceType.Basic,
},
Then the designer will add the keyword kind = Polling
If the kind keyword is not added, then add it manually.
InvokeOperation operation steps
The operation is doing the following.
Read the connection properties as well as the trigger request properties.
- Create connection to Activemq using Apache.NMS.AMQP as documented in the apache site https://activemq.apache.org/components/nms/providers/amqp/
- The read the available messages based on user configured value MaximumNo combined them in a array that match the configured trigger output
If there are no messages in the queue then the response will be System.Net.HttpStatusCode.Accepted which will be understood by the Logic app run engine as a skipped trigger
public Task<ServiceOperationResponse> InvokeOperation(string operationId, InsensitiveDictionary<JToken> connectionParameters,
ServiceOperationRequest serviceOperationRequest)
{
//System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) start InvokeOperation ");
string Error = "";
try
{
ServiceOpertionsProviderValidation.OperationId(operationId);
triggerPramsDto _triggerPramsDto = new triggerPramsDto(connectionParameters, serviceOperationRequest);
var connectionFactory = new NmsConnectionFactory(_triggerPramsDto.UserName, _triggerPramsDto.Password, _triggerPramsDto.BrokerUri);
using (var connection = connectionFactory.CreateConnection())
{
connection.ClientId = _triggerPramsDto.ClientId;
using (var session = connection.CreateSession(AcknowledgementMode.Transactional))
{
using (var queue = session.GetQueue(_triggerPramsDto.QueueName))
{
using (var consumer = session.CreateConsumer(queue))
{
connection.Start();
List<JObject> receiveMessages = new List<JObject>();
for (int i = 0; i < _triggerPramsDto.MaximumNo; i++)
{
var message = consumer.Receive(new TimeSpan(0,0,0,1)) as ITextMessage;
//System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) message != null {(message != null).ToString()} ");
if (message != null)
{
receiveMessages.Add(new JObject
{
{ "contentData", message.Text },
{ "Properties",new JObject{ { "NMSMessageId", message.NMSMessageId } } },
});
}
else
{
//the we will exit the loop if there are no message
break;
}
}
session.Commit();
session.Close();
connection.Close();
if (receiveMessages.Count == 0)
{
//System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) Skip { JObject.FromObject(new { message = "No messages"} ) }");
return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JObject.FromObject(new { message = "No messages" }), System.Net.HttpStatusCode.Accepted));
}
else
{
//System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) Ok {JArray.FromObject(receiveMessages)}");
return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JArray.FromObject(receiveMessages), System.Net.HttpStatusCode.OK));
}
}
}
}
}
}
catch (Exception e)
{
Error = e.Message;
//System.IO.File.AppendAllText("c:\\temp\\lalogdll2.txt", $"\r\n({DateTime.Now}) error {e.Message}");
}
return Task.FromResult((ServiceOperationResponse)new ActiveMQTriggerResponse(JObject.FromObject(new { message = Error }), System.Net.HttpStatusCode.InternalServerError));
}
Development environment
To let the designer recognized the new serviceprovider, the information for the Dll should be added to the extensions.json
Which can be found in the below path
C:\Users\...\.azure-functions-core-tools\Functions\ExtensionBundles\Microsoft.Azure.Functions.ExtensionBundle.Workflows\1.1.7\bin\extensions.json
Also we need to copy dll and it is dependencies to the bin folder next to the extensions.json file , this is done by using powershell script that run after the build
the powershell script can be found in the C# project folder
After building the ServiceProvider project you can switch to vs code that have the logic app designer installed more information can be found on Azure Logic Apps (Preview) - Visual Studio Marketplace
To add the service provider package to the logic app
First convert the logic app project to nuget as described here Create Logic Apps Preview workflows in Visual Studio Code - Azure Logic Apps | Microsoft Docs
Then to get the nupkg file you can enable the below checkbox
Then to add the package you can run the powershell file Common/tools/add-extension.ps1
1- run Import-Module C:\path to the file\add-extension.ps1
2- add-extension Path "ActiveMQ"
You may face difficulty regarding the Nueget package cache ,So keep in mind that you may need manually delete the package file form the cache
Setup the ActiveMQ server
I used the docker image rmohr/activemq (docker.com)
Then create messages using the management page http://localhost:8161/
The source code is available on GitHub
https://github.com/Azure/logicapps-connector-extensions