Getting started with ADF - Loading data in SQL Tables from multiple parquet files dynamically

{tocify} $title={Table of Contents}

Introduction


In last post - Getting started with ADF - Creating and Loading data in parquet file from SQL Tables dynamically we saw how to create parquet files from SQL Tables dynamically.

In this post we will see the reverse scenario i.e. Loading data in SQL Tables from multiple parquet files dynamically 

Also we will make use of Auto-Create option to create table automatically if it doesn't exist in the destination SQL database.


Steps in Loading data in SQL Table from parquet file 


If you are beginner then would ask you to go through - Getting Started with Azure Data Factory

Let's create a pipeline which will load data from single parquet file to particular table
 

Create source linked service


To create linked service means creating a connection,  Click on Manage tab from the left pane.

On the Linked services page, select +New to create a new linked service. 

Search for storage and select Azure data lake storage Gen2 for the Source here i.e., place where parquet files will be kept

linked service for ADLS gen2 with parquet format

Give meaningful name, 

select Integration runtime as AutoResolveIntegrationRuntime

Four options are available as authentication method , we select Account Key                             

inputs to new Linked Service
You can choose From Azure Subscription and select from the list of available storage accounts, here we are manually giving the url of the storage account and the respective storage account key.

Note :  Alternatively and recommended  way is to make use of Key Vault 


Create Linked Service for destination 


Now, create another linked service, here the destination is SQL database tables, so create a Connection string to this particular database.

Linked Service for SQL Database



For that you provide the Server address, Database Name and the credential. And finally click on Test Connection to confirm all ok.

Using this linked services, ADF will connect to these services at runtime. 


Next is  to tell ADF, what form of data to expect.

Two datasets is to be created one for defining structure of data coming from the parquet files (input) and another for  SQL table which will be created and data would be loaded into it (output) 

Create Dataset - Parquet Files


Select Author tab from the left pane --> select the + (plus) button  and then select Dataset.

Search for storage and select ADLS gen2, Parquet as format,  give the Name and select the linked service

create parquet dataset

Provide the file path and save it.


Create Dataset - SQL 


Search for SQL and select SQL Server, provide the Name and select the linked service, the one created for connecting to SQL


Create SQL dataset



Provide the Table name and save it.

Create Pipeline to load data in SQL Table from Parquet File 


Select the + (plus) button, and then select Pipeline.

Drag and drop Copy Data activity from Activities-->Move & Transform section

In the General panel under Properties, specify Copy data from Parquet to SQL for Name. 

Provide the details of source (i.e. ADLS gen 2 folder location) and sink (i.e. which Table to insert data into).

copy data parquet file to SQL Table



That's it , basic pipeline is ready.

So this pipeline will read data from inputfiles/invoice/Invoice.parquet file and insert into dbo.Invoice_Parquet table.

This is great for single file, what if there are multiple files from which data is to be loaded in SQL table?

You can say, we can use same pipeline - by just replacing the folder path and file names in source settings and table name in sink settings, yes that will work but there will be manual intervention required.

And what if there are hundred's and thousand's of different object's parquet file? 


How data can be loaded dynamically in SQL table from parquet files using Azure data factory pipeline?


In order to  load data in those  dynamically, we will take help of configuration table where we will store the required details.

This configurations can be referred at runtime by Pipeline with the help of LookUp activity, the output of this activity can be used to dynamically provide the settings value (through the parameters - which we will add in the datasets) , thus no hardcoding required in the pipeline.

So same pipeline can be used for all the requirement where data is to be loaded in SQL table, just an entry in the configuration table is required.

Let's see below, how to do it...

Steps in creating pipeline - Load data in SQL Table dynamically from multiple different parquet files 


We will use same Linked services and Datasets created above with or without some modification

Modify Source and Destination Linked Service


No changes required


Modify Source Dataset


We will remove hardcoded folder name and filePath (no static data).

And add parameters to dataset

                    container
                    folder
                    filepath

this will help us in achieving the dynamic selections of the parquet files.


modified parquet dataset




In connection tab add following against File Path

@dataset().container/@dataset().folder/@dataset().filepath


Modify Destination Dataset


We will Remove hardcoded Table name (no static data).

And add a parameter called TableName and in connection  tab provide following against Table -
                             
                                @dataset().TableName



Adding parameter in SQL Dataset


We will make use of parameter, this will help us in achieving the dynamic selection of Table. 



Configuration Table 


Using this table we will have some basic config information like the file path of parquet file, the table name, flag to decide whether  it is to be processed or not etc. (more columns can be added as per the need).

Configuration table for ADF pipeline


This table will be referred at runtime and based on results from it, further processing will be done.

In future

1. if more objects are to be added - we make entry in config table
2. if some objects are not to be processed - we set TobeProcessed flag to false

Thus we can setup pipeline once and provide the config details dynamically to it, without making any further change in pipeline.

Designing the Pipeline


The purpose of pipeline is to read the parquet files and load the data in the respective SQL Table 

i.   Lookup Activity

config lookup activity

The fist step where we get the details of the location of the parquet files and which all tables are to be loaded .

  • Place a lookup activity , provide a name in General tab.
  • Under Settings tab - select the dataset as DS_SQLServer , provide the table name as config (the one created in above step), select user entry as Query and provide below against the Query
                    select * from config where TobeProcessed = 1

Here basically we are fetching details of only those objects which we are interested(the ones having TobeProcessed flag set to true)

So based on number of objects returned, we need to perform those number(for each) of copy activity, so in next step add ForEach 


ii.  For each activity

ForEach activity

ForEach works on array, it's input. Here it is termed as Items , provide following against it

                 @activity('ConfigLookup').output.value

Basically we are giving it the output of  ConfigLookup activity, so it will iterate through it . And in each iteration we need to perform the act of creating parquet file and for that we add CopyData activity in ForEach.

iii. CopyData Activity

  • Select Copy data activity , give a meaningful name.
  • On Source tab


 i. Select dataset as DS_Parquet, as soon as you do it - you are asked to provide value against parameters


Here we are using Wildcard path, so that all the .parquet files which is there in the provided path should be processed

                copy activity parquet to sql            
  • On Sink tab
          i. Select dataset as DS_SQLServer, as soon as you do it - you are asked to provide value against parameter TableName, provide below       
                       @concat(item().TableName,'_Parquet') 

         ii. In Table option select Auto create table  -  this tells adf engine to check whether the table exists already in destination database.
If no table found, it automatically creates Table and then loads data in it.



Where did item came into picture?

Here item is current object/record of the ForEach items array. 

In previous step, we  had assigned output of lookup activity to ForEach's Items thus if lookup activity returned 3 records then Foreach items will have 3 records and it is referred as item in it.

Thus you provide the value which is in the current iteration of ForEach loop which ultimately is coming from config table  


That's it, pipeline is ready to use.


Conclusion


We walkthrough the steps of creating a pipeline to load data in SQL from a parquet file using Azure data factory pipeline . 

And in a scenario where there is need to insert data into many tables from multiple parquet files, same pipeline can be utilized with the help of configuration table .

Thus the pipeline remains untouched and whatever details are to be added or removed, is done in configuration table.

It is a design pattern which is very commonly used to make the pipeline more dynamic and to avoid hard coding and reducing tight coupling.


If you have some better idea or any suggestion/question, do post in comment !!



Post a Comment

If you have any suggestions or questions or want to share something then please drop a comment

Previous Post Next Post