Here is a snippet of the code to write out the Data Frame when using the Spark JDBC connector. We used the batch size of 200,000 rows. Changing the batch size to 50,000 did not produce a material difference in performance.
dfOrders.write.mode("overwrite").format("jdbc")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://server.westus.cloudapp.azure.com;databaseName=TestDB")
.option("dbtable", "TestDB.dbo.orders")
.option("user", "myuser")
.option("batchsize","200000")
.option("password", "MyComplexPassword!001")
.save()
Since the load was taking longer than expected, we examined the sys.dm_exec_requests DMV while load was running, and saw that there was a fair amount of latch contention on various pages, which wouldn’t not be expected if data was being loaded via a bulk API.
Examining the statements being executed, we saw that the JDBC driver uses sp_prepare followed by sp_execute for each inserted row; therefore, the operation is not a bulk insert. One can further example the Spark JDBC connector source code , it builds a batch consisting of singleton insert statements, and then executes the batch via the prep/exec model.
This is an 8-node Spark cluster, each executor with 4 CPU’s and due to sparks default parallelism, there were 32 tasks running simultaneously with multiple insert statements batched together. The primary contention was PAGELATCH_EX and just like any latch contention the more parallel sessions requesting for the same resource, the more the contention.
The SQL Spark connector also uses the Microsoft JDBC driver. However, unlike the Spark JDBC connector, it specifically uses the JDBC SQLServerBulkCopy class to efficiently load data into a SQL Server table. Given that in this case the table is a heap, we also use the TABLOCK hint ( "bulkCopyTableLock" -> "true") in the code below to enable parallel streams to be able to bulk load, as discussed here . It is a best practice to use the BulkCopyMetadata class to define the structure of the table. Otherwise, there is additional overhead querying the database to determine table schema.
import com.microsoft.azure.sqldb.spark.config._
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._
import com.microsoft.azure.sqldb.spark._
import com.microsoft.azure.sqldb.spark.bulkcopy._
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "o_orderkey", java.sql.Types.INTEGER, 0, 0)
bulkCopyMetadata.addColumnMetadata(2, "o_custkey", java.sql.Types.INTEGER, 0, 0)
//trimming other columns for brevity… only showing the first 2 columns being added to BulkCopyMetadata
val bulkCopyConfig = Config(Map(
"url" -> "server.westus.cloudapp.azure.com",
"databaseName" -> "testdb",
"user" -> "denzilr",
"password" -> "MyComplexPassword1!",
"dbTable" -> "dbo.orders",
"bulkCopyBatchSize" -> "200000",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
dfOrders.bulkCopyToSqlDB(bulkCopyConfig)
Looking at the Spark UI, we see a total of 50 tasks that this DataFrame write is broken into, each loading a subset of the data:
Further investigating the statements, we see the familiar INSERT BULK statement, which is an internal statement used by the SQL Server bulk load APIs. This proves that we are indeed using the
SQLBulkCopy API.
There is no longer page latch contention, rather now we are waiting on network IO or client fetching the results
Load performance in the columnstore case will be far worse with the JDBC connector than in the heap case. Given that the JDBC connector emits single row insert statements, all this data lands in the delta store . And we are back with more severe latch contention this time around. You can read about many more details of loading data into columnstore tables in this blog :smiling_face_with_smiling_eyes:.
If we examine the sys.dm_db_column_store_index_physical_stats DMV, we notice that all rows are going into an OPEN rowgroup that is a delta store until that rowgroup is filled up and closed. These rows will then have to be compressed by tuple mover into compressed segments later.
For the bulk load into clustered columnstore table, we adjusted the batch size to 1048576 rows, which is the maximum number of rows per rowgroup, to maximize compression benefits. Having batch size > 102400 rows enables the data to go into a compressed rowgroup directly, bypassing the delta store. Also, you have to set TABLOCK hint to false, else you will be serializing the parallel streams.
import com.microsoft.azure.sqldb.spark.config._
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._
import com.microsoft.azure.sqldb.spark._
import com.microsoft.azure.sqldb.spark.bulkcopy._
val bulkCopyConfig = Config(Map(
"url" -> "server.westus.cloudapp.azure.com",
"databaseName" -> "testdb",
"user" -> "denzilr",
"password" -> "MyComplexPassword1!",
"dbTable" -> "dbo.orders",
"bulkCopyBatchSize" -> "1048576",
"bulkCopyTableLock" -> "false",
"bulkCopyTimeout" -> "600"
))
dfOrders.bulkCopyToSqlDB(bulkCopyConfig)
When bulk loading in parallel into a columnstore table, there are a few considerations:
In this case, you see that rows land in the compressed rowgroup directly. We had 32 parallel streams going against a DS16sV3 VM (64GB Ram). There are cases where having too many parallel streams, as a result of memory requirements for a bulk insert can cause rowgroups to get trimmed due to memory limitations.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.