Storage account, Azure SQL Server, SQL databases -> Part 1
Linked Services and Datasets -> Part 2
We are now ready to setup the pipeline & its activities
Note: For this lab we shall be using same naming conventions for Pipeline, Activities & Dynamic content which gets passed
1. LookUp Activity
The activity should be renamed as LookupControlTable. Leave the 'General' and 'User properties' tab as in. For the 'Settings' tab, select the Static_controltable as your source dataset
Ensure you uncheck 'First row only' option, by default this will be checked
2. ForEach Activity
The activity should be renamed as ForEachRow. Point the 'Success' arrow for LookUp activity we created above to this activity. Leave the 'General' tab as is. Under the 'Settings' tab, select 'Add dynamic content' menu which will activate when you select the box for items.
For 'Items' we will be passing an array of rows received as output from the previous activity. Once selected it should look something as shown below. (Same naming conventions become very important here)
You should now be able to select the 'Add' button and for ForEachRow activity and add 'Copy data' activity
3. Copy data Activity
The activity should be renamed as CopyfromBlobToSQL. Leave the 'General' tab as is. In the above steps an array of rows from Lookup activity is getting passed in the ForEach loop. Through inheritance the Copy data activity can also reference these rows via item() runtime variable. Essentially item() holds rows in the MetadataControlTable as follows
For the 'Source' we are going to Dynamic_sourcedataset as dataset. We want this dataset to accept container name, directory name(same as container name) and file dynamically. The columns of the above table now become parameters for the item() variable. We use this reference to pass different row values every time the loop iterates to initialize the ContainName and SourceFile variables declared previously in Dynamic_sourcedataset
Thus when loop runs and we will be extracting data from different containers/directories/files
(Note: Reason we are also using trim() function here is that in lab it was sometimes it was observed that blank spaces are also being passed for parameter values. This results in errors. To avoid the same we are using in-built trim() function).
For the Sink configuration the same logic is followed. We are pointing the sink dataset to Dynamic_sinkdataset . But this time we are passing item() collection properties TargetDatabase, TargetSchema, TargetTable as variables. We are assigning them to variables we declared with same name for Dynamic_sinkdataset.
At runtime different source files will get mapped to different target tables within the respective databases. Clearly we will also need to define the mapping's for column names & data types on the fly. As the mapping will differ for each of the rows. In previous blogs we have take care of this through JsonFiles which we Inserted in the MetadataControlTable. This time we use the TargetJson propery of the item() collectionwhere we have stored this mapping for each rows.
(Note: To ensure ADF understands that the values are to be converted into JSON we are using the json() in-built function. Absence of using this function may result in validation errors)
4. Script Activity
The last and final step is to capture the meta data if a copy activity if it succeeds or fails. That way you are having the mapping data and metadata in the same MetadataControlTable for debugging purpose. Add another Script activity for the ForEachRow loop using the 'Add' button. Rename this as UpdatePipelineandRunData. Point the 'Success' arrow for Copy data activity we created above to this activity. This activity will execute on successful completion of CopyfromBlobToSQL for each iteration.
It will accept the metadata of the CopyfromBlobToSQL activity, execute an update query on the Static_controltable dataset and using where conditions, to update each row with metadata. Leave the 'General' settings as it. Under 'Settings' map the linked service incrementalcontrol. For 'Script' we are going to pass dynamic content using 'Add dynamic content' menu. It looks something like this
The script is available here. It captures the following metadata like : Pipeline_Name, RunId, ActivityStart, ActivityEnd, Status, FilesRead, RowsRead, SourcePeakConnections, RowsWritten, SinkPeakConnections, UsedDIUs, UsedParallelCopies, CopyDuration_inseconds, CopyThroughput_inbytespersecond & Errors. (Note: These were earlier declared in the meta data control table as follows)
Notice we are referencing the parent pipeline (pipeline()), the previous Copy data activity (activity(' CopyfromBlobToSQL ') and the array of row values (item()) runtime variables. We also make use of built-in utcNow() function which returns the current time UTC format as string value. Hence the mandate to use the same naming conventions for activities and declaring variables as they are on the control table. This script was custom created by referencing various parameters of copy activity output in general. The general output of a copy activity can be found here. There is a possibility as and when more features are incorporated for Azure Data Factory activities or pipelines the script will require an update. Ensure you are checking the public documentation links provided below to be updated.
End Results may look something like this
To understand more about these parameters refer to this article. This concludes the end of this lab.
Hopefully you should not require this but should you still run into errors refer this link