Scale Real-Time Streams to Delta Lakehouse with Apache Flink on Azure HDInsight on AKS
Published May 15 2024 02:37 AM 1,822 Views
Microsoft

This post is co-authored with Keshav Singh, Principal Engineering Lead, Microsoft Purview Data Governance

 

In this blog, we turn the page and learn about enabling delta format as source and sink for stream processing with Apache Flink. Delta has become a DeFacto ACID compliant Lakehouse format for ecosystem enabling Petabyte scale processing while turning it a single source of truth, it becomes essential to bring it all together on top of Microsoft Fabric. Data engineering in delta format unifies diverse data sources into singular mode for analytics. Lastly as technologies such as Fabric endpoint, Synapse Serverless SQL will get efficient by the day, direct mode delta access will get cheaper and faster with no real need for an edge copy analytics.

 

Streaming Events can now be unified in Delta format as a sink for enabling Realtime analytics.

 

sairamyeturi_0-1715746717814.png

 

Let us consider a Sales Event Scenario, the event has demonstrated structure.

 

Sales Source EventSales Source Event

 

The Sales Source Event is stored in Delta Format on ADLS Gen2.

 

Delta Storage for SaleSource EventsDelta Storage for SaleSource Events

 

About 20 Million EventsAbout 20 Million Events

HDInsight on AKS Cluster Pool

 

Create a cluster pool to host a set of clusters, these could be Spark, Trino, Flink clusters. With a cluster pool as a concept, and a Platform as a service offering, HDInsight on AKS allows developers to quickly build up a data estate with all their favorite open source workloads, with full configurability and SKU sizing of their choice. 

 

Let's Provision the Pool.

 

HDInsight on AKS Cluster PoolHDInsight on AKS Cluster Pool

 

Next, provision a Flink Cluster, we went with a Session cluster. 

 

In nutshell a session cluster can share resources amongst multiple jobs while an Application Cluster will be resource dedicated towards a particular application.

 

Flink Session ClusterFlink Session Cluster

 

Once the cluster is provisioned update the flink-configs to add/load the Hadoop class path and ensure to load the cluster’s native class loaders.

 

Setting Hadoop Class PathSetting Hadoop Class Path

 

 

Upon applying the changes the cluster will restart, click on the Flink Dashboard and review its available. This is one point for DAG, execution logs and stream processing details.

 

sairamyeturi_1-1715762202018.png

 

Application Code

Here is our code for SteamProcessingJob.

This code simply reads the data from a Delta source and stream processes it to Delta Sinks.

 

 

 

package org.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.hadoop.conf.Configuration;
import java.util.ArrayList;
import java.util.Arrays;

public class StreamProcessingJob {

    public static final RowType ROW_TYPE = new RowType(Arrays.asList(
            new RowType.RowField("SalesId", new VarCharType(VarCharType.MAX_LENGTH)),
            new RowType.RowField("ProductName", new VarCharType(VarCharType.MAX_LENGTH)),
            new RowType.RowField("SalesDateTime", new TimestampType()),
            new RowType.RowField("SalesAmount", new IntType()),
            new RowType.RowField("EventProcessingTime", new TimestampType())
    ));

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://flink@<storage>.dfs.core.windows.net/Streams/SaleSink";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://flink@<storage>.dfs.core.windows.net/Streams/SaleSource";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, ROW_TYPE);
        // Execute the Flink job
        env.execute("FlinkDeltaSourceSinkExample");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "deltaSource");
    }
        public static DataStream<RowData> createDeltaSink(
                DataStream<RowData> stream,
                String deltaTablePath,
                RowType rowType) {
            DeltaSink<RowData> deltaSink = DeltaSink
                    .forRowData(
                            new Path(deltaTablePath),
                            new Configuration(),
                            rowType)
                    .build();
            stream.sinkTo(deltaSink);
            return stream;
        }
}

 

 

 

 

 Here is the POM.xml for the JAVA project

 

 

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>org.example</groupId>
 <artifactId>deltaflinkproject</artifactId>
 <version>1.0-SNAPSHOT</version>

 <properties>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <flink.version>1.17.0</flink.version>
  <java.version>1.8</java.version>
  <scala.binary.version>2.12</scala.binary.version>
  <hadoop-version>3.4.0</hadoop-version>
 </properties>
 <dependencies>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>io.delta</groupId>
   <artifactId>delta-standalone_2.12</artifactId>
   <version>3.0.0</version>
  </dependency>
  <dependency>
   <groupId>io.delta</groupId>
   <artifactId>delta-flink</artifactId>
   <version>3.0.0</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-parquet</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>${hadoop-version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-runtime</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.0.0</version>
    <configuration>
     <appendAssemblyId>false</appendAssemblyId>
     <descriptorRefs>
      <descriptorRef>jar-with-dependencies</descriptorRef>
     </descriptorRefs>
    </configuration>
    <executions>
     <execution>
      <id>make-assembly</id>
      <phase>package</phase>
      <goals>
       <goal>single</goal>
      </goals>
     </execution>
    </executions>
   </plugin>
  </plugins>
 </build>
</project>

 

 

 

 

 

 

Uploading JARUploading JAR

 

 

We build a JAR and upload to a convenient directory on ADLS Gen2.

 

sairamyeturi_3-1715764773363.png

 

At this point we are ready to submit the StreamProcessingJob on the Flink Cluster. Point to the jar location on the storage and provide the entry class details.

Submit Job on Flink ClusterSubmit Job on Flink Cluster

 

JobJob

 

Flink Processing Delta Source:SinkFlink Processing Delta Source:SinkFlink Processing Delta Source:SinkFlink Processing Delta Source:Sink

 

sairamyeturi_8-1715765120834.png

 

 

The Job processed the streams based on the logic defined in the JAVA jar code bits.

 

Callouts and Validations

 

Flink periodically commits into delta based on the configured checkpointing. env.enableCheckpointing(10000); In our case we issuing commits every 10 seconds.

NOTE : Now one of the critical observations to keep in mind for the initial dataset, incase your JobDuration < Checkpoint duration meaning, you have only 10 records and the job completes much before the first Checkpoint, you will observe only parquet files with no _delta_log directory since the first delta commit was never issued. This is an edge case worth calling out to remember the streaming the (unbounded )semantics are much different than (bounded) batch processing semantics.

The below screenshots depict a periodic processing of this data —

The initial run creates the parquet files but is yet to issue the first delta commit hence we observe only parquet files.

Initial processingInitial processing

 

 

Upon the first commit the delta is initialized and continues to process the streams based on the checkpoint duration.

 

Stream Processing ContinuesStream Processing Continues

 

Upon completion and checkpoints all in-progress files are committed.

 

Stream Processing Complete.Stream Processing Complete.

 

Checkpointing is a crucial feature in distributed stream processing frameworks like Flink to ensure fault tolerance and exactly-once semantics.

 

Relevance ofenv.enableCheckpointing(10000):

  1. Ensures Fault Tolerance: Checkpointing allows Flink to take snapshots of the state of the streaming application at regular intervals. In case of failures, Flink can use these snapshots to restore the state of the application and continue processing from the last successful checkpoint. This ensures fault tolerance and resilience against failures such as machine crashes, network issues, or software bugs.
  2. Consistent State: Checkpointing helps in maintaining consistent state in the face of failures. By periodically saving the state of the application, Flink guarantees that even if failures occur, the state can be recovered to a consistent point.
  3. Exactly-once Processing: Checkpointing, combined with Flink’s processing model, enables exactly-once semantics. With exactly-once processing, each record in the input streams is processed exactly once, even in the presence of failures and restarts. This is crucial for applications where data correctness is paramount, such as financial transactions or real-time analytics.
  4. Performance Considerations: The checkpointing interval (in this case, 10000 milliseconds or 10 seconds) is a trade-off between fault tolerance and performance. Shorter intervals provide better fault tolerance but can impact performance due to the overhead of taking and managing checkpoints. Longer intervals reduce this overhead but increase the potential amount of data loss in case of failures. Choosing an appropriate interval depends on the specific requirements of the application.
  5. Configuration Flexibility: Flink provides flexibility in configuring checkpointing behavior. Developers can tune various parameters such as checkpointing interval, checkpointing mode (e.g., exactly-once, at-least-once), state backend, and storage options based on the specific needs of their application and the underlying infrastructure.

Finally lets validate the delta sink for our processed streams.

 

Stream Sink ValidationsStream Sink Validations

 

Architectural Considerations

In contrast with other streaming offerings, 

 

  • Flink offers built-in support for managing complex stateful computations efficiently. It provides a unified runtime for both batch and stream processing, allowing seamless integration of stateful operations into streaming jobs. Flink’s state management capabilities include fault tolerance, exactly-once semantics, and flexible state backend options (e.g., memory, RocksDB).
  • Flink provides strong consistency guarantees with exactly-once processing semantics out of the box. It ensures that each event is processed exactly once, even in the presence of failures or restarts, making it suitable for mission-critical applications.
  • Flink is designed for high throughput and low-latency processing at scale. It supports fine-grained control over resource allocation and dynamic scaling, allowing efficient utilization of cluster resources. Flink’s pipelined execution model and advanced optimizations contribute to its superior performance.
  • Flink integrates seamlessly with other components of the Apache ecosystem, such as Apache Kafka, Apache Hadoop, and Apache Hive. It also provides connectors for integrating with various cloud platforms and data sources.

 

Flink excels in handling complex stateful workloads with its advanced state management, processing guarantees, scalability, and performance optimizations.

 
 

Conclusion

 

We have introduced Azure HDI on AKS (Flink) and emphasized the Delta Lakehouse story.

 

This blog is for those passionate data engineers who are data native and love to design a system, resilient/frugal/precise, write those hard lines of code, control their destiny and are curious to understand what lies under the hood. We dedicate this blog to all such, and extend you a warm welcome to a fully managed Apache Flink on Azure, with Azure HDInsight on AKS!

 

Get started today - Microsoft Azure
Read our documentation - What is Apache Flink® in Azure HDInsight on AKS? (Preview) - Azure HDInsight on AKS | Microsoft Lear...
Questions? Please reach out to us on aka.ms/askhdinsight

Co-Authors
Version history
Last update:
‎May 15 2024 02:46 AM
Updated by: