In this blog post, we will explore the use of the SAP CDC connector within Azure Data Factory/Azure Synapse Analytics to extract data from different SAP Frameworks by leveraging metadata tables. In the following example, we will extract data from the S4 HANA Finance system and save it as a delta table in ADLS Gen2. To read and report on the delta table, we will explore the capabilities of Microsoft Fabric.
Table of Contents
Introduction to SAP CDC Connector
Different SAP Frameworks and SAP CDC connector support
Demo: Metadata-driven pipeline
Creation of Metadata-driven Pipeline
cachedSinkColumnSelection sink
(Optional) - derivedColumn1ChangeEvent
Visualization of the Data in Microsoft Fabric
Attaching the Delta tables into Microsoft Fabric Lakehouse.
SAP CDC connector enables the possibility of connecting SAP applications to various Azure services. The connector is part of Azure Data Factory and Azure Synapse Analytics.
Working with customers so far with available data extraction SAP connectors in ADF/Synapse, we constantly got feedback that there should be an easy way to extract only the incremental/recently loaded data from SAP sources to the Azure data services without extra maintenance and overhead.
You can use a manual, limited workaround to extract mostly new or updated records. In a process called watermarking, extraction requires using a timestamp column, monotonically increasing values, and continuously tracking the highest value since the last extraction. But some tables don't have a column that you can use for watermarking. This process also doesn't identify a deleted record as a change in the dataset.
To address and overcome these challenges, SAP CDC connector comes in handy and helps us to maintain the connectivity to our SAP sources and also extract the delta changes in a much easier way.
SAP CDC connector works on ODP framework, and it supports all SAP systems - ECC, S/4HANA, BW, BW/4HANA, HANA etc., irrespective of location (On-Prem, Multi-Cloud, RISE, etc.)
Change data capture (CDC) | SAP Help Portal
The CDC process of data extraction uses the ODP framework to replicate the delta in a SAP source dataset.
Data extraction via ODP requires a properly configured user on SAP systems. The user must be authorized for ODP API invocations over Remote Function Call (RFC) modules. The user configuration is the same configuration that's required for data extractions via ODP from SAP source systems into BW or BW/4HANA.
The delta tracking or the CDC is mainly and only governed by checkpoint key for SAP CDC pipeline runs. Hence the checkpoint key is particularly important for the incremental delta extractions of data and to track the history of data extracted for each ODP object. Please be very careful when selecting the checkpoint key and try to use a unique checkpoint key for each ODP object. In this demo we will be parameterizing the checkpoint key for each ODP object.
SAP CDC advanced topics - Azure Data Factory | Microsoft Learn
The change type indicator basically means the type of operation that happened on the data, it can be insert, update, upsert or delete. To track the type of operation that happened on our source data we will use a derived column transformation in our dataflow and see those results.
Staging location
When creating a pipeline for SAP CDC extraction process you will have to point out a staging location. This can be your ADLS gen2 storage account, this serves as an intermediate layer for the extracted data from SAP source to do any optimizations or transformations on the data. Please be mindful of not using this staging location for any other purposes like copy activity or data flows, etc.
Run Mode
The CDC connector supports three types of run modes:
Depending upon your business needs you can set this to be the most appropriate one for you.
Read more about this here: SAP CDC advanced topics - Azure Data Factory | Microsoft Learn
In this demo we will be showcasing both the behaviors ‘Full on the first run, then incremental’ and ‘Full on every run’.
In this example, we are going to explore the following.
We have detailed documentation (here) regarding the prerequisites that we need before using the SAP CDC connector. Here are some high-level prerequisites.
SAP Pre-requisites:
Minimum SAP system requirements:
For ODP-based extraction scenarios:
For ABAP CDS-based extraction scenarios.
For SLT based scenarios:
Configure Integration User:
To connect the SAP system (Source System) to ADF, we would require a user on the SAP system that can be used in the linked service configuration.
Data extractions via ODP require a properly configured user on SAP systems. The user must be authorized for ODP API invocations over Remote Function Call (RFC) modules. The user configuration is the same configuration that's required for data extractions via ODP from SAP source systems into BW or BW/4HANA.
In this demo, we will be using the SAP S4 HANA 2021 CAL version as a source system and loading the data in delta format in ADLS gen2 storage account. We will eventually extend this to Microsoft Fabric using OneLake shortcut functionality and demonstrate the report on the extracted data.
Step-by-step guidance is mentioned here.
It's essential to emphasize that the subscriber’s name must be unique, as the SAP system identifies the connector based on this name. This uniqueness greatly simplifies monitoring and troubleshooting of the connector within the SAP environment, particularly within the Operation Data Queue (ODQMON).
In this setup the SAP Servers are hosted in Azure VM which are inside the Azure Virtual Network, hence we are providing the Private IP address in the Server Name.
In this case, we need 2 metadata tables.
IF OBJECT_ID(N'[dbo].[tbl_metadata]', 'U') IS NOT NULL
DROP TABLE [dbo].[tbl_metadata];
GO
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[tbl_metadata]') AND type in (N'U'))
CREATE TABLE [dbo].[tbl_metadata] (
checkPointKey varchar(64) NOT NULL,
sapContext varchar(64) NOT NULL,
sapObjectName varchar(64) NOT NULL,
sapRunMode varchar(255) NOT NULL,
sapKeyColumns varchar(255) NOT NULL,
sapPartitions varchar(255) NULL,
destdeltaContainer varchar(50) NOT NULL,
deltaKeyColumns varchar(255) NOT NULL,
destdeltaFolder varchar(50) NOT NULL,
deltaPartition varchar(250) NULL,
stagingStorageFolder varchar(255),
columnSelectionAndMappingIdentifier varchar(50)
)
Truncate table dbo.tbl_metadata
GO
INSERT INTO dbo.tbl_metadata
SELECT
'CheckPointFor_CMMPOITMDX$F-1' AS checkPointKey,
'ABAP_CDS' AS sapContext,
'CMMPOITMDX$F' AS sapObjectName,
'fullAndIncrementalLoad' AS sapRunMode,
' ["PURCHASEORDER", "PURCHASEORDERITEM"]' AS sapKeyColumns,
'[ [{ "fieldName": "PURCHASEORDER", "sign": "I", "option": "BT", "low": "4500000001" , "high": "4500001000" }]]' AS sapPartitions,
'datafs' AS destdeltaContainer,
'["PURCHASEORDER", "PURCHASEORDERITEM"]' AS deltaKeyColumns,
'dest/delta_tables/CMMPOITMDX_F_sel' AS deltaFolder,
'PURCHASEORDER' AS deltaPartition,
'adf_staging/CMMPOITMDX_F_sel' AS stagingStorageFolder,
'CMMPOITMDX_F_sel' AS columnSelectionAndMappingIdentifier
GO
INSERT INTO dbo.tbl_metadata
SELECT
'CheckPointFor_CSDSLSDOCITMDX1$' AS checkPointKey,
'ABAP_CDS' AS sapContext,
'CSDSLSDOCITMDX1$F' AS sapObjectName,
'fullAndIncrementalLoad' AS sapRunMode,
' [ "SALESDOCUMENT", "SALESDOCUMENTITEM"]' AS sapKeyColumns,
'' AS sapPartitions,
'datafs' AS destdeltaContainer,
' [ "SALESDOCUMENT", "SALESDOCUMENTITEM"]' AS deltaKeyColumns,
'dest/delta_tables/CSDSLSDOCITMDX1_F_sel' AS deltaFolder,
'' AS deltaPartition,
'adf_staging/CSDSLSDOCITMDX1_F_sel' AS stagingStorageFolder,
'CSDSLSDOCITMDX1_F_sel' AS columnSelectionAndMappingIdentifier
IF OBJECT_ID(N'[dbo].[tbl_columnSelectionAndMapping]', 'U') IS NOT NULL
DROP TABLE [dbo].[tbl_columnSelectionAndMapping];
GO
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[tbl_columnSelectionAndMapping]') AND type in (N'U'))
CREATE TABLE tbl_columnSelectionAndMapping(
columnSelectionAndMappingIdentifier varchar(50),
prevColumnName varchar(50),
newColumnName varchar(50),
PRIMARY KEY (columnSelectionAndMappingIdentifier, prevColumnName)
)
/*
inserting the columns names and mapping.
*/
TRUNCATE TABLE tbl_columnSelectionAndMapping
GO
INSERT INTO tbl_columnSelectionAndMapping
Select 'CSDSLSDOCITMDX1_F_sel' ,'SALESDOCUMENT' ,'SALESDOCUMENT'
UNION ALL
Select 'CSDSLSDOCITMDX1_F_sel' ,'SALESDOCUMENTITEM' ,'SALESDOCUMENTITEM'
UNION ALL
Select 'CSDSLSDOCITMDX1_F_sel' ,'SDDOCUMENTCATEGORY' ,'CATEGORY'
GO
INSERT INTO tbl_columnSelectionAndMapping
Select 'CMMPOITMDX_F_sel' ,'PURCHASEORDER' ,'PURCHASEORDER'
UNION ALL
Select 'CMMPOITMDX_F_sel' ,'PURCHASEORDERITEM' ,'PURCHASEORDERITEM'
UNION ALL
Select 'CMMPOITMDX_F_sel' ,'PURCHASINGORGANIZATION' ,'ORG'
Here is the pipeline that we are going to create for this use case:
The Lookup activity reads the data from tbl_metadata and supplies the value to the ForEach Activity.
Here are the DataFlow top-level Settings from the pipeline view.
The parameters can be declared from the DataFlow pane, and the values can be assigned from the pipeline view. The parameter values are assigned from the ForEach item values.
Here is the different component view of the Dataflow.
Please note that in this data flow, we have 2 sinks.
This source is used to read the data from the other metadata table that we have for the column mapping and selection.
We are caching the output from the mapping table, so that we can use this in the main dataflow activities.
We are reading the source data from the SAP Finance system here. More detail about the parametrization of the fields can be found here.
Using the select transformation, we select the required columns from the source data, and perform the column renaming / mapping before saving the data into sink.
In the expression builder put the below expression
Matching condition: !isNull(cachedSinkColumnSelection#lookup(name).prevColumnName)
cachedSinkColumnSelection#lookup($$).newColumnName
Output column name expression: cachedSinkColumnSelection#lookup($$).newColumnName
The data from the select1 is merged (insert/update) with the target delta table.
In various situations, we opt to develop our custom data transformation logic by leveraging change events (update, insert, delete events). For instance, if we aim to establish a Slowly Changing Dimension Type 2 (SCD Type 2) within our data model, the standard merging process in the mapping dataflow in ADF may not suffice, requiring the creation of our unique logic for implementation. During the development of the data ingestion process, it becomes crucial to understand how alterations occurred in the SAP system. These change-events enable us to replicate these events at the Azure End effectively. This is precisely where the utilization of the changeEvent-derived column becomes imperative. Here is the expression for the changeEvent derive column: case(isInsert(),"I",isDelete(), "D", isUpdate(), "U", isUpsert(), 'U')
File name: concat($sapObjectName, '-', toString(currentTimestamp(), 'yyyy-MM-ddHHmmss'), '.csv')
Note: When you run a pipeline it’s good practice to trigger the pipeline and not run it using debug option. With debug option there is a dependency with the UI session for the subscriber process. The checkpoint key used for extraction using debug will change when a UI session gets switched making the extraction process for delta tracking reset. You can read more about it here. The best option would be to trigger the pipeline rather debug.
Full Extraction.
We will now execute the pipeline for the first time. For this example, we are retrieving the data from 2 tables. The first run is going to be full extract.
The run mode in the mapping table is kept FullandIncremental for the table: CSDSLSDOCITMDX1$F, and Full for CMMPOITMDX$F.
Delta Queues Monitoring (ODQMON):
Here we can see the first load as mentioned as the “Initial Load”. And count 6499 is matching with the count that we got in the dataflow in the previous screenshot.
Incremental/Delta Extraction.
Now, we are changing the record in a sales order table. The data also can be changed from the SAP Frontend (GUI or WEBGUI) by altering the transaction data.
Once we run the adf pipeline, we can see that for the item table it’s a full load again (as per the mapping table), and for the sales table, we have the incremental load.
From the ODQMON, let’s look at the last transaction.
So, if we look at the flat file, we will be able to see the same changes with the change event.
Since the data is in delta format in ADLS Gen2, we just need to create a shortcut to the storage into Lakehouse in Microsoft Fabric. Once this is done, we will be able to explore the data using the Microsoft Fabric spark engine/ sql engine.
How to create a shortcut to a Delta table.
Please take note that in order to enable the data to automatically refresh upon completion of data writing by the Azure Data Factory, it is necessary to establish a table shortcut, rather than using the managed table from the File shortcut.
In this comprehensive demonstration, we illustrated the process of extracting data from SAP through a metadata-driven pipeline. While we didn't delve deeply into data transformation here, you have the flexibility to use DataFlow transformation activities or a Spark notebook to implement complex transformation logic before loading the data into a Delta table. If your use case aligns with the Extraction (E) – Load (L) – Transformation (T) workflow, you can certainly follow the aforementioned tutorial and then use a Spark notebook to create your business logic on top of the Delta table, thereby preparing it for the serving layer or presentation layer. We trust that this blog post will serve as a valuable starting point for constructing your pipeline to import SAP data into Azure.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.