hdinsight
52 TopicsHDInsight - Iceberg Open-Source Table Format
Author(s): Arun Sethia is a Program Manager in Azure HDInsight Customer Success Engineering (CSE) team. Introduction In my previous blog, we talked about leveraging Delta Lake on Azure HDInsight; this blog is dedicated to Iceberg open source table format. At this moment, HDInsight does not support Iceberg out of the box, so we will see how someone can leverage Iceberg Open Table Format in this blog. We will also see code examples to use tables from three data sources: Delta, Iceberg, and Spark native parquet files. A quick touch on Apache Iceberg - Apache Iceberg is an open-source table format that provides a transactional and versioned storage layer for big data analytics needs, originally developed to address issues in Apache Hive. It provides an alternative to Spark's default Parquet-based storage. It adds tables to compute engines, including Spark, Trino, Flink, Hive, etc., using a high-performance table format that works like a SQL table. Few silent features: Support Schema evolution without any side-effects Evolve the partition layout based on data volume and query pattern. Hidden Partition – There is no need to maintain a partition column (by transformation); it helps users not need to supply partition layout information when querying Iceberg tables. Support of time travel; useful to examine changes over time. Maintaining versions helps users to correct problems by resetting tables to a stable data version. Apache Iceberg provides an easy way to extend Spark with table specifications. In the next few sections, we will understand how to extend Spark (limited to Spark SQL) and configure the Apache Iceberg catalog with HDInsight. Spark Extension Before we jump into how to use Iceberg with HDInsight 5.0 and Spark, let us spend some time understanding a few concepts related to Spark extensions. It can help with the why and how part of the Iceberg configuration with HDInsight. Using SparkSessionExtensions provides various extension points to extend the Spark Session. The Spark SQL extension is a custom class to extend the behavior of the Spark SQL. The Spark SQL extension can be configured using the configuration property spark.sql.extensions. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. With these extensions, you can extend and customize the behavior of Spark SQL. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. Some common Spark SQL extensions are CSV data source, Avro data source, query optimization rules, etc. Such custom application-specific Spark configuration can be made in multiple ways; based on your deployment methods: Either by passing the configuration part of the Spark-Submit command Using the configuration magic command from Jupyter or Programmatically from the Spark job code with the SparkSession object. Spark Catalog Spark Catalog is a centralized metadata repository that provides information on data and metadata in Spark. It stores information about data stored in Spark, tables, databases, partitions, etc. It supports SQL operations on metadata, such as CREATE table, ALTER table, etc. By default, Spark comes with two catalogs, Hive and In-memory. The Hive catalog is used to store metadata in a Hive Metastore, and the in-memory catalog is used to store metadata in-memory. Spark and Hive use independent catalogs to access tables created using Spark SQL or Hive tables. A table created by Spark resides in the Spark catalog. A table created from Hive resides in the Hive catalog. We can change the Spark default catalog to the Hive catalog using the configuration property metastore.catalog.default with the value Hive. In such case, your Spark tables are managed tables in the Hive catalog and must be transaction enabled, or you can have external tables without transaction enabled. The Spark configuration property 'spark.sql.catalog' configures the list of catalogs available in Spark SQL. It allows us to configure multiple catalogs and specify the default catalog in Spark SQL, the configuration property spark.sql.defaultCatalog allows you to set the default catalog. We can create custom catalogs by implementing the Catalog interface in Spark and registering the catalog with Spark using the "spark.sql.catalog.<catalog name>" configuration parameter, where “<catalog name >” is a unique identifier for your custom catalog. In addition to managing metadata, Spark persists Spark SQL data at the warehouse directory configured using spark.sql.warehouse.dir. Iceberg Extension Iceberg core library components enable integration with compute engines like Spark, Flink, etc. These connectors are maintained in the iceberg repository and they are built for multiple versions. Iceberg provides a runtime connector for different versions of Spark. The runtime jar (iceberg-spark-runtime) is the only addition to the classpath needed to use the Iceberg open-source table format. The Iceberg Spark connector provides an extension for Spark SQL by IcebergSparkSessionExtensions class; it adds support for Iceberg tables. The extension allows users to interact with Iceberg tables in Spark SQL using DataFrame and SQL APIs for Parquet tables. The following table provides Iceberg runtime compatibility matrix w.r.t Spark version: Spark Version Latest Iceberg Support Gradle 2.4 1.1.0 org.apache.iceberg:iceberg-spark-runtime-2.4:1.1.0 3.0 1.0.0 org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:1.0.0 3.1 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 3.2 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0 3.3 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0 Using HDInsight, you can configure the connector either part of your uber jar (as dependent jar) or part of the Spark Submit command and SQL extension part of your application code or via Spark Submit. Configure Iceberg Runtime Connector & Extension Using Spark-Submit Provide Iceberg runtime connector dependency by supplying a Maven coordinate with –packages and provide Spark SQL extension using spark configuration property. spark-submit –packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions Using Application Code Provide Iceberg runtime connector as a maven dependency to your application pom.xml file. <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-${spark.major.version}_${scala.major.version}</artifactId> <version>1.1.0</version> </dependency> We can use SparkConf to set up the Iceberg Spark SQL extension and Iceberg runtime connector jar. val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0") Use SparkConf to build the Spark Session. val spark = SparkSession .builder() .config(sparkConf) Using Jupyter Notebook %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } } Iceberg Spark SQL Catalog Iceberg supplies the following Spark SQL catalog implementations: org.apache.iceberg.spark.SparkSessionCatalog – Support for Iceberg tables to Spark’s built-in catalog, and delegates to the built-in catalog for non-Iceberg tables org.apache.iceberg.spark.SparkCatalog – It supports a Hive Metastore or a Hadoop warehouse as a catalog Spark session Catalog Iceberg table supports Spark’s built-in catalog; we can configure spark_catalog to use Iceberg’s SparkSessionCatalog. The Iceberg session catalog loads non-Iceberg tables using the spark catalog (built-in catalog). The catalog type hive defines that use Hive Catalog for Iceberg tables, and non-Iceberg tables will be created in the default spark catalog. spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type = hive You can provide these configurations based on your application development and deployment method set by your enterprise: If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters. From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf From Jupyter notebook, you can use configure magic command as follows. %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog", "spark.sql.catalog.spark_catalog.type":"hive" } } For example, if you run this code with the above configuration, you should see `iceberg_table` created in Hive Catalog as an external table and `spark_table` in Spark Catalog as a managed table. spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (id string, creation_date string, last_update_time string) USING iceberg""") spark.sql("""CREATE TABLE IF NOT EXISTS spark_table (id string, creation_date string, last_update_time string)""") You can use the following query on HMS to find the catalog name for `iceberg_table` and `spark_table`: SELECT t.TBL_NAME,dbc.ctgName FROM TBLS t INNER JOIN (SELECT c.Name as ctgName, d.DB_ID as dbId FROM CTLGS c , DBS d WHERE d.CTLG_NAME = c.NAME ) dbc ON dbc.dbId=t.DB_ID WHERE TBL_NAME IN ('iceberg_table','spark_table') The output will be: Custom Catalog Iceberg supports multiple data catalog types such as Hive, Hadoop, JDBC, or custom catalog implementations. These catalogs are configured using the Hadoop configuration property spark.sql.catalog.<<catalogname>> and the type of catalog using spark.sql.catalog.<<catalogname>>.type. Hive Catalog Hadoop Catalog spark.sql.catalog.<<catalogname>>=org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<<catalogname>>.type=hive spark.sql.catalog.<<catalogname>>.uri = thrift://metastore-host:port spark.sql.catalog.<<catalogname>>.warehouse=abfs://<<warehouse path>> spark.sql.catalog.iceberg_hadoop =org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_hadoop.type=hadoop spark.sql.catalog.iceberg_hadoop.warehouse=abfs://<<warehouse path>> The catalog.uri is optional; by default, it will use hive.metastore.uris from hive-site.xml (from Ambari -> Hive -> Advanced). The catalog.warehouse is optional; by default, it will use ‘Hive Metastore Warehouse directory’ from hive-site.xml (from Ambari -> Hive -> Advanced). You can provide these configurations based on your application development and deployment method set by your enterprise: If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters. From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf From Jupyter notebook, you can use configure magic command as follows (in this case we are using Hadoop catalog): %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog", "spark.sql.catalog.iceberg":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.iceberg.type":"hadoop", "spark.sql.catalog.iceberg.warehouse":"/iceberg/warehouse" } } Iceberg Hive Tables With Custom Catalog The current HDInsight supports Apache Hive version 3.1.1; if you want to use Iceberg tables from Hive (catalog type hive), the HiveIcebergStorageHandler and supporting classes need to be made available on Hive’s classpath. These classes are available as part of Iceberg-Hive runtime jar. You can add the Iceberg-Hive runtime jar by including the jar file to Hive’s auxiliary classpath, so it is available by default, or if you want to use Hive shell, then this can be achieved by stating so: add jar /<<path>>/iceberg-hive-runtime-1.1.0.jar; Iceberg Hive Tables using a custom catalog can be created in two ways: Using Hive DDL Command from hive-cli or beeline CREATE EXTERNAL TABLE customer ( id bigint, name string ) PARTITIONED BY ( state string ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' TBLPROPERTIES ('iceberg.catalog'='iceberg'); The Table property 'iceberg.catalog'='iceberg' set table catalog to 'iceberg'. Using Iceberg Hive Catalog Java API – Example code is available at git repository. import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.types.Types import org.apache.iceberg.{PartitionSpec, TableProperties, Schema => IcebergSchema} import org.apache.iceberg.CatalogProperties import org.apache.spark.sql.SparkSession val catalogName = "iceberg" val nameSpace = "default" val tableName = "customer" def createTableByHiveCatalog(spark: SparkSession): Unit = { import scala.collection.JavaConverters._ // table specification starts val schema= new IcebergSchema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "name", Types.StringType.get()), Types.NestedField.required(3, "state", Types.StringType.get()) ) val spec = PartitionSpec.builderFor(schema).bucket("state", 128).build() import org.apache.iceberg.catalog.TableIdentifier val tableIdentifier: TableIdentifier = TableIdentifier.of(nameSpace,tableName) val tblProperties = Map(TableProperties.ENGINE_HIVE_ENABLED->"true","iceberg.catalog"->"iceberg") // table specification ends val catalog = new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration) val properties = Map(CatalogProperties.WAREHOUSE_LOCATION->"/iceberg/warehouse/") catalog.initialize(catalogName, properties.asJava) catalog.createTable(tableIdentifier, schema, spec,s"/iceberg/warehouse/${tableName}",tblProperties.asJava) } Example - Delta, Iceberg and Spark Parquet Let us jump into the example code; in this example code, we will create sample data for Product master, Sales, and Return transactions using Mockneat. These data are stored in three file formats – Spark Parquet, Iceberg Parquet, and Delta Parquet. The scenario setup is as follows: The sample code for using Jupyter notebook is available at hdinotebookexamples. Limitation: There are limitations; at this moment, Delta Lake and Hudi currently do not support custom catalogs (out of the box). However, since Iceberg supports a custom catalog, the option is only to use spark_catalog to DeltaCatalog and create a custom catalog for Iceberg. References: https://iceberg.apache.org/ https://github.com/apache/hudi/issues/5537 HDInsight Open Table Formats: Hudi on HDInsight - https://murggu.medium.com/apache-hudi-on-hdinsight-8d981269a97a Delta Lake on HDInsight - Delta Lake on HDInsight - Microsoft Community Hub9.6KViews3likes0CommentsDelta Lake on HDInsight
Introduction Azure HDInsight is a managed, full-spectrum, open-source analytics service in the cloud for enterprises. HDInsight Apache Spark cluster is parallel processing framework that supports in-memory processing, it is based on Open-Source Apache Spark. Apache Spark is evolving; it’s efficiency and ease of use makes it a preferred big data tool among big data engineers and data scientists. There are few essential features missing from the Spark, one of them is A(Atomicity)C(Consistency)I(Isolation)D(Durability) transaction. Majority of databases supports ACID feature out of the box, when it comes to Storage layer (ADLS Gen2) it is hard to support similar level of ACID feature provided by databases. Delta Lake is a storage layer that brings ACID transactions to Apache Spark and big data workloads - for both streaming and batch operations. Delta Lake uses versioned Parquet files to store your data in your cloud storage. Apart from the versions, Delta Lake also stores a transaction log to keep track of all the commits made to the table or blob store directory to provide ACID transactions. This blog is not about Delta Lake; we will talk more about how you can leverage delta with HDInsight Spark Cluster, few code snippet and require configurations. Before we jump into code and require configurations, it is good for you check your Spark version from Ambari user interface from the HDI cluster. You need to pick the right delta lake versions based on your cluster Spark Version. The following table lists Delta Lake versions and their compatible Apache Spark versions: HDI Version Spark Version Delta Lake Version API URL 4.0 Spark 2.4.4 < 0.7.0 0.6.1 API Doc 5.0 Spark 3.1.2 1.0.x 1.0.1 API Doc HDInsight - Delta Lake Configuration Before we jump into code and configurations; we need to look into the below mentioned extendibility configurations provided by Spark: spark.sql.extensions – It is used to configure Spark Session extensions, by providing the name of the extension class. spark.sql.catalog.spark_catalog – This plugin configuration is used to configure custom catalog implementation. You can find the current catalog implementation from CatalogManager spark.sessionState.catalogManager.currentCatalog. The Spark 3.x uses SessionCatalog as default catalog. When you would like to use Delta Lake on Spark 3.x on HDI 5.0, you need to configure sql extensions and delta lake catalog with following values: Configuration Property Delta Lake Value Description spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension An extension for Spark SQL to activate Delta SQL parser to support Delta SQL grammar. spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog This replaces Spark’s default catalog by Delta Lake DeltaCatalog. The above configurations need to be provided as part of the Spark Configuration before any Spark session is created. Apart from the above Spark configurations, the Spark Application uber jar should provide Delta Lake dependency. Working with Spark 2.4.x with HDI 4.0 we just need to supply Delta Lake dependency, no additional spark configurations. To avoid class loading conflicts due to duplicate classes on the cluster classpath, we need to use the maven-shade-plugin to create an uber-jar with jackson dependencies. Example Code You can clone the example code from GitHub, the code is written in Scala. You can run example code using anyone of this option: Copy the application jar to the Azure Storage blob associated with the cluster. SSH to Headnode and run Spark-Submit from the headnode Or Using Livy API or Use Azure Toolkit for IntelliJ The example application will generate stdout logs and delta lake parquet files with commit logs. The output examples are listed on GitHub. Summary Delta Lake is an open-source storage framework that extends parquet data files with a file-based transaction log for ACID transactions and scalable metadata handling. Delta lake is fully compatible with Apache Spark APIs. Since the HDInsight Spark cluster is an installation of the Apache Spark library onto an HDInsight Hadoop cluster, the user can use compatible Delta Lake versions to take benefits of Delta Lake on HDInsight.5.7KViews3likes0CommentsHDInsight Tools for Visual Studio Code: Create, Run and Debug Notebook
We are pleased to announce the Visual Studio Code Notebook support for HDInsight clusters in the HDInsight Spark & Hive Extension. The new feature facilitates you to perform Jupyter like Notebook operations and boosts collaborations with one-click conversion between IPYNB and PY files. The Visual Studio Code Notebook supports working with Jupyter Notebooks natively, allowing you to create a new IPYNB Notebook, open an existing Notebook, and perform cell level operations for code and markdown cells. Moreover, you can fully enjoy the language service for Python, debug your Notebook, and watch variables. The HDInsight Spark & Hive extension also delivers seamless integration with HDInsight clusters. You can quickly access to HDInsight standard Clusters, ESP Clusters and HIB clusters through Azure sign in or link to a cluster manually for PySpark interactive query and batch job submission. Key customer benefits Single Azure sign-in to access HDInsight clusters including ESP and HIB clusters. Create, open, and save a Jupyter Notebook (IPYNB) Work with cells in the Notebook Editor IntelliSense support in the Jupyter Notebook Editor View, inspect, and filter variables through the Variable explorer and Data viewer Debug a Jupyter Notebook Run Notebook against HDInsight clusters for PySpark query. How to get started First, install Visual Studio Code and download Mono 4.2.x (for Linux and Mac). Then, get the latest HDInsight Tools by going to the Visual Studio Code Extension repository or the Visual Studio Code Marketplace and searching Spark & Hive Tools. For more information about the HDInsight Spark & Hive Tools for Visual Studio Code, please see the following resources: Visual Studio Code Notebook: Working with Jupyter Notebooks in Visual Studio Code User Manual: Spark & Hive Tools in Visual Studio Code User Manual: Set Up PySpark Interactive Environment If you have questions, feedback, comments, or bug reports, please send a note to hdivstool@microsoft.com. Announcements Big Data HDInsight Spark Hive Visual Studio Code2.9KViews3likes0CommentsEnhanced autoscale capabilities in HDInsight clusters
HDInsight now has enhanced capabilities which include improved latency, and feedback loop alongside support for recommissioning nodemanagers in case of load-aware autoscale which improves cluster utilization massively and lowers the total cost of ownership significantly.5KViews2likes0CommentsBest Practices and sizing details for Hiveserver2 Interactive and LLAP daemons
Introduction: This document provides information about overall working of LLAP daemons and performance considerations. In general, performance tuning is an ongoing act and there is no silver bullet to solve all your performance issues. Performance improvements should be done in two stages, performance analysis and performance tuning. In the performance analysis step, goal is to find the bottlenecks and areas to focus; and the second step is do incremental tuning for the bottlenecks identified. LLAP Architecture/Components: LLAP execution and sizing configurations: The following are some of the configurations that can be adjusted based on your workload for improved performance. Please refer to Azure HDInsight LLAP sizing guide for detailed information here. Use dedicated YARN queue for LLAP daemons, this ensures the priority scheduling and resource management for other applications. Total Memory available for LLAP queue per worker node can be calculated as follows: This value depends on the total amount of memory available for all YARN containers on a node(yarn.nodemanager.resource.memory-mb) and the percentage of capacity configured for llap queue (yarn.scheduler.capacity.root.llap.capacity). Total memory for LLAP queue on worker node = Total memory available for all YARN containers on a node x Percentage of capacity for llap queue. Please note: If your workload is read-only operations, then setting it as high as 90% of the capacity should work. However, if your workload is mix of update/delete/merge operations using managed tables, it's recommended to give 85% of the capacity for llap queue. The remaining 15% capacity can be used by other tasks such as compaction etc. to allocate containers from the default queue. That way tasks in the default queue won't deprive YARN of resources. For E32 v3 worker, this value is (200 GB x 0.85) = 170 GB. Configure number of concurrent queries same to the for the peak usage scenario to avoid manual intervention because Autoscale events don't change the Hive configuration Maximum Total Concurrent Queries in Ambari. This means that the Hive Server 2 Interactive Service can handle only the given number of concurrent queries at any point of time even if the Interactive Query daemons count are scaled up and down based on load and schedule. Note (Limitation wrt AutoScaling): In case of autoscale-enabled Interactive Query clusters, an autoscale up/down event also scales up/down the number of Interactive Query daemons to the number of active worker nodes. The change in the number of daemons is not persisted in the num_llap_nodes configuration in Ambari. If Hive services are restarted manually, the number of Interactive Query daemons is reset as per the configuration in Ambari. If the Interactive Query service is manually restarted, you need to manually change the num_llap_node configuration (the number of node(s) needed to run the Hive Interactive Query daemon) under Advanced hive-interactive-env to match the current active worker node count. Configure yarn.scheduler.maximum-allocation-vcores. This value indicates the maximum number of virtual CPU cores for every container request at the Resource Manager. Requesting a higher number of vcores than this value won't take effect. It's a global property of the YARN scheduler. For LLAP daemon container, this value can be set to 75% of total available vcores. The remaining 25% should be reserved for Node Manager, Data Node, and other services running on the worker nodes. For E32 v3 VMs and 75% of total 32 vcores can be used by LLAP daemon container, the recommended value is 24. Configure LLAP daemon size: LLAP daemon container size = (Total memory for LLAP queue on a workernode) – (Tez AM memory per node) - (Service Master container size) There is only one Service Master (Application Master for LLAP service) on the cluster spawned on one of the worker nodes. For calculation purpose, we consider one service master per worker node. For E32 v3 worker node, HDI 4.0 - the recommended value is (170 GB - 4 GB - 1 GB)) = 165 GB Number of executors per LLAP daemon: This value depends on the number of vcores, the amount of memory used per executor, and the amount of total memory available for LLAP daemon container. The number of executors can be oversubscribed to 120% of available vcores per worker node. For E32 v3, the recommended value for num of executors is (32 vcores x 120%) ~= 38 on each worker node considering 3GB per executor. Configure hive.llap.io.threadpool.size to value same as number of executors. Recommendation is to monitor heap usage at peak workloads and increase the number of executors making sure that Number of executors times heap usage per executor does not exceed total llap daemon heap size. For example, let's assume that the LLAP daemon is configured for 20 executors and 80 GB of heap size, but your max heap space usage is 60GB. Then the average use per executor is 3 GB (60 GB divided by 20 executors). In such cases, your configured heap is underutilized, and you still have a 20GB room (80 GB – 60 GB). You can increase number of executors by 6 (20 GB/3GB ~= 6) from 20 to 26. This provides increased task parallelism and may result into higher throughput. A thumb rule would be, for each LLAP daemon, (Number of executors x Heap size per executor (Tez container size)) <= (total heap size for LLAP daemon) You can get the information for heap space usage and executor’s usage from Grafana dashboards for each individual daemon as well as the average for all daemons. Pick the max value for calculations. Increasing number of executors beyond the above-mentioned thumb rule may provide you better performance at a times but can also result into occasional failures and instability. Please refer to HDInsight Hive/LLAP sizing guide for details. Configuration: hive.llap.daemon.yarn.container.mb: LLAP daemon runs as a YARN container on each worker node. The total memory size for the LLAP daemon container depends on the following factors. LLAP daemon container memory consists of the following components : Head room Heap memory used by executors (Xmx) In-memory cache per daemon (its off-heap memory size, not applicable when SSD cache is enabled) In-memory cache metadata size (applicable only when SSD cache is enabled) Headroom size: This size indicates a portion of off-heap memory used for Java VM overhead (metaspace, threads stack, GC, data structures, etc.). Generally, this overhead is about 6% of the heap size (Xmx). To be on the safer side, this value can be calculated as 6% of total LLAP daemon memory size. For E32 v3, the recommended value is ceil(170 GB x 0.06) ~= 10 GB. Heap size(Xmx😞 It is amount of heap memory available for all executors. Total Heap size = Number of executors x 3 GB For E32 v3, this value is 38 x 3 GB = 114 GB LLAP cache size: LLAP supports both in-memory (RAM) and SSD cache. The cache size calculation would differ when SSD cache is enabled. Setting hive.llap.io.allocator.mmap = true will enable SSD caching. When SSD cache is enabled, some portion of the RAM memory will be used to store the metadata (headers, index, etc) for the cached data on SSD. The metadata is expected to be approximately 8% of SSD cache size and it is stored in off-heap memory. SSD Cache in-memory metadata size = LLAP daemon container size - (Head room + Heap size) For E32 v3, with HDI 4.0, SSD cache in-memory metadata size = 170 GB - (4 GB + 114 GB) = 52 GB When SSD cache is disabled, the in-memory cache is amount of memory that is left after taking out Headroom size and Heap size from the LLAP daemon container size. In-memory(off-heap) cache size = LLAP daemon container size - (Head room + Heap size) Recommendation is to monitor the LLAP cache uses by checking Grafana dashboards or running sample workload queries via beeline and checking execution summary. LLAP is optimized for interactive queries, if you are running ETL workloads that refer to data only once would not benefit from caching. In such cases, you can disable caching by setting “hive.llap.io.enabled=false” at session level. Adjusting Map Join memory: This configuration determines the threshold for MapJoin selection by Hive optimizer that considers over subscription of memory from other executors to have more room for in-memory hash tables to allow more map join conversions. Considering 3 GB per executor, this size can be oversubscribed to 3 GB, but some heap memory may also be used for sort buffers, shuffle buffers, etc. by the other operations. For E32 v3, with 3 GB memory per executor, it's recommended to set this value to 2048 MB. The values for num_llap_nodes, num_llap_nodes_for_llap_daemons are recommended to be set as same as the number of LLAP worker nodes. Depending the volume of data being queried and result, tune tez.grouping.max.size and tez.grouping.min.size. Decrease for better latency or Increase for more throughput. Data Storage Format: Data storage format plays a critical role in Hive/LLAP performance. It is recommended to use ORC data storage format for taking advantage of efficient compression, fast reads. ORC gives the best performance overall. Avoid using Text-based storage formats for large production systems. If you have data generated in Text formats (CSV, JSON, etc.) , you can create a Hive external hive table, and then convert it to ORC format for regular querying and analysis. (For example: CREATE TABLE orc_table STORED AS ORC AS SELECT * FROM text_table) Both ORC and Parquet provide a highly efficient way to store Hive data in a columnar format. LLAP implementation natively supports ORC and Parquet where LLAP does not have to perform additional encoding/decoding steps. Also, it supports efficient vectorized operations on the data stored in these formats. Hive/LLAP optimizer and execution engine can make use of metadata/indexes that are stored alongside data in ORC or Parquet format. Some of the queries can directly be served from metadata/indexes without requiring scanning through the whole data. In the case of text formats (CSV, JSON, etc.), LLAP would require additional steps to encode/decode data before it is stored in the cache and used in vectorized operations. This overhead may contribute to delays in processing the data. Depending on your environment, you can further tune your ORC storage formats to optimize Hive/LLAP performance by using bloom filters, configuring compression type, stripe size, using partitions, and buckets. Hiveserver2 Interactive High Availability Hiveserver2 Interactive (HSI) supports high availability (HA) in the form of an Active/Passive standby configuration. Only one HSI can be in Active mode, whilst one or more additional HSI instances are in passive standby mode and ready to takeover on Active HSI failure. To connect to the active leader HSI instance, clients use dynamic service discovery. For example: jdbc:hive2://<zookeeper_quorum>;serviceDiscoveryMode=zooKeeperHA;zooKeeperNamespace=hiveserver2-interactive It is important to note that zooKeeperHA service discovery mode is distinct from zooKeeper discovery mode used with traditional Hiveserver2. With zooKeeperHA, the Hive connection will iterate through the list of HSI instances registered in ZooKeeper and identify the elected leader. The node information (hostname:port) for the leader is returned to the client, allowing it to connect to the Active HSI instance. If the Active HSI loses its leadership (for example, a long GC pause resulting in session/connection timeout or network split) leadership is revoked. Before implementing HSI HA you should confirm that all JDBC and ODBC drivers used within your organization include support for zooKeeperHA service discovery. LLAP Resource Monitoring and Recommendations Hive Interactive UI: Currently, Hive Interactive UI does not work for ESP LLAP clusters. The following workaround can be applied to make it work. Step-1: Add the below configs in Ambari: hive.server2.webui.use.spnego -> true hive.server2.webui.spnego.principal -> HTTP/_HOST@domain.com e.g. HTTP/_HOST@SECUREHADOOPRC.ONMICROSOFT.COM for tip clusters hive.server2.webui.spnego.keytab ->/etc/security/keytabs/spnego.service.keytab Custom hive-site: hive.users.in.admin.role -> Comma Separated list of users who can access UI e.g. rsadmin2 (for tip clusters) or * if we want all users to be able to access. Step – 2: Restart Hive All the pages work except for /logs in HS2UI - need to backport [HIVE-14737] Problem accessing /logs in a Kerberized Hive Server 2 Web UI - ASF JIRA (apache.org) Sample Curl command to access these pages programmatically: curl --negotiate -u : http://hn0-kevx5t.securehadooprc.onmicrosoft.com:10502/jmx The following table details the available UI pages and useful endpoints for metric scraping etc. http://<host>:15002 Active sessions, open queries, and last 25 closed queries overview http://<host>:15002/jmx Hiveserver2 system metrics http://<host>:15002/conf Current Hiveserver2 configuration http://<host>:15002/peers Overview of Hiveserver2 Interactive instances in the cluster http://<host>:15002/stacks Show a stack trace of all JVM threads http://<host>:15002/llap.html Status of the LLAP daemons Text http://<host>:15002/conflog LLAP Monitor Each LLAP daemon has a Monitor that listens on port 15002 by default. You can use the LLAP status command or the LLAP Daemons tab on the Hiveserver2 Interactive Web UI to quickly determine where the LLAP daemons are running. LLAP IO Counters Set hive.tez.exec.print.summary to true in order to report data and metadata cache hits and misses for each query you run. Grafana Dashboards LLAP Executor metrics and LLAP Cache metrics Dashboard: Make sure your execution slots are available to run the query fragments. Also, check Evicted, Killed and Rejected task requests. If more than 90% of your execution slots are occupied, then system you may have to scale-out your LLAP cluster to be able to handle increased workload. Sampling of workload Run sample queries (especially long running) from your workload and find out how many resources they consume for a better understanding of the overall workload. Also, accommodate for any future additional workload. Using beeline, you can set the following at session level and run your queries. set hive.tez.exec.print.summary=true; set hive.query.results.cache.enabled=false; Analyze query execution summary Find out time consumed in different stages of query processing. Analyze LLAP Cache usage: Usually, higher the DATA_HIT number better the performance as data can be directly read from the cache and no additional IO is required. Analyze LLAP Tasks execution pattern With an active workload management plan, you will have to run your sample queries with actual user, group or app name based on your query pool mapping in your resource plan. More time spent in SPECULATIVE_RUNNING and SPECULATIVE_QUEUED would mean that the query pool corresponding to the query needs more resource in the pool and it is using resources from the other query pools when possible. However, these speculative tasks will get evicted when other query pools need resources to serve its queries. References: https://docs.microsoft.com/en-us/azure/hdinsight/interactive-query/hive-llap-sizing-guide https://docs.microsoft.com/en-us/azure/hdinsight/interactive-query/apache-interactive-query-get-started https://docs.microsoft.com/en-us/azure/cloud-services/cloud-services-sizes-specs#dv2-series4.7KViews2likes0CommentsAzure HDInsight HBase cluster performance comparison using YCSB
HDInsight HBase accelerated writes feature enables writes and reads that are many orders of magnitude faster than regular HBase clusters. In this lab we bench an HBase cluster with the accelerated writes feature using the industry standard YCSB test.3.9KViews2likes0CommentsMicrosoft Learning content available for Azure HDInsight
You may have heard that Microsoft has a free learning platform that offers interactive material to help you up-level your skills. But did you know Azure HDInsight has a whole learning path, with over three hours of material available for you to learn how Azure HDInsight can meet your businesses growing need for analytics?4.2KViews2likes1CommentMigration of Apache Spark from HDInsight 5.0 to HDInsight 5.1
Azure HDInsight Spark 5.0 to HDI 5.1 Migration A new version of HDInsight 5.1 is released with Spark 3.3.1. This release improves join query performance via Bloom filters, increases the Pandas API coverage with the support of popular Pandas features such as datetime.timedelta and merge_asof, simplifies the migration from traditional data warehouses by improving ANSI compliance and supporting dozens of new built-in functions. In this article we will discuss about the migration of user applications from HDInsight Spark 3.1 to HDInsight Spark 3.315KViews1like0Comments