Lesson Learned #479:Loading Data from Parquet to Azure SQL Database using C# and SqlBulkCopy
Published Mar 15 2024 12:13 PM 1,477 Views

In the realm of big data and cloud computing, efficiently managing and transferring data between different platforms and formats is paramount. Azure SQL Database, a fully managed relational database service by Microsoft, offers robust capabilities for handling large volumes of data. However, when it comes to importing data from Parquet files, a popular columnar storage format, Azure SQL Database's native BULK INSERT command does not directly support this format. This article presents a practical solution using a C# console application to bridge this gap, leveraging the Microsoft.Data.SqlClient.SqlBulkCopy class for high-performance bulk data loading.

 

Understanding Parquet and Its Significance

Parquet is an open-source, columnar storage file format optimized for use with big data processing frameworks. Its design is particularly beneficial for complex nested data structures and efficient data compression and encoding schemes, making it a favored choice for data warehousing and analytical processing tasks.

 

The Challenge with Direct Data Loading

Azure SQL Database's BULK INSERT command is a powerful tool for importing large volumes of data quickly. 

A C# Solution: Bridging the Gap

To overcome this limitation, we can develop a C# console application that reads Parquet files, processes the data, and utilizes SqlBulkCopy for efficient data transfer to Azure SQL Database. This approach offers flexibility and control over the data loading process, making it suitable for a wide range of data integration scenarios.

 

Step 1: Setting Up the Environment

Before diving into the code, ensure your development environment is set up with the following:

  • .NET Core or .NET Framework compatible with Microsoft.Data.SqlClient.
  • Microsoft.Data.SqlClient package installed in your project.
  • Parquet.Net package to facilitate Parquet file reading.

 

Step 2: Create the target table in Azure SQL Database.

 

create table parquet (id int, city varchar(30))

 

Step 3: Create Parquet File in C#

The following C# code allows to Parquet file using two columns ID and city. 

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
using Parquet;
using Parquet.Data;
using Parquet.Schema;


namespace ManageParquet
{
     class ClsWriteParquetFile
    {
        public async Task CreateFile(string filePath)
        {
            var schema = new ParquetSchema( new DataField<int>("id"), new DataField<string>("city"));
            var idColumn = new DataColumn(       schema.DataFields[0],       new int[] { 1, 2 });
            var cityColumn = new DataColumn(             schema.DataFields[1],               new string[] { "L", "D" });
            using (Stream fileStream = System.IO.File.OpenWrite(filePath))
            {
                using (ParquetWriter parquetWriter = await ParquetWriter.CreateAsync(schema, fileStream))
                {
                    parquetWriter.CompressionMethod = CompressionMethod.Gzip;
                    parquetWriter.CompressionLevel = System.IO.Compression.CompressionLevel.Optimal;
                    // create a new row group in the file
                    using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
                    {
                           groupWriter.WriteColumnAsync(idColumn);
                           groupWriter.WriteColumnAsync(cityColumn);
                    }
                }
            }
        }
    }
}

 

 

Step 4: Reading Parquet Files in C#

The first part of the solution involves reading the Parquet file. We leverage the ParquetReader class to access the data stored in the Parquet format.

 

In the following source code you could find two methods. The first method only read the parquet file and the second one reads and saves the data in a table of Azure SQL Database.

 

 

using Parquet;
using Parquet.Data;
using Parquet.Schema;
using System;
using System.Data;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;


namespace ManageParquet
{

    class ClsReadParquetFile
    {
        public async Task ReadFile(string filePath)
        {
            ParquetReader parquetReader = await ParquetReader.CreateAsync(filePath);//, options);
                
                    ParquetSchema schema = parquetReader.Schema;
                    Console.WriteLine("Schema Parquet file:");
                    foreach (var field in schema.Fields)
                    {
                        Console.WriteLine($"{field.Name}");
                    }

                    for (int i = 0; i < parquetReader.RowGroupCount; i++)
                    {
                        using (ParquetRowGroupReader groupReader = parquetReader.OpenRowGroupReader(i))
                        {
                            foreach (DataField field in schema.GetDataFields())
                            {
                        Parquet.Data.DataColumn column = await groupReader.ReadColumnAsync(field);

                                Console.WriteLine($"Column Data of '{field.Name}':");
                                foreach (var value in column.Data)
                                {
                                    Console.WriteLine(value);
                                }
                            }
                        }
                    }
        }

        public async Task ReadFileLoadSQL(string filePath)
        {
            ParquetReader parquetReader = await ParquetReader.CreateAsync(filePath);//, options);
            //ParquetSchema schema = parquetReader.Schema;
            var schema = new ParquetSchema(new DataField<int>("id"), new DataField<string>("city"));
            DataTable dataTable = new DataTable();
            dataTable.Columns.Add("id", typeof(int));
            dataTable.Columns.Add("city", typeof(string));

            for (int i = 0; i < parquetReader.RowGroupCount; i++)
            {
                using (ParquetRowGroupReader groupReader = parquetReader.OpenRowGroupReader(i))
                {

                    var idColumn = new Parquet.Data.DataColumn(schema.DataFields[0], new int[] { 1, 2 });
                    var cityColumn = new Parquet.Data.DataColumn(schema.DataFields[1], new string[] { "L", "D" });
                    for (int j = 0; j < idColumn.Data.Length; j++)
                    {
                        var row = dataTable.NewRow();
                        row["id"] = idColumn.Data.GetValue(j);
                        row["city"] = cityColumn.Data.GetValue(j);
                        dataTable.Rows.Add(row);
                    }
                }
            }

            using (SqlConnection dbConnection = new SqlConnection("Server=tcp:servername.database.windows.net,1433;User Id=MyUser;Password=MyPassword!;Initial Catalog=MyDb;Persist Security Info=False;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;Pooling=true;Max Pool size=100;Min Pool Size=1;ConnectRetryCount=3;ConnectRetryInterval=10;Application Name=ConnTest"))
            {
                await dbConnection.OpenAsync();
                using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
                {
                    s.DestinationTableName = "Parquet";
                    foreach (System.Data.DataColumn column in dataTable.Columns)
                    {
                        s.ColumnMappings.Add(column.ColumnName, column.ColumnName);
                    }

                    await s.WriteToServerAsync(dataTable);
                }
            }
        }
    }
}

 

Conclusion

This C# console application demonstrates an effective workaround for loading data from Parquet files into Azure SQL Database, circumventing the limitations of the BULK INSERT command. By leveraging the .NET ecosystem and the powerful SqlBulkCopy class, developers can facilitate seamless data integration processes, enhancing the interoperability between different data storage formats and Azure SQL Database.

Version history
Last update:
‎Mar 19 2024 12:30 PM
Updated by: