HDInsight - Iceberg Open-Source Table Format
Published Mar 02 2023 05:17 AM 6,548 Views
Microsoft

Author(s): Arun Sethia is a Program Manager in Azure HDInsight Customer Success Engineering (CSE) team.

 

Introduction

In my previous blog, we talked about leveraging Delta Lake on Azure HDInsight; this blog is dedicated to Iceberg open source table format.

 

At this moment, HDInsight does not support Iceberg out of the box, so we will see how someone can leverage Iceberg Open Table Format in this blog. We will also see code examples to use tables from three data sources: Delta, Iceberg, and Spark native parquet files.

 

A quick touch on Apache Iceberg - Apache Iceberg is an open-source table format that provides a transactional and versioned storage layer for big data analytics needs, originally developed to address issues in Apache Hive. It provides an alternative to Spark's default Parquet-based storage. It adds tables to compute engines, including Spark, Trino, Flink, Hive, etc., using a high-performance table format that works like a SQL table. Few silent features:

 

  • Support Schema evolution without any side-effects
  • Evolve the partition layout based on data volume and query pattern.
  • Hidden Partition – There is no need to maintain a partition column (by transformation); it helps users not need to supply partition layout information when querying Iceberg tables.
  • Support of time travel; useful to examine changes over time.
  • Maintaining versions helps users to correct problems by resetting tables to a stable data version.

Apache Iceberg provides an easy way to extend Spark with table specifications. In the next few sections, we will understand how to extend Spark (limited to Spark SQL) and configure the Apache Iceberg catalog with HDInsight.

Spark Extension

 

Before we jump into how to use Iceberg with HDInsight 5.0 and Spark, let us spend some time understanding a few concepts related to Spark extensions. It can help with the why and how part of the Iceberg configuration with HDInsight.

 

Using SparkSessionExtensions provides various extension points to extend the Spark Session. The Spark SQL extension is a custom class to extend the behavior of the Spark SQL. The Spark SQL extension can be configured using the configuration property spark.sql.extensions. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. With these extensions, you can extend and customize the behavior of Spark SQL. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL.

 

Some common Spark SQL extensions are CSV data source, Avro data source, query optimization rules, etc. Such custom application-specific Spark configuration can be made in multiple ways; based on your deployment methods:

 

  • Either by passing the configuration part of the Spark-Submit command
  • Using the configuration magic command from Jupyter
  • or Programmatically from the Spark job code with the SparkSession object.

Spark Catalog

 

Spark Catalog is a centralized metadata repository that provides information on data and metadata in Spark. It stores information about data stored in Spark, tables, databases, partitions, etc. It supports SQL operations on metadata, such as CREATE table, ALTER table, etc.

 

By default, Spark comes with two catalogs, Hive and In-memory. The Hive catalog is used to store metadata in a Hive Metastore, and the in-memory catalog is used to store metadata in-memory. Spark and Hive use independent catalogs to access tables created using Spark SQL or Hive tables. A table created by Spark resides in the Spark catalog. A table created from Hive resides in the Hive catalog.

 

We can change the Spark default catalog to the Hive catalog using the configuration property metastore.catalog.default with the value Hive. In such case, your Spark tables are managed tables in the Hive catalog and must be transaction enabled, or you can have external tables without transaction enabled.

 

The Spark configuration property 'spark.sql.catalog' configures the list of catalogs available in Spark SQL. It allows us to configure multiple catalogs and specify the default catalog in Spark SQL, the configuration property spark.sql.defaultCatalog allows you to set the default catalog. We can create custom catalogs by implementing the Catalog interface in Spark and registering the catalog with Spark using the "spark.sql.catalog.<catalog name>" configuration parameter, where “<catalog name >” is a unique identifier for your custom catalog.

 

In addition to managing metadata, Spark persists Spark SQL data at the warehouse directory configured using spark.sql.warehouse.dir.

Iceberg Extension

 

Iceberg core library components enable integration with compute engines like Spark, Flink, etc. These connectors are maintained in the iceberg repository and they are built for multiple versions. Iceberg provides a runtime connector for different versions of Spark. The runtime jar (iceberg-spark-runtime) is the only addition to the classpath needed to use the Iceberg open-source table format.

 

The Iceberg Spark connector provides an extension for Spark SQL by IcebergSparkSessionExtensions class; it adds support for Iceberg tables. The extension allows users to interact with Iceberg tables in Spark SQL using DataFrame and SQL APIs for Parquet tables.

 

The following table provides Iceberg runtime compatibility matrix w.r.t Spark version:

 

