Optimizing Service Bus message processing concurrency using Logic apps Stateless flow.
In this article I will show how to utilize host configuration for Service bus trigger in logic app Standard, which is doing autocomplete,
these are function app setting and since Logic app standard service bus trigger is built on top of function app it will be used here .
the peek lock trigger is not covered in this article.
The difference between stateful and stateless
There is architecture difference between Stateful and stateless flow, the below diagram explains that.
Stateless flow
Stateful flow
The main difference is that the stateless mode is designed to work on a single process on a single worker. The benefit of this behavior in terms of service bus autocomplete trigger is to ensure that all messages are processed successfully and if there is a failure, the whole batch will be sent back to the queue.
How the test case was designed
in the host.json I used setting for MessageBatchSize and to study how it will affect the message handling
I used json like the below :
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle.Workflows",
"version": "[1.*, 2.0.0)"
},
"extensions": {
"serviceBus": {
"maxMessageBatchSize": 10,
"minMessageBatchSize": 5
}
}
}
I used the same flow in both stateful and stateless scenarios.
In real-life scenarios, the Delay HTTP call will be the actual stateful flow that will process the messages.
I did not use the "Call Logic App action" because it is not called synchronously in stateless flows.
Another benefit of calling the message processor as HTTP is to distribute the load between apps, so the message handler app could have different scaling settings than the message processor.
It is also important to orchestrate between the Service Bus lock duration and the expected execution time of one run.
Test Cases
beside the natural of the flow (stateless/ stateful) I consider testing the flow with and without session enabled
1-Stateless without session enabled.
The diagram shows that logic app batches are sometimes processed in parallel but not the same worker.
2-Stateless with one Session enabled
No overlaps in batches execution even there is multiple workers handling Runs.
3-Stateless with multiple sessions maxMessageBatchSize=1
there were 2 sessions in the queue and there is parallel handling for messages but with different sessions.
4-Stateless without Session maxMessageBatchSize=1 and Maximum Burst is one
All the message handling is done by one worker.
5-Stateful with session enabled.
There are overlaps, so the next run will not wait for the previous run to complete.
6- Stateful without session.
There are overlaps, so the next run will not wait for the previous run to complete.
Below the logic app code that I used for testing
{
"definition": {
"$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
"actions": {
"HTTP": {
"inputs": {
"method": "GET",
"retryPolicy": {
"type": "none"
},
"uri": "****"
},
"operationOptions": "DisableAsyncPattern",
"runAfter": {
"Initialize_COMPUTER_NAME": [
"SUCCEEDED"
]
},
"type": "Http"
},
"Initialize_COMPUTER_NAME": {
"inputs": {
"variables": [
{
"name": "COMPUTERNAME",
"type": "string",
"value": "@{appsetting('COMPUTERNAME')}"
}
]
},
"runAfter": {
"Initialize_count": [
"SUCCEEDED"
]
},
"type": "InitializeVariable"
},
"Initialize_count": {
"inputs": {
"variables": [
{
"name": "countt",
"type": "integer",
"value": "@length(triggerBody())"
}
]
},
"runAfter": {
"Initialize_sessionID": [
"SUCCEEDED"
]
},
"type": "InitializeVariable"
},
"Initialize_sessionID": {
"inputs": {
"variables": [
{
"name": "sessionID",
"type": "string",
"value": "@{triggerBody()?[0]?['sessionId']}"
}
]
},
"runAfter": {},
"type": "InitializeVariable"
},
"Insert_or_Update_Entity": {
"inputs": {
"parameters": {
"entity": {
"COMPUTERNAME": "@{variables('COMPUTERNAME')}",
"MessageCount": "@{variables('countt')}",
"PartitionKey": "name",
"RowKey": "@{workflow()?['run']?['name']}",
"Session": "@{variables('sessionID')}",
"end": "@{utcNow()}",
"kind": "Stateless",
"triggerStart": "@{trigger()['startTime']}"
},
"tableName": "LASBlog"
},
"serviceProviderConfiguration": {
"connectionName": "azureTables",
"operationId": "upsertEntity",
"serviceProviderId": "/serviceProviders/azureTables"
}
},
"runAfter": {
"HTTP": [
"SUCCEEDED"
]
},
"type": "ServiceProvider"
}
},
"contentVersion": "1.0.0.0",
"outputs": {},
"triggers": {
"When_messages_are_available_in_a_queue": {
"inputs": {
"parameters": {
"isSessionsEnabled": true,
"queueName": "session"
},
"serviceProviderConfiguration": {
"connectionName": "serviceBus",
"operationId": "receiveQueueMessages",
"serviceProviderId": "/serviceProviders/serviceBus"
}
},
"type": "ServiceProvider"
}
}
},
"kind": "Stateless"
}