Spark
31 TopicsSecure Medallion Architecture Pattern on Azure Databricks (Part I)
This article presents a security-first pattern for Azure Databricks: a Medallion Architecture where Bronze, Silver and Gold each run as their Lakeflow Job and cluster, orchestrated by a parent job. Run-as identities are Microsoft Entra service principals; storage access is governed via Unity Catalog External Locations backed by the Access Connector’s managed identity. Least-privilege is enforced with cluster policies and UC grants. Prefer managed tables to unlock Predictive Optimisation, Automatic liquid clustering and Automatic statistics. Secrets live in Azure Key Vault and are read at runtime. Monitor reliability and cost with system tables and Jobs UI. Part II covers more low-level concepts and CI/CD.1.1KViews11likes0CommentsAzure Databricks Cost Optimization: A Practical Guide
Co-Authored by: Sanjeev Nair Sanjeev Nair and Rafia Aqil Rafia_Aqil This guide walks through a proven approach to Databricks cost optimization, structured in three phases: Discovery, Cluster/Data/Code Best Practices, and Team Alignment & Next Steps. Phase 1: Discovery Assessing Your Current State The following questions are designed to guide your initial assessment and help you identify areas for improvement. Documenting answers to each will provide a baseline for optimization and inform the next phases of your cost management strategy. Environment & Organization Cluster Management Cost Optimization Data Management Performance Monitoring Future Planning What is the current scale of your Databricks environment? How many workspaces do you have? How are your workspaces organized (e.g., by environment type, region, use case)? How many clusters are deployed? How many users are active? What are the primary use cases for Databricks in your organization? Data engineering Data science Machine learning Business intelligence How are clusters currently managed? Manual configuration Automated scripts Databricks REST API Cluster policies What is the average cluster uptime? Hours per day Days per week What is the average cluster utilization rate? CPU usage Memory usage What is the current monthly spend on Databricks? Total cost Breakdown by workspace Breakdown by cluster What cost management tools are currently in use? Azure Cost Management Third-party tools Are there any existing cost optimization strategies in place? Reserved instances Spot instances Cluster auto-scaling What is the current data storage strategy? Data lake Data warehouse Hybrid What is the average data ingestion rate? GB per day Number of files What is the average data processing time? ETL jobs Machine learning models What types of data formats are used in your environment? Delta Lake Parquet JSON CSV Other formats relevant to your workloads What performance monitoring tools are currently in use? Databricks Ganglia Azure Monitor Third-party tools What are the key performance metrics tracked? Job execution time Cluster performance Data processing speed Are there any planned expansions or changes to the Databricks environment? New use cases Increased data volume Additional users What are the long-term goals for Databricks cost optimization? Reducing overall spend Improving resource utilization & cost attribution Enhancing performance Understanding Databricks Cost Structure Total Cost = Cloud Cost + DBU Cost Cloud Cost: Compute (VMs, networking, IP addresses), storage (ADLS, MLflow artifacts), other services (firewalls), cluster type (serverless compute, classic compute) DBU Cost: Workload size, cluster/warehouse size, photon acceleration, compute runtime, workspace tier, SKU type (Jobs, Delta Live Tables, All Purpose Clusters, Serverless), model serving, queries per second, model execution time Diagnose Cost and Issues Effectively diagnosing cost and performance issues in Databricks requires a structured approach. Use the following steps and metrics to gain visibility into your environment and uncover actionable insights. 1. Identify Costly Workloads Account Console Usage Reports: Review usage reports to identify usage breakdowns by product, SKU name, and custom tags. Usage Breakdown by Product and SKU: Helps you understand which services and compute types (clusters, SQL warehouses, serverless options) are consuming the most resources. Custom Tags for Attribution: Tags allow you to attribute costs to teams, projects, or departments, making it easier to identify high-cost areas. Workflow and Job Analysis: By correlating usage data with workflows and jobs, you can pinpoint long-running or resource-heavy workloads that drive costs. Focus on Long-Running Workloads: Examine workloads with extended runtimes or high resource utilization. Key Question: Which pipelines or workloads are driving the majority of your costs? Now That You’ve Identified Long-Running Workloads, Review These Key Areas: 2. Review Cluster Metrics CPU Utilization: Track guest, iowait, idle, irq, nice, softirq, steal, system, and user times to understand how compute resources are being used. Memory Utilization: Monitor used, free, buffer, and cached memory to identify over- or under-utilization. Key Question: Is your cluster over- or under-utilized? Are resources being wasted or stretched too thin? 3. Review SQL Warehouse Metrics Live Statistics: Monitor warehouse status, running/queued queries, and current cluster count. Time Scale Filter: Analyze query and cluster activity over different time frames (8 hours, 24 hours, 7 days, 14 days). Peak Query Count Chart: Identify periods of high concurrency. Completed Query Count Chart: Track throughput and query success/failure rates. Running Clusters Chart: Observe cluster allocation and recycling events. Query History Table: Filter and analyze queries by user, duration, status, and statement type. Key Question: Is your SQL Warehouse over- or under-utilized? Are resources being wasted or stretched too thin? 4. Review Spark UI Stages Tab: Look for skewed data, high input/output, and shuffle times. Uneven task durations may indicate data skew or inefficient data handling. Jobs Timeline: Identify long-running jobs or stages that consume excessive resources. Stage Analysis: Determine if stages are I/O bound or suffering from data skew/spill. Executor Metrics: Monitor memory usage, CPU utilization, and disk I/O. Frequent garbage collection or high memory usage may signal the need for better resource allocation. 4.1. Spark UI: Storage & Jobs Tab Storage Level: Check if data is stored in memory, on disk, or both. Size: Assess the size of cached data. Job Analysis: Investigate jobs that dominate the timeline or have unusually long durations. Look for gaps caused by complex execution plans, non-Spark code, driver overload, or cluster malfunction. 4.2. Spark UI: Executor Tab Storage Memory: Compare used vs. available memory. Task Time (Garbage Collection): Review long tasks and garbage collection times. Shuffle Read/Write: Measure data transferred between stages. 5. Additional Diagnostic Methods System Tables in Unity Catalog: Query system tables for cost attribution and resource usage trends. Cost Observability Queries Tagging Analysis: Use tags to identify which teams or projects consume the most resources. Dashboards & Alerts: Set up cost dashboards and budget alerts for proactive monitoring. Phase 2: Cluster/Code/Data Best Practices Alignment Cluster UI Configuration and Cost Attribution Effectively configuring clusters/workloads in Databricks is essential for balancing performance, scalability, and cost. Tunning settings and features when used strategically can help organizations maximize resource efficiency and minimize unnecessary spending. Key Configuration Strategies 1. Reduce Idle Time: Clusters to incur costs even when not actively processing workloads. To avoid paying for unused resources: Enable Auto-Terminate: Set clusters automatically shut down after a period of inactivity. This simple setting can significantly reduce wasted spending. Enable Autoscaling: Workloads fluctuate in size and complexity. Autoscaling allows clusters to dynamically adjust the number of nodes based on demand: Automatic Resource Adjustment: Scale up for heavy jobs and scale down for lighter loads, ensuring you only pay for what you use. It significantly enhances cost efficiency and overall performance. For serverless and streaming, using Delta Live Tables with autoscaling is recommended. This approach leads to better resource management and reliability. Use Spot Instances: For batch processing and non-critical workloads, spot instances offer substantial cost savings: Lower VM Costs: Spot instances are typically much cheaper than standard VMs. However, they are not recommended for jobs requiring constant uptime due to potential interruptions. Considerations: Azure Spot VMs are intended for non-critical, fault-tolerant tasks. They can be evicted without notice, riskingproduction stability. No SLA guarantees mean potentialdowntime for critical applications. Using Spot VMs could lead to reliability issues in production environments. Leverage Photon Engine: Photon is Databricks’ high-performance, vectorized query engine: Accelerate Large Workloads: Photon can dramatically reduce runtime for compute-intensive tasks, improving both speed and cost efficiency. Keep Runtimes Up to Date: Using the latest Databricks runtime ensures optimal performance and security: Benefit from Improvements: Regular updates include performance enhancements, bug fixes, and new features. Apply Cluster Policies: Cluster policies help standardize configurations and enforce cost controls across teams: Governance and Consistency: Policies can restrict certain settings, enforce tagging, and ensure clusters are created with cost-effective defaults. Optimize Storage: type impacts both performance and cost: Switch from HDDs to SSDs: SSDs provide faster caching and shuffle operations, which can improve job efficiency and reduce runtime. Tag Clusters for Cost Attribution: Tagging clusters enables granular tracking and reporting: Visibility and Accountability: Use tags to attribute costs to specific teams, projects, or environments, supporting better budgeting and chargeback processes. Select the Right Cluster Type: Different workloads require different cluster types, see table below for Serverless vs Classic Compute: Feature Classic Compute Serverless Compute Control Full control over config & network Minimal control, fully managed by Databricks Startup Time Slower (unless pre-warmed) Instant Cost Model Hourly, supports reservations Pay-per-use, elastic scaling Security VNet injection, private endpoints NCC-based private connectivity Best For Heavy ETL, ML, compliance workloads Interactive queries, unpredictable demand Job Clusters: Ideal for scheduled jobs and Delta Live Tables. All-Purpose Clusters: Suited for ad-hoc analysis and collaborative work. Single-Node Clusters: Efficient for simple exploratory data analysis or pure Python tasks. Serverless Compute: Scalable, managed workloads with automatic resource management. 11. Monitor and Adjust Regularly: review cluster metrics and query history: Continuous Optimization: Use built-in dashboards to monitor usage, identify bottlenecks, and adjust cluster size or configuration as needed. Code Best Practices Avoid Reprocessing Large Tables Use a CDC (Change Data Capture) architecture with Delta Live Tables (DLT) to process only new or changed data, minimizing unnecessary computation. Ensure Code Parallelizes Well Write Spark code that leverages parallel processing. Avoid loops, deeply nested structures, and inefficient user-defined functions (UDFs) that can hinder scalability. Reduce Memory Consumption Tweak Spark configurations to minimize memory overhead. Clean out legacy or unnecessary settings that may have carried over from previous Spark versions. Prefer SQL Over Complex Python Use SQL (declarative language) for Spark jobs whenever possible. SQL queries are typically more efficient and easier to optimize than complex Python logic. Modularize Notebooks Use %run to split large notebooks into smaller, reusable modules. This improves maintainability. Use LIMIT in Exploratory Queries When exploring data, always use the LIMIT clause to avoid scanning large datasets unnecessarily. Monitor Job Performance Regularly review Spark UI to detect inefficiencies such as high shuffle, input, or output. Review the below table for optimization opportunities: Spark stage high I/O - Azure Databricks | Microsoft Learn Databricks Code Performance Enhancements & Data Engineering Best Practices By enabling the below features and applying best practices, you can significantly lower costs, accelerate job execution, and build Databricks pipelines that are both scalable and highly reliable. For more guidance review: Comprehensive Guide to Optimize Data Workloads | Databricks. Feature / Technique Purpose / Benefit How to Use / Enable / Key Notes Disk Caching Accelerates repeated reads of Parquet files Set spark.databricks.io.cache.enabled = true Dynamic File Pruning (DFP) Skips irrelevant data files during queries, improves query performance Enabled by default in Databricks Low Shuffle Merge Reduces data rewriting during MERGE operations, less need to recalculate ZORDER Use Databricks runtime with feature enabled Adaptive Query Execution (AQE) Dynamically optimizes query plans based on runtime statistics Available in Spark 3.0+, enabled by default Deletion Vectors Efficient row removal/change without rewriting entire Parquet file Enable in workspace settings, use with Delta Lake Materialized Views Faster BI queries, reduced compute for frequently accessed data Create in Databricks SQL Optimize Compacts Delta Lake files, improves query performance Run regularly, combine with ZORDER on high-cardinality columns ZORDER Physically sorts/co-locates data by chosen columns for faster queries Use with OPTIMIZE, select columns frequently used in filters/joins Auto Optimize Automatically compacts small files during writes Enable optimizeWrite and autoCompact table properties Liquid Clustering Simplifies data layout, replaces partitioning/ZORDER, flexible clustering keys Recommended for new Delta tables, enables easy redefinition of clustering keys File Size Tuning Achieve optimal file size for performance and cost Set delta.targetFileSize table property Broadcast Hash Join Optimizes joins by broadcasting smaller tables Adjust spark.sql.autoBroadcastJoinThreshold and spark.databricks.adaptive.autoBroadcastJoinThreshold Shuffle Hash Join Faster join alternative to sort-merge join Prefer over sort-merge join when broadcasting isn’t possible, Photon engine can help Cost-Based Optimizer (CBO) Improves query plans for complex joins Enabled by default, collect column/table statistics with ANALYZE TABLE Data Spilling & Skew Handles uneven data distribution and excessive shuffle Use AQE, set spark.sql.shuffle.partitions=auto, optimize partitioning Data Explosion Management Controls partition sizes after transformations (e.g., explode, join) Adjust spark.sql.files.maxPartitionBytes, use repartition() after reads Delta Merge Efficient upserts and CDC (Change Data Capture) Use MERGE operation in Delta Lake, combine with CDC architecture Data Purging (Vacuum) Removes stale data files, maintains storage efficiency Run VACUUM regularly based on transaction frequency Phase 3: Team Alignment and Next Steps Implementing Cost Observability and Taking Action Effective cost management in Databricks goes beyond configuration and code—it requires robust observability, granular tracking, and proactive measures. Below outlines how your teams can achieve this using system tables, tagging, dashboards, and actionable scripts. Cost Observability with System Tables Databricks Unity Catalog provides system tables that store operational data for your account. These tables enable historical cost observability and empower FinOps teams to analyze spend independently. System Tables Location: Found inside the Unity Catalog under the “system” schema. Key Benefits: Structured data for querying, historical analysis, and cost attribution. Action: Assign permissions to FinOps teams so they can access and analyze dedicated cost tables. Enable Tags for Granular Tracking Tagging is a powerful feature for tracking, reporting, and budgeting at a granular level. Classic Compute: Manually add key/value pairs when creating clusters, jobs, SQL Warehouses, or Model Serving endpoints. Use cluster policies to enforce custom tags. Serverless Compute: Create budget policies and assign permissions to teams or members for serverless workloads. Action: Tag all compute resources to enable detailed cost attribution and reporting. Track Costs with Dashboards and Alerts Databricks offers prebuilt dashboards and queries for cost forecasting and usage analysis. Dashboards: Visualize spend, usage trends, and forecast future costs. Prebuilt Queries: Use top queries with system tables to answer meaningful cost questions. Budget Alerts: Set up alerts in the Account Console (Usage > Budget) to receive notifications when spend approaches defined thresholds. Build Culture of Efficiency To go beyond technical fixes and build a culture of efficiency, by focusing on the below strategic actions: Collaborate with Internal Engineers: Spend time with engineering teams to understand workload patterns and optimization opportunities. Peer Reviews and Code Audits: Conduct regular code review sessions and peer reviews to ensure best practices are followed for Spark jobs, data pipelines, and cluster configurations. Create Internal Best Practice Documentation: Develop clear guidelines for writing optimized code, managing data, and maintaining clusters. Make these resources easily accessible for all teams. Implement Observability Dashboards: Use Databricks’ built-in features to create dashboards that track spend, monitor resource utilization, and highlight anomalies. Set Alerts and Budgets: Configure alerts for long-running workloads and establish budgets using prebuilt Databricks capabilities to prevent cost overruns. 5. Azure Reservations and Azure Savings Plan When optimizing Databricks costs on Azure, it’s important to understand the two main commitment-based savings options: Azure Reservations and Azure Savings Plans. Both can help you reduce compute costs, but they differ in flexibility and how savings are applied. Which Should You Choose? Reservations are ideal if you have stable, predictable Databricks workloads and want maximum savings. Savings Plans are better if you expect your compute needs to change, or if you want a simpler, more flexible way to save across multiple services. Pro Tip: You can combine both options—use Reservations for your baseline, always-on Databricks clusters, and Savings Plans for bursty, variable, or new workloads. Summary Table: Action Steps It’s critical to monitor costs continuously and align your teams with established best practices, while scheduling regular code review sessions to ensure efficiency and consistency. Area Best Practice / Action System Tables Use for historical cost analysis and attribution Tagging Apply to all compute resources for granular tracking Dashboards Visualize spend, usage, and forecasts Alerts Set budget alerts for proactive cost management Scripts/Queries Build custom analysis tools for deep insights Cluster/Data/Code Review & Align Regularly review best practices, share findings, and align teams on optimization Save on your Usage Consider Azure Reservations and Azure Savings Plan1.4KViews3likes0CommentsExternal Data Sharing With Microsoft Fabric
The demands and growth of data for external analytics consumption is rapidly growing. There are many options to share data externally and the field is very dynamic. One of the most frictionless and easy onboarding steps for external data sharing we will explore is with Microsoft Fabric. This external data allows users to share data from their tenant with users in another Microsoft Fabric tenant.6.2KViews3likes2CommentsHDInsight - Iceberg Open-Source Table Format
Author(s): Arun Sethia is a Program Manager in Azure HDInsight Customer Success Engineering (CSE) team. Introduction In my previous blog, we talked about leveraging Delta Lake on Azure HDInsight; this blog is dedicated to Iceberg open source table format. At this moment, HDInsight does not support Iceberg out of the box, so we will see how someone can leverage Iceberg Open Table Format in this blog. We will also see code examples to use tables from three data sources: Delta, Iceberg, and Spark native parquet files. A quick touch on Apache Iceberg - Apache Iceberg is an open-source table format that provides a transactional and versioned storage layer for big data analytics needs, originally developed to address issues in Apache Hive. It provides an alternative to Spark's default Parquet-based storage. It adds tables to compute engines, including Spark, Trino, Flink, Hive, etc., using a high-performance table format that works like a SQL table. Few silent features: Support Schema evolution without any side-effects Evolve the partition layout based on data volume and query pattern. Hidden Partition – There is no need to maintain a partition column (by transformation); it helps users not need to supply partition layout information when querying Iceberg tables. Support of time travel; useful to examine changes over time. Maintaining versions helps users to correct problems by resetting tables to a stable data version. Apache Iceberg provides an easy way to extend Spark with table specifications. In the next few sections, we will understand how to extend Spark (limited to Spark SQL) and configure the Apache Iceberg catalog with HDInsight. Spark Extension Before we jump into how to use Iceberg with HDInsight 5.0 and Spark, let us spend some time understanding a few concepts related to Spark extensions. It can help with the why and how part of the Iceberg configuration with HDInsight. Using SparkSessionExtensions provides various extension points to extend the Spark Session. The Spark SQL extension is a custom class to extend the behavior of the Spark SQL. The Spark SQL extension can be configured using the configuration property spark.sql.extensions. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. With these extensions, you can extend and customize the behavior of Spark SQL. You can specify a comma-separated list of fully qualified class names for the extension classes you want to use in Spark SQL. Some common Spark SQL extensions are CSV data source, Avro data source, query optimization rules, etc. Such custom application-specific Spark configuration can be made in multiple ways; based on your deployment methods: Either by passing the configuration part of the Spark-Submit command Using the configuration magic command from Jupyter or Programmatically from the Spark job code with the SparkSession object. Spark Catalog Spark Catalog is a centralized metadata repository that provides information on data and metadata in Spark. It stores information about data stored in Spark, tables, databases, partitions, etc. It supports SQL operations on metadata, such as CREATE table, ALTER table, etc. By default, Spark comes with two catalogs, Hive and In-memory. The Hive catalog is used to store metadata in a Hive Metastore, and the in-memory catalog is used to store metadata in-memory. Spark and Hive use independent catalogs to access tables created using Spark SQL or Hive tables. A table created by Spark resides in the Spark catalog. A table created from Hive resides in the Hive catalog. We can change the Spark default catalog to the Hive catalog using the configuration property metastore.catalog.default with the value Hive. In such case, your Spark tables are managed tables in the Hive catalog and must be transaction enabled, or you can have external tables without transaction enabled. The Spark configuration property 'spark.sql.catalog' configures the list of catalogs available in Spark SQL. It allows us to configure multiple catalogs and specify the default catalog in Spark SQL, the configuration property spark.sql.defaultCatalog allows you to set the default catalog. We can create custom catalogs by implementing the Catalog interface in Spark and registering the catalog with Spark using the "spark.sql.catalog.<catalog name>" configuration parameter, where “<catalog name >” is a unique identifier for your custom catalog. In addition to managing metadata, Spark persists Spark SQL data at the warehouse directory configured using spark.sql.warehouse.dir. Iceberg Extension Iceberg core library components enable integration with compute engines like Spark, Flink, etc. These connectors are maintained in the iceberg repository and they are built for multiple versions. Iceberg provides a runtime connector for different versions of Spark. The runtime jar (iceberg-spark-runtime) is the only addition to the classpath needed to use the Iceberg open-source table format. The Iceberg Spark connector provides an extension for Spark SQL by IcebergSparkSessionExtensions class; it adds support for Iceberg tables. The extension allows users to interact with Iceberg tables in Spark SQL using DataFrame and SQL APIs for Parquet tables. The following table provides Iceberg runtime compatibility matrix w.r.t Spark version: Spark Version Latest Iceberg Support Gradle 2.4 1.1.0 org.apache.iceberg:iceberg-spark-runtime-2.4:1.1.0 3.0 1.0.0 org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:1.0.0 3.1 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 3.2 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0 3.3 1.1.0 org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0 Using HDInsight, you can configure the connector either part of your uber jar (as dependent jar) or part of the Spark Submit command and SQL extension part of your application code or via Spark Submit. Configure Iceberg Runtime Connector & Extension Using Spark-Submit Provide Iceberg runtime connector dependency by supplying a Maven coordinate with –packages and provide Spark SQL extension using spark configuration property. spark-submit –packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions Using Application Code Provide Iceberg runtime connector as a maven dependency to your application pom.xml file. <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-${spark.major.version}_${scala.major.version}</artifactId> <version>1.1.0</version> </dependency> We can use SparkConf to set up the Iceberg Spark SQL extension and Iceberg runtime connector jar. val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0") Use SparkConf to build the Spark Session. val spark = SparkSession .builder() .config(sparkConf) Using Jupyter Notebook %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } } Iceberg Spark SQL Catalog Iceberg supplies the following Spark SQL catalog implementations: org.apache.iceberg.spark.SparkSessionCatalog – Support for Iceberg tables to Spark’s built-in catalog, and delegates to the built-in catalog for non-Iceberg tables org.apache.iceberg.spark.SparkCatalog – It supports a Hive Metastore or a Hadoop warehouse as a catalog Spark session Catalog Iceberg table supports Spark’s built-in catalog; we can configure spark_catalog to use Iceberg’s SparkSessionCatalog. The Iceberg session catalog loads non-Iceberg tables using the spark catalog (built-in catalog). The catalog type hive defines that use Hive Catalog for Iceberg tables, and non-Iceberg tables will be created in the default spark catalog. spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type = hive You can provide these configurations based on your application development and deployment method set by your enterprise: If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters. From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf From Jupyter notebook, you can use configure magic command as follows. %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog", "spark.sql.catalog.spark_catalog.type":"hive" } } For example, if you run this code with the above configuration, you should see `iceberg_table` created in Hive Catalog as an external table and `spark_table` in Spark Catalog as a managed table. spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (id string, creation_date string, last_update_time string) USING iceberg""") spark.sql("""CREATE TABLE IF NOT EXISTS spark_table (id string, creation_date string, last_update_time string)""") You can use the following query on HMS to find the catalog name for `iceberg_table` and `spark_table`: SELECT t.TBL_NAME,dbc.ctgName FROM TBLS t INNER JOIN (SELECT c.Name as ctgName, d.DB_ID as dbId FROM CTLGS c , DBS d WHERE d.CTLG_NAME = c.NAME ) dbc ON dbc.dbId=t.DB_ID WHERE TBL_NAME IN ('iceberg_table','spark_table') The output will be: Custom Catalog Iceberg supports multiple data catalog types such as Hive, Hadoop, JDBC, or custom catalog implementations. These catalogs are configured using the Hadoop configuration property spark.sql.catalog.<<catalogname>> and the type of catalog using spark.sql.catalog.<<catalogname>>.type. Hive Catalog Hadoop Catalog spark.sql.catalog.<<catalogname>>=org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<<catalogname>>.type=hive spark.sql.catalog.<<catalogname>>.uri = thrift://metastore-host:port spark.sql.catalog.<<catalogname>>.warehouse=abfs://<<warehouse path>> spark.sql.catalog.iceberg_hadoop =org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_hadoop.type=hadoop spark.sql.catalog.iceberg_hadoop.warehouse=abfs://<<warehouse path>> The catalog.uri is optional; by default, it will use hive.metastore.uris from hive-site.xml (from Ambari -> Hive -> Advanced). The catalog.warehouse is optional; by default, it will use ‘Hive Metastore Warehouse directory’ from hive-site.xml (from Ambari -> Hive -> Advanced). You can provide these configurations based on your application development and deployment method set by your enterprise: If you use the Spark-Submit command from edge nodes or Livy API to submit a job, then you can use ‘conf’ parameters. From your application code, you can use the SparkConf object to set these configurations and create SparkSession using SparkConf From Jupyter notebook, you can use configure magic command as follows (in this case we are using Hadoop catalog): %%configure { "conf": {"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.1.0", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog", "spark.sql.catalog.iceberg":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.iceberg.type":"hadoop", "spark.sql.catalog.iceberg.warehouse":"/iceberg/warehouse" } } Iceberg Hive Tables With Custom Catalog The current HDInsight supports Apache Hive version 3.1.1; if you want to use Iceberg tables from Hive (catalog type hive), the HiveIcebergStorageHandler and supporting classes need to be made available on Hive’s classpath. These classes are available as part of Iceberg-Hive runtime jar. You can add the Iceberg-Hive runtime jar by including the jar file to Hive’s auxiliary classpath, so it is available by default, or if you want to use Hive shell, then this can be achieved by stating so: add jar /<<path>>/iceberg-hive-runtime-1.1.0.jar; Iceberg Hive Tables using a custom catalog can be created in two ways: Using Hive DDL Command from hive-cli or beeline CREATE EXTERNAL TABLE customer ( id bigint, name string ) PARTITIONED BY ( state string ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' TBLPROPERTIES ('iceberg.catalog'='iceberg'); The Table property 'iceberg.catalog'='iceberg' set table catalog to 'iceberg'. Using Iceberg Hive Catalog Java API – Example code is available at git repository. import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.types.Types import org.apache.iceberg.{PartitionSpec, TableProperties, Schema => IcebergSchema} import org.apache.iceberg.CatalogProperties import org.apache.spark.sql.SparkSession val catalogName = "iceberg" val nameSpace = "default" val tableName = "customer" def createTableByHiveCatalog(spark: SparkSession): Unit = { import scala.collection.JavaConverters._ // table specification starts val schema= new IcebergSchema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "name", Types.StringType.get()), Types.NestedField.required(3, "state", Types.StringType.get()) ) val spec = PartitionSpec.builderFor(schema).bucket("state", 128).build() import org.apache.iceberg.catalog.TableIdentifier val tableIdentifier: TableIdentifier = TableIdentifier.of(nameSpace,tableName) val tblProperties = Map(TableProperties.ENGINE_HIVE_ENABLED->"true","iceberg.catalog"->"iceberg") // table specification ends val catalog = new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration) val properties = Map(CatalogProperties.WAREHOUSE_LOCATION->"/iceberg/warehouse/") catalog.initialize(catalogName, properties.asJava) catalog.createTable(tableIdentifier, schema, spec,s"/iceberg/warehouse/${tableName}",tblProperties.asJava) } Example - Delta, Iceberg and Spark Parquet Let us jump into the example code; in this example code, we will create sample data for Product master, Sales, and Return transactions using Mockneat. These data are stored in three file formats – Spark Parquet, Iceberg Parquet, and Delta Parquet. The scenario setup is as follows: The sample code for using Jupyter notebook is available at hdinotebookexamples. Limitation: There are limitations; at this moment, Delta Lake and Hudi currently do not support custom catalogs (out of the box). However, since Iceberg supports a custom catalog, the option is only to use spark_catalog to DeltaCatalog and create a custom catalog for Iceberg. References: https://iceberg.apache.org/ https://github.com/apache/hudi/issues/5537 HDInsight Open Table Formats: Hudi on HDInsight - https://murggu.medium.com/apache-hudi-on-hdinsight-8d981269a97a Delta Lake on HDInsight - Delta Lake on HDInsight - Microsoft Community Hub9.7KViews3likes0CommentsDelta Lake on HDInsight
Introduction Azure HDInsight is a managed, full-spectrum, open-source analytics service in the cloud for enterprises. HDInsight Apache Spark cluster is parallel processing framework that supports in-memory processing, it is based on Open-Source Apache Spark. Apache Spark is evolving; it’s efficiency and ease of use makes it a preferred big data tool among big data engineers and data scientists. There are few essential features missing from the Spark, one of them is A(Atomicity)C(Consistency)I(Isolation)D(Durability) transaction. Majority of databases supports ACID feature out of the box, when it comes to Storage layer (ADLS Gen2) it is hard to support similar level of ACID feature provided by databases. Delta Lake is a storage layer that brings ACID transactions to Apache Spark and big data workloads - for both streaming and batch operations. Delta Lake uses versioned Parquet files to store your data in your cloud storage. Apart from the versions, Delta Lake also stores a transaction log to keep track of all the commits made to the table or blob store directory to provide ACID transactions. This blog is not about Delta Lake; we will talk more about how you can leverage delta with HDInsight Spark Cluster, few code snippet and require configurations. Before we jump into code and require configurations, it is good for you check your Spark version from Ambari user interface from the HDI cluster. You need to pick the right delta lake versions based on your cluster Spark Version. The following table lists Delta Lake versions and their compatible Apache Spark versions: HDI Version Spark Version Delta Lake Version API URL 4.0 Spark 2.4.4 < 0.7.0 0.6.1 API Doc 5.0 Spark 3.1.2 1.0.x 1.0.1 API Doc HDInsight - Delta Lake Configuration Before we jump into code and configurations; we need to look into the below mentioned extendibility configurations provided by Spark: spark.sql.extensions – It is used to configure Spark Session extensions, by providing the name of the extension class. spark.sql.catalog.spark_catalog – This plugin configuration is used to configure custom catalog implementation. You can find the current catalog implementation from CatalogManager spark.sessionState.catalogManager.currentCatalog. The Spark 3.x uses SessionCatalog as default catalog. When you would like to use Delta Lake on Spark 3.x on HDI 5.0, you need to configure sql extensions and delta lake catalog with following values: Configuration Property Delta Lake Value Description spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension An extension for Spark SQL to activate Delta SQL parser to support Delta SQL grammar. spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog This replaces Spark’s default catalog by Delta Lake DeltaCatalog. The above configurations need to be provided as part of the Spark Configuration before any Spark session is created. Apart from the above Spark configurations, the Spark Application uber jar should provide Delta Lake dependency. Working with Spark 2.4.x with HDI 4.0 we just need to supply Delta Lake dependency, no additional spark configurations. To avoid class loading conflicts due to duplicate classes on the cluster classpath, we need to use the maven-shade-plugin to create an uber-jar with jackson dependencies. Example Code You can clone the example code from GitHub, the code is written in Scala. You can run example code using anyone of this option: Copy the application jar to the Azure Storage blob associated with the cluster. SSH to Headnode and run Spark-Submit from the headnode Or Using Livy API or Use Azure Toolkit for IntelliJ The example application will generate stdout logs and delta lake parquet files with commit logs. The output examples are listed on GitHub. Summary Delta Lake is an open-source storage framework that extends parquet data files with a file-based transaction log for ACID transactions and scalable metadata handling. Delta lake is fully compatible with Apache Spark APIs. Since the HDInsight Spark cluster is an installation of the Apache Spark library onto an HDInsight Hadoop cluster, the user can use compatible Delta Lake versions to take benefits of Delta Lake on HDInsight.5.8KViews3likes0CommentsClosing the loop: Interactive write-back from Power BI to Azure Databricks
This is a collaborative post from Microsoft and Databricks. We thank Toussaint Webb, Product Manager at Databricks, for his contributions. We're excited to announce that the Azure Databricks connector for Power Platform is now Generally Available. With this integration, organizations can seamlessly build Power Apps, Power Automate flows, and Copilot Studio agents with secure, governed data and no data duplication. A key functionality unlocked by this connector is the ability to write data back from Power BI to Azure Databricks. Many organizations want to not only analyze data but also act on insights quickly and efficiently. Power BI users, in particular, have been seeking a straightforward way to “close the loop” by writing data back from Power BI into Azure Databricks. This capability is now here - real-time updates and streamlined operational workflows with the new Azure Databricks connector for Power Platform. With this connector, users can now read from and write to Azure Databricks data warehouses in real time, all from within familiar interfaces — no custom connectors, no data duplication, and no loss of governance. How It Works: Write-backs from Power BI through Power Apps Enabling writebacks from Power BI to Azure Databricks is seamless. Follow these steps: Open Power Apps and create a connection to Azure Databricks (documentation). In Power BI (desktop or service), add a Power Apps visual to your report (purple Power Apps icon). Add data to connect to your Power App via the visualization pane. Create a new Power App directly from the Power BI interface, or choose an existing app to embed. Start writing records to Azure Databricks! With this integration, users can make real-time updates directly within Power BI using the embedded Power App, instantly writing changes back to Azure Databricks. Think of all the workflows that this can unlock, such as warehouse managers monitoring performance and flagging issues on the spot, or store owners reviewing and adjusting inventory levels as needed. The seamless connection between Azure Databricks, Power Apps, and Power BI lets you close the loop on critical processes by uniting reporting and action in one place. Try It Out: Get started with Azure Databricks Power Platform Connector The Power Platform Connector is now Generally Available for all Azure Databricks customers. Explore more in the deep dive blog here and to get started, check out our technical documentation. Coming soon we will add the ability to execute existing Azure Databricks Jobs via Power Automate. If your organization is looking for an even more customizable end-to-end solution, check out Databricks Apps in Azure Databricks! No extra services or licenses required.3.8KViews2likes2CommentsEnhanced autoscale capabilities in HDInsight clusters
HDInsight now has enhanced capabilities which include improved latency, and feedback loop alongside support for recommissioning nodemanagers in case of load-aware autoscale which improves cluster utilization massively and lowers the total cost of ownership significantly.5.1KViews2likes0Comments