Blog Post

Azure Database Support Blog
5 MIN READ

Lesson Learned #514: Optimizing Bulk Insert Performance in Parallel Data Ingestion - Part1

Jose_Manuel_Jurado's avatar
Nov 13, 2024

While working on a support case, we encountered an issue where bulk inserts were taking longer than our customer’s SLA allowed.

While working on a support case, we encountered an issue where bulk inserts were taking longer than our customer’s SLA allowed.

Working on the troubleshooting scenario, we identified three main factors affecting performance:

  1. High Transaction Log (TLOG) Threshold: The transaction log was under significant pressure, impacting bulk insert speeds.
  2. Excessive Indexes: The table had multiple indexes, adding overhead during each insert.
  3. High Index Fragmentation: Index fragmentation was high, further slowing down data ingestion.

We found that the customer was using partitioning within a single table, and we began considering a restructuring approach that would partition data across multiple databases rather than within a single table. This approach provided several advantages:

 

  • Set individual Transaction LOG thresholds for each partition, reducing the log pressure.
  • Disable indexes before each insert and rebuild them afterward. In a single table with partitioning, it is currently not possible to disable indexes per partition individually.
  • Select specific Service Level Objectives (SLOs) for each database, optimizing resource allocation based on volume and SLA requirements.
  • Indexed Views for Real-Time Data Retrieval: Partitioned databases allowed us to create indexed views that updated simultaneously with data inserts more faster because the volumen is less, enhancing read performance and consistency.
  • Reduced Fragmentation and Improved Query Performance: By rebuilding indexes after each bulk insert, we maintained optimal index structure across all partitions, significantly improving read speeds and taking less time.

By implementing these changes, we were able to achieve a significant reduction in bulk insert times, helping our customer meet SLA targets.

 

Following, I would like to share the following code:

  • This code reads CSV files from a specified folder and performs a parallel bulk inserts per each file. Each CSV file has a header, and the data is separated by the | character. 
  • During the reading process, the code identifies the value in column 20, which is a date in the format YYYY-MM-DD. 
  • This date value is converted to the YYYY-MM format, which is used to determine the target database where the data should be inserted. For example, if the value in column20 is 2023-08-15, it extracts 2023-08 and directs the record to the corresponding database for that period, for example, db202308.
  • Once the batchsize is reached per partition (rows per YYYY-MM read in the CSV file), the application executes in parallel the SQLBulkCopy. 

 

using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Runtime.Remoting.Contexts;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;

namespace BulkInsert
{
    class Program
    {
        static string sqlConnectionStringTemplate = "Server=servername.database.windows.net;Database={0};Authentication=Active Directory Managed Identity;User Id=xxxx;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Pooling=true;Max Pool size=300;Min Pool Size=100;ConnectRetryCount=3;ConnectRetryInterval=10;Connection Lifetime=0;Application Name=ConnTest Check Jump;Packet Size=32767";
        static string localDirectory = @"C:\CsvFiles";
        static int batchSize = 1248000;
        static SemaphoreSlim semaphore = new SemaphoreSlim(10);
        static async Task Main(string[] args)
        {
            var files = Directory.GetFiles(localDirectory, "*.csv");

            Stopwatch totalStopwatch = Stopwatch.StartNew();
            foreach (var filePath in files)
            {
                Console.WriteLine($"Processing file: {filePath}");
                await LoadDataFromCsv(filePath);
            }
            totalStopwatch.Stop();

            Console.WriteLine($"Total processing finished in {totalStopwatch.Elapsed.TotalSeconds} seconds.");
        }

        static async Task LoadDataFromCsv(string filePath)
        {
            Stopwatch fileReadStopwatch = Stopwatch.StartNew();
            var dataTables = new Dictionary<string, DataTable>();
            var bulkCopyTasks = new List<Task>();

            using (StreamReader reader = new StreamReader(filePath, System.Text.Encoding.UTF8, true, 819200))
            {
                string line;
                string key;
                long lineCount = 0;
                long lShow = batchSize / 2;

                reader.ReadLine(); 
                fileReadStopwatch.Stop();
                Console.WriteLine($"File read initialization took {fileReadStopwatch.Elapsed.TotalSeconds} seconds.");

                Stopwatch processStopwatch = Stopwatch.StartNew();
                string[] values = new string[31];

                while (!reader.EndOfStream)
                {
                    line = await reader.ReadLineAsync();
                    lineCount++;

                    if(lineCount % lShow == 0 )
                    {
                        Console.WriteLine($"Read {lineCount}");
                    }

                    values = line.Split('|');
                    key = DateTime.Parse(values[19]).ToString("yyyyMM");

                    if (!dataTables.ContainsKey(key))
                    {
                        dataTables[key] = CreateTableSchema();
                    }

                    var batchTable = dataTables[key];
                    DataRow row = batchTable.NewRow();

                    for (int i = 0; i < 31; i++)
                    {
                        row[i] = ParseValue(values[i], batchTable.Columns[i].DataType,i);
                    }
                    batchTable.Rows.Add(row);

                    if (batchTable.Rows.Count >= batchSize)
                    {
                        Console.WriteLine($"BatchSize processing {key} - {batchTable.Rows.Count}.");
                        Stopwatch insertStopwatch = Stopwatch.StartNew();
                        bulkCopyTasks.Add(ProcessBatchAsync(dataTables[key], key, insertStopwatch)); 
                        dataTables[key] = CreateTableSchema();
                    }
                }
                processStopwatch.Stop();
                Console.WriteLine($"File read and processing of {lineCount} lines completed in {processStopwatch.Elapsed.TotalSeconds} seconds.");
            }

            foreach (var key in dataTables.Keys)
            {
                if (dataTables[key].Rows.Count > 0)
                {
                    Stopwatch insertStopwatch = Stopwatch.StartNew();
                    bulkCopyTasks.Add(ProcessBatchAsync(dataTables[key], key, insertStopwatch));
                }
            }

            await Task.WhenAll(bulkCopyTasks);
        }

