Lesson Learned #391: Data Loading - Parallel BULK INSERTs with JDBC and Azure Blob Storage
Published Jul 03 2023 11:18 AM 3,004 Views

Today, we got a service request that our customer is running in multiple threads using prepared statement. In this article, I would like explore a Java code snippet used to test a connection to a SQL Server database and execute load bulk data using the BULK INSERT statement. We will explain the purpose and advantages of using the BULK INSERT statement for large-scale data inserts. Additionally, I will examine each part of the code in detail and explain its functionality.

 

Advantages of BULK INSERT

 

The BULK INSERT statement is a powerful feature provided by SQL Server that allows for efficient loading of large volumes of data into a table. Here are some advantages of using the BULK INSERT statement:

  1. Improved Performance: The BULK INSERT statement is optimized for high-performance data loading. It is designed to handle large data sets and can outperform traditional row-by-row inserts performed using regular INSERT statements. This can significantly reduce the time required to load data.

  2. Minimized Logging and Transaction Overhead: When using BULK INSERT, SQL Server minimizes the amount of logging and transactional overhead compared to individual row inserts. It uses a minimal amount of transaction log space and does not generate individual log entries for each row inserted. This improves overall performance and reduces the impact on transaction log resources.

  3. Efficient Data Streaming: The BULK INSERT statement supports various data formats, including CSV (Comma-Separated Values). This allows data to be streamed directly from a file into the database, eliminating the need for parsing and transforming the data within the application code. By leveraging the built-in capabilities of SQL Server, data loading can be performed more efficiently.

  4. Batch Processing: The BULK INSERT statement allows for batch processing of data, which means that multiple rows can be inserted in a single operation. This reduces the overhead of individual round-trips to the database and improves overall performance.

 

 

