Blog Post

Azure Integration Services Blog
4 MIN READ

Optimizing Service Bus message processing concurrency using Logic apps Stateless flow.

Mohammed_Barqawi's avatar
Dec 12, 2023

In this article I will show how to utilize host configuration for Service bus trigger in logic app Standard, which is doing autocomplete,

these are function app setting and since Logic app standard service bus trigger is built on top of function app it will be used here . 

 

the peek lock trigger is not covered in this article.

 

 

The difference between stateful and stateless

There is architecture difference between Stateful and stateless flow, the below diagram explains that. 

 

Stateless flow


Stateful flow

The main difference is that the stateless mode is designed to work on a single process on a single worker. The benefit of this behavior in terms of service bus autocomplete trigger is to ensure that all messages are processed successfully and if there is a failure, the whole batch will be sent back to the queue.

 

How the test case was designed 

in the host.json I used setting  for MessageBatchSize and to study how it will affect the message handling 

I used json like the below :

 

 

 

 

 

 

 

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle.Workflows",
    "version": "[1.*, 2.0.0)"
  },
  "extensions": {
    "serviceBus": {
      "maxMessageBatchSize": 10,
      "minMessageBatchSize": 5
    }
  }
}

 

 

 

 

 

I used the same flow in both stateful and stateless scenarios.

 

 

In real-life scenarios, the Delay HTTP call will be the actual stateful flow that will process the messages.

I did not use the "Call Logic App action" because it is not called synchronously in stateless flows.

Another benefit of calling the message processor as HTTP is to distribute the load between apps, so the message handler app could have different scaling settings than the message processor.

 

It is also important to orchestrate between the Service Bus lock duration and the expected execution time of one run.

 

 

 

Test Cases 

beside the natural of the flow (stateless/ stateful) I consider testing the flow with and without session enabled 

 

1-Stateless without session enabled.

Full image Full image 

 The diagram shows that logic app batches are sometimes processed in parallel but not the same worker.

2-Stateless with one Session enabled

Full image 

 No overlaps in batches execution even there is multiple workers handling Runs.

3-Stateless with multiple sessions maxMessageBatchSize=1

Full image 

 there were 2 sessions in the queue and there is parallel handling for messages but with different sessions. 

4-Stateless without Session maxMessageBatchSize=1 and Maximum Burst is one

Full image 

 All the message handling is done by one worker.

5-Stateful with session enabled.

Full image 

 There are overlaps, so the next run will not wait for the previous run to complete.

 

6- Stateful without session.

Full image 

 There are overlaps, so the next run will not wait for the previous run to complete. 


Below the logic app code that I used for testing 

 

 

 

{
    "definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "HTTP": {
                "inputs": {
                    "method": "GET",
                    "retryPolicy": {
                        "type": "none"
                    },
                    "uri": "****"
                },
                "operationOptions": "DisableAsyncPattern",
                "runAfter": {
                    "Initialize_COMPUTER_NAME": [
                        "SUCCEEDED"
                    ]
                },
                "type": "Http"
            },
            "Initialize_COMPUTER_NAME": {
                "inputs": {
                    "variables": [
                        {
                            "name": "COMPUTERNAME",
                            "type": "string",
                            "value": "@{appsetting('COMPUTERNAME')}"
                        }
                    ]
                },
                "runAfter": {
                    "Initialize_count": [
                        "SUCCEEDED"
                    ]
                },
                "type": "InitializeVariable"
            },
            "Initialize_count": {
                "inputs": {
                    "variables": [
                        {
                            "name": "countt",
                            "type": "integer",
                            "value": "@length(triggerBody())"
                        }
                    ]
                },
                "runAfter": {
                    "Initialize_sessionID": [
                        "SUCCEEDED"
                    ]
                },
                "type": "InitializeVariable"
            },
            "Initialize_sessionID": {
                "inputs": {
                    "variables": [
                        {
                            "name": "sessionID",
                            "type": "string",
                            "value": "@{triggerBody()?[0]?['sessionId']}"
                        }
                    ]
                },
                "runAfter": {},
                "type": "InitializeVariable"
            },
            "Insert_or_Update_Entity": {
                "inputs": {
                    "parameters": {
                        "entity": {
                            "COMPUTERNAME": "@{variables('COMPUTERNAME')}",
                            "MessageCount": "@{variables('countt')}",
                            "PartitionKey": "name",
                            "RowKey": "@{workflow()?['run']?['name']}",
                            "Session": "@{variables('sessionID')}",
                            "end": "@{utcNow()}",
                            "kind": "Stateless",
                            "triggerStart": "@{trigger()['startTime']}"
                        },
                        "tableName": "LASBlog"
                    },
                    "serviceProviderConfiguration": {
                        "connectionName": "azureTables",
                        "operationId": "upsertEntity",
                        "serviceProviderId": "/serviceProviders/azureTables"
                    }
                },
                "runAfter": {
                    "HTTP": [
                        "SUCCEEDED"
                    ]
                },
                "type": "ServiceProvider"
            }
        },
        "contentVersion": "1.0.0.0",
        "outputs": {},
        "triggers": {
            "When_messages_are_available_in_a_queue": {
                "inputs": {
                    "parameters": {
                        "isSessionsEnabled": true,
                        "queueName": "session"
                    },
                    "serviceProviderConfiguration": {
                        "connectionName": "serviceBus",
                        "operationId": "receiveQueueMessages",
                        "serviceProviderId": "/serviceProviders/serviceBus"
                    }
                },
                "type": "ServiceProvider"
            }
        }
    },
    "kind": "Stateless"
}

 

 

 

 

 

Updated Dec 12, 2023
Version 1.0
No CommentsBe the first to comment