While working on a support case, we faced the need to replicate a substantial volume of data in a table to conduct performance comparisons between two different SQL databases. During this troubleshooting process, we crafted a small yet powerful Python script to accomplish this task, which might be beneficial in similar scenarios. Given that the source table contained several million records, the utilization of multithreading was crucial to handle this massive data efficiently. Moreover, every execution of the script clearly indicated the GUID of the most recently processed data, adding a layer of traceability to our operations. As always, this script serves as an example and should be thoroughly tested in a controlled environment, with users taking responsibility for its deployment.
In the realm of database management, particularly during support and performance tuning tasks, mirroring large datasets between different databases is a common yet complex challenge. Our objective was straightforward but technically demanding: replicate extensive data volumes from one SQL database table to another to facilitate accurate performance benchmarking.
Overview of the Python Script
Our script employs Python’s pyodbc
module, which facilitates interaction with SQL Server databases, and utilizes multi-threading via the concurrent.futures
module to enhance performance. The script features:
Robust Database Connection: Using pyodbc
, the script establishes connections with both source and target databases, implementing retry logic for reliability.
Concurrent Data Processing: The ThreadPoolExecutor
is used for parallel processing, speeding up the data synchronization.
Intelligent Data Handling: The script employs SQL MERGE
statements, ensuring that data is either updated or inserted as needed, enhancing efficiency.
Transaction Management: With autocommit
set to False, the script handles transactions manually, ensuring data integrity.
Process Monitoring: Utilization of GUIDs and timestamps to track the synchronization process.
Script Walkthrough
Connection Configuration:
Total Rows Calculation:
Data Processing in Batches:
Merge Operation:
MERGE
SQL statements, the script ensures that each record is either updated or inserted in Database B, marked with a unique process GUID for traceability.Transaction Management:
Performance Tracking:
-- Structure for Server A
CREATE TABLE [dbo].[Table1](
[ID] [bigint] IDENTITY(1,1) NOT NULL,
[Item1] [nvarchar](200) NULL,
CONSTRAINT [PK__Table1__3214EC2758D12B45] PRIMARY KEY CLUSTERED
(
[ID] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO
DECLARE @count INT = 0;
WHILE @count < 20000
BEGIN
INSERT INTO [dbo].[Table1] ([Item1])
VALUES (CONCAT('Item ', @count + 1));
SET @count = @count + 1;
END;
-- Script for ServerB
CREATE TABLE [dbo].[Table1](
[ID] [bigint] NOT NULL,
[Item1] [nvarchar](200) NULL,
[ProcessGUID] [uniqueidentifier] NULL,
CONSTRAINT [PK__Table1__3214EC27FB61D54C] PRIMARY KEY CLUSTERED
(
[ID] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO
import pyodbc
import time
import sys
import uuid
from concurrent.futures import ThreadPoolExecutor
import datetime
# Configuración de conexión a las bases de datos
conn_str_A = 'DRIVER={ODBC Driver 18 for SQL Server};SERVER=servernamea.database.windows.net;DATABASE=A;UID=user1;PWD=pwd1!;APP=Test-Sync Reader'
conn_str_B = 'DRIVER={ODBC Driver 18 for SQL Server};SERVER=servernameb.database.windows.net;DATABASE=B;UID=user2;PWD=pwd2!;APP=Test-Sync Writer'
def create_connections(conn_str, count, max_retries=3, delay=5):
"""Create a list of database connections with retry policy."""
connections = []
for _ in range(count):
retries = 0
while retries < max_retries:
try:
conn = pyodbc.connect(conn_str, timeout=30,autocommit=False)
connections.append(conn)
break # Exit the retry loop if connection is successful
except pyodbc.Error as e:
print(f"Failed to connect to database: {e}")
retries += 1
if retries >= max_retries:
print("Maximum retry attempts reached. Exiting the application.")
sys.exit(1) # Exit the application if all retries fail
print(f"Retrying in {delay} seconds...")
time.sleep(delay) # Wait for a while before retrying
return connections
def get_total_rows(conn, table_name):
"""Retrieve an estimated number of rows in a table using DMV."""
try:
with conn.cursor() as cursor:
# Query to get an estimated row count
query = f"""
SELECT SUM(row_count)
FROM sys.dm_db_partition_stats
WHERE object_id=OBJECT_ID('{table_name}')
AND (index_id=0 OR index_id=1);
"""
cursor.execute(query)
result = cursor.fetchone()
if result and result[0]:
return result[0]
else:
print("No rows found or unable to retrieve row count.")
sys.exit(1)
except pyodbc.Error as e:
print(f"Error retrieving row count: {e}")
sys.exit(1)
def process_block_A_to_B(conn_A, conn_B, offset, limit, process_guid, batch_size=10000):
"""Process a block of data from Database A to B in batches using SQL MERGE."""
records = []
with conn_A.cursor() as cursor_A:
print(f"Thread: {offset} {limit}")
cursor_A.execute(f"SELECT ID, Item1 FROM Table1 ORDER BY ID OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY")
for id, item1 in cursor_A:
records.append((id, item1, process_guid))
if len(records) == batch_size:
merge_into_database_B(conn_B, records)
records = []
if records:
merge_into_database_B(conn_B, records)
def merge_into_database_B(conn, records):
"""Merge records into Database B within a single transaction using a process GUID."""
with conn.cursor() as cursor:
try:
cursor.fast_executemany = True
cursor.execute("BEGIN TRANSACTION")
cursor.execute("CREATE TABLE #TempTable (ID bigint, Item1 nvarchar(200), ProcessGUID uniqueidentifier)")
cursor.executemany("INSERT INTO #TempTable (ID, Item1, ProcessGUID) VALUES (?, ?, ?)", records)
conn.commit()
except Exception as e:
print(f"Error during table creation and data insertion: {e}")
conn.rollback()
return
try:
cursor.execute("BEGIN TRANSACTION")
merge_query = """
MERGE INTO Table1 AS Target
USING #TempTable AS Source
ON Target.ID = Source.ID
WHEN MATCHED THEN
UPDATE SET Target.Item1 = Source.Item1, Target.ProcessGUID = Source.ProcessGUID
WHEN NOT MATCHED THEN
INSERT (ID, Item1, ProcessGUID) VALUES (Source.ID, Source.Item1, Source.ProcessGUID);
"""
cursor.execute(merge_query)
conn.commit()
except Exception as e:
print(f"Error during MERGE operation: {e}")
conn.rollback()
finally:
cursor.execute("DROP TABLE #TempTable")
def main():
start_time = datetime.datetime.now()
print(f"Process started at: {start_time}")
num_workers = 10
process_guid = generate_guid()
print(f"Process GUID: {process_guid}")
connections_A = create_connections(conn_str_A, num_workers)
connections_B = create_connections(conn_str_B, num_workers)
total_rows_A = get_total_rows(connections_A[0], "Table1")
limit_A = (total_rows_A + num_workers - 1) // num_workers
with ThreadPoolExecutor(num_workers) as executor:
for i in range(num_workers):
offset_A = i * limit_A
# Ajustar el límite para el último worker
if i == num_workers - 1:
actual_limit_A = total_rows_A - offset_A
else:
actual_limit_A = limit_A
executor.submit(process_block_A_to_B, connections_A[i], connections_B[i], offset_A, actual_limit_A, process_guid)
end_time = datetime.datetime.now()
print(f"Process completed at: {end_time}")
print(f"Total duration: {end_time - start_time}")
def generate_guid():
"""Generate a new GUID."""
return uuid.uuid4()
if __name__ == "__main__":
main()
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.