delta
6 TopicsADX Continuous Export to Delta Table - Public Preview
We're excited to announce that continuous export to Delta table is now available in Preview. Continuous export in ADX allows you to export data from Kusto to an external table with a periodically run query. The results are stored in the external table, which defines the destination, such as Azure Blob Storage, and the schema of the exported data. This process guarantees that all records are exported "exactly once", with some exceptions. Continous export previously supported CSV, TSV, JSON and Parquet formats.4.5KViews1like1CommentNeed Help: Accessing 'Shared' DriveItem Property in SharePoint Without Delta API
Hello everyone! I am attempting to retrieve all root drive items from a folder, including the 'Shared' property of each drive item. However, even when I explicitly include the 'shared' attribute in my QueryOption object, it does not seem to reflect any changes. It always returns an object with the "Scope" property set to "user", even if the default permissions have not been altered for the item. Interestingly, when utilizing the Delta API, the behavior observed is that only items with set permissions (beyond the default ones) return a valid, non-null "Shared" object with the "Scope" property set to 'user'. Conversely, items without set permissions (i.e., default permissions only) return a null "Shared" object. This difference effectively helps in determining whether each item was actually shared. The reason I need to specifically fetch a list of ROOT drive items for a folder, rather than utilizing the Delta API to track changes (which could potentially traverse through sub-folders and their children/grandchildren on an initial query), is due to the requirement to perform a clean fetch of all items for specific SharePoint site libraries. These libraries often contain thousands of subfolders and files, while we are only interested in the root items of a particular folder for caching purposes. The Delta API variant could potentially take over an hour per folder to traverse through, which is not feasible for our needs. Unless there's a way to restrict Delta API calls to only directly access a root folder's children, I'm seeking alternative solutions. Here is my Select query option (C#) : new QueryOption("select", " id, name, createdDateTime, lastModifiedBy, lastModifiedDateTime, size, parentReference, permissions, sharepointIds, file, eTag, webDavUrl, deleted, folder, shared") As you can see I've included the 'shared' attribute in this query. My Graph API request (C#) : driveItemRequest.ItemWithPath(formattedPath).Children.Request(options) Is there any way I can obtain the desired "Shared" property without resorting to the Delta API for all root items in a folder? Alternatively, is there a method involving Permissions endpoints that I can use to achieve a similar result? At this stage, I am only in need of a boolean flag indicating whether permissions have been modified or not, as mentioned earlier. Thanks in advance to anyone who can offer any guidance on this matter ! Kind regards, Facundo.506Views0likes0CommentsUnderstanding the Notebooks in the Oversharing Template v2 (Microsoft Fabric)
Introduction The Microsoft Graph Data Connect for SharePoint team published two notebooks used with Microsoft Fabric in the Information Oversharing v2 template. This blog explains what each code block inside these notebooks does. The goal was to help you understand what the notebooks do. Note that this document was written with help from Copilot, using simple prompts like “Analyze each section of this Jupyter notebook with PySpark and Scala code. Describe what each section does.” Notebook 1: Read Last Snapshot Dates This first notebook runs right as the pipeline starts. It checks the environment, verifies if the Sites and Permission tables exist in the Lakehouse, checks the last day data was gathered from MGDC and calculates the start and end date to use. It also cleans the staging tables and stores a few commands that are used in later steps. Section 0 – Set the Default Lakehouse for Notebook to Run %%configure { "defaultLakehouse": { "name": { "parameterName": "lakehouseName", "defaultValue": "defaultlakehousename" } } } This section uses the %%configure magic command to set a JSON configuration that defines a parameter (lakehouseName) with the default value "defaultlakehousename". This setting ensures that when the notebook is launched through a pipeline, it dynamically selects the target Lakehouse. Section 1 – Initialize Parameters import java.time.LocalDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.UUID import java.text.SimpleDateFormat import java.time.{LocalDate, LocalDateTime, Period} import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.Calendar import java.sql.Timestamp val runId = "00000000-0000-0000-0000-000000000000" val workspaceId = spark.conf.get("trident.workspace.id") val workspaceName = "LakeHouseTesting" val lakehouseId = spark.conf.get("trident.lakehouse.id") val lakehouseName = "IMAXDefault" val sitesStagingTableName = "Sites_Staging" val sitesFinalTableName = "Sites" val permissionsStagingTableName = "Permissions_Staging" val permissionsFinalTableName = "Permissions" val endTime = "2024-11-15T00:00:00Z" spark.conf.set("spark.sql.caseSensitive", true) This section imports various libraries for date/time handling and initializes key parameters for the ETL process. These include a run identifier (runId), workspace and Lakehouse information (with some values coming from Spark configuration), table names for staging and final datasets, and a fallback endTime. It also enforces case sensitivity in Spark SQL. Section 2 – Checking Required Final Tables Exist or Not val lakehouse = mssparkutils.lakehouse.get(lakehouseName) val lakehouseId = lakehouse.id val workspaceName = notebookutils.runtime.context("currentWorkspaceName") val permissionsStagingLocation = s"abfss://${workspaceName}@onelake.dfs.fabric.microsoft.com/${lakehouseName}.Lakehouse/Tables/${permissionsStagingTableName}" val sitesStagingLocation = s"abfss://${workspaceName}@onelake.dfs.fabric.microsoft.com/${lakehouseName}.Lakehouse/Tables/${sitesStagingTableName}" val sitesFinalLocation = s"abfss://${workspaceName}@onelake.dfs.fabric.microsoft.com/${lakehouseName}.Lakehouse/Tables/${sitesFinalTableName}" val permissionsFinalLocation = s"abfss://${workspaceName}@onelake.dfs.fabric.microsoft.com/${lakehouseName}.Lakehouse/Tables/${permissionsFinalTableName}" val tables = spark.catalog.listTables() val siteTableCount = tables.filter(col("name") === lit(sitesFinalTableName) and array_contains(col("namespace"), lakehouseName) ).count() val permissionsTableCount = tables.filter(col("name") === lit(permissionsFinalTableName) and array_contains(col("namespace"), lakehouseName)).count() val siteStagingTableCount = tables.filter(col("name") === lit(sitesStagingTableName) and array_contains(col("namespace"), lakehouseName) ).count() val permissionsStagingTableCount = tables.filter(col("name") === lit(permissionsStagingTableName) and array_contains(col("namespace"), lakehouseName)).count() This section retrieves the Lakehouse object and uses it to construct ABFS paths for both staging and final tables (for Sites and Permissions). It then checks for the existence of these tables by listing them in Spark’s catalog and filtering by name and namespace. Section 3 – Getting Snapshot Dates from Last Successful Extracts import org.apache.spark.sql.functions.{col, _} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel val dtCurrentDateFormatt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S") val dtRequiredtDateFormatt = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") var siteDataExists: Boolean = false var permissionsDataExists: Boolean = false val siteSnapshotDate = { if (siteTableCount == 1) { val dfSites = spark.sql(s"SELECT MAX(SnapshotDate) AS SnapshotDate FROM ${lakehouseName}.${sitesFinalTableName} ") val rowSites: Row = dfSites.select("SnapshotDate").head(1)(0) if (rowSites.get(0) == null) endTime else { siteDataExists = true println(s"Sites data Exists: ${siteDataExists}") LocalDateTime.parse(rowSites.get(0).toString(), dtCurrentDateFormatt) .format(dtRequiredtDateFormatt) } } else { endTime } } val permissionsSnapshotDate = { if (permissionsTableCount == 1) { val dfPermissions = spark.sql(s"SELECT MAX(SnapshotDate) AS SnapshotDate FROM ${lakehouseName}.${permissionsFinalTableName} ") val rowPermissions: Row = dfPermissions.select("SnapshotDate").head(1)(0) if (rowPermissions.get(0) == null) endTime else { permissionsDataExists = true println(s"Permissions data Exists: ${permissionsDataExists}") LocalDateTime.parse(rowPermissions.get(0).toString(), dtCurrentDateFormatt) .format(dtRequiredtDateFormatt) } } else { endTime } } This section queries the final tables to retrieve the latest SnapshotDate for both Sites and Permissions. It then reformats the date into an ISO-compliant format. If no snapshot date is found, it defaults to the predefined endTime, and two boolean flags (siteDataExists and permissionsDataExists) are toggled accordingly. Section 4 – Generate View Script for Sites val sitesView: String = s""" CREATE OR ALTER VIEW vw${sitesFinalTableName} AS SELECT *, [StorageQuotaFriendly] = (case when StorageQuota < 1048576 then concat(ceiling(StorageQuota / 1024.0), ' KB') when StorageQuota < 1073741824 then concat(ceiling(StorageQuota / 1048576.0), ' MB') when StorageQuota < 1099511627776 then concat(ceiling(StorageQuota / 1073741824.0), ' GB') when StorageQuota < 1125899906842624 then concat(ceiling(StorageQuota / 1099511627776.0), ' TB') else concat(ceiling(StorageQuota / 1125899906842624.0), ' PB') end ), [StorageUsedFriendly] = (case when StorageUsed < 1048576 then concat(ceiling(StorageUsed / 1024.0), ' KB') when StorageUsed < 1073741824 then concat(ceiling(StorageUsed / 1048576.0), ' MB') when StorageUsed < 1099511627776 then concat(ceiling(StorageUsed / 1073741824.0), ' GB') when StorageUsed < 1125899906842624 then concat(ceiling(StorageUsed / 1099511627776.0), ' TB') else concat(ceiling(StorageUsed / 1125899906842624.0), ' PB') end ) FROM ${sitesFinalTableName} """.stripMargin.replaceAll("[\n\r]"," ") println(sitesView) Here a SQL view (vwSites) is dynamically generated for the Sites final table. The view adds two computed columns (StorageQuotaFriendly and StorageUsedFriendly) that convert byte values into more digestible units such as KB, MB, GB, etc. This script will be stored and executed later. Section 5 – Generate View Script for Permissions val permissionsView: String = s""" CREATE OR ALTER VIEW vw${permissionsFinalTableName} AS SELECT *, ShareeDomain = CASE WHEN CHARINDEX('@', SharedWith_Email) > 0 AND CHARINDEX('.', SharedWith_Email) > 0 THEN SUBSTRING(SharedWith_Email,CHARINDEX('@', SharedWith_Email)+1,LEN(SharedWith_Email)) ELSE '' END, ShareeEMail = CASE WHEN CHARINDEX('@', SharedWith_Email) > 0 THEN SharedWith_Email ELSE '' END, PermissionsUniqueKey = CONCAT(SiteId,'_',RoleDefinition,'_',ScopeId,'_',COALESCE(LinkId,'00000000-0000-0000-0000-000000000000')), EEEUPermissionsCount = SUM(CASE WHEN SharedWith_Name LIKE 'Everyone except external users' THEN 1 ELSE NULL END ) OVER( PARTITION BY CONCAT(SiteId,'_',RoleDefinition,'_',ScopeId,'_',COALESCE(LinkId,'00000000-0000-0000-0000-000000000000'),SharedWith_Name) ), ExternalUserCount = SUM(CASE WHEN SharedWith_TypeV2 LIKE 'External' THEN 1 ELSE NULL END ) OVER( PARTITION BY CONCAT(SiteId,'_',RoleDefinition,'_',ScopeId,'_',COALESCE(LinkId,'00000000-0000-0000-0000-000000000000'),SharedWith_Name) ), B2BUserCount = SUM(CASE WHEN SharedWith_TypeV2 LIKE 'B2BUser' THEN 1 ELSE NULL END ) OVER( PARTITION BY CONCAT(SiteId,'_',RoleDefinition,'_',ScopeId,'_',COALESCE(LinkId,'00000000-0000-0000-0000-000000000000'),SharedWith_Name) ) FROM ${permissionsFinalTableName} """.stripMargin.replaceAll("[\n\r]"," ") println(permissionsView) This section builds a SQL view (vwPermissions) for the Permissions final table. It derives additional columns like ShareeDomain, ShareeEMail, and a composite key (PermissionsUniqueKey) while applying window functions to compute counts (e.g., for external or B2B users). This script will also be stored and executed later. Section 6 – Truncate the Staging Tables from Previous Runs if (siteStagingTableCount == 1) { spark.sql(s"DELETE FROM ${lakehouseName}.${sitesStagingTableName} ") println(s"Staging table deleted: ${lakehouseName}.${sitesStagingTableName}") } else { println(s"Staging table ${lakehouseName}.${sitesFinalTableName} not found") } if (permissionsStagingTableCount == 1) { spark.sql(s"DELETE FROM ${lakehouseName}.${permissionsStagingTableName} ") println(s"Staging table deleted: ${lakehouseName}.${permissionsStagingTableName}") } else { println(s"Staging table ${lakehouseName}.${permissionsStagingTableName} not found") } This section checks if the staging tables exist (by count) and, if found, issues a SQL DELETE command to remove existing data so that new data can be loaded. It prints messages indicating the action taken. Section 7 – Return Snapshot Dates Back to Pipeline import mssparkutils.notebook val returnData = s"""{\"LakehouseId\": \"${lakehouseId}\", \"SitesStagingTableName\": \"${sitesStagingTableName}\", \"SitesFinalTableName\": \"${sitesFinalTableName}\", \"SitesSnapshotDate\": \"${siteSnapshotDate}\", \"SitesDataExists\": ${siteDataExists}, \"SitesView\": \"${sitesView}\", \"PermissionsStagingTableName\": \"${permissionsStagingTableName}\", \"PermissionsFinalTableName\": \"${permissionsFinalTableName}\", \"PermissionsSnapshotDate\": \"${permissionsSnapshotDate}\", \"EndSnapshotDate\": \"${endTime}\", \"PermissionsDataExists\": ${permissionsDataExists}, \"PermissionsView\": \"${permissionsView}\"}""" println(returnData) mssparkutils.notebook.exit(returnData) This concluding section aggregates the key metadata—including Lakehouse information, table names, snapshot dates, existence flags, and the generated view scripts—into a JSON string. It then exits the notebook by returning that JSON to the pipeline. Notebook 2: Merge Sites and Permissions to Final Table This notebook runs after the Sites and Permissions data from MGDC has been collected successfully into the staging tables. If this is the first collection, it handles them as full datasets, storing the data directly in the final tables. If this is using MGDC for SharePoint delta datasets, it merges the new, updated or deleted objects from the staging tables into the final tables. Note: The word "Delta" here might refer to Delta Parquet (an efficient data storage format used by tables in a Microsoft Fabric Lakehouse) or to the MGDC for SharePoint Delta datasets (how MGDC can return only the objects that are new, updated or deleted between two dates). It can be a bit confusing, so be aware of the two interpretations of the word. Section 0 – Set the Default Lakehouse for Notebook to Run %%configure { "defaultLakehouse": { "name": { "parameterName": "lakehouseName", "defaultValue": "defaultlakehousename" } } } This section uses the same Lakehouse configuration as in Notebook 1. It sets the default Lakehouse through a parameter (lakehouseName) to support dynamic running of the notebook in different environments. Section 1 – Initialize Parameters import java.time.LocalDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.UUID import java.text.SimpleDateFormat import java.time.{LocalDate, LocalDateTime, Period} import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.Calendar val runId = "00000000-0000-0000-0000-000000000000" val workspaceId = spark.conf.get("trident.workspace.id") val workspaceName = "LakeHouseTesting" val lakehouseId = spark.conf.get("trident.lakehouse.id") val lakehouseName = spark.conf.get("trident.lakehouse.name") val sitesStagingTableName = "Sites_Staging" val sitesFinalTableName = "Sites" val permissionsStagingTableName = "Permissions_Staging" val permissionsFinalTableName = "Permissions" spark.conf.set("spark.sql.caseSensitive", true) This section is like Notebook 1’s Section 1 but here lakehouseName is retrieved from the configuration. It initializes variables needed for merging, such as run IDs, workspace/Lakehouse identifiers, and table names. Section 2 – Read Sites Dataset from Staging Table val lakehouse = mssparkutils.lakehouse.get(lakehouseName) val lakehouseId = lakehouse.id val workspaceName = notebookutils.runtime.context("currentWorkspaceName") println("Started reading Sites dataset") val sitesStagingLocation = s"abfss://${workspaceName}@onelake.dfs.fabric.microsoft.com/${lakehouseName}.Lakehouse/Tables/${sitesStagingTableName}" val dfSitesStaging = spark.read.format("delta").load(sitesStagingLocation) println("Completed reading Sites dataset") This section constructs the ABFS path for the Sites staging table and reads the dataset into a DataFrame using the Delta Parquet format. It includes print statements to track progress. Section 3 – Read Permissions Dataset from Staging Table println("Started reading Permissions dataset") val permissionsStagingLocation = s"abfss://${workspaceName}@onelake.dfs.fabric.microsoft.com/${lakehouseName}.Lakehouse/Tables/${permissionsStagingTableName}" val dfPermissionsStaging = spark.read.format("delta").load(permissionsStagingLocation) println("Completed reading Permissions dataset") This section performs the analogous operation for the Permissions staging table, loading the dataset into a DataFrame and providing console output for monitoring. Section 4 – Check Final Tables Exist or Not import io.delta.tables.DeltaTable val sitesFinalLocation = s"abfss://${workspaceName}@onelake.dfs.fabric.microsoft.com/${lakehouseName}.Lakehouse/Tables/${sitesFinalTableName}" val permissionsFinalLocation = s"abfss://${workspaceName}@onelake.dfs.fabric.microsoft.com/${lakehouseName}.Lakehouse/Tables/${permissionsFinalTableName}" val sitesFinalTableExists = DeltaTable.isDeltaTable(spark, sitesFinalLocation) if (!sitesFinalTableExists) { println("Final Sites table not exists. Creating final Sites table with schema only") dfSitesStaging.filter("1=2").write.format("delta").mode("overwrite").save(sitesFinalLocation) println("Final Sites table created") } else { println("Final Sites table exists already") } val permissionsFinalTableExists = DeltaTable.isDeltaTable(spark, permissionsFinalLocation) if (!permissionsFinalTableExists) { println("Final Permissions table not exists. Creating final Permissions table with schema only") dfPermissionsStaging.filter("1=2").write.format("delta").mode("overwrite").save(permissionsFinalLocation) println("Final Permissions table created") } else { println("Final Permissions table exists already") } This section checks whether the final tables for Sites and Permissions exist. If a table does not exist, it creates an empty table (schema only) from the staging DataFrame by filtering out data (filter("1=2")). Section 5 – Merge Sites Data from Staging Table to Final Table import io.delta.tables._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.{Window, WindowSpec} import org.apache.spark.sql.functions.{coalesce, lit, sum, col, _} import org.apache.spark.sql.types.{StructField, _} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel val deltaTableSource = DeltaTable.forPath(spark, sitesStagingLocation) val deltaTableTarget = DeltaTable.forPath(spark, sitesFinalLocation) import spark.implicits._ val dfSource = deltaTableSource.toDF //Delete records that have Operation as Deleted println("Merging Sites dataset from current staging table") deltaTableTarget .as("target") .merge( dfSource.as("source"), "source.Id = target.Id") .whenMatched("source. Operation = 'Deleted'") .delete() .whenMatched("source.Operation != 'Deleted'") .updateAll() .whenNotMatched("source.Operation != 'Deleted'") .insertAll() .execute() println("Merging of Sites dataset completed") This section performs a Delta Lake merge (upsert) operation on the Sites data. The merge logic deletes matching records when the source’s Operation is 'Deleted', updates other matching records, and inserts new records that are not marked as 'Deleted'. Section 6 – Merge Permissions Data from Staging Table to Final Table import io.delta.tables._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.{Window, WindowSpec} import org.apache.spark.sql.functions.{coalesce, lit, sum, col, _} import org.apache.spark.sql.types.{StructField, _} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel val deltaTablePermissionsSource = DeltaTable.forPath(spark, permissionsStagingLocation) val deltaTablePermissionsTarget = DeltaTable.forPath(spark, permissionsFinalLocation) import spark.implicits._ val dfPermissionsSource = deltaTablePermissionsSource.toDF //Delete records that have Operation as Deleted println("Merging Permissions dataset from current staging table") deltaTablePermissionsTarget .as("target") .merge( dfPermissionsSource.as("source"), """source.SiteId = target.SiteId and source.ScopeId = target.ScopeId and source.LinkId = target.LinkId and source.RoleDefinition = target.RoleDefinition and coalesce(source.SharedWith_Name,"") = coalesce(target.SharedWith_Name,"") and coalesce(source.SharedWith_TypeV2,"") = coalesce(target.SharedWith_TypeV2,"") and coalesce(source.SharedWith_Email,"") = coalesce(target.SharedWith_Email,"") and coalesce(source.SharedWith_AADObjectId,"") = coalesce(target.SharedWith_AADObjectId,"") """) .whenMatched("source. Operation = 'Deleted'") .delete() .whenMatched("source.Operation != 'Deleted'") .updateAll() .whenNotMatched("source.Operation != 'Deleted'") .insertAll() .execute() println("Merging of Permissions dataset completed") This section performs a merge operation on the Permissions data. The merge condition is more complex—comparing multiple columns (including handling nulls with coalesce) to identify matching records. The operation applies deletion for rows marked as 'Deleted', updates others, and inserts records where no match exists. Section 7 – Read and Display Sample TOP 10 Rows var sqlQuery = s"SELECT * FROM ${lakehouseName}.${sitesFinalTableName} order by SnapshotDate DESC LIMIT 10" val dfSitesAll = spark.sql(sqlQuery) display(dfSitesAll) sqlQuery = s"SELECT * FROM ${lakehouseName}.${permissionsFinalTableName} order by SnapshotDate DESC LIMIT 10" val dfPermissionsAll = spark.sql(sqlQuery) display(dfPermissionsAll) This final section executes SQL queries to retrieve and display the top 10 rows from both the Sites and Permissions final tables. The rows are ordered by SnapshotDate in descending order. This is typically used for sample or debugging purposes. Conclusion I hope this article helped you understand the notebooks included in the template. This might help you customize it later. These templates are intended as starting points for your work with many scenarios. Read more about MGDC for SharePoint at https://aka.ms/SharePointData.