Spark Version

Latest Iceberg Support

Gradle

2.4

1.1.0

org.apache.iceberg:iceberg-spark-runtime-2.4:1.1.0

3.0

1.0.0

org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:1.0.0

3.1

1.1.0

org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0

3.2

1.1.0

org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0

3.3

1.1.0

org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0

 

Using HDInsight, you can configure the connector either part of your uber jar (as dependent jar) or part of the Spark Submit command and SQL extension part of your application code or via Spark Submit.

Configure Iceberg Runtime Connector & Extension

 

Using Spark-Submit

Provide Iceberg runtime connector dependency by supplying a Maven coordinate with –packages and provide Spark SQL extension using spark configuration property.

 

 

spark-submit –packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

 

 

Using Application Code

Provide Iceberg runtime connector as a maven dependency to your application pom.xml file.

 

 

<dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-spark-runtime-${spark.major.version}_${scala.major.version}</artifactId>
    <version>1.1.0</version>
</dependency>

 

 

We can use SparkConf to set up the Iceberg Spark SQL extension and Iceberg runtime connector jar.

 

 

val sparkConf = new SparkConf()
sparkConf.set("spark.sql.extensions",
		"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0")

 

 

Use SparkConf to build the Spark Session.

 

 

val spark = SparkSession
  .builder()
  .config(sparkConf)

 

 

Using Jupyter Notebook

 

 

%%configure
{ "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0",
           "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
          }
}

 

 

Iceberg Spark SQL Catalog

 

Iceberg supplies the following Spark SQL catalog implementations:

  • org.apache.iceberg.spark.SparkSessionCatalog – Support for Iceberg tables to Spark’s built-in catalog, and delegates to the built-in catalog for non-Iceberg tables
  • org.apache.iceberg.spark.SparkCatalog – It supports a Hive Metastore or a Hadoop warehouse as a catalog

Spark session Catalog

Iceberg table supports Spark’s built-in catalog; we can configure spark_catalog to use Iceberg’s SparkSessionCatalog. The Iceberg session catalog loads non-Iceberg tables using the spark catalog (built-in catalog).

The catalog type hive defines that use Hive Catalog for Iceberg tables, and non-Iceberg tables will be created in the default spark catalog.

 

 

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive

 

 

You can provide these configurations based on your application development and deployment method set by your enterprise:

  • If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters.
  • From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf
  • From Jupyter notebook, you can use configure magic command as follows.
    %%configure
    { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0",
               "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
               "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog",
               "spark.sql.catalog.spark_catalog.type":"hive"
              }
    }

For example, if you run this code with the above configuration, you should see `iceberg_table` created in Hive Catalog as an external table and `spark_table` in Spark Catalog as a managed table.

 

 

spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (id string,
               creation_date string,
                 last_update_time string) USING iceberg""")
spark.sql("""CREATE TABLE IF NOT EXISTS spark_table (id string,
                creation_date string,
                last_update_time string)""")

 

 

You can use the following query on HMS to find the catalog name for `iceberg_table` and `spark_table`:

 

 

SELECT t.TBL_NAME,dbc.ctgName 
    FROM TBLS t INNER JOIN 
            (SELECT c.Name as ctgName, d.DB_ID as dbId 
                     FROM CTLGS c , DBS d 
                        WHERE d.CTLG_NAME = c.NAME
            ) dbc ON dbc.dbId=t.DB_ID
WHERE TBL_NAME IN ('iceberg_table','spark_table')

 

 

The output will be:

asethia_0-1677528676761.png

Custom Catalog

Iceberg supports multiple data catalog types such as Hive, Hadoop, JDBC, or custom catalog implementations. These catalogs are configured using the Hadoop configuration property spark.sql.catalog.<<catalogname>> and the type of catalog using spark.sql.catalog.<<catalogname>>.type.

 

Hive Catalog

Hadoop Catalog

spark.sql.catalog.<<catalogname>>=org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<<catalogname>>.type=hive

spark.sql.catalog.<<catalogname>>.uri = thrift://metastore-host:port

spark.sql.catalog.<<catalogname>>.warehouse=abfs://<<warehouse path>>

spark.sql.catalog.iceberg_hadoop =org.apache.iceberg.spark.SparkCatalog

spark.sql.catalog.iceberg_hadoop.type=hadoop

spark.sql.catalog.iceberg_hadoop.warehouse=abfs://<<warehouse path>>

 

 

The catalog.uri is optional; by default, it will use hive.metastore.uris from hive-site.xml (from Ambari -> Hive -> Advanced). The catalog.warehouse is optional; by default, it will use ‘Hive Metastore Warehouse directory’ from hive-site.xml (from Ambari -> Hive -> Advanced).

You can provide these configurations based on your application development and deployment method set by your enterprise:

  • If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters.
  • From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf
  • From Jupyter notebook, you can use configure magic command as follows (in this case we are using Hadoop catalog):
    %%configure
    { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0",
               "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
               "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog",
               "spark.sql.catalog.iceberg":"org.apache.iceberg.spark.SparkCatalog",
               "spark.sql.catalog.iceberg.type":"hadoop",
               "spark.sql.catalog.iceberg.warehouse":"/iceberg/warehouse"
              }
    }

Iceberg Hive Tables With Custom Catalog

The current HDInsight supports Apache Hive version 3.1.1; if you want to use Iceberg tables from Hive (catalog type hive), the HiveIcebergStorageHandler and supporting classes need to be made available on Hive’s classpath. These classes are available as part of Iceberg-Hive runtime jar.

 

You can add the Iceberg-Hive runtime jar by including the jar file to Hive’s auxiliary classpath, so it is available by default, or if you want to use Hive shell, then this can be achieved by stating so:

 

 

add jar /<<path>>/iceberg-hive-runtime-1.1.0.jar;

 

 

Iceberg Hive Tables using a custom catalog can be created in two ways:

 

  1. Using Hive DDL Command from hive-cli or beeline
    CREATE EXTERNAL TABLE customer (
         id bigint,
         name string
    ) PARTITIONED BY (
         state string
    ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    TBLPROPERTIES ('iceberg.catalog'='iceberg'); 
    The Table property 'iceberg.catalog'='iceberg' set table catalog to 'iceberg'.
  2. Using Iceberg Hive Catalog Java API – Example code is available at git repository.

 

 

import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.types.Types
import org.apache.iceberg.{PartitionSpec, TableProperties, Schema => IcebergSchema}
import org.apache.iceberg.CatalogProperties
import org.apache.spark.sql.SparkSession

val catalogName = "iceberg"
val nameSpace = "default"
val tableName = "customer"

def createTableByHiveCatalog(spark: SparkSession): Unit = {
  import scala.collection.JavaConverters._
  // table specification starts
  val schema= new IcebergSchema(
    Types.NestedField.required(1, "id", Types.IntegerType.get()),
    Types.NestedField.required(2, "name", Types.StringType.get()),
    Types.NestedField.required(3, "state", Types.StringType.get())
  )
  val spec = PartitionSpec.builderFor(schema).bucket("state", 128).build()
  import org.apache.iceberg.catalog.TableIdentifier
  val tableIdentifier: TableIdentifier = TableIdentifier.of(nameSpace,tableName)
  val tblProperties = Map(TableProperties.ENGINE_HIVE_ENABLED->"true","iceberg.catalog"->"iceberg")
  // table specification ends
  val catalog = new HiveCatalog()
  catalog.setConf(spark.sparkContext.hadoopConfiguration)
  val properties = Map(CatalogProperties.WAREHOUSE_LOCATION->"/iceberg/warehouse/")
  catalog.initialize(catalogName, properties.asJava)
  catalog.createTable(tableIdentifier, schema, spec,s"/iceberg/warehouse/${tableName}",tblProperties.asJava)
}

 

 

Example - Delta, Iceberg and Spark Parquet

Let us jump into the example code; in this example code, we will create sample data for Product master, Sales, and Return transactions using Mockneat. These data are stored in three file formats – Spark Parquet, Iceberg Parquet, and Delta Parquet.  The scenario setup is as follows:

iceberg_blog_design_layout.png

 

The sample code for using Jupyter notebook is available at hdinotebookexamples.

 

Limitation: There are limitations; at this moment, Delta Lake and Hudi currently do not support custom catalogs (out of the box). However, since Iceberg supports a custom catalog, the option is only to use spark_catalog to DeltaCatalog and create a custom catalog for Iceberg.

 

References:

 

  1. https://iceberg.apache.org/
  2. https://github.com/apache/hudi/issues/5537

HDInsight Open Table Formats:

 

  1. Hudi on HDInsight - https://murggu.medium.com/apache-hudi-on-hdinsight-8d981269a97a 
  2. Delta Lake on HDInsight - Delta Lake on HDInsight - Microsoft Community Hub
Version history
Last update:
‎Nov 08 2023 09:43 PM
Updated by: