This post is co-authored with Keshav Singh, Principal Engineering Lead, Microsoft Purview Data Governance
In this blog, we turn the page and learn about enabling delta format as source and sink for stream processing with Apache Flink. Delta has become a DeFacto ACID compliant Lakehouse format for ecosystem enabling Petabyte scale processing while turning it a single source of truth, it becomes essential to bring it all together on top of Microsoft Fabric. Data engineering in delta format unifies diverse data sources into singular mode for analytics. Lastly as technologies such as Fabric endpoint, Synapse Serverless SQL will get efficient by the day, direct mode delta access will get cheaper and faster with no real need for an edge copy analytics.
Streaming Events can now be unified in Delta format as a sink for enabling Realtime analytics.
Let us consider a Sales Event Scenario, the event has demonstrated structure.
The Sales Source Event is stored in Delta Format on ADLS Gen2.
Create a cluster pool to host a set of clusters, these could be Spark, Trino, Flink clusters. With a cluster pool as a concept, and a Platform as a service offering, HDInsight on AKS allows developers to quickly build up a data estate with all their favorite open source workloads, with full configurability and SKU sizing of their choice.
Let's Provision the Pool.
Next, provision a Flink Cluster, we went with a Session cluster.
In nutshell a session cluster can share resources amongst multiple jobs while an Application Cluster will be resource dedicated towards a particular application.
Once the cluster is provisioned update the flink-configs to add/load the Hadoop class path and ensure to load the cluster’s native class loaders.
Upon applying the changes the cluster will restart, click on the Flink Dashboard and review its available. This is one point for DAG, execution logs and stream processing details.
Here is our code for SteamProcessingJob.
This code simply reads the data from a Delta source and stream processes it to Delta Sinks.
package org.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.hadoop.conf.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
public class StreamProcessingJob {
public static final RowType ROW_TYPE = new RowType(Arrays.asList(
new RowType.RowField("SalesId", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("ProductName", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("SalesDateTime", new TimestampType()),
new RowType.RowField("SalesAmount", new IntType()),
new RowType.RowField("EventProcessingTime", new TimestampType())
));
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://flink@<storage>.dfs.core.windows.net/Streams/SaleSink";
// Define the source Delta table path
String deltaTablePath_source = "abfss://flink@<storage>.dfs.core.windows.net/Streams/SaleSource";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, ROW_TYPE);
// Execute the Flink job
env.execute("FlinkDeltaSourceSinkExample");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "deltaSource");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Here is the POM.xml for the JAVA project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>deltaflinkproject</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.4.0</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
We build a JAR and upload to a convenient directory on ADLS Gen2.
At this point we are ready to submit the StreamProcessingJob on the Flink Cluster. Point to the jar location on the storage and provide the entry class details.
The Job processed the streams based on the logic defined in the JAVA jar code bits.
Callouts and Validations
Flink periodically commits into delta based on the configured checkpointing. env.enableCheckpointing(10000); In our case we issuing commits every 10 seconds.
NOTE : Now one of the critical observations to keep in mind for the initial dataset, incase your JobDuration < Checkpoint duration meaning, you have only 10 records and the job completes much before the first Checkpoint, you will observe only parquet files with no _delta_log directory since the first delta commit was never issued. This is an edge case worth calling out to remember the streaming the (unbounded )semantics are much different than (bounded) batch processing semantics.
The below screenshots depict a periodic processing of this data —
The initial run creates the parquet files but is yet to issue the first delta commit hence we observe only parquet files.
Upon the first commit the delta is initialized and continues to process the streams based on the checkpoint duration.
Upon completion and checkpoints all in-progress files are committed.
Checkpointing is a crucial feature in distributed stream processing frameworks like Flink to ensure fault tolerance and exactly-once semantics.
Relevance ofenv.enableCheckpointing(10000)
:
Finally lets validate the delta sink for our processed streams.
In contrast with other streaming offerings,
Flink excels in handling complex stateful workloads with its advanced state management, processing guarantees, scalability, and performance optimizations.
We have introduced Azure HDI on AKS (Flink) and emphasized the Delta Lakehouse story.
This blog is for those passionate data engineers who are data native and love to design a system, resilient/frugal/precise, write those hard lines of code, control their destiny and are curious to understand what lies under the hood. We dedicate this blog to all such, and extend you a warm welcome to a fully managed Apache Flink on Azure, with Azure HDInsight on AKS!
Get started today - Microsoft Azure
Read our documentation - What is Apache Flink® in Azure HDInsight on AKS? (Preview) - Azure HDInsight on AKS | Microsoft Lear...
Questions? Please reach out to us on aka.ms/askhdinsight
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.