Today, We've been working on a service request that our customer wants to improve the performance of a bulk insert process. Following, I would like to share my experience working on that.
Our customer mentioned that inserting data (100.000 rows) is taking 14 seconds in a database in Business Critical. I was able to reproduce this time using a single thread using a table with 20 columns.
In order to improve this Python code, I suggested to run in parallel this bulk insert every batch size of 10.000 rows and also, I followed the best practices reducing the execution time of this process:
- Client Virtual Machine level:
- Accelerated networking enabled.
- Depending how many parallel process that I needed create a CPU/Vcore, in this case, 10 vCores.
- Placed the virtual machine in the same region that the DB is.
- Database level:
- Create a table with 20 columns.
- As the PK is a sequential key I included in the clustered index definition the parameter OPTIMIZE_FOR_SEQUENTIAL_KEY = ON
- Configure the same number of CPU/vCores with the maximum number of parallel process that I would like to have. In this case, 10 vCores.
- Depeding on amount of data use Business Critical to reduce the storage latency.
- Python code level:
- Using executemany method in order to reduce the network roundtrips, sending only the value of the parameters.
- Running in batches (1000,10000) instead a single process.
- Use SET NOCOUNT ON to reduce the replied response/rowset about how many rows were inserted.
- In the connectionstring use autocommit=False
Example of python code that you could find here. This Python reads a CSV file and for every 10000 rows execute a bulk insert using thread pool.
import csv
import pyodbc
import threading
import os
import datetime
class ThreadsOrder: #Class to run in parallel the process.
def ExecuteSQL(self,a,s,n):
TExecutor = threading.Thread(target=ExecuteSQL,args=(a,s,n,))
TExecutor.start()
def SaveResults( Message, bSaveFile): #Save the details of the file.
try:
print(Message)
if (bSaveFile==True):
file_object = open(filelog, "a")
file_object.write(datetime.datetime.strftime(datetime.datetime.now(), '%d/%m/%y %H:%M:%S') + '-' + Message + '\n' )
file_object.close()
except BaseException as e:
print('And error occurred - ' , format(e))
def ExecuteSQLcc(sTableName):
try:
cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600)
cursor = cnxn1.cursor()
cursor.execute("DROP TABLE IF EXISTS" + sTableName )
cursor.commit()
cursor.execute("CREATE TABLE " + sTableName + " (" \
" [Key] [int] NOT NULL," \
" [Num_TEST] [int] NULL," \
" [TEST_01] [varchar](6) NULL," \
" [TEST_02] [varchar](6) NULL," \
" [TEST_03] [varchar](6) NULL," \
" [TEST_04] [varchar](6) NULL," \
" [TEST_05] [varchar](6) NULL," \
" [TEST_06] [varchar](6) NULL," \
" [TEST_07] [varchar](6) NULL," \
" [TEST_08] [varchar](6) NULL," \
" [TEST_09] [varchar](6) NULL," \
" [TEST_10] [varchar](6) NULL," \
" [TEST_11] [varchar](6) NULL," \
" [TEST_12] [varchar](6) NULL," \
" [TEST_13] [varchar](6) NULL," \
" [TEST_14] [varchar](6) NULL," \
" [TEST_15] [varchar](6) NULL," \
" [TEST_16] [varchar](6) NULL," \
" [TEST_17] [varchar](6) NULL," \
" [TEST_18] [varchar](6) NULL," \
" [TEST_19] [varchar](6) NULL," \
" [TEST_20] [varchar](6) NULL)")
cursor.commit()
cursor.execute("CREATE CLUSTERED INDEX [ix_ms_example] ON " + sTableName + " ([Key] ASC) WITH (STATISTICS_NORECOMPUTE = OFF, DROP_EXISTING = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = ON) ON [PRIMARY]")
cursor.commit()
except BaseException as e:
SaveResults('Executing SQL - an error occurred - ' + format(e),True)
def ExecuteSQL(a,sTableName,n):
try:
Before = datetime.datetime.now()
if n==-1:
sTypeProcess = "NoAsync"
else:
sTypeProcess="Async - Thread:" + str(n)
SaveResults('Executing at ' + str(Before) + " Process Type: " + sTypeProcess, True )
cnxn1 = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" +SQL_user+';PWD='+ SQL_password, autocommit=False, Timeout=3600)
cursor = cnxn1.cursor()
cursor.fast_executemany = True
cursor.executemany("SET NOCOUNT ON;INSERT INTO " + sTableName +" ([Key], Num_TEST, TEST_01, TEST_02, TEST_03, TEST_04, TEST_05, TEST_06, TEST_07, TEST_08, TEST_09, TEST_10, TEST_11, TEST_12, TEST_13, TEST_14, TEST_15, TEST_16, TEST_17, TEST_18, TEST_19, TEST_20) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",a)
cursor.commit()
SaveResults('Time Difference INSERT process ' + str(datetime.datetime.now() - Before) + " " + sTypeProcess, True )
except BaseException as e:
SaveResults('Executing SQL - an error occurred - ' + format(e),True)
#Connectivity details.
SQL_server = 'tcp:servername.database.windows.net,1433'
SQL_database = 'databasename'
SQL_user = 'username'
SQL_password = 'password'
#file details to read
filepath = 'c:\\k\\' ##To Read the demo file
filelog = filepath + '\\Error.log' #Save the log.
chunksize = 10000 #Transaction batch rows.
sTableName = "[test_data]" #Table Name (dummy)
pThreadOrder = ThreadsOrder()
nThread = 0 #Number of Threads -- Right now, we provided an unlimited threads.
ExecuteSQLcc(sTableName)
Before = datetime.datetime.now()
line_count = 0
for directory, subdirectories, files in os.walk(filepath):
for file in files:
name, ext = os.path.splitext(file)
if ext == '.csv':
a=[]
SaveResults('Reading the file ' + name ,True)
BeforeFile= datetime.datetime.now()
with open(os.path.join(directory,file), mode='r') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
line_count+= 1
if line_count>1:
a.append(row)
if (line_count%chunksize)==0:
deltaFile = datetime.datetime.now() - BeforeFile
nThread=nThread+1
SaveResults('Time Difference Reading file is ' + str(deltaFile) + ' for ' + str(line_count) + ' rows', True )
pThreadOrder.ExecuteSQL(a,sTableName,nThread) #Open a new theard per transaction batch size.
#ExecuteSQL(a,sTableName,-1)
a=[]
BeforeFile= datetime.datetime.now()
SaveResults('Total Time Difference Reading file is ' + str(datetime.datetime.now() - Before) + ' for ' + str(line_count) + ' rows for the file: ' + name , True )
During the execution if you need to know the connections, number of rows and the impact in terms of resources see the following TSQL
SELECT
substring(REPLACE(REPLACE(SUBSTRING(ST.text, (req.statement_start_offset/2) + 1, (
(CASE statement_end_offset WHEN -1 THEN DATALENGTH(ST.text) ELSE req.statement_end_offset END
- req.statement_start_offset)/2) + 1) , CHAR(10), ' '), CHAR(13), ' '), 1, 512) AS statement_text
,req.database_id
,program_name
,req.session_id
, req.cpu_time 'cpu_time_ms'
, req.status
, wait_time
, wait_resource
, wait_type
, last_wait_type
, req.total_elapsed_time
, total_scheduled_time
, req.row_count as [Row Count]
, command
, scheduler_id
, memory_usage
, req.writes
, req.reads
, req.logical_reads, blocking_session_id
FROM sys.dm_exec_requests AS req
inner join sys.dm_exec_sessions as sess on sess.session_id = req.session_id
CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) as ST
where req.session_id <> @@SPID
select count(*) from test_data
select * from sys.dm_db_resource_stats order by end_time desc
Enjoy!