Apache Spark in Azure Synapse - Performance Update

Published 03-30-2021 08:35 AM 9,467 Views
Microsoft

Recently, Azure Synapse Analytics has made significant investments in the overall performance for Apache Spark workloads. As Azure Synapse brings the worlds of data warehousing, big data, and data integration into a single unified analytics platform, we continue to invest in improving performance for customers that choose Azure Synapse for multiple types of analytical workloads—including big data workloads.

 

Our efforts have focused on:

  • Query optimization: New query execution elements and better plan selection yielded major gains
  • Autoscaling: Automatically scaling clusters up and down with load, even as a job is running
  • Cluster optimizations: Picking the right VM types, rapid provisioning of VMs
  • Intelligent caching: Using local SSDs, native code, and hardware-assisted parsing
  • Indexing: Optimizing data representation in the data lake for the workload yields significant improvement

 

Query Execution and Optimization

To compare the performance, we derived queries from TPC-DS with 1TB scale and ran them on 8 nodes Azure E8V3 cluster (15 executors – 28g memory, 4 cores). Even though our version running inside Azure Synapse today is a derivative of Apache Spark™ 2.4.4, we compared it with the latest open-source release of Apache Spark™ 3.0.1 and saw Azure Synapse was 2x faster in total runtime for the Test-DS comparison.

 

euanga_1-1617051189256.png

 

euanga_2-1617051200673.png

 

 Also, we observed up to 18x query performance improvement on Azure Synapse compared to the open-source Apache Spark™ 3.0.1.

euanga_3-1617051408517.png

Below are some of the techniques and optimizations we implemented to achieve these results.

 

Scan reuse

Many queries refer to the same table multiple times. This is not a mistake --- self-joins are a common example. But an engine does not actually have to scan the same data multiple times --- it can combine multiple logical scans into a single physical one.

 

This optimization introduces new super operators which can avoid redundant IOs. In a TPC-DS like benchmark, 40% of the queries have redundant IO and a large fraction of them spend half of their time in stages with redundant IOs.

 

For instance, one query shows close to 3x improvement with this optimization.

 

Preaggregation

As part of this optimization, partial aggregation can be pushed down below join and thus help in reducing shuffle and compute.  This is because a pushed-down aggregation can then be partially evaluated on the mappers, thus dramatically reducing the amount of data being shuffled.  And, of course, joining the aggregated results is cheaper than joining the original data.

 

Determining when to deploy this technique is non-trivial.  Sometimes, the aggregation key is nearly unique (e.g. where aggregation is being used to remove duplicates).  In other cases, there is insufficient reduction within each partition, to “pay” the hashing costs of pre-aggregation. 

 

Bloom Filters

When doing a join, it is wasteful to shuffle the rows that are not going to match the other side.  We construct a Bloom filter using one (smaller) side of the join, and then feed it to the other side.  We can then use the Bloom filter to eliminate the large majority of non-matching rows from the shuffle. The gains are amplified for plans with deep join trees because now we are dropping the non-matching rows at the point of the scan rather than processing them in several operators. 

 

Deeper pushdowns

We extended the standard technique of pushing filters below joins to disjunctive filters, greatly increasing its applicability.

 

We now choose sort orders based on the rest of the query plan, eliminating unnecessary shuffles.  And we are capable of pushing aggregations even below anti-semi and left-semi joins. 

 

Autoscaling

Autoscaling is at the heart of the Azure Synapse promise for big data workloads to minimize maintenance of clusters and control costs. When jobs are submitted (or notebooks connected), we launch clusters on demand. We then grow and shrink those clusters as the load changes, and auto-pause them when no longer needed. We then grow and shrink those clusters as the load changes, and auto-pause them when no longer needed. 

 

Autoscaling operates at two levels:

  • Growing and shrinking individual jobs while running, by adding or removing executors when such growth would result in linear scaling.  We do that by constantly examining queues and task histories.
  • Adding or removing VMs to the cluster to accommodate the executor demands of all of its jobs, via smart probes running inside the cluster.

Together, these come close to striking a balance between cost and speed. 

 

For instance, this graph shows the resources that Azure Synapse used over time and the actions it took (automatically) while running several particularly complex TPC-DS derived queries. The pool was configured to automatically scale between 3 and 100 nodes, and the queries were submitted in a tight loop.

 

euanga_4-1617051920552.png

 

 

Cluster Optimizations

For autoscale to be truly efficient, new clusters (and cluster expansions) need to come up very quickly.  A series of investments in Azure Synapse allowed us to cut cluster creation times significantly, to   Let’s look at some of the techniques that were used to achieve this.

 

In Azure Synapse, Apache Spark clusters are created (automatically, on-demand) by the Spark Cluster Service, by provisioning Azure VMs using a Spark image that has necessary binaries already pre-installed.  The virtual machines that host the Spark cluster components are created in Azure Synapse managed system subscriptions and the network interfaces associated with the VMs

 

This behind-the-scenes provisioning allows us to offer several workspace flavors, from Microsoft-managed VNET, to customer-supplied managed VNET, and even an exfiltration-protected VNET. 

 

We worked on the following optimizations in the Spark cluster provisioning and setup flow to improve the cluster startup times.

  1. SKU Selection: Spark cluster is provisioned on optimized Azure SKU that are best fitted for Spark workloads. We are now upgrading we use to one with high i/o throughput the cost and implementation of this are just something that we take care of.
  2. Bulk VM creation: On receiving a cluster creation request, we send a bulk VM creation to Azure. Batching VM creations has improved VM allocation times than individual VM allocations.
  3. Quick Start. Spark Cluster Service waits for at least 3 nodes to heartbeat with initialization response to handover the cluster to Spark Service. Spark Service then submits the spark application to the Livy endpoint of the spark cluster. The Cluster Service adds the rest of the VMs in the background as not all nodes are immediately needed to start the application.
  4. Overprovisioning of VMs: Spark Cluster Service overprovisions VMs to help with any transient VM allocation delays from Azure. The cluster service will always request a few extra VMs from Azure for every cluster and will delete the extra ones after the cluster is provisioned with the required number of nodes. This helped improve faster cluster creation and startup times.
  5. VM image: The image through which the Spark VM is created is a specialized VHD image that is optimized for Spark performance. The image is optimized via a prefetching process that caches blocks accessed during startup and that improved spark VM startup time by 25%.

 

Indexing with Hyperspace

Hyperspace is a project we have recently open-sourced and shipped into Azure Synapse. Hyperspace introduces the ability for users to build multiple types of secondary indexes on their data, maintain them through a multi-user concurrency model, and leverage them automatically – without any change to their application code – for query/workload acceleration. All the meta-data is stored in the lake and available for other engines.

 

Several features of Hyperspace stem from our interaction with various enterprise customers. Some of the key highlights include:

  1. Very simple ways to build indexes on your data (e.g., CSV, JSON, Parquet), including Delta Time Travel feature.
  2. The ability to incrementally refresh an index in the data is suitable for streaming workloads.
  3. An easy-to-use optimize() API to handle index fragmentation.
  4. A novel hybrid scan feature that allows you to exploit indexes even when they are out-of-date.

When running test queries derived from industry-standard TPC benchmarks (Test-H and Test-DS) over 1 TB of Parquet data, we have seen Hyperspace deliver up to 11x acceleration in query performance for individual queries. We ran all benchmark derived queries using open-source Apache Spark™ 2.4 running on a 7-node Azure E8 V3 cluster (7 executors, each executor having 8 cores and 47 GB memory) and a scale factor of 1000 (i.e., 1 TB data).

 

euanga_5-1617052050323.png

 

euanga_6-1617052059503.png

Overall, we have seen an approximate 2x and 1.8x acceleration in query performance time, respectively, all using commodity hardware.

To learn more about Hyperspace, check out our recent presentation at Spark + AI Summit 2020 or better yet, come talk to us on GitHub!

 

Soon: Intelligent Caching

Much as cloud providers aim to optimize their raw storage solutions, there is still a big difference between data stored in a general-purpose remote store and locally in the compute cluster. This is why we built a caching solution for Azure Synapse. It operates inside the cluster and serves several purposes.


First, IO caching. We use the locally-attached NVMe SSDs to avoid repeatedly going to Azure Storage (e.g. ADLS) to fetch the data --- they have orders of magnitude for more IO bandwidth. We cache and thus optimize away file enumerations --- a major time sink in scenarios where large numbers of smallish data files exist. Our caching solution is implemented in native code, mostly for careful memory and IO management.


Second, location awareness. We place data on the nodes that need that (portion of the) file, and then influence Spark scheduling to locate future tasks on nodes that have the data.


But third, we realized that caching original data is often a waste! Most of the time, the data is, at the very least, parsed every time it is read. Why repeat this work every time the cache is accessed?

 

Therefore, our cache is designed to parse the incoming data, and cache in the format that is most amenable to subsequent processing, such as the Spark-native Tungsten.

 

Here, we benefit from running in native code.  We use state-of-the-art SIMD-optimized parsers from Microsoft Research for CSV and JSON files (still the workhorse of Big Data processing).  And we execute it in the download path, hiding the parsing costs under IO.  Going forward, we are aiming to move even more processing into this layer, including filter evaluation and shuffle prep.

 

But why use CPU cores (even highly optimized), when a custom hardware could be faster and cheaper? This brings us to the next section.

 

Hardware

We can use FPGA VM SKUs to accelerate Spark. The data formats CSV, JSON and Parquet account for 90% of our customers' workloads. Based on profiling data we concluded that parsing the CSV and JSON data formats is very CPU intensive --- often 70% to 80% of the query. So, we have accelerated CSV parsing using FPGAs. The FPGA parser reads CSV data, parses it, and generates a VStream formatted binary data whose format is close to Apache Spark internal row format which is Tungsten. The internal raw performance of CSV parser at FPGA level is 8GB/sec, although it's not possible to get all of that in end to end application currently.

 

FPGA choice

We are using a Xilinx accelerator PCIe card. The usable bandwidth is 12GB/sec. It has 64GB of DDR4 memory locally connected to FPGA over 4-channels. The peak write/read bandwidth between FPGA and DDR4 memory is 76.8 GB/sec. But the usable bandwidth is 65.0 GB/sec.


The CSV parsing logic occupies 25% of the FPGA and provides a raw CSV performance of 8.0 GB/sec. The parser is micro-code driven and by changing the micro-program we can handle parsing of JSON too.

 

Conclusion

Performance improvements in Azure Synapse today (from query optimization, to autoscaling, to in-the-lake indexing) make running big data workloads in Azure Synapse both easy and cost-effective.  With on-demand scaling, data professionals do not need to choose between saving time and saving money—they can have both, coupled with the ease-of-use of a fully managed, unified analytics service.

 

 

Get Started Today

Customers with *qualifying subscription types can now try the Apache Spark pool resources in Azure Synapse using free quantities until July 31st, 2021 (up to 120 free vCore-hours per month).

euanga_0-1617077238490.png

*Free quantities apply only to the following subscription types: Pay-As-You-Go, Microsoft Azure Enterprise, Microsoft Azure Plan, Azure in CSP, Enterprise Dev/Test. These included free quantities aggregate at the enrollment level for enterprise agreements and at the subscription level for pay-as-you-go subscriptions.

 

 

 

 

 

 

 

1 Comment
Microsoft

@euanga Very useful information and very nicely explained.. Thanks !!

Co-Authors
Version history
Last update:
‎Mar 29 2021 09:07 PM
Updated by: