Automating Azure Synapse Analytics and Azure Analysis Services Tabular, or Power BI Dataset

Published Feb 22 2021 03:13 PM 8,739 Views
Microsoft

Writer: Derek Daniels, Senior Business Intelligence Consultant

Technical Reviewers:

  • Mark Kromer, Principal Program Manager, Azure Data Factory (Microsoft)
  • Patrick LeBlanc, Principal Program Manager, Power BI Customer Account Team (Microsoft)
  • Phil Bennett, IT Software Development Engineer (Microsoft)

 

Published: January 2021

Applies to: SQL Server Integration Services (on-premises), Azure Data Factory, Azure Synapse Analytics workspace (formerly Azure SQL DW), Azure Analysis Services and Azure Logic Apps

 

Summary

This article describes an example of how to automate an ELT (extract, load, transform) for your data warehouse and tabular model reporting solutions (AAS (Azure Analysis Services) or Power BI (PBI) dataset) within Azure Synapse Analytics workspace (/studio).  The key differentiator of this paper is that the data sits on-premises, and Azure Data Factory is prohibited (i.e. blocked) from accessing the data but, SSIS on-premises can access the data. 

 

Introduction

If you’re looking at moving your on-premises BI solution to Azure to leverage the power of Azure Synapse Analytics, but your data is hidden behind a firewall that won’t allow Azure Data Factory access, then this solution could help you.  This article will provide you with a possible Azure solution architecture design to assist you.

 

This article was created when I began my journey of moving my clients data from an on-premises 2016 SQL Server to Azure. At that time there was no one article that showed a possible end-to-end solution.  Here I’ve captured a lot the steps I took to craft my final Automated Azure self-serve BI Solution.

 

This article assumes you’ve already successfully setup and configured the following:

  1. A self-hosted IR (integration runtime)
  2. Azure Synapse Analytics workspace.  If not, you can start by going to Microsoft Docs Get Started with Azure Synapse Analytics.  Make sure database permissions are setup correctly under Manage > Security > Access Control (Secure your Synapse workspace (preview)).
    1. If you prefer YouTube videos, Advancing Analytics - The Synapse Sessions might be a good start too.
  3. Have a working SSIS (SQL Server Integration Services) environment already setup for deployment. 
  4. Built out and deployed a tabular model.  If not, there are numerous resources to assist you with this, including Microsoft Documents Lesson 1: Create a New Tabular Model Project.

 

Setup

First (if you haven’t already), download and install Azure Feature Pack for Integration Services (SSIS) for your version of SSIS.  Within this feature pack you’ll want to leverage the Flexible File Destination component to save off the data as Parquet file format in ADLS Gen. 2 (Azure Data Lake Storage Gen. 2/ Blob). The caveat with Parquet files is that they have a major dependence on Java.  This created quite the hiccup for me.  While I used Oracle's Java SE Runtime Environment, I still had to make some changes to the System Properties where the SSIS Environment sits.  I found the 11 minute YouTube video How to Install Java JDK on Windows 10 (with JAVA_HOME) (March 2020) quite helpful in setting up and validating the setup.

 

Within your new SSIS Project, you’ll need to create connections to your source system(s) and Azure Storage connection manager.

 

Extract Load Transform (ELT)

The advantage of placing the data directly from the source into Blob Storage is that it strips away any metadata type mismatch issues.  Once completed the data is transformed into a star schema to be later consumed by AAS. 

There are three primary ways to connect to a server (database engine):

  1. Windows Authentication
  2. SQL Server Authentication
  3. Azure Active Directory – Integrated

If your Data Factory can create a linked service to your data source via Windows Authentication or SQL Server Authentication, then you should be able to leverage Azure Data Factory Managed Virtual Network (preview).  If your data sources requires you to use a Service Principal, and you can use Azure Databricks (Azure Synapse Analytics workspace Notebook), then you should consider PySpark.

 

Unfortunately, my situation only allows for Azure Active Directory – Integrated, and at the time of publication of this paper, ADF doesn’t support Active Directory Integrated Authentication.  Which is why I used SSIS to pull my fact table.  However, my dimension tables are located in ADLS Gen. 2, and so I’m able to leverage PySpark.

 

Extract Load

My Source dataset is a destructive load, so my dataset must be a destructive load too.  This step pulls from two different data sources (on-premises & Azure) and allows me to call two activities simultaneously (General – Stored procedure, and Synapse – Notebook).

 

Extract Load with SSIS

I chose to use SSIS Sequence Containers for each of my data sources, and then create the Data Flow Tasks within (please see image on the right).  That way, if there was an issue with the source system it’s easier to identify.

 

The General – Stored procedure activity leverages an on-premises SSIS 2016 Data Flow Task to copy data from on-premises SQL Server (doesn’t accommodate Azure Data Factory) and paste it into a secure ADLS Gen. 2 repository. 

 

Here you have to explicitly state that you want the Flexible File Destination task to save the data as a Parquet file.  If this isn’t setup correctly (System Settings) then you won’t be able to perform this task successfully.

  • Location
    • Folder Path – this is Hadoop (/Linux) based, therefore case sensitive
    • File Name – needs to contain both file name, and file type extension (this case “.parquet”)
  • Format
    • File Format – select Parquet from the drop down

Once I deployed the package I used Tim Mitchell’s article A Better Way To Execute SSIS Packages With T-SQL (November 2016) (GitHub) to create a SP (Stored Procedure) to call and wait for completion of the SSIS Package.  This will be used in the automated data orchestration.

 

Extract Load with PySpark

Appendix A – PySpark (Python) Destructive Load contains most of the code I used to pull data from ADLS Gen 2 via PySpark.  Connecting to ADLS Gen 2 has two options depending how source data has been configured:

The next part stumped me for several weeks.  Since each time the PySpark notebook is ran, the Parquet file name keep changing in the folder path.  Thus, a dynamic naming convention needs to be leveraged.  However, once I realized I could create an external table to a folder location (not a file path) that solved my issue.  Here’s how I created my external table for DIM Currency.

 

CREATE EXTERNAL TABLE ext.DIM_Currency (

    [DIM_CurrencyId] int,

    [CurrencyCode] varchar(8000),

    [CurrencyName] varchar(8000)

    )

    WITH (

    LOCATION = '<file path>/DIM_Currency/',

    DATA_SOURCE = [dfs_core_windows_net],

    FILE_FORMAT = [SynapseParquetFormat],

    REJECT_TYPE = VALUE,

    REJECT_VALUE = 0

    )

GO

 

Extract data with Scala

Appendix B – Spark (Scala) Destructive Load  contains most of the code I used to pull data from Azure SQL Database (SQL script with table joins and filtering), and wrote the results into the ADLS Gen 2 environment.  I connected the new parquet file the same way I did with the PySpark script.

 

Extract data with Scala - Update (2/22/2021)

The data source for one of the RDD parquet files had changing column counts (would increase or decrease with no downstream communications), which would always break my stored procedure process.  The best way to solve this issue was to write a select statement a new file path for RDD parquet files.

 

To get his option up and running I had to update the permissions on the tmp/hive folder for the “other” profile.  This was successfully completed by opening up MS Azure Storage Explorer and clicking down into the tmp/hive folder within the applicable Azure Storage Blob Container, and granting the “Other” group/ profile Read, Write and Execute abilities.

 

#Cell created to keep column count consistent

df = spark.read.parquet("abfss://___@___.dfs.core.windows.net/FilePath/")

dfNew = df.select("<<Individual Field Names>>")

dfNew.write.mode('overwrite').parquet("abfss://___@___.dfs.core.windows.net/NewFilePath/")

 

Transform

This part requires some more setup in the form of mapping the Parquet files in ADLS as External Tables within Synapse (Microsoft Docs Use external tables with Synapse SQL).  This needs to be done for both Synapse SQL pool, and SQL on-demand.  A shortcut to doing this is to go to the Data section, click on the “Linked” icon. Drill down to where one of your Parquet files is located, right click on the file, select New SQL Script > Create external table, select the SQL pool, select the database, type in the table schema and table name in the text box (I created a “stg” schema for all of my staging tables). 

 

Once I created all of the necessary external tables, I wrote two destructive load stored procedures: one for the fact tables and one for the dimension tables.  At this time, all of my tables are destructive loads (fact and dimensions) through Stored Procedures in the SQL Pool.  In my environment I loaded the transformed (star schematized) tables into the “dbo” schema.  From there I wrote SQL Views in the “bi” schema to perform “(NOLOCK)” on the dbo tables.  If you’re not familiar with NOLOCK table hint, MS SQL Tips wrote an article explaining it, Understanding the SQL Server NOLOCK hint (April 2019).  Having the “bi” schema also helps me limit the access I grant to users, as I may not want to always give them access to the source table.

 

Publish

Create Tabular Model (AAS or PBI)

Once your data is properly prepped in Synapse Analytics, you’ll need to build out a tabular model either in SSDT (SQL Server Data Tools/ Visual Studios) or Tabular Editor, and deploy it to your AAS Server.  Or you can use Power BI Desktop and publish it. 

 

Both options have the ability to leverage DirectQuery but, there are some caveats to be mindful of.  They can be found in the DirectQuery model guidance in Power BI.  This is because at the time of writing this, this is a requirement for Materialized Views.

 

Additionally, at the time of writing of this article, Materialized Views cannot be created on Existing Views and they don't support RLS.

 

Automation with Synapse Data Factory (Orchestration)

My Synapse Data Factory solution has several parts largely divided up into 3 segments.  The first is ELT, the second cleans the tabular model, and the third performs a full refresh on the tabular model.  A constraint I have is that all my tables are a destructive load (non-incremental).  I would prefer I have the option to perform an incremental (partition) refresh.

 

Using the SQL BI Vertipaq Analyzer, I quickly discovered my model was just over 10 GB..  Upon reviewing the AAS Pricing Tiers, one would expect the S1 tier to be sufficient to perform a full refresh.  Unfortunately, my experience proved otherwise giving me an ambiguous Azure Data Factory Error Code 2108, which meant insufficient memory.  I discovered this when I chose to “clearValues”, then perform a full refresh.  This workaround also prevented me from spending more money than I needed to.

 

Microsoft’s Mark Kromer helped me automate the AAS portion by showing me James Coulter article’s Process Azure Analysis Services Models with Azure Data Factory v2 (March 2020), it also shows how to refresh table partitions.  From here I created two Data Factory Pipelines (clear values, and full).  James figured out how to call and wait for the correct response from AAS.

 

If your client wishes to use the Power BI instead of AAS, Meagan Longoria wrote Refreshing a Power BI Dataset in Azure Data Factory (July 2020).  In May of 2018 Power BI Premium allowed for incremental refresh, and in 2020 that option was available for Power BI Pro licenses too.  The YouTube Channel “Guy in a Cube” has covered this topic.

 

My Production Synapse Orchestration Pipeline has the following sequential task:

  1. Stored procedure activity
    1. Calls the SSIS SP
  2. SQL pool stored procedure activity (dimension tables)
    1. Calls the destructive load of the dimension tables within Synapse
  3. SQL pool stored procedure activity (fact tables)
    1. Calls the destructive load of the fact tables within Synapse
  4. Execute Pipeline
    1. AAS
      1.       clear values
      2.       full
    2. PBI
  5. Web activity (Azure Logic App call)
 

Update 3/16/2021

I was encountering timeout errors.  After a discussion with Azure Support, and finding an article by Chris Webb titled Advanced Options For Loading Data From SQL Server With Power Query (September 2014).  In the article it talks about updating the “CommandTimeout” portion of the PowerQuery M code.

  • CommandTimeout=#duration(1,2,3,4),
    • 1 = day
    • 2 = hours
    • 3 = minutes
    • 4 = seconds

The SQL Statement for the “Table Properties” now looks like this:

Table Properties window

DerekDaniels_0-1615933192684.png

 

Code

let

    Source = #"SQL/<<ServerName>> database windows net;<<DB Name>>",

    CommandTimeout = #duration(0,0,30,0),

    BI_vwDimCalendar = Source{[Schema="BI",Item="vwDimCalendar"]}[Data]

in

    BI_vwDimCalendar

 

This change seems to have alleviated my issues with ambiguous error code 2108.

Create Azure Logic Apps for messaging

Adam Marczak created a great 30 minute YouTube video labeled Azure Data Factory Custom Email Notifications Tutorial (June 2020).  It provides a detailed step by step process to create an Azure Logic App to send a custom email.  He suggests as a best practice to create a master pipeline that calls the pipeline doing all of the work to help with error handling.

 

Appendix A – PySpark (Python) Destructive Load

%%spark

spark.conf.set("fs.azure.account.auth.type", "OAuth")

spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")

spark.conf.set("fs.azure.account.oauth2.client.id", "") //Principal App (client) ID

spark.conf.set("fs.azure.account.oauth2.client.secret", "") //Secret Key

spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/ (directory/ tenant ID) /oauth2/token") //Directory (tenant) ID

//YouTube: Azure Databricks - Accessing Data Lake - Using a Service Principal (April 2020) by Dinesh Priyankara

//https://www.youtube.com/watch?v=odlf3wXBSpY&list=PLEebLi3cHr6SsEJfsrxNze4-Ksju3cQyI&index=7&t=0s

 

# pyspark.sql.SparkSession Main entry point for DataFrame and SQL functionality.

from pyspark.sql import SparkSession

 

# Define build file Uniform Resource Identifier (URI) for source

def build_file_uri(accountName: str, blobName: str, dirName: str) -> str:

 

    baseFilePath = "abfss://{}@{}.dfs.core.windows.net/Layer01/Layer02/{}/" #File Tree Path

    filePath = baseFilePath.format(blobName, accountName, dirName)

    filePath = filePath + "{*}"

 

    return filePath

 

# Define build file Uniform Resource Identifier (URI) for destination

def build_file_uri_alt(accountName: str, blobName: str, dirName: str) -> str:

 

    baseFilePath = "abfss://{}@{}.dfs.core.windows.net/Layer01/Layer02/{}/"

    filePath = baseFilePath.format(blobName, accountName, dirName)

    #filePath = filePath + "{*}"

 

    return filePath

 

def build_file_uris(accountName: str, blobName: str, dirNames: [str]) -> [str]:

 

    paths = []

 

    for name in dirNames:

 

        path = build_file_uri(accountName, blobName, name)

        paths.append(path)

    

    return paths

 

def build_file_uris_alt(accountName: str, blobName: str, dirNames: [str]) -> [str]:

 

    paths = []

 

    for name in dirNames:

 

        path = build_file_uri_alt(accountName, blobName, name)

        paths.append(path)

    

    return paths

 

# Define copy type = overwrite

def copy(inputFilePath: str, outputFilePath: str, writeMode: str = "overwrite", sparkSession: object = None) -> bool:

 

    try:

 

        if not sparkSession:

            spark = SparkSession.builder.getOrCreate()

 

        spark = SparkSession.builder.getOrCreate()

        df = spark.read.parquet(inputFilePath)

        #df = df.repartition(1)  #provide a single consolidated file (not partitioned)

        df.write.mode(writeMode).parquet(outputFilePath)

    except Exception as e:

        raise e

 

# Define copy plural

def copy_many(sourceAccount: str, destinationAccount: str, sourceBlob: str, destinationBlob: str, tableNames: [str]):

 

    sourceUris = build_file_uris(sourceAccount, sourceBlob, tableNames)

    destinationUris = build_file_uris_alt(destinationAccount, destinationBlob, tableNames)

 

    spark = SparkSession.builder.getOrCreate()

 

    for i in range(len(sourceUris)):

        sourcePath = sourceUris[i]

        destPath = destinationUris[i]

        copy(sourcePath, destPath, sparkSession = spark)

        print("copied | {} | to | {} |".format(sourcePath, destPath))

 

#if __name__ == "__main__":

 

sourceAccount = "<Storage Account Name>" #Source Account Name

destinationAccount = "<Storage Account Name>" #Destination Account Name

sourceBlob = "<Blob Container Name>" #Source Blob Container Name

destinationBlob = "<Blob Container Name>" #Destination Blob Container Name

 

tables = ["DIM_Date", "DIM_BusinessHierarchy"]

 

copy_many(sourceAccount, destinationAccount, sourceBlob, destinationBlob, tables)

 

Appendix B – Spark (Scala) Destructive Load

//Use HDInsight Spark cluster to read and write data to Azure SQL Database

//https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-connect-to-sql-database

 

// Declare the values for your database

val jdbcUsername = "<UserName>"

val jdbcPassword = "<Password>"

val jdbcHostname = "<Name>.database.windows.net" //typically, this is in the form or servername.database.windows.net

val jdbcPort = 1433

val jdbcDatabase ="<DatabaseName>”

 

//Create Connection String

import java.util.Properties

 

val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"

val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")

connectionProperties.put("password", s"${jdbcPassword}")

 

//SQL Databases using JDBC (https://docs.databricks.com/data/data-sources/sql-databases.html)

//Push down a query to the database engine (https://docs.databricks.com/data/data-sources/sql-databases.html#push-down-a-query-to-the-database-e...)

// Note: The parentheses are required.

val pushdown_query = """(SELECT DISTINCT PFam.*, PO.* 

FROM dbo.PriceListItems PLI

    JOIN dbo.Products Prod ON PLI.ProductSKID = Prod.ProductSKID

    JOIN dbo.ProductFamilies PFam ON Prod.ProductFamilySKID = Pfam.ProductFamilySKID

    JOIN dbo.ProgramOfferings PO ON PLI.ProgramOfferingSKID = PO.ProgramOfferingSKID

WHERE PLI.ValidTo >= DATEADD(MONTH,-6, CONVERT(datetime, CONVERT(varchar, MONTH(SYSUTCDATETIME())) + '-01-' + CONVERT(varchar, YEAR(SYSUTCDATETIME()))))

AND PO.ProgramName LIKE '%Enterprise%') EA_PriceList"""

 

val dfPriceListEA = spark.read.jdbc(url=jdbc_url, table=pushdown_query, properties=connectionProperties)

//display(df)

//df.printSchema

 

val filePath = "abfss://<BlobContainerName>@<StorageAccountName>.dfs.core.windows.net/<FilePath>"

dfPriceListEA.write.mode("overwrite").parquet(filePath)

 

 

Conclusion

Creating this fully automated data refresh solution allowed me to focus my efforts elsewhere, such as meaningful data visualization with Power BI.  Hopefully this article will help you reap similar rewards.

 

For more information:

Azure Web site

Azure Synapse TechCenter

Co-Authors
Version history
Last update:
‎Sep 15 2021 02:00 PM
Updated by: