Recently, Azure Synapse Analytics has made significant investments in the overall performance for Apache Spark workloads. As Azure Synapse brings the worlds of data warehousing, big data, and data integration into a single unified analytics platform, we continue to invest in improving performance for customers that choose Azure Synapse for multiple types of analytical workloads—including big data workloads.
Our efforts have focused on:
To compare the performance, we derived queries from TPC-DS with 1TB scale and ran them on 8 nodes Azure E8V3 cluster (15 executors – 28g memory, 4 cores). Even though our version running inside Azure Synapse today is a derivative of Apache Spark™ 2.4.4, we compared it with the latest open-source release of Apache Spark™ 3.0.1 and saw Azure Synapse was 2x faster in total runtime for the Test-DS comparison.
Also, we observed up to 18x query performance improvement on Azure Synapse compared to the open-source Apache Spark™ 3.0.1.
Below are some of the techniques and optimizations we implemented to achieve these results.
Many queries refer to the same table multiple times. This is not a mistake --- self-joins are a common example. But an engine does not actually have to scan the same data multiple times --- it can combine multiple logical scans into a single physical one.
This optimization introduces new super operators which can avoid redundant IOs. In a TPC-DS like benchmark, 40% of the queries have redundant IO and a large fraction of them spend half of their time in stages with redundant IOs.
For instance, one query shows close to 3x improvement with this optimization.
As part of this optimization, partial aggregation can be pushed down below join and thus help in reducing shuffle and compute. This is because a pushed-down aggregation can then be partially evaluated on the mappers, thus dramatically reducing the amount of data being shuffled. And, of course, joining the aggregated results is cheaper than joining the original data.
Determining when to deploy this technique is non-trivial. Sometimes, the aggregation key is nearly unique (e.g. where aggregation is being used to remove duplicates). In other cases, there is insufficient reduction within each partition, to “pay” the hashing costs of pre-aggregation.
When doing a join, it is wasteful to shuffle the rows that are not going to match the other side. We construct a Bloom filter using one (smaller) side of the join, and then feed it to the other side. We can then use the Bloom filter to eliminate the large majority of non-matching rows from the shuffle. The gains are amplified for plans with deep join trees because now we are dropping the non-matching rows at the point of the scan rather than processing them in several operators.
We extended the standard technique of pushing filters below joins to disjunctive filters, greatly increasing its applicability.
We now choose sort orders based on the rest of the query plan, eliminating unnecessary shuffles. And we are capable of pushing aggregations even below anti-semi and left-semi joins.
Autoscaling is at the heart of the Azure Synapse promise for big data workloads to minimize maintenance of clusters and control costs. When jobs are submitted (or notebooks connected), we launch clusters on demand. We then grow and shrink those clusters as the load changes, and auto-pause them when no longer needed. We then grow and shrink those clusters as the load changes, and auto-pause them when no longer needed.
Autoscaling operates at two levels:
Together, these come close to striking a balance between cost and speed.
For instance, this graph shows the resources that Azure Synapse used over time and the actions it took (automatically) while running several particularly complex TPC-DS derived queries. The pool was configured to automatically scale between 3 and 100 nodes, and the queries were submitted in a tight loop.
For autoscale to be truly efficient, new clusters (and cluster expansions) need to come up very quickly. A series of investments in Azure Synapse allowed us to cut cluster creation times significantly, to Let’s look at some of the techniques that were used to achieve this.
In Azure Synapse, Apache Spark clusters are created (automatically, on-demand) by the Spark Cluster Service, by provisioning Azure VMs using a Spark image that has necessary binaries already pre-installed. The virtual machines that host the Spark cluster components are created in Azure Synapse managed system subscriptions and the network interfaces associated with the VMs
This behind-the-scenes provisioning allows us to offer several workspace flavors, from Microsoft-managed VNET, to customer-supplied managed VNET, and even an exfiltration-protected VNET.
We worked on the following optimizations in the Spark cluster provisioning and setup flow to improve the cluster startup times.
Hyperspace is a project we have recently open-sourced and shipped into Azure Synapse. Hyperspace introduces the ability for users to build multiple types of secondary indexes on their data, maintain them through a multi-user concurrency model, and leverage them automatically – without any change to their application code – for query/workload acceleration. All the meta-data is stored in the lake and available for other engines.
Several features of Hyperspace stem from our interaction with various enterprise customers. Some of the key highlights include:
When running test queries derived from industry-standard TPC benchmarks (Test-H and Test-DS) over 1 TB of Parquet data, we have seen Hyperspace deliver up to 11x acceleration in query performance for individual queries. We ran all benchmark derived queries using open-source Apache Spark™ 2.4 running on a 7-node Azure E8 V3 cluster (7 executors, each executor having 8 cores and 47 GB memory) and a scale factor of 1000 (i.e., 1 TB data).
Overall, we have seen an approximate 2x and 1.8x acceleration in query performance time, respectively, all using commodity hardware.
Much as cloud providers aim to optimize their raw storage solutions, there is still a big difference between data stored in a general-purpose remote store and locally in the compute cluster. This is why we built a caching solution for Azure Synapse. It operates inside the cluster and serves several purposes.
First, IO caching. We use the locally-attached NVMe SSDs to avoid repeatedly going to Azure Storage (e.g. ADLS) to fetch the data --- they have orders of magnitude for more IO bandwidth. We cache and thus optimize away file enumerations --- a major time sink in scenarios where large numbers of smallish data files exist. Our caching solution is implemented in native code, mostly for careful memory and IO management.
Second, location awareness. We place data on the nodes that need that (portion of the) file, and then influence Spark scheduling to locate future tasks on nodes that have the data.
But third, we realized that caching original data is often a waste! Most of the time, the data is, at the very least, parsed every time it is read. Why repeat this work every time the cache is accessed?
Therefore, our cache is designed to parse the incoming data, and cache in the format that is most amenable to subsequent processing, such as the Spark-native Tungsten.
Here, we benefit from running in native code. We use state-of-the-art SIMD-optimized parsers from Microsoft Research for CSV and JSON files (still the workhorse of Big Data processing). And we execute it in the download path, hiding the parsing costs under IO. Going forward, we are aiming to move even more processing into this layer, including filter evaluation and shuffle prep.
But why use CPU cores (even highly optimized), when a custom hardware could be faster and cheaper? This brings us to the next section.
We can use FPGA VM SKUs to accelerate Spark. The data formats CSV, JSON and Parquet account for 90% of our customers' workloads. Based on profiling data we concluded that parsing the CSV and JSON data formats is very CPU intensive --- often 70% to 80% of the query. So, we have accelerated CSV parsing using FPGAs. The FPGA parser reads CSV data, parses it, and generates a VStream formatted binary data whose format is close to Apache Spark internal row format which is Tungsten. The internal raw performance of CSV parser at FPGA level is 8GB/sec, although it's not possible to get all of that in end to end application currently.
We are using a Xilinx accelerator PCIe card. The usable bandwidth is 12GB/sec. It has 64GB of DDR4 memory locally connected to FPGA over 4-channels. The peak write/read bandwidth between FPGA and DDR4 memory is 76.8 GB/sec. But the usable bandwidth is 65.0 GB/sec.
The CSV parsing logic occupies 25% of the FPGA and provides a raw CSV performance of 8.0 GB/sec. The parser is micro-code driven and by changing the micro-program we can handle parsing of JSON too.
Performance improvements in Azure Synapse today (from query optimization, to autoscaling, to in-the-lake indexing) make running big data workloads in Azure Synapse both easy and cost-effective. With on-demand scaling, data professionals do not need to choose between saving time and saving money—they can have both, coupled with the ease-of-use of a fully managed, unified analytics service.
Customers with *qualifying subscription types can now try the Apache Spark pool resources in Azure Synapse using free quantities until July 31st, 2021 (up to 120 free vCore-hours per month).
*Free quantities apply only to the following subscription types: Pay-As-You-Go, Microsoft Azure Enterprise, Microsoft Azure Plan, Azure in CSP, Enterprise Dev/Test. These included free quantities aggregate at the enrollment level for enterprise agreements and at the subscription level for pay-as-you-go subscriptions.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.