spark
41 TopicsData Vault 2.0 using Databricks Lakehouse Architecture on Azure
This Article is about Data Vault 2.0 using Databricks Lakehouse Architecture on Azure and is presented in partnership with VaultSpeed and Scalefree our Microsoft Partner Network on Data Warehouse Automation and is part of Blog series. Please see the Landing Page here for more Articles. This Article is Authored By Jonas De Keuster & Koen Moermans from VaultSpeed and Co-authored with Michael Olschimke, co-founder and CEO at Scalefree International GmbH The Technical Review is done by Ian Clarke, Naveed Hussain, Irfan Maroof and Hajar Habjaoui – GBBs (Cloud Scale Analytics) for EMEA at Microsoft16KViews1like3CommentsMigration of Apache Spark from HDInsight 5.0 to HDInsight 5.1
Azure HDInsight Spark 5.0 to HDI 5.1 Migration A new version of HDInsight 5.1 is released with Spark 3.3.1. This release improves join query performance via Bloom filters, increases the Pandas API coverage with the support of popular Pandas features such as datetime.timedelta and merge_asof, simplifies the migration from traditional data warehouses by improving ANSI compliance and supporting dozens of new built-in functions. In this article we will discuss about the migration of user applications from HDInsight Spark 3.1 to HDInsight Spark 3.315KViews1like0CommentsIngest Azure Event Hub Telemetry Data with Apache PySpark Structured Streaming on Databricks.
Overview. Ingesting, storing, and processing millions of telemetry data from a plethora of remote IoT devices and Sensors has become common place. One of the primary Cloud services used to process streaming telemetry events at scale is Azure Event Hub. Most documented implementations of Azure Databricks Ingestion from Azure Event Hub Data are based on Scala. So, in this post, I outline how to use PySpark on Azure Databricks to ingest and process telemetry data from an Azure Event Hub instance configured without Event Capture. My workflow and Architecture design for this use case include IoT sensors as the data source, Azure Event Hub, Azure Databricks, ADLS Gen 2 and Azure Synapse Analytics as output sink targets and Power BI for Data Visualization. Orchestration pipelines are built and managed with Azure Data Factory and secrets/credentials are stored in Azure Key Vault. Requirements. An Azure Event Hub service must be provisioned. I will not go into the details of provisioning an Azure Event Hub resource in this post. The steps are well documented on the Azure document site. Create an Azure Databricks workspace and provision a Databricks Cluster. To match the artifact id requirements of the Apache Spark Event hub connector: azure-eventhubs-spark_2.12, I have provisioned a Databricks cluster with the 7.5 runtime. To enable Databricks to successfully ingest and transform Event Hub messages, install the Azure Event Hubs Connector for Apache Spark from the Maven repository in the provisioned Databricks cluster. For this post, I have installed the version 2.3.18 of the connector, using the following maven coordinate: “com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18“. This library is the most current package at the time of this writing. Architecture. Azure Event Hub to Azure Databricks Architecture. Configuration and Notebook Code Prep. Create an Event Hub instance in the previously created Azure Event Hub namespace. Create a new Shared Access Policy in the Event Hub instance. Copy the connection string generated with the new policy. Note that this connection string has an “EntityPath” component , unlike the RootManageSharedAccessKey connectionstring for the Event Hub namespace. Install the Azure Event Hubs Connector for Apache Spark referenced in the Overview section. To authenticate and connect to the Azure Event Hub instance from Azure Databricks, the Event Hub instance connection string is required. The connection string must contain the EntityPath property. Please note that the Event Hub instance is not the same as the Event Hub namespace. The Event Hub namespace is the scoping container for the Event hub instance. The connection string located in the RootManageSharedAccessKey associated with the Event Hub namespace does not contain the EntityPath property, it is important to make this distinction because this property is required to successfully connect to the Hub from Azure Databricks. If the EntityPath property is not present, the connectionStringBuilder object can be used to make a connectionString that contains the required components. The connection string (with the EntityPath) can be retrieved from the Azure Portal as shown in the following screen shot: Event Hub Connection String Location. I recommend storing the Event Hub instance connection string in Azure Key Vault as a secret and retrieving the secret/credential using the Databricks Utility as displayed in the following code snippet: connectionString = dbutils.secrets.get("myscope", key="eventhubconnstr") An Event Hub configuration dictionary object that contains the connection string property must be defined. All configurations relating to Event Hubs are configured in this dictionary object. In addition, the configuration dictionary object requires that the connection string property be encrypted. # Initialize event hub config dictionary with connectionString ehConf = {} ehConf['eventhubs.connectionString'] = connectionString # Add consumer group to the ehConf dictionary ehConf['eventhubs.consumerGroup'] = "$Default" # Encrypt ehConf connectionString property ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString) Use the PySpark Streaming API to Read Events from the Event Hub. Now that we have successfully configured the Event Hub dictionary object. We will proceed to use the Structured Streaming readStream API to read the events from the Event Hub as shown in the following code snippet. # Read events from the Event Hub df = spark.readStream.format("eventhubs").options(**ehConf).load() # Visualize the Dataframe in realtime display(df) Using the Databricks display function, we can visualize the structured streaming Dataframe in real time and observe that the actual message events are contained within the “Body” field as binary data. Some transformation will be required to convert and extract this data. Visualize Events Dataframe in Real time. The goal is to transform the DataFrame in order to extract the actual events from the “Body” column. To achieve this, we define a schema object that matches the fields/columns in the actual events data, map the schema to the DataFrame query and convert the Body field to a string column type as demonstrated in the following snippet: # Write stream into defined sink from pyspark.sql.types import * import pyspark.sql.functions as F events_schema = StructType([ StructField("id", StringType(), True), StructField("timestamp", StringType(), True), StructField("uv", StringType(), True), StructField("temperature", StringType(), True), StructField("humidity", StringType(), True)]) decoded_df = df.select(F.from_json(F.col("body").cast("string"), events_schema).alias("Payload")) # Visualize the transformed df display(decoded_df) Further transformation is needed on the DataFrame to flatten the JSON properties into separate columns and write the events to a Data Lake container in JSON file format. # Flatten the JSON properties into separate columns df_events = decoded_df.select(decoded_df.Payload.id, decoded_df.Payload.timestamp, decoded_df.Payload.uv, decoded_df.Payload.temperature, decoded_df.Payload.humidity) # Write stream to Data Lake in JSON file formats df_out = df_events.writeStream\ .format("json")\ .outputMode("append")\ .option("checkpointLocation", "abfss://checkpointcontainer@adlstore.dfs.core.windows.net/checkpointapievents")\ .start("abfss://api-eventhub@store05.dfs.core.windows.net/writedata") Fully transformed DataFrame Specific business needs will require writing the DataFrame to a Data Lake container and to a table in Azure Synapse Analytics. The downstream data is read by Power BI and reports can be created to gain business insights into the telemetry stream. So far in this post, we have outlined manual and interactive steps for reading and transforming data from Azure Event Hub in a Databricks notebook. To productionize and operationalize these steps we will have to 1. Automate cluster creation via the Databricks Jobs REST API. 2. Automate the installation of the Maven Package. 3. Perhaps execute the Job on a schedule or to run continuously (this might require configuring Data Lake Event Capture on the Event Hub). To achieve the above-mentioned requirements, we will need to integrate with Azure Data Factory, a cloud based orchestration and scheduling service. As time permits, I hope to follow up with a post that demonstrates how to build a Data Factory orchestration pipeline productionizes these interactive steps. We could use a Data Factory notebook activity or trigger a custom Python function that makes REST API calls to the Databricks Jobs API. The complete PySpark notebook is available here.13KViews0likes0CommentsRediscover your data’s value by skilling up on Microsoft Fabric
Check out the latest Microsoft Fabric learning modules, skilling exercises, training offers, and live workshops with Azure experts to accelerate your enterprise organization's data potential with Microsoft Fabric10KViews0likes0CommentsHDInsight - Iceberg Open-Source Table Format
Author(s): Arun Sethia is a Program Manager in Azure HDInsight Customer Success Engineering (CSE) team. Introduction In my previous blog, we talked about leveraging Delta Lake on Azure HDInsight; this blog is dedicated to Iceberg open source table format. At this moment, HDInsight does not support Iceberg out of the box, so we will see how someone can leverage Iceberg Open Table Format in this blog. We will also see code examples to use tables from three data sources: Delta, Iceberg, and Spark native parquet files. A quick touch on Apache Iceberg - Apache Iceberg is an open-source table format that provides a transactional and versioned storage layer for big data analytics needs, originally developed to address issues in Apache Hive. It provides an alternative to Spark's default Parquet-based storage. It adds tables to compute engines, including Spark, Trino, Flink, Hive, etc., using a high-performance table format that works like a SQL table. Few silent features: Support Schema evolution without any side-effects Evolve the partition layout based on data volume and query pattern. Hidden Partition – There is no need to maintain a partition column (by transformation); it helps users not need to supply partition layout information when querying Iceberg tables. Support of time travel; useful to examine changes over time. Maintaining versions helps users to correct problems by resetting tables to a stable data version. Apache Iceberg provides an easy way to extend Spark with table specifications. In the next few sections, we will understand how to extend Spark (limited to Spark SQL) and configure the Apache Iceberg catalog with HDInsight. Spark Extension Before we jump into how to use Iceberg with HDInsight 5.0 and Spark, let us spend some time understanding a few concepts related to Spark extensions. It can help with the why and how part of the Iceberg configuration with HDInsight. Using SparkSessionExtensions provides various extension points to extend the Spark Session. The Spark SQL extension is a custom class to extend the behavior of the Spark SQL. The Spark SQL extension can be configured using the configuration property spark.sql.extensions. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. With these extensions, you can extend and customize the behavior of Spark SQL. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. Some common Spark SQL extensions are CSV data source, Avro data source, query optimization rules, etc. Such custom application-specific Spark configuration can be made in multiple ways; based on your deployment methods: Either by passing the configuration part of the Spark-Submit command Using the configuration magic command from Jupyter or Programmatically from the Spark job code with the SparkSession object. Spark Catalog Spark Catalog is a centralized metadata repository that provides information on data and metadata in Spark. It stores information about data stored in Spark, tables, databases, partitions, etc. It supports SQL operations on metadata, such as CREATE table, ALTER table, etc. By default, Spark comes with two catalogs, Hive and In-memory. The Hive catalog is used to store metadata in a Hive Metastore, and the in-memory catalog is used to store metadata in-memory. Spark and Hive use independent catalogs to access tables created using Spark SQL or Hive tables. A table created by Spark resides in the Spark catalog. A table created from Hive resides in the Hive catalog. We can change the Spark default catalog to the Hive catalog using the configuration property metastore.catalog.default with the value Hive. In such case, your Spark tables are managed tables in the Hive catalog and must be transaction enabled, or you can have external tables without transaction enabled. The Spark configuration property 'spark.sql.catalog' configures the list of catalogs available in Spark SQL. It allows us to configure multiple catalogs and specify the default catalog in Spark SQL, the configuration property spark.sql.defaultCatalog allows you to set the default catalog. We can create custom catalogs by implementing the Catalog interface in Spark and registering the catalog with Spark using the "spark.sql.catalog.<catalog name>" configuration parameter, where “<catalog name >” is a unique identifier for your custom catalog. In addition to managing metadata, Spark persists Spark SQL data at the warehouse directory configured using spark.sql.warehouse.dir. Iceberg Extension Iceberg core library components enable integration with compute engines like Spark, Flink, etc. These connectors are maintained in the iceberg repository and they are built for multiple versions. Iceberg provides a runtime connector for different versions of Spark. The runtime jar (iceberg-spark-runtime) is the only addition to the classpath needed to use the Iceberg open-source table format. The Iceberg Spark connector provides an extension for Spark SQL by IcebergSparkSessionExtensions class; it adds support for Iceberg tables. The extension allows users to interact with Iceberg tables in Spark SQL using DataFrame and SQL APIs for Parquet tables. The following table provides Iceberg runtime compatibility matrix w.r.t Spark version: Spark Version Latest Iceberg Support Gradle 2.4 1.1.0 org.apache.iceberg:iceberg-spark-runtime-2.4:1.1.0 3.0 1.0.0 org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:1.0.0 3.1 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 3.2 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0 3.3 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0 Using HDInsight, you can configure the connector either part of your uber jar (as dependent jar) or part of the Spark Submit command and SQL extension part of your application code or via Spark Submit. Configure Iceberg Runtime Connector & Extension Using Spark-Submit Provide Iceberg runtime connector dependency by supplying a Maven coordinate with –packages and provide Spark SQL extension using spark configuration property. spark-submit –packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions Using Application Code Provide Iceberg runtime connector as a maven dependency to your application pom.xml file. <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-${spark.major.version}_${scala.major.version}</artifactId> <version>1.1.0</version> </dependency> We can use SparkConf to set up the Iceberg Spark SQL extension and Iceberg runtime connector jar. val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0") Use SparkConf to build the Spark Session. val spark = SparkSession .builder() .config(sparkConf) Using Jupyter Notebook %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } } Iceberg Spark SQL Catalog Iceberg supplies the following Spark SQL catalog implementations: org.apache.iceberg.spark.SparkSessionCatalog – Support for Iceberg tables to Spark’s built-in catalog, and delegates to the built-in catalog for non-Iceberg tables org.apache.iceberg.spark.SparkCatalog – It supports a Hive Metastore or a Hadoop warehouse as a catalog Spark session Catalog Iceberg table supports Spark’s built-in catalog; we can configure spark_catalog to use Iceberg’s SparkSessionCatalog. The Iceberg session catalog loads non-Iceberg tables using the spark catalog (built-in catalog). The catalog type hive defines that use Hive Catalog for Iceberg tables, and non-Iceberg tables will be created in the default spark catalog. spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type = hive You can provide these configurations based on your application development and deployment method set by your enterprise: If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters. From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf From Jupyter notebook, you can use configure magic command as follows. %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog", "spark.sql.catalog.spark_catalog.type":"hive" } } For example, if you run this code with the above configuration, you should see `iceberg_table` created in Hive Catalog as an external table and `spark_table` in Spark Catalog as a managed table. spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (id string, creation_date string, last_update_time string) USING iceberg""") spark.sql("""CREATE TABLE IF NOT EXISTS spark_table (id string, creation_date string, last_update_time string)""") You can use the following query on HMS to find the catalog name for `iceberg_table` and `spark_table`: SELECT t.TBL_NAME,dbc.ctgName FROM TBLS t INNER JOIN (SELECT c.Name as ctgName, d.DB_ID as dbId FROM CTLGS c , DBS d WHERE d.CTLG_NAME = c.NAME ) dbc ON dbc.dbId=t.DB_ID WHERE TBL_NAME IN ('iceberg_table','spark_table') The output will be: Custom Catalog Iceberg supports multiple data catalog types such as Hive, Hadoop, JDBC, or custom catalog implementations. These catalogs are configured using the Hadoop configuration property spark.sql.catalog.<<catalogname>> and the type of catalog using spark.sql.catalog.<<catalogname>>.type. Hive Catalog Hadoop Catalog spark.sql.catalog.<<catalogname>>=org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<<catalogname>>.type=hive spark.sql.catalog.<<catalogname>>.uri = thrift://metastore-host:port spark.sql.catalog.<<catalogname>>.warehouse=abfs://<<warehouse path>> spark.sql.catalog.iceberg_hadoop =org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_hadoop.type=hadoop spark.sql.catalog.iceberg_hadoop.warehouse=abfs://<<warehouse path>> The catalog.uri is optional; by default, it will use hive.metastore.uris from hive-site.xml (from Ambari -> Hive -> Advanced). The catalog.warehouse is optional; by default, it will use ‘Hive Metastore Warehouse directory’ from hive-site.xml (from Ambari -> Hive -> Advanced). You can provide these configurations based on your application development and deployment method set by your enterprise: If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters. From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf From Jupyter notebook, you can use configure magic command as follows (in this case we are using Hadoop catalog): %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog", "spark.sql.catalog.iceberg":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.iceberg.type":"hadoop", "spark.sql.catalog.iceberg.warehouse":"/iceberg/warehouse" } } Iceberg Hive Tables With Custom Catalog The current HDInsight supports Apache Hive version 3.1.1; if you want to use Iceberg tables from Hive (catalog type hive), the HiveIcebergStorageHandler and supporting classes need to be made available on Hive’s classpath. These classes are available as part of Iceberg-Hive runtime jar. You can add the Iceberg-Hive runtime jar by including the jar file to Hive’s auxiliary classpath, so it is available by default, or if you want to use Hive shell, then this can be achieved by stating so: add jar /<<path>>/iceberg-hive-runtime-1.1.0.jar; Iceberg Hive Tables using a custom catalog can be created in two ways: Using Hive DDL Command from hive-cli or beeline CREATE EXTERNAL TABLE customer ( id bigint, name string ) PARTITIONED BY ( state string ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' TBLPROPERTIES ('iceberg.catalog'='iceberg'); The Table property 'iceberg.catalog'='iceberg' set table catalog to 'iceberg'. Using Iceberg Hive Catalog Java API – Example code is available at git repository. import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.types.Types import org.apache.iceberg.{PartitionSpec, TableProperties, Schema => IcebergSchema} import org.apache.iceberg.CatalogProperties import org.apache.spark.sql.SparkSession val catalogName = "iceberg" val nameSpace = "default" val tableName = "customer" def createTableByHiveCatalog(spark: SparkSession): Unit = { import scala.collection.JavaConverters._ // table specification starts val schema= new IcebergSchema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "name", Types.StringType.get()), Types.NestedField.required(3, "state", Types.StringType.get()) ) val spec = PartitionSpec.builderFor(schema).bucket("state", 128).build() import org.apache.iceberg.catalog.TableIdentifier val tableIdentifier: TableIdentifier = TableIdentifier.of(nameSpace,tableName) val tblProperties = Map(TableProperties.ENGINE_HIVE_ENABLED->"true","iceberg.catalog"->"iceberg") // table specification ends val catalog = new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration) val properties = Map(CatalogProperties.WAREHOUSE_LOCATION->"/iceberg/warehouse/") catalog.initialize(catalogName, properties.asJava) catalog.createTable(tableIdentifier, schema, spec,s"/iceberg/warehouse/${tableName}",tblProperties.asJava) } Example - Delta, Iceberg and Spark Parquet Let us jump into the example code; in this example code, we will create sample data for Product master, Sales, and Return transactions using Mockneat. These data are stored in three file formats – Spark Parquet, Iceberg Parquet, and Delta Parquet. The scenario setup is as follows: The sample code for using Jupyter notebook is available at hdinotebookexamples. Limitation: There are limitations; at this moment, Delta Lake and Hudi currently do not support custom catalogs (out of the box). However, since Iceberg supports a custom catalog, the option is only to use spark_catalog to DeltaCatalog and create a custom catalog for Iceberg. References: https://iceberg.apache.org/ https://github.com/apache/hudi/issues/5537 HDInsight Open Table Formats: Hudi on HDInsight - https://murggu.medium.com/apache-hudi-on-hdinsight-8d981269a97a Delta Lake on HDInsight - Delta Lake on HDInsight - Microsoft Community Hub9.5KViews3likes0Comments