HDInsight
34 TopicsMigration of Apache Spark from HDInsight 5.0 to HDInsight 5.1
Azure HDInsight Spark 5.0 to HDI 5.1 Migration A new version of HDInsight 5.1 is released with Spark 3.3.1. This release improves join query performance via Bloom filters, increases the Pandas API coverage with the support of popular Pandas features such as datetime.timedelta and merge_asof, simplifies the migration from traditional data warehouses by improving ANSI compliance and supporting dozens of new built-in functions. In this article we will discuss about the migration of user applications from HDInsight Spark 3.1 to HDInsight Spark 3.315KViews1like0CommentsHDInsight - Iceberg Open-Source Table Format
Author(s): Arun Sethia is a Program Manager in Azure HDInsight Customer Success Engineering (CSE) team. Introduction In my previous blog, we talked about leveraging Delta Lake on Azure HDInsight; this blog is dedicated to Iceberg open source table format. At this moment, HDInsight does not support Iceberg out of the box, so we will see how someone can leverage Iceberg Open Table Format in this blog. We will also see code examples to use tables from three data sources: Delta, Iceberg, and Spark native parquet files. A quick touch on Apache Iceberg - Apache Iceberg is an open-source table format that provides a transactional and versioned storage layer for big data analytics needs, originally developed to address issues in Apache Hive. It provides an alternative to Spark's default Parquet-based storage. It adds tables to compute engines, including Spark, Trino, Flink, Hive, etc., using a high-performance table format that works like a SQL table. Few silent features: Support Schema evolution without any side-effects Evolve the partition layout based on data volume and query pattern. Hidden Partition – There is no need to maintain a partition column (by transformation); it helps users not need to supply partition layout information when querying Iceberg tables. Support of time travel; useful to examine changes over time. Maintaining versions helps users to correct problems by resetting tables to a stable data version. Apache Iceberg provides an easy way to extend Spark with table specifications. In the next few sections, we will understand how to extend Spark (limited to Spark SQL) and configure the Apache Iceberg catalog with HDInsight. Spark Extension Before we jump into how to use Iceberg with HDInsight 5.0 and Spark, let us spend some time understanding a few concepts related to Spark extensions. It can help with the why and how part of the Iceberg configuration with HDInsight. Using SparkSessionExtensions provides various extension points to extend the Spark Session. The Spark SQL extension is a custom class to extend the behavior of the Spark SQL. The Spark SQL extension can be configured using the configuration property spark.sql.extensions. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. With these extensions, you can extend and customize the behavior of Spark SQL. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. Some common Spark SQL extensions are CSV data source, Avro data source, query optimization rules, etc. Such custom application-specific Spark configuration can be made in multiple ways; based on your deployment methods: Either by passing the configuration part of the Spark-Submit command Using the configuration magic command from Jupyter or Programmatically from the Spark job code with the SparkSession object. Spark Catalog Spark Catalog is a centralized metadata repository that provides information on data and metadata in Spark. It stores information about data stored in Spark, tables, databases, partitions, etc. It supports SQL operations on metadata, such as CREATE table, ALTER table, etc. By default, Spark comes with two catalogs, Hive and In-memory. The Hive catalog is used to store metadata in a Hive Metastore, and the in-memory catalog is used to store metadata in-memory. Spark and Hive use independent catalogs to access tables created using Spark SQL or Hive tables. A table created by Spark resides in the Spark catalog. A table created from Hive resides in the Hive catalog. We can change the Spark default catalog to the Hive catalog using the configuration property metastore.catalog.default with the value Hive. In such case, your Spark tables are managed tables in the Hive catalog and must be transaction enabled, or you can have external tables without transaction enabled. The Spark configuration property 'spark.sql.catalog' configures the list of catalogs available in Spark SQL. It allows us to configure multiple catalogs and specify the default catalog in Spark SQL, the configuration property spark.sql.defaultCatalog allows you to set the default catalog. We can create custom catalogs by implementing the Catalog interface in Spark and registering the catalog with Spark using the "spark.sql.catalog.<catalog name>" configuration parameter, where “<catalog name >” is a unique identifier for your custom catalog. In addition to managing metadata, Spark persists Spark SQL data at the warehouse directory configured using spark.sql.warehouse.dir. Iceberg Extension Iceberg core library components enable integration with compute engines like Spark, Flink, etc. These connectors are maintained in the iceberg repository and they are built for multiple versions. Iceberg provides a runtime connector for different versions of Spark. The runtime jar (iceberg-spark-runtime) is the only addition to the classpath needed to use the Iceberg open-source table format. The Iceberg Spark connector provides an extension for Spark SQL by IcebergSparkSessionExtensions class; it adds support for Iceberg tables. The extension allows users to interact with Iceberg tables in Spark SQL using DataFrame and SQL APIs for Parquet tables. The following table provides Iceberg runtime compatibility matrix w.r.t Spark version: Spark Version Latest Iceberg Support Gradle 2.4 1.1.0 org.apache.iceberg:iceberg-spark-runtime-2.4:1.1.0 3.0 1.0.0 org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:1.0.0 3.1 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 3.2 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0 3.3 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0 Using HDInsight, you can configure the connector either part of your uber jar (as dependent jar) or part of the Spark Submit command and SQL extension part of your application code or via Spark Submit. Configure Iceberg Runtime Connector & Extension Using Spark-Submit Provide Iceberg runtime connector dependency by supplying a Maven coordinate with –packages and provide Spark SQL extension using spark configuration property. spark-submit –packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions Using Application Code Provide Iceberg runtime connector as a maven dependency to your application pom.xml file. <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-${spark.major.version}_${scala.major.version}</artifactId> <version>1.1.0</version> </dependency> We can use SparkConf to set up the Iceberg Spark SQL extension and Iceberg runtime connector jar. val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0") Use SparkConf to build the Spark Session. val spark = SparkSession .builder() .config(sparkConf) Using Jupyter Notebook %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } } Iceberg Spark SQL Catalog Iceberg supplies the following Spark SQL catalog implementations: org.apache.iceberg.spark.SparkSessionCatalog – Support for Iceberg tables to Spark’s built-in catalog, and delegates to the built-in catalog for non-Iceberg tables org.apache.iceberg.spark.SparkCatalog – It supports a Hive Metastore or a Hadoop warehouse as a catalog Spark session Catalog Iceberg table supports Spark’s built-in catalog; we can configure spark_catalog to use Iceberg’s SparkSessionCatalog. The Iceberg session catalog loads non-Iceberg tables using the spark catalog (built-in catalog). The catalog type hive defines that use Hive Catalog for Iceberg tables, and non-Iceberg tables will be created in the default spark catalog. spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type = hive You can provide these configurations based on your application development and deployment method set by your enterprise: If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters. From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf From Jupyter notebook, you can use configure magic command as follows. %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog", "spark.sql.catalog.spark_catalog.type":"hive" } } For example, if you run this code with the above configuration, you should see `iceberg_table` created in Hive Catalog as an external table and `spark_table` in Spark Catalog as a managed table. spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (id string, creation_date string, last_update_time string) USING iceberg""") spark.sql("""CREATE TABLE IF NOT EXISTS spark_table (id string, creation_date string, last_update_time string)""") You can use the following query on HMS to find the catalog name for `iceberg_table` and `spark_table`: SELECT t.TBL_NAME,dbc.ctgName FROM TBLS t INNER JOIN (SELECT c.Name as ctgName, d.DB_ID as dbId FROM CTLGS c , DBS d WHERE d.CTLG_NAME = c.NAME ) dbc ON dbc.dbId=t.DB_ID WHERE TBL_NAME IN ('iceberg_table','spark_table') The output will be: Custom Catalog Iceberg supports multiple data catalog types such as Hive, Hadoop, JDBC, or custom catalog implementations. These catalogs are configured using the Hadoop configuration property spark.sql.catalog.<<catalogname>> and the type of catalog using spark.sql.catalog.<<catalogname>>.type. Hive Catalog Hadoop Catalog spark.sql.catalog.<<catalogname>>=org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<<catalogname>>.type=hive spark.sql.catalog.<<catalogname>>.uri = thrift://metastore-host:port spark.sql.catalog.<<catalogname>>.warehouse=abfs://<<warehouse path>> spark.sql.catalog.iceberg_hadoop =org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_hadoop.type=hadoop spark.sql.catalog.iceberg_hadoop.warehouse=abfs://<<warehouse path>> The catalog.uri is optional; by default, it will use hive.metastore.uris from hive-site.xml (from Ambari -> Hive -> Advanced). The catalog.warehouse is optional; by default, it will use ‘Hive Metastore Warehouse directory’ from hive-site.xml (from Ambari -> Hive -> Advanced). You can provide these configurations based on your application development and deployment method set by your enterprise: If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters. From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf From Jupyter notebook, you can use configure magic command as follows (in this case we are using Hadoop catalog): %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog", "spark.sql.catalog.iceberg":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.iceberg.type":"hadoop", "spark.sql.catalog.iceberg.warehouse":"/iceberg/warehouse" } } Iceberg Hive Tables With Custom Catalog The current HDInsight supports Apache Hive version 3.1.1; if you want to use Iceberg tables from Hive (catalog type hive), the HiveIcebergStorageHandler and supporting classes need to be made available on Hive’s classpath. These classes are available as part of Iceberg-Hive runtime jar. You can add the Iceberg-Hive runtime jar by including the jar file to Hive’s auxiliary classpath, so it is available by default, or if you want to use Hive shell, then this can be achieved by stating so: add jar /<<path>>/iceberg-hive-runtime-1.1.0.jar; Iceberg Hive Tables using a custom catalog can be created in two ways: Using Hive DDL Command from hive-cli or beeline CREATE EXTERNAL TABLE customer ( id bigint, name string ) PARTITIONED BY ( state string ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' TBLPROPERTIES ('iceberg.catalog'='iceberg'); The Table property 'iceberg.catalog'='iceberg' set table catalog to 'iceberg'. Using Iceberg Hive Catalog Java API – Example code is available at git repository. import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.types.Types import org.apache.iceberg.{PartitionSpec, TableProperties, Schema => IcebergSchema} import org.apache.iceberg.CatalogProperties import org.apache.spark.sql.SparkSession val catalogName = "iceberg" val nameSpace = "default" val tableName = "customer" def createTableByHiveCatalog(spark: SparkSession): Unit = { import scala.collection.JavaConverters._ // table specification starts val schema= new IcebergSchema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "name", Types.StringType.get()), Types.NestedField.required(3, "state", Types.StringType.get()) ) val spec = PartitionSpec.builderFor(schema).bucket("state", 128).build() import org.apache.iceberg.catalog.TableIdentifier val tableIdentifier: TableIdentifier = TableIdentifier.of(nameSpace,tableName) val tblProperties = Map(TableProperties.ENGINE_HIVE_ENABLED->"true","iceberg.catalog"->"iceberg") // table specification ends val catalog = new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration) val properties = Map(CatalogProperties.WAREHOUSE_LOCATION->"/iceberg/warehouse/") catalog.initialize(catalogName, properties.asJava) catalog.createTable(tableIdentifier, schema, spec,s"/iceberg/warehouse/${tableName}",tblProperties.asJava) } Example - Delta, Iceberg and Spark Parquet Let us jump into the example code; in this example code, we will create sample data for Product master, Sales, and Return transactions using Mockneat. These data are stored in three file formats – Spark Parquet, Iceberg Parquet, and Delta Parquet. The scenario setup is as follows: The sample code for using Jupyter notebook is available at hdinotebookexamples. Limitation: There are limitations; at this moment, Delta Lake and Hudi currently do not support custom catalogs (out of the box). However, since Iceberg supports a custom catalog, the option is only to use spark_catalog to DeltaCatalog and create a custom catalog for Iceberg. References: https://iceberg.apache.org/ https://github.com/apache/hudi/issues/5537 HDInsight Open Table Formats: Hudi on HDInsight - https://murggu.medium.com/apache-hudi-on-hdinsight-8d981269a97a Delta Lake on HDInsight - Delta Lake on HDInsight - Microsoft Community Hub9.6KViews3likes0CommentsHive - Materialized Views
Introduction: Traditionally, one of the most powerful techniques used to accelerate query processing in data warehouses is the pre-computation of relevant summaries or materialized views. The initial implementation introduced in Apache Hive 3.0.0 focuses on introducing materialized views and automatic query rewriting based on those materializations in the project. Materialized views can be stored natively in Hive or in other custom storage handlers (ORC), and they can seamlessly exploit exciting new Hive features such as LLAP acceleration. Then, the optimizer relies in Apache Calcite to automatically produce full and partial rewritings for a large set of query expressions comprising projections, filters, join, and aggregation operations. In this document, we provide details about materialized view creation and management in Hive against the source parquet tables. Materialized views creation: The syntax to create a materialized view in Hive is very similar to the CTAS statement syntax, supporting common features such as partition columns, custom storage handler, or passing table properties. Standard Syntax: CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db_name.]materialized_view_name [DISABLE REWRITE] [COMMENT materialized_view_comment] [PARTITIONED ON (col_name, ...)] [CLUSTERED ON (col_name, ...) | DISTRIBUTED ON (col_name, ...) SORTED ON (col_name, ...)] [ [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] ] [LOCATION hdfs_path] [TBLPROPERTIES (property_name=property_value, ...)] AS <query>; Example wrt Parquet source tables: Table description: 0: jdbc:hive2://zk0-hdilla.xi2kmm3bon0engqedn> desc formatted hive_parquet; +-------------------------------+----------------------------------------------------+-----------------------------+ | col_name | data_type | comment | +-------------------------------+----------------------------------------------------+-----------------------------+ | # col_name | data_type | comment | | clientid | string | | | querytime | string | | | market | string | | | deviceplatform | string | | | devicemake | string | | | devicemodel | string | | | state | string | | | country | string | | | querydwelltime | double | | | sessionid | bigint | | | sessionpagevieworder | bigint | | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | anonymous | NULL | | CreateTime: | Mon Jun 21 11:38:49 UTC 2021 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | abfs://hdillap-2021-06-09t16-52-55-399z@hiverepl.dfs.core.windows.net/hive/warehouse/managed/hive_parquet | NULL | | Table Type: | MANAGED_TABLE | NULL | | Table Parameters: | NULL | NULL | | | COLUMN_STATS_ACCURATE | {\"BASIC_STATS\":\"true\"} | | | bucketing_version | 2 | | | numFiles | 1 | | | numRows | 59793 | | | rawDataSize | 657723 | | | totalSize | 1419783 | | | transactional | true | | | transactional_properties | insert_only | | | transient_lastDdlTime | 1624275529 | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | NULL | | InputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | NULL | | OutputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | -1 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | | Storage Desc Params: | NULL | NULL | | | serialization.format | 1 | +-------------------------------+----------------------------------------------------+-----------------------------+ Example 1: Create materialized view on parquet table with partition on country column: CREATE MATERIALIZED VIEW hivemv1 PARTITIONED ON (country) STORED AS ORC AS SELECT * FROM hive_parquet; Example 2: Create MV parquet table with partition on country column and sort defined on one column CREATE MATERIALIZED VIEW hivemv3 PARTITIONED ON (country) STORED AS ORC AS SELECT * FROM hive_parquet order by deviceplatform; Other operations for materialized view management: Currently we support the following operations that aid at managing the materialized views in Hive: -- Drops a materialized view DROP MATERIALIZED VIEW [db_name.]materialized_view_name; -- Shows materialized views (with optional filters) SHOW MATERIALIZED VIEWS [IN database_name] ['identifier_with_wildcards’]; -- Shows information about a specific materialized view DESCRIBE [EXTENDED | FORMATTED] [db_name.]materialized_view_name; Example: 0: jdbc:hive2://zk0-hdilla.xi2kmm3bon0engqedn> show materialized views; +------------+--------------------+-----------------+ | mv_name | rewrite_enabled | mode | +------------+--------------------+-----------------+ | # MV Name | Rewriting Enabled | Mode | | hivemv | Yes | Manual refresh | | hivemv1 | Yes | Manual refresh | | hivemv2 | Yes | Manual refresh | | | NULL | NULL | +------------+--------------------+-----------------+ Materialized view-based query rewriting: Once a materialized view has been created, the optimizer will be able to exploit its definition semantics to automatically rewrite incoming queries using materialized views, and hence, accelerate query execution. The rewriting algorithm can be enabled and disabled globally using the hive.materializedview.rewriting configuration property (default value is true) and at materialized view level as below: ALTER MATERIALIZED VIEW [db_name.]materialized_view_name ENABLE|DISABLE REWRITE; Materialized view maintenance: When data in the source tables used by a materialized view changes, the rebuild operation for a materialized view needs to be triggered by the user. In particular, the user should execute the following statement: ALTER MATERIALIZED VIEW [db_name.]materialized_view_name REBUILD; Hive supports incremental view maintenance, i.e., only refresh data that was affected by the changes in the original source tables. Incremental view maintenance will decrease the rebuild step execution time. In addition, it will preserve LLAP cache for existing data in the materialized view. FYI, Hive will attempt to rebuild a materialized view incrementally, falling back to full rebuild if it is not possible. Current implementation only supports incremental rebuild when there were INSERT operations over the source tables, while UPDATE and DELETE operations will force a full rebuild of the materialized view. To execute incremental maintenance, following conditions should be met: The materialized view should only use transactional tables, either micromanaged or ACID. If the materialized view definition contains a Group By clause, the materialized view should be stored in an ACID table, since it needs to support MERGE operation. For materialized view definitions consisting of Scan-Project-Filter-Join, this restriction does not exist. A rebuild operation acquires an exclusive write lock over the materialized view, i.e., for a given materialized view, only one rebuild operation can be executed at a given time. Materialized view lifecycle: If the materialized view uses non-transactional tables and hence, we cannot verify whether its contents are outdated, however we still want to use the automatic rewriting. For such occasions, we can combine a rebuild operation run periodically, e.g., every 5 minutes, and define the required freshness of the materialized view data using the hive.materializedview.rewriting.time.window configuration parameter, for instance: SET hive.materializedview.rewriting.time.window=10min; The parameter value can be also overridden by a concrete materialized view just by setting it as a table property when the materialization is created. Please note: By default, hive.materializedview.rewriting.time.window will be set to 0min which means auto rebuild is disabled. To enable at global level add the same with specific time interval under Ambari -> Hive config -> Custom hive-site. Also, the change will be applicable to the MVs created post this change. Post the rewrite window, the update with MV could be validated by `desc formatted mv_name`. .... Rewrite Enabled: | Yes | NULL Outdated for Rewriting: | Yes | NULL .... Example: CREATE MATERIALIZED VIEW hivemv3 PARTITIONED ON (country) STORED AS ORC TBLPROPERTIES (hive.materializedview.rewriting.time.window"="10min") AS SELECT * FROM hive_parquet; Examples with outputs: Materialized view with sort on specific column With distributed on or sort by on roadmap, to sort the data within materialized view – create the mv with order by clause with the select query. CREATE MATERIALIZED VIEW hivemv14 PARTITIONED ON (country) STORED AS ORC AS SELECT * FROM hive_parquet ORDER BY (devicemake); The data stored in the ORC file is sorted and could be validated with command `/usr/bin/hive –orcfiledump -d <location_of_orc_file>`. Refer: Attached sample ORC data file. Description on the MV created: (desc formatted hivemv1) +----------------------------------+----------------------------------------------------+-----------------------------+ | col_name | data_type | comment | +----------------------------------+----------------------------------------------------+-----------------------------+ | # col_name | data_type | comment | | clientid | string | | | querytime | string | | | market | string | | | deviceplatform | string | | | devicemake | string | | | devicemodel | string | | | state | string | | | querydwelltime | double | | | sessionid | bigint | | | sessionpagevieworder | bigint | | | | NULL | NULL | | # Partition Information | NULL | NULL | | # col_name | data_type | comment | | country | string | | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | anonymous | NULL | | CreateTime: | Mon Jun 21 11:44:00 UTC 2021 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | abfs://hdillap-2021-06-09t16-52-55-399z@hiverepl.dfs.core.windows.net/hive/warehouse/managed/hivemv1 | NULL | | Table Type: | MATERIALIZED_VIEW | NULL | | Table Parameters: | NULL | NULL | | | COLUMN_STATS_ACCURATE | {\"BASIC_STATS\":\"true\"} | | | bucketing_version | 2 | | | numFiles | 88 | | | numPartitions | 88 | | | numRows | 59793 | | | rawDataSize | 39334953 | | | totalSize | 841901 | | | transient_lastDdlTime | 1624275840 | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.hadoop.hive.ql.io.orc.OrcSerde | NULL | | InputFormat: | org.apache.hadoop.hive.ql.io.orc.OrcInputFormat | NULL | | OutputFormat: | org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | -1 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | | | NULL | NULL | | # Materialized View Information | NULL | NULL | | Original Query: | select * from hive_parquet | NULL | | Expanded Query: | SELECT `clientid`, `querytime`, `market`, `deviceplatform`, `devicemake`, `devicemodel`, `state`, `querydwelltime`, `sessionid`, `sessionpagevieworder`, `country` FROM (select `hive_parquet`.`clientid`, `hive_parquet`.`querytime`, `hive_parquet`.`market`, `hive_parquet`.`deviceplatform`, `hive_parquet`.`devicemake`, `hive_parquet`.`devicemodel`, `hive_parquet`.`state`, `hive_parquet`.`country`, `hive_parquet`.`querydwelltime`, `hive_parquet`.`sessionid`, `hive_parquet`.`sessionpagevieworder` from `default`.`hive_parquet`) `hivemv1` | NULL | | Rewrite Enabled: | Yes | NULL | | Outdated for Rewriting: | No | NULL | +----------------------------------+----------------------------------------------------+-----------------------------+ Data validation on materialized view: 0: jdbc:hive2://zk0-hdilla.xi2kmm3bon0engqedn> select * from hivemv1 limit 5; INFO : Compiling command(queryId=hive_20210621163758_1a523251-bfd0-40ca-a9fd-ef463120b3d8): select * from hivemv1 limit 5 INFO : Semantic Analysis Completed (retrial = false) INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:hivemv1.clientid, type:string, comment:null), FieldSchema(name:hivemv1.querytime, type:string, comment:null), FieldSchema(name:hivemv1.market, type:string, comment:null), FieldSchema(name:hivemv1.deviceplatform, type:string, comment:null), FieldSchema(name:hivemv1.devicemake, type:string, comment:null), FieldSchema(name:hivemv1.devicemodel, type:string, comment:null), FieldSchema(name:hivemv1.state, type:string, comment:null), FieldSchema(name:hivemv1.querydwelltime, type:double, comment:null), FieldSchema(name:hivemv1.sessionid, type:bigint, comment:null), FieldSchema(name:hivemv1.sessionpagevieworder, type:bigint, comment:null), FieldSchema(name:hivemv1.country, type:string, comment:null)], properties:null) INFO : Completed compiling command(queryId=hive_20210621163758_1a523251-bfd0-40ca-a9fd-ef463120b3d8); Time taken: 1.111 seconds INFO : Executing command(queryId=hive_20210621163758_1a523251-bfd0-40ca-a9fd-ef463120b3d8): select * from hivemv1 limit 5 INFO : Completed executing command(queryId=hive_20210621163758_1a523251-bfd0-40ca-a9fd-ef463120b3d8); Time taken: 0.01 seconds INFO : OK +-------------------+--------------------+-----------------+-------------------------+---------------------+----------------------+----------------+-------------------------+--------------------+-------------------------------+----------------------+ | hivemv1.clientid | hivemv1.querytime | hivemv1.market | hivemv1.deviceplatform | hivemv1.devicemake | hivemv1.devicemodel | hivemv1.state | hivemv1.querydwelltime | hivemv1.sessionid | hivemv1.sessionpagevieworder | hivemv1.country | +-------------------+--------------------+-----------------+-------------------------+---------------------+----------------------+----------------+-------------------------+--------------------+-------------------------------+----------------------+ | 11786 | 22:33:53 | en-US | Android | Motorola | Quench XT5 | Saint John | 7.0328606 | 0 | 1 | Antigua And Barbuda | | 11786 | 22:30:29 | en-US | Android | Motorola | Quench XT5 | Saint John | 68.3177076 | 0 | 0 | Antigua And Barbuda | | 11786 | 22:35:02 | en-US | Android | Motorola | Quench XT5 | Saint John | 8.1046491 | 0 | 2 | Antigua And Barbuda | | 11786 | 22:35:25 | en-US | Android | Motorola | Quench XT5 | Saint John | 26.3155831 | 0 | 3 | Antigua And Barbuda | | 11786 | 22:36:00 | en-US | Android | Motorola | Quench XT5 | Saint John | 3.8841237 | 0 | 4 | Antigua And Barbuda | +-------------------+--------------------+-----------------+-------------------------+---------------------+----------------------+----------------+-------------------------+--------------------+-------------------------------+----------------------+ 5 rows selected (1.851 seconds) Compute statistics: One of the key use cases of statistics is query optimization. Statistics serve as the input to the cost functions of the optimizer so that it can compare different plans and choose among them. analyze table hivemv1 partition(country) compute statistics for columns; Table description post stats collection: +----------------------------------+----------------------------------------------------+----------------------------------------------------+ | col_name | data_type | comment | +----------------------------------+----------------------------------------------------+----------------------------------------------------+ | # col_name | data_type | comment | | clientid | string | | | querytime | string | | | market | string | | | deviceplatform | string | | | devicemake | string | | | devicemodel | string | | | state | string | | | country | string | | | querydwelltime | double | | | sessionid | bigint | | | sessionpagevieworder | bigint | | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | anonymous | NULL | | CreateTime: | Fri Jun 18 05:48:03 UTC 2021 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | abfs://hdillap-2021-06-09t16-52-55-399z@hiverepl.dfs.core.windows.net/hive/warehouse/managed/hivemv2 | NULL | | Table Type: | MATERIALIZED_VIEW | NULL | | Table Parameters: | NULL | NULL | | | COLUMN_STATS_ACCURATE | {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"clientid\":\"true\",\"country\":\"true\",\"devicemake\":\"true\",\"devicemodel\":\"true\",\"deviceplatform\":\"true\",\"market\":\"true\",\"querydwelltime\":\"true\",\"querytime\":\"true\",\"sessionid\":\"true\",\"sessionpagevieworder\":\"true\",\"state\":\"true\"}} | | | bucketing_version | 2 | | | numFiles | 1 | | | numRows | 59793 | | | rawDataSize | 45057355 | | | totalSize | 737187 | | | transient_lastDdlTime | 1623995315 | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.hadoop.hive.ql.io.orc.OrcSerde | NULL | | InputFormat: | org.apache.hadoop.hive.ql.io.orc.OrcInputFormat | NULL | | OutputFormat: | org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | -1 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | | | NULL | NULL | | # Materialized View Information | NULL | NULL | | Original Query: | select * from hivesampletable | NULL | | Expanded Query: | select `hivesampletable`.`clientid`, `hivesampletable`.`querytime`, `hivesampletable`.`market`, `hivesampletable`.`deviceplatform`, `hivesampletable`.`devicemake`, `hivesampletable`.`devicemodel`, `hivesampletable`.`state`, `hivesampletable`.`country`, `hivesampletable`.`querydwelltime`, `hivesampletable`.`sessionid`, `hivesampletable`.`sessionpagevieworder` from `default`.`hivesampletable` | NULL | | Rewrite Enabled: | Yes | NULL | | Outdated for Rewriting: | No | NULL | +----------------------------------+----------------------------------------------------+----------------------------------------------------+ Known limitations: Support defining a CLUSTERED ON/DISTRIBUTED ON+SORTED ON specification for materialized views - HIVE-18842 Creation on partitioned ACID materialized view. The data movement fails with error `Write id is not set in the config by open txn task for migration` - HIVE-21678 References: https://cwiki.apache.org/confluence/display/Hive/Materialized+views https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/materialized-view/content/hive_alter_materialized_view_rebuild.html https://cwiki.apache.org/confluence/display/Hive/StatsDev8.7KViews0likes0CommentsAgile Data Vault 2.0 Projects with Azure DevOps
Having discussed the value of Data Vault 2.0 and the associated architectures in the previous articles of this blog series, this article will focus on the organization and successful execution of Data Vault 2.0 projects using Azure DevOps. It will also discuss the differences between standard Scrum, as used in agile software development, and the Data Vault 2.0 methodology, which is based on Scrum but also includes aspects from other methodologies. Other functions of Azure DevOps, for example the deployment of the data analytics platform, will be discussed in subsequent articles of this ongoing blog series.7KViews1like0CommentsHive Tez AM split computation based on the input format
Introduction: The performance of the query depend on the size and type of data each of Tez container processes. Split computation plays a major role when deciding on the number of containers required to run a specific query. This affects the cluster resource utilization, performance of the specific query and the performance & running of concurrent queries. Problem often encounters: With text based formats like Parquet, TextFormat for the data under Hive, the input splits is straight forward. It is calculated based on: No. of data files = No. of splits These data files could be combined with Tez grouping algorithm based on the data locality and rack awareness. This is affected by several factors. If TEZ_GROUPING_SPLIT_COUNT is set, the value will be used as initial count. Otherwise, the one passed in parameter will be used. Then the initial count will be corrected according to TEZ_GROUPING_SPLIT_MIN_SIZE and TEZ_GROUPING_SPLIT_MAX_SIZE: if the initial count causes a too small grouped split size, it will be overridden as total input size/TEZ_GROUPING_SPLIT_MIN_SIZE; if initial count causes too large grouped split size, it will be override as total input size/TEZ_GROUPING_SPLIT_MAX_SIZE. // HiveSplitGenerator 2017-02-16 15:56:48,862 [INFO] [InputInitializer {Map 1} #0] |tez.HiveSplitGenerator|: Number of input splits: 5. 3 available slots, 1.7 waves. Input format is: org.apache.hadoop.hive.ql.io.HiveInputFormat // then grouping logic get called in HiveSplitGenerator 2017-02-16 15:56:48,891 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? true 2017-02-16 15:56:48,891 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? false 2017-02-16 15:56:48,891 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? false 2017-02-16 15:56:48,891 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? false 2017-02-16 15:56:48,892 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? false 2017-02-16 15:56:48,892 [INFO] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: # Src groups for split generation: 2 2017-02-16 15:56:48,893 [INFO] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Estimated number of tasks: 5 for bucket 1 org.apache.hadoop.mapred.split.TezMapredSplitsGrouper#getGroupedSplits(org.apache.hadoop.conf.Configuration, org.apache.hadoop.mapred.InputSplit[], int, java.lang.String, org.apache.hadoop.mapred.split.SplitSizeEstimator) based on tez.grouping.split-count 2017-02-16 15:56:48,894 [INFO] [InputInitializer {Map 1} #0] |split.TezMapredSplitsGrouper|: Grouping splits in Tez 2017-02-16 15:56:48,894 [INFO] [InputInitializer {Map 1} #0] |split.TezMapredSplitsGrouper|: Using original number of splits: 5 desired splits: 5 2017-02-16 15:56:48,897 [INFO] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Original split size is 5 grouped split size is 5, for bucket: 1 2017-02-16 15:56:48,899 [INFO] [InputInitializer {Map 1} #0] |tez.HiveSplitGenerator|: Number of grouped splits: 5 This could be addressed from the Hive end as well with hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat. If we set this parameter in hive-site.xml, it will be the default Hive InputFormat provided not setting 'hive.input.format' explicitly before the HQL. The InputFormat in this scenario serves different function in comparison to the former one. Firstly, let's take a glance at 'org.apache.hadoop.mapred.FileInputFormat', which is the base class for all file-based InputFormat. There are three essential methods in this class: boolean isSplitable(FileSystem fs, Path filename) InputSplit[] getSplits(JobConf job, int numSplits) RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) 'isSplitable' is self-explaining: it will return whether the given filename is splitable. This method is valid when working around MapReduce program, when it comes to Hive-related one, we could set 'mapreduce.input.fileinputformat.split.minsize' in hive-site.xml to a very big value to achieve the same effect alternatively. 'getSplits' will return an array of InputSplit objects, whose size is corresponding to the number of mappers for this HQL task. Every InputSplit contains one or more file chunks in current file system, the details will be discussed later. 'getRecordReader' will return a 'org.apache.hadoop.mapred.RecordReader' object, whose function is to read data record by record from underlying file system. The main methods are as follows: K createKey() V createValue() boolean next(K key, V value) float getProgress() 'createKey', 'createValue' and 'getProgress' is well self-explaining. 'next' will evaluate the key and value parameters from current read position provided it returns true; when being at EOF, false is returned. In the former case as mentioned above, only 'getRecordReader' method will be used; Whereas in the latter case, only 'getSplits' method will be used. Suppose you have a file for the 200M, press the top HiveInputFormatof the split algorithm: The total file size is 200M, goalSize = 200M/2 = 100M, minSize = 1, splitSize = max {1, min {100M, 128M}} = 100M 200M/100M> 1.1, so the size of the first block is 100M The remaining file size is 100M, less than 128M, so the second block size is 100M. Whereas, CombineHiveInputFormat of the split algorithm: 128M <200M <128M X 2, so the size of the first block is 128M The remaining file size is 200M-128M = 72M, 72M <128M, so the second block size is 72M What happens when it’s a ORC table and why ORC: Split Computation and Grouping by OrcInputFormat: Firstly, OrcInputFormat reads all stripes from all input ORC files, and creates one split per stripe unless the stripe size is lower than mapreduce.input.fileinputformat.split.minsize value. If the stripe is smaller than mapreduce.input.fileinputformat.split.minsize then OrcInputFormat combines multiple stripes into a single input split. Split Grouping by Tez: In addition to OrcInputFormat, the Tez engine can further group the input splits. Initially Tez asks the YARN Resource Manager about the number of available containers, multiplies this number by tez.grouping.split-waves (1.7 by default; for more information about split ways, read Tez Internals #1 – Number of Map Tasks) and gets the desired number of input splits (and tasks). From the Tez Application Master log you can see: [INFO] [InputInitializer {Map 1} #0] |tez.HiveSplitGenerator|: Number of input splits: 1084. 337 available slots, 1.7 waves. [INFO] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Estimated number of tasks: 572 1084 is the total number of stripes in all input ORC files, 337 is the total number of available containers, and 337 * 1.7 gives 572 tasks. Tez knows that it wants to run 572 tasks, so it defines the average data size per task: [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Grouping splits in Tez [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Desired numSplits: 572 lengthPerGroup: 822989185 numLocations: 1 numSplitsPerLocation: 1084 numSplitsInGroup: 1 totalLength: 470749813876 numOriginalSplits: 1084 . Grouping by length: true count: false nodeLocalOnly: false Tez calculates totalLength that is the total volume of input data to process. Note that Tez uses the column statistics from ORC files, not from Hive Metastore (!) to get the estimated uncompressed data size. That’s why totalLength is 470 GB while the total size of input ORC files is just 143 GB. Knowing the total data size and the desired number of tasks, Tez gets lengthPerGroup, the desired size of input split: 470749813876/572 = 822989185. So the desired input split is 822 MB (!), and again it is the uncompressed data size. Similar to OrcInputFormat, Tez also goes through all ORC stripes (actually input splits created by OrcInputFormat), but now it deals with the uncompressed data sizes. If the input split is smaller than tez.grouping.min-size then it is combined with another split trying to create input splits having the lengthPerGroup size. It is hard to get this exact size, since Tez has to operate with full ORC stripes, it cannot split a single stripe into multiple input splits. ORC Split strategy: There are three available options HYBRID, ETL and BI: HYBRID: Reads the footers for all files if there are fewer files than expected mapper count, switching over to generating 1 split per file if the average file sizes are smaller than the default HDFS blocksize. BI: Per file split. It will be faster when the number of files is less ETL: Reads the File Footer and then decides the number of splits. There is a searchArg passed to the reader which can eliminate orc stripes/splits based on the filter condition provided in the query. This is used when you can allow ORC split to spend time on calculating the Splits and is used when the Query is large. Accelerate the ORC split computation: Tuning the default value of hive.orc.compute.splits.num.threads=10, set the value of this parameter to match the number of available processors. This parameter controls the number of parallel threads involved in computing splits. For Parquet computing splits is still single-threaded, so split computations can take longer with Parquet and Cloud Object Stores. With hive.exec.orc.split.strateg=ETL, set hive.orc.splits.include.file.footer=true – This ensure Hive and Tez AM is aware of the split payload from the footer information. References: https://www.slideshare.net/Hadoop_Summit/orc-file-optimizing-your-big-data https://community.hortonworks.com/articles/68631/optimizing-hive-queries-for-orc-formatted-tables.html7KViews0likes0Comments