azure data factory
398 TopicsWeb activity failure due to Invoking endpoint failed with HttpStatusCode - 403 -- help?
Hi, I have an Azure Data Factory (ADF) instance that I am using to create a Pipeline to ingest external (cloud based) 3rd party data into my Azure SQL Server database. I am a novice with ADF and have only used it to ingest some external SQL data into my SQL database - it did work. The external source I'm attempting to extract from uses an OAuth 2.0 API and an API is something I've not used before. Using Postman (never used this software before this attempt), I have passed the external source's base_url, client_id, and client_secret, and in return successfully received an access token. This tells me that the base_url, client_id, and client_secret values I passed are correct and accepted by the target source/application. Feeling encourage to implement the same values into ADF, I first created a Linked Service which with a successful test connection returned - see below. This Linked Service uses the same values as the Postman entry which granted an access token. I then created a Pipeline with a Web activity object within it. The General and User Properties don't have any configuration, only the Settings tab does which shown below. Again, the URL, Client ID and Client Secret configured here are the same as those used in Postman (and the Linked Service). I execute the Web object and it returns with a failure - see below. The error states the endpoint refused the request (for an access token). Is this accurate as I was able to receive an access token via Postman when using the same credentials? I don't understand why via Postman I can received an access token but via ADF it errors. I'm wondering if I've completed the ADF parts incorrectly, or if there is more needed just to received an access token, or if it's something else? Are you able to advise what's taking place here? Thanks.165Views0likes0CommentsADF unable to ingest partitioned Delta data from Azure Synapse Link (Dataverse/FnO)
We are ingesting Dynamics 365 Finance & Operations (FnO) data into ADLS Gen2 using Azure Synapse Link for Dataverse, and then attempting to load that data into Azure SQL Database using Azure Data Factory (ADF). This is part of a migration effort as Export to Data Lake is being deprecated. Source Details Source: ADLS Gen2 Data generated by: Azure Synapse Link for Dataverse (FnO) Format on lake: Delta / Parquet Partitioned folder structure (e.g. PartitionId=xxxx) Destination: Azure SQL Database Issue Observed in ADF When configuring ADF pipelines: Using ADLS Gen2 dataset with: Delta / Parquet Recursive folder traversal Wildcard paths We encounter: No data returned in Data Preview Or runtime error such as: “No partitions information found in metadata file” Despite this: The data is present in ADLS The same data can be successfully queried using Synapse serverless SQL Key Question for ADF / Synapse Engineers What is the recommended and supported ADF ingestion pattern for: Partitioned Delta/Parquet data produced by Azure Synapse Link for Dataverse Specifically: Should ADF: Read Delta tables directly, or Use Synapse serverless SQL external tables/views as an intermediate layer? Is there a reference architecture for: Synapse Link → ADLS → ADF → Azure SQL Are there ADF limitations when consuming Synapse Link–generated Delta tables? Many customers are now forced to migrate due to Export to Data Lake deprecation, but current ADF documentation does not clearly explain how to replace existing ingestion pipelines when using Synapse Link for FnO. Any guidance, patterns, or official documentation would be greatly appreciated.98Views0likes0Comments[Design Pattern] Handling race conditions and state in serverless data pipelines
Hello community, I recently faced a tricky data engineering challenge involving a lot of Parquet files (about 2 million records) that needed to be ingested, transformed, and split into different entities. The hard part wasn't the volume, but the logic. We needed to generate globally unique, sequential IDs for specific columns while keeping the execution time under two hours. We were restricted to using only Azure Functions, ADF, and Storage. This created a conflict: we needed parallel processing to meet the time limit, but parallel processing usually breaks sequential ID generation due to race conditions on the counters. I documented the three architecture patterns we tested to solve this: Sequential processing with ADF (Safe, but failed the 2-hour time limit). 2. Parallel processing with external locking/e-tags on Table Storage (Too complex and we still hit issues with inserts). 3. A "Fan-Out/Fan-In" pattern using Azure Durable Functions and Durable Entities. We ended up going with Durable Entities. Since they act as stateful actors, they allowed us to handle the ID counter state sequentially in memory while the heavy lifting (transformation) ran in parallel. It solved the race condition issue without killing performance. I wrote a detailed breakdown of the logic and trade-offs here if anyone is interested in the implementation details: https://medium.com/@yahiachames/data-ingestion-pipeline-a-data-engineers-dilemma-and-azure-solutions-7c4b36f11351 I am curious if others have used Durable Entities for this kind of ETL work, or if you usually rely on an external database sequence to handle ID generation in serverless setups? Thanks, Chameseddine239Views0likes1CommentCopy Data Activity Failed with Unreasonable Cause
It is a simple set up but it has baffled me a lot. I'd like to copy data to a data lake via API. Here are the steps I've taken: Created a HTTP linked service as below: Created a dataset with a HTTP Binary data format as below: Created a pipeline with a Copy Data activity only as shown below: Made sure linked service and dataset all working fine as below: Created a Sink dataset with 3 parameters as shown below: Passed parameters from pipeline to Sink dataset as below: That's all. Simple, right? But the pipeline failed with a clear message "usually this is caused by invalid credentials." as below: Summary: No need to worry about the Sink side of parameters etc. which I have used same thing for years on other pipelines and all succeeded. This time the API failed to reach a data lake from source side as said "invalid credentials". In Step 4 above one could see the linked service and dataset connections were succeeded, ie. credentials have been checked and passed already. How come it failed in data copy activity complaining an invalid credentials? Pretty weird. Any advice and suggestions will be welcomed.109Views0likes0CommentsUser Properties of Activities in ADF: How to add dynamic content in it?
On ADF, I am using a for each loop in which I am using an Execute Pipeline Activity which is getting executed for different iterations as per the values of the items provided to the For-Each Loop. I am stuck on a scenario which requires me to add the Dynamic Content Expression in the User Properties of individual activities of ADF. Specific to my case, I want to add the Dynamic Content Expression in the User Properties of Execute Pipeline Activity so that I get to individual runs of these activities on Azure Monitor with a specific label attached to it through its User Properties. The necessity to add the Dynamic Content Expression in the User Properties is due to the reason that each execution in respective iterations of these activities corresponds to a particular Step from a set of Steps configured for the Data Load Job as a whole, which has been orchestrated through ADF. To identify the association with the respective Job-Step, I require to add Dynamic Content Expression in its User Properties. Any sort of response regarding this is highly appreciated. Thank You!221Views1like0CommentsBYOPI - Design your own custom private AI Search indexer with no code ADF (SQLServer on private VM)
Executive Summary Building a fully private search indexing solution using Azure Data Factory (ADF) to sync SQL Server data from private VM to Azure AI Search is achievable but comes with notable complexities and limitations. This blog shares my journey, discoveries, and honest assessment of the BYOPI (Build Your Own Private Indexer) architecture. Architectural flow: Table of Contents Overall Setup How ADF works in this approach with Azure AI Search Challenges - discovered Pros and Cons: An Honest Assessment Conclusion and Recommendations 1. Overall Setup: Phase 1: Resource Group & Network Setup : create resource group and vNET (virtual network) in any region of your choice Phase 2: Deploy SQL Server VM: Phase 3: Create Azure Services - ADF (Azure Data Factory), Azure AI Search and AKV (Azure Key Vault) service from portal or from your choice of deployment. Phase 4: Create Private Endpoints for all the services in their dedicated subnets: Phase 5: Configure SQL Server on VM : connect to VM via bastion and setup database, tables & SP: Sample metadata used as below: CREATE DATABASE BYOPI_DB; GO USE BYOPI_DB; GO CREATE TABLE Products ( ProductId INT IDENTITY(1,1) PRIMARY KEY, ProductName NVARCHAR(200) NOT NULL, Description NVARCHAR(MAX), Category NVARCHAR(100), Price DECIMAL(10,2), InStock BIT DEFAULT 1, Tags NVARCHAR(500), IsDeleted BIT DEFAULT 0, CreatedDate DATETIME DEFAULT GETDATE(), ModifiedDate DATETIME DEFAULT GETDATE() ); CREATE TABLE WatermarkTable ( TableName NVARCHAR(100) PRIMARY KEY, WatermarkValue DATETIME ); INSERT INTO WatermarkTable VALUES ('Products', '2024-01-01'); CREATE PROCEDURE sp_update_watermark @TableName NVARCHAR(100), @NewWatermark DATETIME AS BEGIN UPDATE WatermarkTable SET WatermarkValue = @NewWatermark WHERE TableName = @TableName; END; INSERT INTO Products (ProductName, Description, Category, Price, Tags) VALUES ('Laptop Pro', 'High-end laptop', 'Electronics', 1299.99, 'laptop,computer'), ('Office Desk', 'Adjustable desk', 'Furniture', 599.99, 'desk,office'), ('Wireless Mouse', 'Bluetooth mouse', 'Electronics', 29.99, 'mouse,wireless'); Phase 6: Install Self-Hosted Integration Runtime Create SHIR in ADF: Go to ADF resource in Azure Portal Click "Open Azure Data Factory Studio" Note: You need to access from a VM in the same VNet or via VPN since ADF is private In ADF Studio, click Manage (toolbox icon) Select Integration runtimes → "+ New" Select "Azure, Self-Hosted" → "Self-Hosted" Name: SHIR-BYOPI or of your choice Click "Create" Copy Key1 (save it) Install SHIR on VM In the VM (via Bastion): Open browser, go to: https://www.microsoft.com/download/details.aspx?id=39717 Download and install Integration Runtime During setup: Launch Configuration Manager Paste the Key1 from Step 14 Click "Register" Wait for "Connected" status Phase 7: Create Search Index through below powershell script and saving it as search_index.ps1 $searchService = "search-byopi" $apiKey = "YOUR-ADMIN-KEY" $headers = @{ 'api-key' = $apiKey 'Content-Type' = 'application/json' } $index = @{ name = "products-index" fields = @( @{name="id"; type="Edm.String"; key=$true} @{name="productName"; type="Edm.String"; searchable=$true} @{name="description"; type="Edm.String"; searchable=$true} @{name="category"; type="Edm.String"; filterable=$true; facetable=$true} @{name="price"; type="Edm.Double"; filterable=$true} @{name="inStock"; type="Edm.Boolean"; filterable=$true} @{name="tags"; type="Collection(Edm.String)"; searchable=$true} ) } | ConvertTo-Json -Depth 10 Invoke-RestMethod ` -Uri "https://$searchService.search.windows.net/indexes/products-index?api-version=2020-06-30" ` -Method PUT ` -Headers $headers ` -Body $index Phase 8: Configure AKV & ADF Components - Link AKV and ADF for secrets Create Key Vault Secrets Navigate to kv-byopi (created AKV resource) in Portal Go to "Access policies" Click "+ Create" Select permissions: Get, List for secrets Select principal: adf-byopi-private Create Go to "Secrets" → "+ Generate/Import": Name: sql-password, Value: <> Name: search-api-key, Value: Your search key Create Linked Services in ADF Access ADF Studio from the VM (since it's private): Key Vault Linked Service: Manage → Linked services → "+ New" Search "Azure Key Vault" Configure: Name: LS_KeyVault Azure Key Vault: kv-byopi Integration runtime: AutoResolveIntegrationRuntime Test connection → Create SQL Server Linked Service: "+ New" → "SQL Server" Configure: Name: LS_SqlServer Connect via: SHIR-BYOPI Server name: localhost Database: BYOPI_DB Authentication: SQL Authentication User: sqladmin Password: Select from Key Vault → LS_KeyVault → sql-password Test → Create Azure Search Linked Service: "+ New" → "Azure Search" Configure: Name: LS_AzureSearch URL: https://search-byopi.search.windows.net Connect via: SHIR-BYOPI - Important - use SHIR API Key: From Key Vault → LS_KeyVault → search-api-key Test → Create Phase 9: Create ADF Datasets and PipelineCreate Datasets SQL Products Dataset: Author → Datasets → "+" → "New dataset" Select "SQL Server" → Continue Select "Table" → Continue Properties: Name: DS_SQL_Products Linked service: LS_SqlServer Table: Select Products click OK Watermark Dataset: Repeat with: Name: DS_SQL_Watermark Table: WatermarkTable Search Dataset: "+" → "Azure Search" Properties: Name: DS_Search_Index Linked service: LS_AzureSearch Index name: products-index Create Pipeline Author → Pipelines → "+" → "Pipeline" Name: PL_BYOPI_Private From Activities → General, drag "Lookup" activity Configure Lookup 1: Name: LookupOldWatermark Settings: Source dataset: DS_SQL_Watermark Query: below sql SELECT WatermarkValue FROM WatermarkTable WHERE TableName='Products' - **First row only**: ✓ Add another Lookup: Name: LookupNewWatermark Query: below sql SELECT MAX(ModifiedDate) as NewWatermark FROM Products Add Copy Data activity: Name: CopyToSearchIndex Source: Dataset: DS_SQL_Products Query: sql SELECT CAST(ProductId AS NVARCHAR(50)) as id, ProductName as productName, Description as description, Category as category, Price as price, InStock as inStock, Tags as tags, CASE WHEN IsDeleted = 1 THEN 'delete' ELSE 'upload' END as [@search.action] FROM Products WHERE ModifiedDate > '@{activity('LookupOldWatermark').output.firstRow.WatermarkValue}' AND ModifiedDate <= '@{activity('LookupNewWatermark').output.firstRow.NewWatermark}' Sink: Dataset: DS_Search_Index Write behavior: Merge Batch size: 1000 Add Stored Procedure activity: Name: UpdateWatermark SQL Account: LS_SqlServer Stored procedure: sp_update_watermark Parameters: TableName: Products NewWatermark: @{activity('LookupNewWatermark').output.firstRow.NewWatermark} Connect activities with success conditions Phase 10: Test and Schedule Test Pipeline Click "Debug" in pipeline Monitor in Output panel Check for green checkmarks Create Trigger In pipeline, click "Add trigger" → "New/Edit" Click "+ New" Configure: Name: TR_Hourly Type: Schedule Recurrence: Every 1 Hour OK → Publish All Monitor Go to Monitor tab View Pipeline runs Check Trigger runs Your pipeline should look like this: Phase 11: Validation & Testing Verify Private Connectivity From the VM, run PowerShell: # Test DNS resolution (should return private IPs) nslookup adf-byopi-private.datafactory.azure.net # Should show private IP like : 10.0.2.x nslookup search-byopi.search.windows.net # Should show private IP like : 10.0.2.x nslookup kv-byopi.vault.azure.net # Should show private IP like : 10.0.2.x # Test Search $headers = @{ 'api-key' = 'YOUR-KEY' } Invoke-RestMethod -Uri "https://search-byopi.search.windows.net/indexes/products-index/docs?`$count=true&api-version=2020-06-30" -Headers $headers Test Data Sync (adding few records) and verify in search index: -- Add test record INSERT INTO Products (ProductName, Description, Category, Price, Tags) VALUES ('Test Product Private', 'Testing private pipeline', 'Test', 199.99, 'test,private'); -- Trigger pipeline manually or wait for schedule -- Then verify in Search index 2. How ADF works in this approach with Azure AI search: Azure AI Search uses a REST API for indexing or called as uploading. When ADF sink uploads data to AI Search, it's actually making HTTP POST requests: for example - POST https://search-byopi.search.windows.net/indexes/products-index/docs/index?api-version=2020-06-30 Content-Type: application/json api-key: YOUR-ADMIN-KEY { "value": [ { "@search.action": "upload", "id": "1", "productName": "Laptop", "price": 999.99 }, { "@search.action": "delete", "id": "2" } ] } Delete action used here is soft delete and not hard delete. pipeline query: SELECT CAST(ProductId AS NVARCHAR(50)) as id, -- Renamed to match index field ProductName as productName, -- Renamed to match index field Description as description, Category as category, Price as price, InStock as inStock, Tags as tags, CASE WHEN IsDeleted = 1 THEN 'delete' ELSE 'upload' END as [@search.action] -- Special field with @ prefix FROM Products WHERE ModifiedDate > '2024-01-01' ``` Returns this resultset: ``` id | productName | description | category | price | inStock | tags | @search.action ----|----------------|------------------|-------------|--------|---------|----------------|--------------- 1 | Laptop Pro | High-end laptop | Electronics | 1299 | 1 | laptop,computer| upload 2 | Office Chair | Ergonomic chair | Furniture | 399 | 1 | chair,office | upload 3 | Deleted Item | Old product | Archive | 0 | 0 | old | delete The @search.action Field - The Magic Control This special field tells Azure AI Search what to do with each document: @search.action What It Does When to Use What Happens If Document... upload Insert OR Update Most common - upsert operation Exists: Updates it<br>Doesn't exist: Creates it merge Update only When you know it exists Exists: Updates specified fields<br>Doesn't exist: ERROR mergeOrUpload Update OR Insert Safe update Exists: Updates fields<br>Doesn't exist: Creates it delete Remove from index To remove documents Exists: Deletes it<br>Doesn't exist: Ignores (no error) ADF automatically converts SQL results to JSON format required by Azure Search: { "value": [ { "@search.action": "upload", "id": "1", "productName": "Laptop Pro", "description": "High-end laptop", "category": "Electronics", "price": 1299.00, "inStock": true, "tags": "laptop,computer" }, { "@search.action": "upload", "id": "2", "productName": "Office Chair", "description": "Ergonomic chair", "category": "Furniture", "price": 399.00, "inStock": true, "tags": "chair,office" }, { "@search.action": "delete", "id": "3" // For delete, only ID is needed } ] } ADF doesn't send all records at once. It batches them based on writeBatchSize and each batch is a separate HTTP POST to Azure Search How ADF will detect new changes and run batches: Watermark will be updated after each successful ADF run to detect new changes as below: Handling different scenarios: Scenario 1: No Changes Between Runs: Run at 10:00 AM: - Old Watermark: 09:45:00 - New Watermark: 10:00:00 - Query: WHERE ModifiedDate > '09:45' AND <= '10:00' - Result: 0 rows - Action: Still update watermark to 10:00 - Why: Prevents reprocessing if changes come later Scenario 2: Bulk Insert Happens: Someone inserts 5000 records at 10:05 AM Run at 10:15 AM: - Old Watermark: 10:00:00 - New Watermark: 10:15:00 - Query: WHERE ModifiedDate > '10:00' AND <= '10:15' - Result: 5000 rows - Action: Process all 5000, update watermark to 10:15 Scenario 3: Pipeline Fails Run at 10:30 AM: - Old Watermark: 10:15:00 (unchanged from last success) - Pipeline fails during Copy activity - Watermark NOT updated (still 10:15:00) Next Run at 10:45 AM: - Old Watermark: 10:15:00 (still the last successful) - New Watermark: 10:45:00 - Query: WHERE ModifiedDate > '10:15' AND <= '10:45' - Result: Gets ALL changes from 10:15 to 10:45 (30 minutes of data) - No data loss! Note: There is still room for improvement by refining this logic to handle more advanced scenarios. However, I have not examined the logic in depth, as the goal here is to review how the overall setup functions, identify its limitations, and compare it with the indexing solutions available in AI Search. 3. Challenges - disovered: When I tried to set out to build a private search indexer for SQL Server data residing on an Azure VM with no public IP, the solution seemed straightforward: use Azure Data Factory to orchestrate the data movement to Azure AI Search. The materials made it sound simple. The reality? It's possible, but the devil is in the details. What We Needed: ✅ SQL Server on private VM (no public IP) ✅ Azure AI Search with private endpoint ✅ No data over public internet ✅ Support for full CRUD operations ✅ Near real-time synchronization ✅ No-code/low-code solution Reality Check: ⚠️ DELETE operations not natively supported in ADF sink ⚠️ Complex networking requirements ⚠️ Higher costs than expected ⚠️ Significant setup complexity ✅ But it IS possible with workarounds Components Required Azure VM: ~$150/month (D4s_v3) Self-Hosted Integration Runtime: Free (runs on VM) Private Endpoints: ~$30/month (approx 3 endpoints) Azure Data Factory: ~$15-60/month (depends on frequency) Azure AI Search: ~$75/month (Basic tier) Total: ~$270-315/month** The DELETE Challenge: Despite Azure AI Search REST API fully supporting delete operations via @search.action, ADF's native Azure Search sink does NOT support delete operations. -- This SQL query with delete action SELECT ProductId as id, CASE WHEN IsDeleted = 1 THEN 'delete' ELSE 'upload' END as [@search.action] FROM Products -- Will NOT delete documents in Azure Search when using Copy activity -- The @search.action = 'delete' is ignored by ADF sink! Nevertheless, there is a workaround using the Web Activity approach or by calling the REST API from the ADF side to perform the delete operation. { "name": "DeleteViaREST", "type": "Web", "typeProperties": { "url": "https://search.windows.net/indexes/index/docs/index", "method": "POST", "body": { "value": [ {"@search.action": "delete", "id": "123"} ] } } } Development Challenges No Direct Portal Access: With ADF private, you need: Jump box in the same VNet VPN connection Bastion for access Testing Complexity: Can't use Postman from local machine Need to test from within VNet Debugging requires multiple tools 4. Pros and Cons: An Honest Assessment: Pros: Security: Complete network isolation Compliance: Meets strict requirements No-Code: Mostly configuration-based Scalability: Can handle large datasets Monitoring: Built-in ADF monitoring Managed Service: Microsoft handles updates Cons: DELETE Complexity: Not natively supported Cost: Higher than expected Setup Complexity: Many moving parts Debugging: Difficult with private endpoints Hidden Gotchas: - SHIR requires Windows VM (Linux in preview) - Private endpoint DNS propagation delays - ADF Studio timeout with private endpoints - SHIR auto-update can break pipelines 5. Conclusion and Recommendations: When to Use BYOPI: ✅ Good Fit: Strict security requirements Needs indexing from an un-supported scenarios for example SQL server residing on private VM Budget > $500/month Team familiar with Azure networking Read-heavy workloads ❌ Poor Fit: Simple search requirements Budget conscious Need real-time updates Heavy DELETE operations Small team without Azure expertise BYOPI works, but it's more complex and expensive than initially expected. The lack of native DELETE support in ADF sink is a significant limitation that requires workarounds. Key Takeaways It works but requires significant effort DELETE (hard) operations need workarounds Costs will be higher than expected Complexity is substantial for a "no-code" solution Alternative solutions might be better for many scenarios Disclaimer: The sample scripts provided in this article are provided AS IS without warranty of any kind. The author is not responsible for any issues, damages, or problems that may arise from using these scripts. Users should thoroughly test any implementation in their environment before deploying to production. Azure services and APIs may change over time, which could affect the functionality of the provided scripts. Always refer to the latest Azure documentation for the most up-to-date information. Thanks for reading this blog! I hope you've found this approach of creating own private indexing solution for Azure AI Search (BYOPI) useful 😀252Views1like0CommentsADF connection issue with Cassandra
Hi, I am trying to connect a cassandra DB hosted in azure cosmos db. I created the linked service but getting below error on test connection. Already checked the cassandra DB and its public network access is set to all networks. Google suggested enabling SSL but there is no such option in linked service. Please help. Failed to connect to the connector. Error code: 'Unknown', message: 'Failed to connect to Cassandra server due to, ErrorCode: InternalError' Failed to connect to the connector. Error code: 'InternalError', message: 'Failed to connect to Cassandra server due to, ErrorCode: InternalError' Failed to connect to Cassandra server due to, ErrorCode: InternalError All hosts tried for query failed (tried 51.107.58.67:10350: SocketException 'A request to send or receive data was disallowed because the socket is not connected and (when sending on a datagram socket using a sendto call) no address was supplied')203Views1like1CommentGetting an Oauth2 API access token using client_id and client_secret - help
Hi, I'm attempting to integrate external data into our SQL Server. The third-party data is from a solution called iLevel. They use token based Oauth2 APIs for access. The integration tool is ADF Pipelines. I'm not a data engineer but it has fallen upon me to complete this exercise. What I've attempted so far is failing and I don't know why. I would like your help on this. I'll explain what I've configured so far in the order I configured it. 1) To generate a client_id and client_secret, I logged on to the iLevel solution itself and generated the same for my account (call me 'Joe' account) and the Team account (call it 'Data team' account). I've recorded the client_id and client_secret for both users/accounts in notepad for reference. 2) I logged in Azure Data Factory using my 'Joe Admin' admin account (this is the account I need to log in with for any ADF development). 3) I created a Linked Service with the following configuration. Note how the Test connection was successful. I guess this means our ADF instance can connect to iLevel's Base URL. 4) I then created a dataset for iLevel. I configured this based on an online example I was following which I can't get working, so this configuration may be incorrect. 5) I then created a Pipeline which contains a 'Web' activity and a 'Set variable' activity. The Pipeline has a variable as shown below. The 'Web' activity has the following configuration: URL = is iLevel's token URL (it is different from the Base URL used in the Linked Service). Body = I've blocked out the client_id and client_secret (I'm using the client_id and client_secret generated for the 'Data team' account - remember I'm logged into ADF using the 'Joe Admin' account - not sure if this makes a difference) but have placed red brackets around where the start and end of each values is. I'm wrapping the values in any single or double quotes - not sure if I'm meant to. I'm not sure if I have configured the Body correctly. The ilevel documentation states to use an Authorization header, Content-Type header and Body - it states to the following is needed to obtain an access token, but it doesn't state exactly how to submit the information (i.e. how to format it). Notice how, in my configuration, I haven't used an Authorization header - this partly because an online example I've followed doesn't use one. If iLevel state to use one then I think I should but I don't know how to format it - any ideas? The 'Set variable' activity has the following activity. The idea is the access token is retrieved from the 'Web' activity and placed in the 'Set variable' "iLevel access token" variable. At this point I validate all and it comes back with no errors found. I then Debug it to see if it does indeed work but it returns an error stating the request contains an invalid client_id or client_secret. The client_id and client_secret values used are the exact same I generated from within the iLevel solution just a few hours ago. Is anyone able to point out to me why this isn't working? Have I populated all that I need to (as mentioned, iLevel say to use an Authorization header which I haven't but I don't know how to format it if I were to use one)? What can I do to get this working? I'm just trying to get the access token at the moment. I've not even attempted to extract the iLevel data and can't until I get a working token. iLevel's token have a 1 hour time-to-live so the Pipeline needs to generate a new token each time it's executed. You help will be most appreciated. Thanks.343Views1like0CommentsDynamics AX connector stops getting records after amount of time
Hello everyone, I am using the Dynamics AX connector to get data out of Finance. After a certain amount of time it suddenly doesnt get any new records anymore and it keeps running until it reaches the general timeout. It gets 290,000 records in like 01:30:00 and then keeps running and doesn't get new records anymore. Sometimes it gets stuck earlier or later. Sometimes it also gives me this error: Failure happened on 'Source' side. ErrorCode=ODataRequestTimeout,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Fail to get response from odata service in a expected time.,Source=Microsoft.DataTransfer.Runtime.ODataConnector,''Type=System.Threading.Tasks.TaskCanceledException,Message=A task was canceled.,Source=mscorlib,' This is my pipeline JSON: { "name": "HICT - Init Sync SalesOrders", "properties": { "activities": [ { "name": "Get FO SalesOrders", "type": "Copy", "dependsOn": [], "policy": { "timeout": "0.23:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "DynamicsAXSource", "query": "$filter=FM_InterCompanyOrder eq Microsoft.Dynamics.DataEntities.NoYes'No' and dataAreaId eq 'prev'&$select=SalesOrderNumber,SalesOrderName,IsDeliveryAddressPrivate,FormattedInvoiceAddress,FormattedDeliveryAddress,ArePricesIncludingSalesTax,RequestedReceiptDate,QuotationNumber,PriceCustomerGroupCode,PBS_PreferredInvoiceDate,PaymentTermsBaseDate,OrderTotalTaxAmount,OrderTotalChargesAmount,OrderTotalAmount,TotalDiscountAmount,IsInvoiceAddressPrivate,InvoiceBuildingCompliment,InvoiceAddressZipCode,LanguageId,IsDeliveryAddressOrderSpecific,IsOneTimeCustomer,InvoiceAddressStreetNumber,InvoiceAddressStreet,InvoiceAddressStateId,InvoiceAddressPostBox,InvoiceAddressLongitude,InvoiceAddressLatitude,InvoiceAddressDistrictName,InvoiceAddressCountyId,InvoiceAddressCountryRegionISOCode,InvoiceAddressCity,FM_Deadline,Email,DeliveryTermsCode,DeliveryModeCode,DeliveryBuildingCompliment,DeliveryAddressCountryRegionISOCode,DeliveryAddressZipCode,DeliveryAddressStreetNumber,SalesOrderStatus,DeliveryAddressStreet,DeliveryAddressStateId,SalesOrderPromisingMethod,DeliveryAddressPostBox,DeliveryAddressName,DeliveryAddressLongitude,DeliveryAddressLocationId,DeliveryAddressLatitude,DeliveryAddressDunsNumber,DeliveryAddressDistrictName,DeliveryAddressDescription,DeliveryAddressCountyId,DeliveryAddressCity,CustomersOrderReference,IsSalesProcessingStopped,CustomerRequisitionNumber,SalesOrderProcessingStatus,CurrencyCode,ConfirmedShippingDate,ConfirmedReceiptDate,SalesOrderOriginCode,URL,OrderingCustomerAccountNumber,InvoiceCustomerAccountNumber,ContactPersonId,FM_WorkerSalesTaker,FM_SalesResponsible,PaymentTermsName,DefaultShippingSiteId,DefaultShippingWarehouseId,DeliveryModeCode,dataAreaId,FM_InterCompanyOrder&cross-company=true", "httpRequestTimeout": "00:15:00", "additionalHeaders": { "Prefer": "odata.maxpagesize=1000" }, "retrieveEnumValuesAsString": true }, "sink": { "type": "JsonSink", "storeSettings": { "type": "AzureBlobStorageWriteSettings", "copyBehavior": "FlattenHierarchy" }, "formatSettings": { "type": "JsonWriteSettings" } }, "enableStaging": false, "enableSkipIncompatibleRow": true, "logSettings": { "enableCopyActivityLog": true, "copyActivityLogSettings": { "logLevel": "Warning", "enableReliableLogging": false }, "logLocationSettings": { "linkedServiceName": { "referenceName": "AzureBlobStorage", "type": "LinkedServiceReference" }, "path": "ceexports" } } }, "inputs": [ { "referenceName": "AX_SalesOrders_Dynamics_365_FO_ACC", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "Orders_FO_D365_Data_JSON", "type": "DatasetReference" } ] }, { "name": "Get_All_CE_Table_Data", "type": "ForEach", "dependsOn": [ { "activity": "Get FO SalesOrders", "dependencyConditions": [ "Completed" ] } ], "userProperties": [], "typeProperties": { "items": { "value": "@pipeline().parameters.CE_Tables", "type": "Expression" }, "activities": [ { "name": "Copy_CE_TableData", "type": "Copy", "dependsOn": [], "policy": { "timeout": "0.12:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "CommonDataServiceForAppsSource" }, "sink": { "type": "DelimitedTextSink", "storeSettings": { "type": "AzureBlobStorageWriteSettings", "copyBehavior": "FlattenHierarchy" }, "formatSettings": { "type": "DelimitedTextWriteSettings", "quoteAllText": true, "fileExtension": ".txt" } }, "enableStaging": false }, "inputs": [ { "referenceName": "CE_Look_Up_Tables", "type": "DatasetReference", "parameters": { "entiryName": "@item().sourceDataset" } } ], "outputs": [ { "referenceName": "CE_GenericBlobSink", "type": "DatasetReference", "parameters": { "sinkPath": { "value": "@item().sinkPath", "type": "Expression" } } } ] } ] } }, { "name": "Transform_Create_CE_JSON", "type": "ExecuteDataFlow", "dependsOn": [ { "activity": "Get_All_CE_Table_Data", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "0.12:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "dataflow": { "referenceName": "FO_Transform_CE_Select", "type": "DataFlowReference" }, "compute": { "coreCount": 16, "computeType": "General" }, "traceLevel": "Fine" } } ], "parameters": { "CE_Tables": { "type": "array", "defaultValue": [ { "name": "D365_CE_ACC_AccountRelations", "sourceDataset": "crmp_accountrelation", "sinkPath": "ce-exports/D365_CE_ACC_AccountRelations.json" }, { "name": "D365_CE_ACC_ContactRelations", "sourceDataset": "crmp_contactrelation", "sinkPath": "ce-exports/D365_CE_ACC_ContactRelations.json" }, { "name": "D365_CE_ACC_PriceCustomerGroup", "sourceDataset": "msdyn_pricecustomergroup", "sinkPath": "ce-exports/D365_CE_ACC_PriceCustomerGroup.json" }, { "name": "D365_CE_ACC_SalesOrderOrigin", "sourceDataset": "odin_salesorderorigin", "sinkPath": "ce-exports/D365_CE_ACC_SalesOrderOrigin.json" }, { "name": "D365_CE_ACC_ShipVia", "sourceDataset": "msdyn_shipvia", "sinkPath": "ce-exports/D365_CE_ACC_ShipVia.json" }, { "name": "D365_CE_ACC_SystemUser", "sourceDataset": "systemuser", "sinkPath": "ce-exports/D365_CE_ACC_SystemUser.json" }, { "name": "D365_CE_ACC_TermsOfDelivery", "sourceDataset": "msdyn_termsofdelivery", "sinkPath": "ce-exports/D365_CE_ACC_TermsOfDelivery.json" }, { "name": "D365_CE_ACC_Worker", "sourceDataset": "cdm_worker", "sinkPath": "ce-exports/D365_CE_ACC_Worker.json" }, { "name": "D365_CE_ACC_TransactionCurrency", "sourceDataset": "transactioncurrency", "sinkPath": "ce-exports/D365_CE_ACC_TransactionCurrency.json" }, { "name": "D365_CE_ACC_Warehouse", "sourceDataset": "msdyn_warehouse", "sinkPath": "ce-exports/D365_CE_ACC_Warehouse.json" }, { "name": "D365_CE_ACC_OperationalSite", "sourceDataset": "msdyn_operationalsite", "sinkPath": "ce-exports/D365_CE_ACC_OperationalSite.json" }, { "name": "D365_CE_ACC_PaymentTerms", "sourceDataset": "odin_paymentterms", "sinkPath": "ce-exports/D365_CE_ACC_PaymentTerms.json" } ] } }, "annotations": [], "lastPublishTime": "2025-07-30T12:55:32Z" }, "type": "Microsoft.DataFactory/factories/pipelines" }125Views0likes0CommentsAzure Data Factory ForEach Loop Fails Despite Inner Activity Error Handling - Seeking Best Practices
Hello Azure Data Factory Community, I'm encountering a persistent issue with my ADF pipeline where a ForEach loop is failing, even though I've implemented error handling for the inner activities. I'm looking for insights and best practices on how to prevent internal activity failures from propagating up and causing the entire ForEach loop (and subsequently the pipeline) to fail, while still logging all outcomes. My Setup: My pipeline processes records using a ForEach loop. Inside the loop, I have a Web activity (Sample_put_record) that calls an external API. This API call can either succeed or fail for individual records. My current error handling within the ForEach iteration is structured as follows: 1.Sample_put_record (Web Activity): Makes the API call. 2.Conditional Logic: I've tried two main approaches: •Approach A (Direct Success/Failure Paths): The Sample_put_record activity has a green arrow (on success) leading to a Log Success Items (Script activity) and a red arrow (on failure) leading to a Log Failed Items (Script activity). Both logging activities are followed by Wait activities (Dummy Wait For Success/Failure). •Approach B (If Condition Wrapper): I've wrapped the Sample_put_record activity and its success/failure logging within an If Condition activity. The If Condition's expression is @equals(activity('Sample_put_record').status, 'Succeeded'). The True branch contains the success logging, and the False branch contains the failure logging. The intention here was for the If Condition to always report success, regardless of the Sample_put_record outcome, to prevent the ForEach from failing. The Problem: Despite these error handling attempts, the ForEach loop (and thus the overall pipeline) still fails when an Sample_put_record activity fails. The error message I typically see for the ForEach activity is "Activity failed because an inner activity failed." When using the If Condition wrapper, the If Condition itself sometimes fails with the same error, indicating that an activity within its True or False branch is still causing a hard failure. For example, a common failure for Sample_put_record is: "valid":false,"message":"WARNING: There was no xxxxxxxxxxxxxxxxxxxxxxxxx scheduled..." (a user configuration/data issue). Even when my Log Failed Items script attempts to capture this, the ForEach still breaks. What I've Ensured/Considered: •Wait Activity Configuration: Wait activities are configured with reasonable durations and do not appear to be the direct cause of failure. •No Unhandled Exceptions: I'm trying to ensure no unhandled exceptions are propagating from my error handling activities. •Pipeline Status Goal: My ultimate goal is for the overall pipeline status to be Succeeded as long as the pipeline completes its execution, even if some Sample_put_record calls fail and are logged. I need to rely on the logs to identify actual failures, not the pipeline status. My Questions to the Community: 1.What is the definitive best practice in Azure Data Factory to ensure a ForEach loop never fails due to an inner activity failure, assuming the inner activity's failure is properly logged and handled within that iteration? 2.Are there specific nuances or common pitfalls with If Condition activities or Script activities within ForEach loops that could still cause failure propagation, even with try-catch and success exits? 3.How do you typically structure your ADF pipelines to achieve this level of resilience where internal failures are logged but don't impact the overall pipeline success status? 4.Are there any specific configurations on the ForEach activity itself (e.g., Continue on error setting, if it exists for ForEach?) or other activities that I might be overlooking? Any detailed examples, architectural patterns, or debugging tips would be greatly appreciated. Thank you in advance for your help!258Views0likes0Comments