Blog Post

Azure Synapse Analytics Blog
8 MIN READ

Reading Delta Lake in Dedicated SQL Pool

Mark Pryce-Maher's avatar
Sep 15, 2022

In June, Databricks announced that they are open sourcing Delta Lake 2.0Delta Lake is quickly becoming the format of choice in data science and data engineering.

 

To import Delta Lake into a Synapse dedicated SQL Pool you would need Azure Data Factory/Synapse Pipelines or Spark to handle the Delta Lake files.


This is not ideal because it adds extra overheads of complexity, time, and costs.

 

As an intellectual challenge, I wondered if it's possible to import Delta Lake files directly into the dedicated SQL Pool and support features like time-travel. It turned out to be a great little project and a great way of learning about Delta Lake.

 

I have uploaded the script to the Azure Synapse Toolbox Github siteThe Azure Synapse Toolbox is an open-source library of useful tools and scripts to use with Azure Synapse Analytics. 

 

Here is a breakdown of how the script works.

 

  1. Read the _last_checkpoint file, this contains the highest checkpoint version number.
  2. Read all the checkpoint files, these contain the 'Add' and 'Remove' files that need to be included and their timestamps.
  3. Read the latest JSON transaction files (since the last checkpoint), these contain the most recent 'Add' and 'Remove' files that need to be included and their timestamps.
  4. Filter out the 'Add' and 'Remove' files based on the 'modificationTime' and 'deletionTimestamp'
  5. Return the data from all the 'Add' files excluding the 'Remove' files. (Batching up the COPY INTO statements)

 

 

 

/*
IF OBJECT_ID('mpmtest') IS NOT NULL
  BEGIN;
	DROP TABLE mpmtest
END

declare @path varchar(400), @dt datetime2, @credential varchar(500),@outputable varchar(500),@display int, @debug int;
set @path = 'https://storageaccount.blob.core.windows.net/container/delta/demo/'
--set @dt =  convert(datetime2,'2022/07/06 18:37:00'); --getdate(); --  for time travel -- 
set @dt =  getdate(); --  for time travel -- 
set @credential  = 'IDENTITY= ''Shared Access Signature'', SECRET = ''___SECRET____''';
set @outputable = 'mpmtest' -- leave empty for temp table
set @display = 1; -- if 1 display results
set @debug = 1;
exec delta @path,@dt,@credential,@outputable,@display, @debug

--select count(*) from mpmtest;
*/



create PROC [dbo].[delta] 
	@folder [varchar](4000), 
	@dt [datetime2], 
	@credential [varchar](4000), 
	@dest_table [varchar](4000),
	@display int, -- 0 dont display, 1 display
	@debug int -- 0 dont debug , 1 debug mode
	AS
begin
-- version info
-- 0.1 - Very basic, but sort of worked.
-- 0.2 - Added support for remove files.
-- 0.3 - Fixed bug , in authenication
-- 0.4 - Improved performance in handling add/remove files
-- 0.5 - Added support for checkpoint files
-- 0.6 - Changed the json handling, added latest checkpoint handing

	DECLARE  bigint,@tsmil bigint, @json varchar(8000), @tsql varchar(8000);
	DECLARE @json_remove varchar(8000), @tsql_checkpoint varchar(8000);
	DECLARE @td_checkpoint1 datetime2, @td_checkpoint2 datetime2;
	DECLARE @last_checkpoint varchar(25), @jsonversionchar varchar(25),@checkpointversion varchar(25) ;
	DECLARE @output_sql varchar(max);
	DECLARE _file_count bigint;

	-- default: AAD authenication
    IF (@credential IS NULL)
        SET @credential = ''

    IF (@credential <> '')
        SET @credential = ' CREDENTIAL = (' + @credential + ')'

	-- default : display results
	IF @display IS NULL
		set @display = 1

	-- default : debug is off
	IF @debug IS NULL 
		SET @debug = 0
	
	-- if the datetime is null, get the latest version
	IF (@dt IS NULL)
		set @dt = getdate()

	-- number of milliseconds since 1970
	-- Datediff doesn't return a bigint - so I need todo this into 2 parts
	set   = datediff( s, '1970/1/1', convert(date,@dt) ) 
	set  =  * 1000;

	set @tsmil = datediff(ms, convert(date,@dt) , @dt)
	set  =  + @tsmil;

	if (@dest_table IS NULL)
		set @dest_table = '#tmp_output'

	---- Holds the raw json information -----------------------------
	IF OBJECT_ID('tempdb..#last_checkpoint') IS NOT NULL
		DROP TABLE #last_checkpoint

	create table #last_checkpoint
	(
		info varchar(max)
	) with (distribution=round_robin, heap)

	------ read _last_checkpoint
	-- This gives us the checkpoint if it exists, this gives us the starting point
	-- for the json files
	set @tsql = '
	copy into  #last_checkpoint (info)
	from ''' + @folder + '_delta_log/_last_checkpoint''
	 with ( 	' + @credential +  '    ,FILE_TYPE = ''CSV''
	 ,fieldterminator =''0x0b''    ,fieldquote = ''0x0b''     ,rowterminator = ''0x0d'' ) '
 
	if @debug = 1 	
		print @tsql;
 
	set @td_checkpoint1 = getdate();  
	exec(@tsql);
	set @td_checkpoint2 = getdate();

	print 'Loading _last_checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'

	if @debug = 1 
	begin 
		select '_Last_checkpoint', *  from #last_checkpoint
	end
	
	select @checkpointversion = ISNULL(JSON_VALUE(info,'$.version'),'00') from #last_checkpoint;
	
	if NOT EXISTS(SELECT * FROM #last_checkpoint)
	BEGIN
		SET @checkpointversion = '00'
	END


	PRINT 'checkpointversion=' +@checkpointversion

	set @jsonversionchar = '00000000000000000000';

	IF OBJECT_ID('tempdb..#delta_checkpoint') IS NOT NULL
		DROP TABLE #delta_checkpoint

	create table #delta_checkpoint
	(
		txn varchar(max),
		[add] varchar(max),
		[remove] varchar(max),
		metaData varchar(max),
		protocol varchar(max)
	) with (distribution=round_robin, heap)

	--- Code to pull the the add / remove files from the checkpoint files.
	set @tsql_checkpoint = ' 
	copy into  #delta_checkpoint	from ''' + @folder + '_delta_log/*.checkpoint.parquet''
	 with ( ' + @credential +  '  ,FILE_TYPE = ''PARQUET'', AUTO_CREATE_TABLE = ''ON''   ) '

	 if @debug = 1
	 BEGIN
		print @tsql_checkpoint;
	END
 
	set @td_checkpoint1 = getdate()  
	exec(@tsql_checkpoint);
	set @td_checkpoint2 = getdate()

	print 'Loading checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'

	SELECT _file_count = count(*) from #delta_checkpoint

	 if @debug = 1 
	 BEGIN
		print 'No of checkpoint files ' + convert(varchar(10),@checkpoint_file_count);
		
		SELECT * FROM #delta_checkpoint order by [add],[remove]
	END
	-- Holds the information from the checkpoint files ---------
	IF OBJECT_ID('tempdb..#delta_files_checkpoint') IS NOT NULL
	  BEGIN;
		print 'Dropping table #delta_files_checkpoint'
		DROP TABLE #delta_files_checkpoint
	END
	
	create table #delta_files_checkpoint
	(
		fname varchar(99),
		ts bigint,
		dt datetime2,
		[action] char(1)
	) with (distribution=round_robin, heap)

	-- create a table of add/remove files with the datetimestamp
	INSERT INTO #delta_files_checkpoint
	select DISTINCT
		CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN
			JSON_VALUE([remove],'$.path')
		ELSE
			JSON_VALUE([add],'$.path') 
		END fname,
		CASE 
		WHEN JSON_VALUE([add],'$.path') is NULL THEN
			convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp'))
		ELSE
			convert(bigint,JSON_VALUE([add],'$.modificationTime')) 
			END updatetime,
		CASE 
		WHEN JSON_VALUE([add],'$.path') is NULL THEN
			dateadd(s, convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp'))  / 1000 , '1970/1/1')
		ELSE
			dateadd(s, convert(bigint,JSON_VALUE([add],'$.modificationTime'))  / 1000 , '1970/1/1')
		END updatetimedt,
		CASE	WHEN JSON_VALUE([add],'$.path') is NULL THEN
			'R'
		ELSE
			'A'
		END [action]
		from #delta_checkpoint

	DELETE from #delta_files_checkpoint where fname is null

	if @debug = 1 	
		SELECT 'checkpoint', * , , @dt FROM #delta_files_checkpoint order by dt desc

	 -- remove files after the given time 
	 DELETE FROM #delta_files_checkpoint  	WHERE ts >  

	if @debug = 1 	
		SELECT 'checkpoint filtered', * , , @dt FROM #delta_files_checkpoint  order by dt desc

	---- Holds the raw json information -----------------------------
	IF OBJECT_ID('tempdb..#delta_json') IS NOT NULL
	  BEGIN;
		DROP TABLE #delta_json
	END

	create table #delta_json
	(
		jsoninput varchar(max)
	) with (distribution=round_robin, heap)
	----------------------------------------------------------------

	set @jsonversionchar=  left(substring(@jsonversionchar,0,len(@jsonversionchar)-len(@checkpointversion)+1) + @checkpointversion,len(@jsonversionchar)-1) + '*'

	-- Read all the delta transaction logs, we cant filter out things by file name.
	set @tsql = '
	copy into  #delta_json (jsoninput)
	from ''' + @folder + '_delta_log/'+ @jsonversionchar +'.json''
	 with ( 	' + @credential +  '    ,FILE_TYPE = ''CSV''
	 ,fieldterminator =''0x0b''    ,fieldquote = ''0x0b''     ,rowterminator = ''0x0d'' ) '
 
	if @debug = 1 	
		print @tsql;
 
	set @td_checkpoint1 = getdate();  
	exec(@tsql);
	set @td_checkpoint2 = getdate();

	print 'Loading json files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'

	if @debug = 1 	
		SELECT 'JSON Files', * , , @dt FROM #delta_json

	---- delta file details..
	IF OBJECT_ID('tempdb..#delta_active_json') IS NOT NULL
	  BEGIN;
		DROP TABLE #delta_active_json
	END

	create table #delta_active_json
	(
		jsoninput varchar(max) , 
		ts bigint,
		dt datetime2,
		[action] char(1)
	) with (distribution=round_robin, heap)
	-----------------------------------------------------------

	-- inserting into temp table, so the date and time is associated to each row
	insert into #delta_active_json
		select  
			convert(varchar(max),jsoninput)
			, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  as ts
			, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  / 1000 , '1970/1/1')
			,''
		from #delta_json 
		WHERE convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) <   


	if @debug = 1 	
		select 'json filtered by date',*, , @dt	from #delta_active_json order by dt desc

	-- left in for debugging -- shows each version of the JSON
	if @debug = 1 
	begin
		select  'not filtered',
		convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  as ts
		, convert(varchar(8000),jsoninput)
		, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  / 1000 , '1970/1/1')
		,ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY  JSON_VALUE(jsoninput,'$.commitInfo.timestamp') DESC) AS "Row Number" 
		,convert(varchar(8000),jsoninput)
		, , @dt 
		from #delta_json  
		order by convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  desc
	end


	---------------------------------------------------------
	-- insert the JSON adds and removes to the existing list from checkpoint
	insert into #delta_files_checkpoint
	select  substring(value,charindex('path',value)+7 ,67) [path] 
			, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
			, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  / 1000 , '1970/1/1') td
			,'A' as [action]
	from #delta_active_json 
		cross apply STRING_SPLIT(jsoninput,',') 
	where value like '%path%' and  charindex('add',value) > 0
	union all
	select   substring(value,charindex('path',value)+7 ,67)
			, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
			, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  / 1000 , '1970/1/1') td
			, 'R'
	from #delta_active_json 
		cross apply STRING_SPLIT(jsoninput,',') 
	where value like '%path%' and  charindex('remove',value) > 0


	if @debug = 1 
	begin
		-- all the adds and removes from the json
		select 'adds', substring(value,charindex('path',value)+7 ,67) [path] 
			, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
			, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  / 1000 , '1970/1/1') td
			,'A' as [action] , , @dt
		from #delta_active_json 
			cross apply STRING_SPLIT(jsoninput,',') 
		where value like '%path%' and  charindex('add',value) > 0

		select  'removes',  substring(value,charindex('path',value)+7 ,67)
				, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
				, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp'))  / 1000 , '1970/1/1') td
				, 'R', , @dt
		from #delta_active_json 
			cross apply STRING_SPLIT(jsoninput,',') 
		where value like '%path%' and  charindex('remove',value) > 0
	
		select 'New list with adds and removes', *, , @dt from #delta_files_checkpoint order by ts desc;
	
		select distinct 'distinct list ', fname, dt, [action], , @dt  from #delta_files_checkpoint order by dt desc;
	end


	---- Holds the raw json information -----------------------------
	IF OBJECT_ID('tempdb..#delta_files') IS NOT NULL
	  BEGIN;
		DROP TABLE #delta_files
	END

	create table #delta_files
	(
		filenames varchar(max), rownumber bigint
	) with (distribution=round_robin, heap)

	insert into #delta_files
		select fname, 
		ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY fname DESC) 
		from #delta_files_checkpoint where [action] = 'A' and
			fname not in (select fname from  #delta_files_checkpoint where [action] = 'R') 

	if @debug = 1 
		select '#delta_files', * from #delta_files;

	if @debug = 1 
		select 'parquet to read ', fname,ts, , dt, @dt from #delta_files_checkpoint where [action] = 'A' and
			fname not in (select fname from  #delta_files_checkpoint where [action] = 'R') 
		--where ts < 

	--- This batching code might appear over kill, but STRING_AGG falls over when the string
	-- is too long.. so I am batching up the inserts.

	DECLARE @flist varchar(max);
	DECLARE @batchsize int = 100000;
	DECLARE @iParquetFileCount int =0;
	DECLARE @iParquetFilePos int =1;
	select @iParquetFileCount = count(*) from #delta_files

	IF OBJECT_ID('tempdb..#_files') IS NOT NULL
		DROP TABLE #_files

	create table #_files
	( fname varchar(999) ) with (distribution=round_robin,heap)

	---- creating batches of COPY INTO statements
	while(@iParquetFilePos <= @iParquetFileCount)
	BEGIN
		INSERT INTO #_files 
		SELECT [filenames] FROM #delta_files where rownumber between @iParquetFilePos and @iParquetFilePos + @batchsize 

		-- Had issues with STRING_AGG with if the string gets too long
		SELECT @flist = 'COPY INTO  ' + @dest_table + '  FROM '  
						+ STRING_AGG(CAST(''''+@folder+fname+'''' AS VARCHAR(MAX)), ',
						') + '   with ( 	' + @credential +  '
							,FILE_TYPE = ''PARQUET''  , AUTO_CREATE_TABLE = ''ON''  )' 
							FROM #_files

		if @debug =1 
			select '_filelist', * from #_files;
		
		if @debug =1 print @flist;
		if @debug =1 print len(@flist);

		set @td_checkpoint1 = getdate();  
		exec(@flist);
		set @td_checkpoint2 = getdate();

		print 'Loading batch of parquet ' + convert(varchar(10),@iParquetFilePos) + '->' +  convert(varchar(10), @iParquetFilePos + @batchsize ) + ' took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'

		truncate table #_files;
		set @iParquetFilePos = @iParquetFilePos + @batchsize +1;
	END
	
	-- if there is no file, there is no table and this causes a problem
	if charindex('#',@dest_table) > 0
		set @output_sql = 'IF OBJECT_ID(''tempdb..' + @dest_table + ''') IS NOT NULL 	 SELECT * from ' + @dest_table + ';'
	ELSE
		set @output_sql = 'IF OBJECT_ID(''' + @dest_table + ''') IS NOT NULL 	 SELECT * from ' + @dest_table + ';'

	if @debug = 1
		PRINT @output_sql;
	
	if @display = 1
		exec(@output_sql );

end

 

 

 

Our team publishes blog(s) each week and you can find all these blogs here: https://aka.ms/synapsecseblog  

For deeper level understanding of Synapse implementation best practices, please refer to the Synapse Success by Design site. 

 

Author(s): Mark Pryce-Maher is a Program Manager in Azure Synapse Customer Success Engineering (CSE) team. 

 

 

 

 

 

 

 

 

 

 

Updated Sep 14, 2022
Version 1.0

10 Comments

  • MLT4S's avatar
    MLT4S
    Copper Contributor

    Hi Mark Pryce-Maher Ignore my last comment or delete. I was able to corroborate my delta table counts and we have the right number! Thank you again for this solution. It has been very helpful to us as an alternative.

  • MLT4S's avatar
    MLT4S
    Copper Contributor

    Hello Mark Pryce-Maher I found the var that stores your path string length and was able to leverage the checkpoint to recurse all files now. I hit a new issue where somehow it is only reading in 1.6M rows as a ceiling to big tables.. I didn't find any hard-coded limits in your SP though so wondering if that's a system limitation or just hidden somewhere in code.

  • MLT4S's avatar
    MLT4S
    Copper Contributor

    Hello Mark Pryce-Maher, Thank you for this. I tried it for single partition delta files and it works! Is there a minor change you can introduce for delta tables that are partitioned? When I tried it on such, am getting this error. it seems unable to recurse into the child folders?

     

    441

    Msg 105219, Level 16, State 1, Line 18

    Cannot create new table when provided location is empty or contains only 0 byte files.

     

  • Thanks for the comments.

     

    markwaelterman I will check that out, thanks for spotting it.

     

    dcastell1 There could be a much longer version of this blog with the reasons why I wrote it (But I don't want to bore you!). 

    I thought it should be possible with just T-SQL, so I wanted to prove (or disprove) that hypothesis.  

    In doing so, I learnt a lot about the delta.   The journey of learning was more important than the script itself. 

     

  • dcastell1's avatar
    dcastell1
    Copper Contributor

    First of all thank you for this enlightening post, i didn't think this was even possible, well done Mark Pryce-Maher

     

    On the other hand, I think that Synapse developers should use your vast knowledge on Delta Lake to better embed its support into Synapse, in the dedicated SQL pool, the serverless one and the Spark pools, I think the latter still miss some important features (No RESTORE in Spark SQL? 😞 😞

     

    It would really help us since we are adopting the Data Lakehouse infrastructure in most of our projects.

     

    Thank you!

  • markwaelterman's avatar
    markwaelterman
    Copper Contributor

    Mark Pryce-Maher thank you for posting this!

     

    I think you are missing a variable declaration on line 39.  I believe it should be @checkpoint_file_count, based on the rest of the script.

    _file_count variable also seems to be wrong syntax.

  • versydney's avatar
    versydney
    Iron Contributor

    Haha, I really really hope it's a spoiler of a proper integration for the product 😉

  • _MartinB 

    Thanks for the comment.   This is a very public forum, but I think this is best summed up by quoting River Song (from Dr Who) - 'Spoilers!

    Mark

  • _MartinB's avatar
    _MartinB
    Iron Contributor

    Hi Mark Pryce-Maher & Terence_OShea

    First of all: well done and thanks!


    I don't get it: You (= Microsoft Program Manager for Synapse) recognized that Delta Lake is an important file format that should be (better) supported by Dedicated SQL pool.

    But instead of convincing the Product Team to have this functionality build into the product itself, you come up with a 500 lines custom script. :suprised:

     

    Sometimes I am really wondering who is prioritising the Synapse Backlog.