        static async Task ProcessBatchAsync(DataTable batchTable, string yearMonth, Stopwatch insertStopwatch)
        {
            await semaphore.WaitAsync();
            try
            {
                using (SqlConnection conn = new SqlConnection(string.Format(sqlConnectionStringTemplate, $"db{yearMonth}")))
                {
                    await conn.OpenAsync();
                    using (SqlTransaction transaction = conn.BeginTransaction())
                    using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, transaction)
                    {
                        DestinationTableName = "dbo.dummyTable",
                        BulkCopyTimeout = 40000,
                        BatchSize = batchSize,
                        EnableStreaming = true
                    })
                    {
                        await bulkCopy.WriteToServerAsync(batchTable);
                        transaction.Commit();
                    }
                }
                insertStopwatch.Stop();
                Console.WriteLine($"Inserted batch of {batchTable.Rows.Count} rows to db{yearMonth} in {insertStopwatch.Elapsed.TotalSeconds} seconds.");
            }
            finally
            {
                semaphore.Release();
            }
        }

        static DataTable CreateTableSchema()
        {
            DataTable dataTable = new DataTable();
            dataTable.Columns.Add("Column1", typeof(long));
            dataTable.Columns.Add("Column2", typeof(long));
            dataTable.Columns.Add("Column3", typeof(long));
            dataTable.Columns.Add("Column4", typeof(long));
            dataTable.Columns.Add("Column5", typeof(long));
            dataTable.Columns.Add("Column6", typeof(long));
            dataTable.Columns.Add("Column7", typeof(long));
            dataTable.Columns.Add("Column8", typeof(long));
            dataTable.Columns.Add("Column9", typeof(long));
            dataTable.Columns.Add("Column10", typeof(long));
            dataTable.Columns.Add("Column11", typeof(long));
            dataTable.Columns.Add("Column12", typeof(long));
            dataTable.Columns.Add("Column13", typeof(long));
            dataTable.Columns.Add("Column14", typeof(DateTime));
            dataTable.Columns.Add("Column15", typeof(double));
            dataTable.Columns.Add("Column16", typeof(double));
            dataTable.Columns.Add("Column17", typeof(string));
            dataTable.Columns.Add("Column18", typeof(long));
            dataTable.Columns.Add("Column19", typeof(DateTime));
            dataTable.Columns.Add("Column20", typeof(DateTime));
            dataTable.Columns.Add("Column21", typeof(DateTime));
            dataTable.Columns.Add("Column22", typeof(string));
            dataTable.Columns.Add("Column23", typeof(long));
            dataTable.Columns.Add("Column24", typeof(double));
            dataTable.Columns.Add("Column25", typeof(short));
            dataTable.Columns.Add("Column26", typeof(short));
            dataTable.Columns.Add("Column27", typeof(short));
            dataTable.Columns.Add("Column28", typeof(short));
            dataTable.Columns.Add("Column29", typeof(short));
            dataTable.Columns.Add("Column30", typeof(short));
            dataTable.Columns.Add("Column31", typeof(short));

            dataTable.BeginLoadData();
            dataTable.MinimumCapacity = batchSize;
            return dataTable;
        }

        static object ParseValue(string value, Type targetType, int i)
        {

            if (string.IsNullOrWhiteSpace(value))
                return DBNull.Value;

            if (long.TryParse(value, out long longVal))
                return longVal;

            if (double.TryParse(value, out double doubleVal))
                return doubleVal;

            if (DateTime.TryParse(value, out DateTime dateVal))
                return dateVal;

            return value;
        }
    }
}

 

Updated Nov 13, 2024
Version 2.0
No CommentsBe the first to comment