package testconnectionms;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SQLTestLoadingBulk
{
	static {
		try {
			Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
		} catch (Exception ex) {
			System.err.println("Unable to load JDBC driver");
			ex.printStackTrace();
			System.exit(1);
		}
	}

	public static void main(String[] args)
		throws SQLException
	{

		String usernameSQL      = "UserName";
		String passwordSQL      = "Password";

		String envname    = "env";
		String seconds   = "10";

		String url="";

	   System.out.println("Arguments are (SQL): username="+usernameSQL+",envname="+envname+",seconds="+seconds);
  	   url = String.format("jdbc:sqlserver://servername.database.windows.net:1433;database=dbname;sslProtocol=TLSv1.2;encrypt=true;TrustServerCertificate=True;hostNameInCertificate=*.database.windows.net;loginTimeout=30;", envname,envname);

		System.out.println("-- Connecting to " + url);
        ExecutorService executorService = Executors.newFixedThreadPool(2);				
		
        String insertQuery = "BULK INSERT [dbo].[test_data_DIAG] FROM 'test_data_DIAG_10000000.csv' " +
		                     " WITH (DATA_SOURCE = 'YourDataSource', FORMAT = 'CSV', FIELDTERMINATOR = ','," +
							 " ROWTERMINATOR = '0x0A', FIRSTROW = 2, BATCHSIZE=102400)";
        Connection connection = DriverManager.getConnection(url,usernameSQL, passwordSQL);							 
        Statement statement = connection.createStatement();
        statement.executeUpdate("TRUNCATE TABLE [dbo].[test_data_DIAG]");		
		connection.close();
         List<String> sqlStatements = new ArrayList<>();		
 		 sqlStatements.add(insertQuery);
		 sqlStatements.add(insertQuery);		
        List<CompletableFuture<Void>> futures = new ArrayList<>();
	    try			
		{
            // Execute each SQL statement asynchronously
            for (String sql : sqlStatements) {
			    String JBD = url;
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> executeBulkInsert(JBD, usernameSQL, passwordSQL, sql), executorService);
                futures.add(future);
            }

            // Wait for all CompletableFuture objects to complete
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
		}
		  catch (Exception e) {
            e.printStackTrace();
        } finally {
            // Shut down the executor service
            executorService.shutdown();
        }
	}

    private static void executeBulkInsert(String jdbcUrl, String username, String password, String sql) {
        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password);
             PreparedStatement statement = connection.prepareStatement(sql)) {
            // Execute the bulk insert
            statement.executeUpdate();
            System.out.println("Bulk insert executed: " + sql);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 

 

Understanding the Code

 

The provided Java code as an example demonstrates the usage of the BULK INSERT statement for efficient data loading. Here's a breakdown of the code's functionality:

  1. Connection and Configuration: The code sets up the necessary connection parameters, including the username, password, and connection URL. These parameters are used to establish a connection to the SQL Server database.

  2. Bulk Insert Configuration: The insertQuery variable contains the BULK INSERT statement configuration. It specifies the target table and the source file from which the data will be loaded. Additional options, such as the data source, format, field terminator, and row terminator, are also provided.

  3. Clearing the Target Table: Before performing the bulk insert, the code executes a TRUNCATE TABLE statement to clear the target table. This ensures that the table is empty before loading the data.

  4. Asynchronous Execution: The code leverages the power of asynchronous execution using Java's CompletableFuture and an executor service. It creates a list of CompletableFuture objects, each representing the asynchronous execution of a bulk insert statement. The bulk insert statements are executed concurrently, improving overall performance.

  5. Executing the Bulk Insert: The executeBulkInsert method is invoked for each bulk insert statement. It establishes a connection to the database, prepares the statement, and executes the bulk insert using the provided JDBC URL, username, password, and SQL statement.

 

Definition of the table and data source

 

 

CREATE TABLE [dbo].[test_data_DIAG](
	[Key] [int] NOT NULL,
	[Num_DIAG] [int] NULL,
	[DIAG_01] [varchar](6) NULL,
	[DIAG_02] [varchar](6) NULL,
	[DIAG_03] [varchar](6) NULL,
	[DIAG_04] [varchar](6) NULL,
	[DIAG_05] [varchar](6) NULL,
	[DIAG_06] [varchar](6) NULL,
	[DIAG_07] [varchar](6) NULL,
	[DIAG_08] [varchar](6) NULL,
	[DIAG_09] [varchar](6) NULL,
	[DIAG_10] [varchar](6) NULL,
	[DIAG_11] [varchar](6) NULL,
	[DIAG_12] [varchar](6) NULL,
	[DIAG_13] [varchar](6) NULL,
	[DIAG_14] [varchar](6) NULL,
	[DIAG_15] [varchar](6) NULL,
	[DIAG_16] [varchar](6) NULL,
	[DIAG_17] [varchar](6) NULL,
	[DIAG_18] [varchar](6) NULL,
	[DIAG_19] [varchar](6) NULL,
	[DIAG_20] [varchar](6) NULL) 

create clustered columnstore index [test_data_DIAG1] on [dbo].[test_data_DIAG]

CREATE DATABASE SCOPED CREDENTIAL YourCredential
WITH
    IDENTITY = 'SHARED ACCESS SIGNATURE',
    SECRET = 'sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupiyx&se=2023-07-04T00:38:21Z&st=2023-07-03T16:38:21Z&spr=https&sig=qv2CxZ2Yr%2FhXXXXXXXXXX%3D';

CREATE EXTERNAL DATA SOURCE YourDataSource
WITH (
    TYPE = BLOB_STORAGE,
    LOCATION = 'https://azureblobstorage.blob.core.windows.net/replica',
    CREDENTIAL = YourCredential
);

 

 

Structure of file data 

 

 

Key,Num_DIAG,DIAG_01,DIAG_02,DIAG_03,DIAG_04,DIAG_05,DIAG_06,DIAG_07,DIAG_08,DIAG_09,DIAG_10,DIAG_11,DIAG_12,DIAG_13,DIAG_14,DIAG_15,DIAG_16,DIAG_17,DIAG_18,DIAG_19,DIAG_20
1,0,,,,,,,,,,,,,,,,,,,,
2,0,,,,,,,,,,,,,,,,,,,,
3,0,,,,,,,,,,,,,,,,,,,,
4,0,,,,,,,,,,,,,,,,,,,,
5,0,,,,,,,,,,,,,,,,,,,,
6,0,,,,,,,,,,,,,,,,,,,,
7,0,,,,,,,,,,,,,,,,,,,,
8,0,,,,,,,,,,,,,,,,,,,,
9,0,,,,,,,,,,,,,,,,,,,,
10,0,,,,,,,,,,,,,,,,,,,,

 

 

Enjoy!

Version history
Last update:
‎Jul 03 2023 11:18 AM
Updated by: