Reusing dialogs with a dialog pool
Published Mar 23 2019 10:59 AM 1,442 Views
Microsoft
First published on MSDN on Jul 24, 2008

As noted in various Service Broker sources, it is often advantageous to minimize the overhead of creating dialogs to send messages on. This blog shows how to create a shared pool of dialogs to be able to reuse dialogs instead of creating new ones. The dialog pool is a variation of Remus Rusanu's reusing and recycling conversations as shown in his blog . One of the main differences is that the dialog pool is keyed only on services and contract, not SPID. This allows the same SPID to obtain multiple dialogs from the pool should the need arise. As importantly, different SPIDs can reuse the same dialog sequentially instead of creating two of them. Measurements show equivalent performance using the dialog pool compared to the SPID-based reuse scheme.



The following code shows how to get, free and delete dialogs from a dialog pool table. Initially empty, a new dialog is created in the pool when a request for an existing free dialog cannot be met. Thus the pool will grow during bursts of high demand.



The dialog pool entries also contain creation time and send count fields that ease the auditing and "recycling" of dialogs in the pool based on application requirements. Recycling consists of gracefully ending an existing dialog between services and beginning a new one. If done prudently, this technique can ease the handling of dialog errors by limiting the number of messages affected. For example, the application may choose to contrain a dialog to a certain number of messages before it is recycled. This might also be done according to the age of a dialog. See the end of the usp_send procedure for an example of recycling.



An example application that exercises the dialog pool is also included.



--------------------------------------------------------------------------


-- Dialog Pool Sample.


-- This sample shows how to create and use a shared pool of reuseable dialogs.


-- The purpose of reusing dialogs is to reduce the overhead of creating them.


-- The sample also shows how dialogs in the pool can be "recycled" by deleting


-- dialogs based on application criteria, such as number of messages sent.


-- This sample is largely based on Remus Rusanu's tutorials on reusing and


-- recycling conversations (rusanu.com/blog).


-- Contents: dialog pool and application using the pool.


----------------------------------------------------



USE master



GO



--------------------------------------------------------------------------


-- Create demo database section


--------------------------------------------------------------------------



IF EXISTS (SELECT name FROM sys.databases WHERE name = 'SsbDemoDb')


DROP DATABASE [SsbDemoDb];



CREATE DATABASE [SsbDemoDb]



GO



USE [SsbDemoDb];


GO



-- Create master key


IF NOT EXISTS(SELECT name FROM sys.symmetric_keys WHERE name = '##MS_DatabaseMasterKey##')


CREATE MASTER KEY ENCRYPTION BY PASSWORD='Password#123'



GO



--------------------------------------------------------------------------


-- Dialog pool section


--------------------------------------------------------------------------



--------------------------------------------------------------------------


-- The dialog pool table.


-- Obtain a conversation handle using from service, to service, and contract.


-- Also indicates age and usage of dialog for auditing purposes.


--------------------------------------------------------------------------


IF EXISTS (SELECT name FROM sys.tables WHERE name = 'DialogPool')


DROP TABLE [DialogPool]


GO


CREATE TABLE [DialogPool] (


FromService SYSNAME NOT NULL,


ToService SYSNAME NOT NULL,


OnContract SYSNAME NOT NULL,


Handle UNIQUEIDENTIFIER NOT NULL,


OwnerSPID INT NOT NULL,


CreationTime DATETIME NOT NULL,


SendCount BIGINT NOT NULL,


UNIQUE (Handle));


GO



--------------------------------------------------------------------------


-- Get dialog procedure.


-- Reuse a free dialog in the pool or create a new one in case


-- no free dialogs exist.


-- Input is from service, to service, and contract.


-- Output is dialog handle and count of message previously sent on dialog.


--------------------------------------------------------------------------


IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_get_dialog')


DROP PROC usp_get_dialog


GO


CREATE PROCEDURE [usp_get_dialog] (


@fromService SYSNAME,


@toService SYSNAME,


@onContract SYSNAME,


@dialogHandle UNIQUEIDENTIFIER OUTPUT,


@sendCount BIGINT OUTPUT)


AS


BEGIN


SET NOCOUNT ON;


DECLARE @dialog TABLE


(


FromService SYSNAME NOT NULL,


ToService SYSNAME NOT NULL,


OnContract SYSNAME NOT NULL,


Handle UNIQUEIDENTIFIER NOT NULL,


OwnerSPID INT NOT NULL,


CreationTime DATETIME NOT NULL,


SendCount BIGINT NOT NULL


);



-- Try to claim an unused dialog in [DialogPool]


-- READPAST option avoids blocking on locked dialogs.


BEGIN TRANSACTION;


DELETE @dialog;


UPDATE TOP(1) [DialogPool] WITH(READPAST)


SET OwnerSPID = @@SPID


OUTPUT INSERTED.* INTO @dialog


WHERE FromService = @fromService


AND ToService = @toService


AND OnContract = @OnContract


AND OwnerSPID = -1;


IF @@ROWCOUNT > 0


BEGIN


SET @dialogHandle = (SELECT Handle FROM @dialog);


SET @sendCount = (SELECT SendCount FROM @dialog);


END


ELSE


BEGIN


-- No free dialogs: need to create a new one


BEGIN DIALOG CONVERSATION @dialogHandle


FROM SERVICE @fromService


TO SERVICE @toService


ON CONTRACT @onContract


WITH ENCRYPTION = OFF;


INSERT INTO [DialogPool]


(FromService, ToService, OnContract, Handle, OwnerSPID,


CreationTime, SendCount)


VALUES


(@fromService, @toService, @onContract, @dialogHandle, @@SPID,


GETDATE(), 0);


SET @sendCount = 0;


END


COMMIT


END;


GO



--------------------------------------------------------------------------


-- Free dialog procedure.


-- Return the dialog to the pool.


-- Inputs are dialog handle and updated send count.


--------------------------------------------------------------------------


IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_free_dialog')


DROP PROC usp_free_dialog


GO


CREATE PROCEDURE [usp_free_dialog] (


@dialogHandle UNIQUEIDENTIFIER,


@sendCount BIGINT)


AS


BEGIN


SET NOCOUNT ON;


DECLARE @rowcount INT;


DECLARE @string VARCHAR(50);



BEGIN TRANSACTION;



-- Release dialog by setting OwnerSPID to -1.


UPDATE [DialogPool] SET OwnerSPID = -1, SendCount = @sendCount WHERE Handle = @dialogHandle;


SELECT @rowcount = @@ROWCOUNT;


IF @rowcount = 0


BEGIN


SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));


RAISERROR('usp_free_dialog: dialog %s not found in dialog pool', 16, 1, @string) WITH LOG;


END


ELSE IF @rowcount > 1


BEGIN


SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));


RAISERROR('usp_free_dialog: duplicate dialog %s found in dialog pool', 16, 1, @string) WITH LOG;


END



COMMIT


END;


GO



--------------------------------------------------------------------------


-- Delete dialog procedure.


-- Delete the dialog from the pool. This does not end the dialog.


-- Input is dialog handle.


--------------------------------------------------------------------------


IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_delete_dialog')


DROP PROC usp_delete_dialog


GO


CREATE PROCEDURE [usp_delete_dialog] (


@dialogHandle UNIQUEIDENTIFIER)


AS


BEGIN


SET NOCOUNT ON;



BEGIN TRANSACTION;


DELETE [DialogPool] WHERE Handle = @dialogHandle;


COMMIT


END;


GO



--------------------------------------------------------------------------


-- Application setup section.


--------------------------------------------------------------------------



--------------------------------------------------------------------------


-- Send messages from initiator to target.


-- Initiator uses dialogs from the dialog pool.


-- Initiator also retires dialogs based on application criteria,


-- which results in recycling dialogs in the pool.


--------------------------------------------------------------------------



-- This table stores the messages on the target side


IF EXISTS (SELECT name FROM sys.tables WHERE name = 'MsgTable')


DROP TABLE MsgTable


GO


CREATE TABLE MsgTable ( message_type SYSNAME, message_body NVARCHAR(4000))


GO



-- Activated store proc for the initiator to receive messages.


CREATE PROCEDURE initiator_queue_activated_procedure


AS


BEGIN


DECLARE @handle UNIQUEIDENTIFIER;


DECLARE @message_type SYSNAME;



BEGIN TRANSACTION;


WAITFOR (


RECEIVE TOP(1) @handle = [conversation_handle],


@message_type = [message_type_name]


FROM [SsbInitiatorQueue]), TIMEOUT 5000;



IF @@ROWCOUNT = 1


BEGIN


-- Expect target response to EndOfStream message.


IF @message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'


BEGIN


END CONVERSATION @handle;


END


END


COMMIT


END;


GO



-- Activated store proc for the target to receive messages.


CREATE PROCEDURE target_queue_activated_procedure


AS


BEGIN


-- Variable table for received messages.


DECLARE @receive_table TABLE(


queuing_order BIGINT,


conversation_handle UNIQUEIDENTIFIER,


message_type_name SYSNAME,


message_body VARCHAR(MAX));



-- Cursor for received message table.


DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY


FOR SELECT


conversation_handle,


message_type_name,


message_body


FROM @receive_table ORDER BY queuing_order;



DECLARE @conversation_handle UNIQUEIDENTIFIER;


DECLARE @message_type SYSNAME;


DECLARE @message_body VARCHAR(4000);



-- Error variables.


DECLARE @error_number INT;


DECLARE @error_message VARCHAR(4000);


DECLARE @error_severity INT;


DECLARE @error_state INT;


DECLARE @error_procedure SYSNAME;


DECLARE @error_line INT;


DECLARE @error_dialog VARCHAR(50);



BEGIN TRY


WHILE (1 = 1)


BEGIN


BEGIN TRANSACTION;



-- Receive all available messages into the table.


-- Wait 5 seconds for messages.


WAITFOR (


RECEIVE


[queuing_order],


[conversation_handle],


[message_type_name],


CAST([message_body] AS VARCHAR(4000))


FROM [SsbTargetQueue]


INTO @receive_table


), TIMEOUT 5000;



IF @@ROWCOUNT = 0


BEGIN


COMMIT;


BREAK;


END


ELSE


BEGIN


OPEN message_cursor;


WHILE (1=1)


BEGIN


FETCH NEXT FROM message_cursor


INTO @conversation_handle,


@message_type,


@message_body;



IF (@@FETCH_STATUS != 0) BREAK;



-- Process a message.


-- If an exception occurs, catch and attempt to recover.


BEGIN TRY



IF @message_type = 'SsbMsgType'


BEGIN


-- process the msg. Here we will just insert it into a table


INSERT INTO MsgTable values(@message_type, @message_body);


END


ELSE IF @message_type = 'EndOfStream'


BEGIN


-- initiator is signaling end of message stream: end the dialog


END CONVERSATION @conversation_handle;


END


ELSE IF @message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'


BEGIN


-- If the message_type indicates that the message is an error,


-- raise the error and end the conversation.


WITH XMLNAMESPACES ('http://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)


SELECT


@error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),


@error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');


SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50));


RAISERROR('Error in dialog %s: %s (%i)', 16, 1, @error_dialog, @error_message, @error_number);


END CONVERSATION @conversation_handle;


END


END TRY


BEGIN CATCH


SET @error_number = ERROR_NUMBER();


SET @error_message = ERROR_MESSAGE();


SET @error_severity = ERROR_SEVERITY();


SET @error_state = ERROR_STATE();


SET @error_procedure = ERROR_PROCEDURE();


SET @error_line = ERROR_LINE();



IF XACT_STATE() = -1


BEGIN


-- The transaction is doomed. Only rollback possible.


-- This could disable the queue if done 5 times consecutively!


ROLLBACK TRANSACTION;



-- Record the error.


BEGIN TRANSACTION;


INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,


@error_severity, @error_state, @error_procedure, @error_line, 1);


COMMIT;



-- For this level of error, it is best to exit the proc


-- and give the queue monitor control.


-- Breaking to the outer catch will accomplish this.


RAISERROR ('Message processing error', 16, 1);


END


ELSE IF XACT_STATE() = 1


BEGIN


-- Record error and continue processing messages.


-- Failing message could also be put aside for later processing here.


-- Otherwise it will be discarded.


INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,


@error_severity, @error_state, @error_procedure, @error_line, 0);


END


END CATCH


END


CLOSE message_cursor;


DELETE @receive_table;


END


COMMIT;


END


END TRY


BEGIN CATCH


-- Process the error and exit the proc to give the queue monitor control


SET @error_number = ERROR_NUMBER();


SET @error_message = ERROR_MESSAGE();


SET @error_severity = ERROR_SEVERITY();


SET @error_state = ERROR_STATE();


SET @error_procedure = ERROR_PROCEDURE();


SET @error_line = ERROR_LINE();



IF XACT_STATE() = -1


BEGIN


-- The transaction is doomed. Only rollback possible.


-- This could disable the queue if done 5 times consecutively!


ROLLBACK TRANSACTION;



-- Record the error.


BEGIN TRANSACTION;


INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,


@error_severity, @error_state, @error_procedure, @error_line, 1);


COMMIT;


END


ELSE IF XACT_STATE() = 1


BEGIN


-- Record error and commit transaction.


-- Here you could also save anything else you want before exiting.


INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,


@error_severity, @error_state, @error_procedure, @error_line, 0);


COMMIT;


END


END CATCH


END;


GO



-- Table to store processing errors.


IF EXISTS (SELECT name FROM sys.tables WHERE name = 'target_processing_errors')


DROP TABLE target_processing_errors;


GO



CREATE TABLE target_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT,


error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL,


error_line INT, doomed_transaction TINYINT)


GO



-- Create Initiator and Target side SSB entities


CREATE MESSAGE TYPE SsbMsgType VALIDATION = WELL_FORMED_XML;


CREATE MESSAGE TYPE EndOfStream;


CREATE CONTRACT SsbContract


(


SsbMsgType SENT BY INITIATOR,


EndOfStream SENT BY INITIATOR


);


CREATE QUEUE SsbInitiatorQueue


WITH ACTIVATION (


STATUS = ON,


MAX_QUEUE_READERS = 1,


PROCEDURE_NAME = [initiator_queue_activated_procedure],


EXECUTE AS OWNER);


CREATE QUEUE SsbTargetQueue


WITH ACTIVATION (


STATUS = ON,


MAX_QUEUE_READERS = 1,


PROCEDURE_NAME = [target_queue_activated_procedure],


EXECUTE AS OWNER);



CREATE SERVICE SsbInitiatorService ON QUEUE SsbInitiatorQueue;


CREATE SERVICE SsbTargetService ON QUEUE SsbTargetQueue (SsbContract);


GO



-- SEND procedure. Uses a dialog from the dialog pool.


--


IF EXISTS (SELECT name FROM sys.procedures WHERE name = 'usp_send')


DROP PROC usp_send


GO


CREATE PROCEDURE [usp_send] (


@fromService SYSNAME,


@toService SYSNAME,


@onContract SYSNAME,


@messageType SYSNAME,


@messageBody NVARCHAR(MAX))


AS


BEGIN


SET NOCOUNT ON;


DECLARE @dialogHandle UNIQUEIDENTIFIER;


DECLARE @sendCount BIGINT;


DECLARE @counter INT;


DECLARE @error INT;



SELECT @counter = 1;


BEGIN TRANSACTION;


-- Will need a loop to retry in case the dialog is


-- in a state that does not allow transmission


--


WHILE (1=1)


BEGIN


-- Claim a dialog from the dialog pool.


-- A new one will be created if none are available.


--


EXEC usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT;



-- Attempt to SEND on the dialog


--


IF (@messageBody IS NOT NULL)


BEGIN


-- If the @messageBody is not null it must be sent explicitly


SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody);


END


ELSE


BEGIN


-- Messages with no body must *not* specify the body,


-- cannot send a NULL value argument


SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType;


END



SELECT @error = @@ERROR;


IF @error = 0


BEGIN


-- Successful send, increment count and exit the loop


--


SET @sendCount = @sendCount + 1;


BREAK;


END



SELECT @counter = @counter+1;


IF @counter > 10


BEGIN


-- We failed 10 times in a  row, something must be broken


--


RAISERROR('Failed to SEND on a conversation for more than 10 times. Error %i.', 16, 1, @error) WITH LOG;


BREAK;


END



-- Delete the associated dialog from the table and try again


--


EXEC usp_delete_dialog @dialogHandle;


SELECT @dialogHandle = NULL;


END



-- "Criterion" for dialog pool removal is send count > 1000.


-- Modify to suit application.


-- When deleting also inform the target to end the dialog.


IF @sendCount > 1000


BEGIN


EXEC usp_delete_dialog @dialogHandle ;


SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [EndOfStream];


END


ELSE


BEGIN


-- Free the dialog.


EXEC usp_free_dialog @dialogHandle, @sendCount;


END


COMMIT


END;


GO



------------------------------------------------------------------------------------


-- Run application section


------------------------------------------------------------------------------------



-- Send some messages


exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message1.</xml>'


exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message2.</xml>'


exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message3.</xml>'


exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message4.</xml>'


exec usp_send N'SsbInitiatorService', N'SsbTargetService', N'SsbContract', N'SsbMsgType', N'<xml>This is a well formed XML Message5.</xml>'


GO



-- Show the dialog pool


SELECT * FROM [DialogPool]


GO



-- Show the dialogs used.


SELECT * FROM sys.conversation_endpoints;


GO



-- Check whether the TARGET side has processed the messages


SELECT * FROM MsgTable


GO


TRUNCATE TABLE MsgTable


GO



1 Comment
Version history
Last update:
‎Mar 23 2019 10:59 AM
Updated by: