Sunday, June 16, 2019

Getting Started with Azure Data Factory - Insert Pipeline details in Custom Monitoring Table

Monitoring of any system/application's health and performance plays very important role in an Enterprise , as you get insight about what is happening with the applications and can prevent from major losses by enabling better/corrective decision.

Microsoft has provided inbuilt Monitoring feature within Azure Data Factory where you gets details about the each Pipeline which is run and also details about the activity inside the pipeline.


Pipeline run

As can be seen lot of details are made available at both the levels,RunID, Starttime, status, name, duration etc. 
activity run

 and at activity level you can even see input and output of that particular activity.

So the question might arise why need of Custom monitoring table?

1. Currently the pipeline run data retention is only for 45 days, then after data would be no more available.
2. Support for User defined properties -- all the details which we see is system properties but what about other than those, i.e. User defined properties which we want to monitor (non system). Yes, there is support to include User Properties in Monitor - you can promote any pipeline activity property as a user property( But only 5).
3. And not all properties are shown in Monitor tab.However columns can be added or removed but are limited and only available for pipeline run and not for activity run.



Scenario:


For demo I am using simple scenario - moving data from Azure SQL to Azure Data Lake Store. Continued from last post(Getting Started with Azure Data Factory - CopyData from CosmosDB to SQL) 
Azure SQL to Azure Data Lake Store


But here, upon failure details are logged into custom monitoring table

Steps in creating solution


1. Create Azure Data Lake Store  -- Destination


ADLS gen2

Small mistake here which led to failure in Pipeline run.

2. Create Azure SQL DB -- Source
SQL DB Table


Small mistake here which led to failure in Pipeline run.

3. Create Error Logging Table and Stored Procedure to Insert values in it

Based on what all details you need to capture, number of columns can be defined. For demo twelve entities are identified thus those are added as columns  in ErrorLogTable
ErrorLog Table and Stored Procedure

And a simple stored procedure to insert values in it

CREATE PROCEDURE [dbo].[usp_UpdateErrorLogTable]
@PipelineName VARCHAR(250),
@PipelineID VARCHAR(250),
@Source VARCHAR(300),
@ActivityName VARCHAR(250),
@ActivityID VARCHAR(250),
@ErrorCode VARCHAR(10),
@ErrorDescription VARCHAR(5000),
@FailureType VARCHAR(50),
@ActivityStartTime DATETIME,
@PipelineStartTime DATETIME,
@BatchID VARCHAR(100),
@ErrorLoggedTime DATETIME,
AS
BEGIN
DECLARE @CheckError INT = 0;
BEGIN TRY
INSERT INTO [ErrorLogTable]
(
[PipelineName],
[PipelineID],
[Source],
[ActivityName ],
[ActivityID ],
[ErrorCode ],
[ErrorDescription ],
[FailureType ],
[ActivityStartTime ],
[PipelineStartTime ],
[BatchID ],
[ErrorLoggedTime ]
)
VALUES
(
@PipelineName ,
@PipelineID ,
@Source ,
@ActivityName ,
@ActivityID ,
@ErrorCode ,
@ErrorDescription ,
@FailureType ,
@ActivityStartTime ,
@PipelineStartTime ,
@BatchID ,
@ErrorLoggedTime
)
END TRY
BEGIN CATCH
SET @CheckError = ERROR_NUMBER()
END CATCH
IF @CheckError = 0 
BEGIN
RETURN 1
END
ELSE
BEGIN
RETURN -1
END

END



4. Create Azure Data Factory and Pipeline


i. To create new instance of Data Factory, login to Azure portal Click on +Create a resource --> Data Factories --> click on +Add button 

Provide unique name to Data Factory instance to be created as per purpose , select the subscription, resource group, version and location(Note that currently only limited locations are available to choose from).

After creating an instance of Data Factory, you need to click on Author and Monitor - which will lead to ADF designer portal (dev env) which opens in separate tab. 

get started page

The new tab opens with options to get started with. Either click on Create Pipeline Wizard or the Pencil icon, both will lead to the canvas.

ii. If you already have Data Factory instance, you can skip above step. 
Instance of Data factory

Search and open existing ADF instance and add new pipeline in it.



iii. Create Linked Service and Dataset which will be used in pipeline.

SQL Linked service and dataset
SQL Linked Service

Go to Connection tab and click on New button. Give it a name, database details, username, password etc.
SQl Dataset

Here no Table is selected, as am going to use query(Note: Even if you choose table here, you can use query in the activity).


ADLS Linked service and dataset


ADLS Linked Service

Go to Connection tab and click on New button. Give it a name, ADLS url, account key.


ADLS Dataset

iv. Now drag and drop Copy activity from Activities-->Move & Transform section on canvas

copy activity
Give it a name, select Dataset (SQL Dataset created in above step) and the query to fetch data



SQL Sink

In sink tab select Dataset (ADLS dataset created in above step)

v. Now next step is to add StoredProcedure activity on canvas and connect Copy DataFromSQL activity to it but on failure condition.



Select Copy DataFromSQL activity and Click on plus sign and select Failure



Under general tab give name and under SQL Account select the Linked Service, here we are using same Linked service which was created to SQL DB.
Stored Procedure Parameter

Under Stored Procedure tab select the SP and click on Import Parameter button -- It will fetch list of all the parameters defined in SP. You can add parameters manually too by pressing +New  button.
SP Parameter

Here am capturing details of Activity and Pipeline from properties which are promoted by system at runtime (ActivityID, PipelineID, ErrorCode etc.) and few are static values (User defined properties e.g., Source and ActivityName).

How it works -- Whenever pipeline is triggered a RunID gets assigned to it and also to each activities within it , also other info like PipelineName, JobId, ActivityRunId, Status, StatusCode, Output, Error, ExecutionStartTime, ExecutionEndTime, ExecutionDetails, Duration and many more are captured. But all are not accessible, only few of them are, the ones which are promoted by system(available to access). And to access them we need to use accessors(properties).
example -
a. to get Pipeline ID use RunID property of Pipeline class -- pipeline().RunId
b. to get when particular activity started use ExecutionStartTime property of activity class -- activity('name of activity').ExecutionStartTime
c. to get the error details of particular activity  use error.message property of activity class -- activity('name of activity').error.message (it is available only if that particular activity has failed).


That's it, pipeline is ready, validate it , publish it and test.



Testing


The copy activity has to fail so that entry is made in ErrorLogTable, for that provided Incorrect table name in Query

Select * from EmployeeArchive1 where Jobtitle='Developer'


Invalid Object Name error



 After the pipeline run completed, entries were found in the ErrorLogTable
entry in error log table






Related Post


Sunday, June 9, 2019

Bad Request - The specifed resource name contains invalid characters.

Encountered below error when triggered the  pipeline which moves data  from SQL to Azure Data Lake Store Gen2 and writes a csv file

Error

Activity Copy DataFromSQL failed: ErrorCode=UserErrorFailedBlobFSOperation,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=BlobFS operation failed for: Operation returned an invalid status code 'BadRequest'. Account: 'adls2adf'. FileSystem: 'EMPARCHIVE'. ErrorCode: 'InvalidResourceName'. Message: 'The specifed resource name contains invalid characters.'. RequestId: '8a0437a7-201f-0008-4ac7-1e3e47000000'.,Source=Microsoft.DataTransfer.ClientLibrary,''Type=Microsoft.Azure.Storage.Data.Models.ErrorSchemaException,Message=Operation returned an invalid status code 'BadRequest',Source=Microsoft.DataTransfer.ClientLibrary,'



Why it happened


The error actually is pointing out that there is some invalid characters in the file path.

incorrect file path


As can be seen in above image name of container,directory and filename are all in Uppercase, however the error points to the name given to the container i.e. EMPARCHIVE. And it is because uppercase is not yet supported for container name.



What to do


You need to use lowercase alphabets also there is support for numbers for providing name to container. There is no such restriction on Directory/ foldername. 

correct file path in ADLS gen2

Output file was created after correcting the container name.


Related Post






Sunday, June 2, 2019

Getting Started with Azure Data Factory - CopyData from CosmosDB to SQL


Why and What is Data Factory


Day by day the amount of data collected by any business/enterprise is growing - both the historical and transactional. But is there any need to keep them? Is it useful to business in any way? -- Yes, the data which is collected so far or would be collected on going can be used to deduce value by process of analyzing data and presenting actionable information to help executives, managers and other corporate end users to make informed business decisions.(assisting Business Intelligence)

Like it is said Information when processed on turns into Knowledge.

To make this possible Raw Data(information) --> Transformed data(knowledge), Microsoft introduced data integration service i.e. Azure Data Factory which is capable of extracting-transformation-loading (ETL) or extracting-loading-transformation(ELT), for transformation activities either of following can be used - HDInsight, Data Bricks, Azure Data lake U-SQL, Machine Learning, stored procedures etc and is managed by Microsoft. 

Each business/entity have there own way of storing the data (data stores), some prefer to store on premises, some on cloud, some on file system, some in different database etc. There is currently support for more than 72 data store to which ADF can connect to and extract data.

Like Logic Apps, where we have provision  to connect services which are cloud based or also on premises (integrating various services) like wise Azure Data factory can connect to various types of data stores which are cloud based or also on premises.You get a browser based designer (also available in visual studio), where you can design the Pipeline (data driven workflow)  by setting the  Trigger (the way to start the workflow) and then selecting the appropriate activity or series of activities. You have provision to debug and validate the workflow, once done you publish the workflow, it gets deployed – ready to use.  you are charged only when it is executed - based on the number of activities that are run, the volume of data that is moved etc.

Building Blocks of Azure Data Factory


1.TRIGGERS



There is three way you can trigger pipeline - manually or Scheduled based and Event Based(example -the deletion of a file, in your Azure Storage account) . 
Scheduled based will invoke automatically based on your scheduled time. It can be set to execute on daily, weekly, monthly basis (scheduled ) or can be set on time intervals of fixed size (Tumbling window)

trigger types


2.CONNECTIONS


The way to connect to data store and establishing a way to allow data movement to and fro is done via Integration Runtime and Linked Services.

Itegration runtime acts as a Data gateway/medium (on premises to cloud, cloud to cloud) whereas Linked Services are connection to data sources and destinations. Linked services are connection strings much like bindings in function App and Connectors config in Logic App. Thus Linked Service runs in scope of Integration Runtime.

Currently three types of IR are supported 

Azure (Within Azure)
Self-hosted (On premises)
Azure-SSIS (dedicated for SSIS package)

3. ACTIVITIES


The steps in pipelines are termed as Activities and can be Data movement activities, Data transformation activities (Data bricks),Control activities (Lookup activity, if conditions, wait activity,for each etc). You can initiate with any of the activity based on requirement. Below is how they are categorized 

activities in adf


4.DATASET


Datasets represent the way data is present inside the data store i.e. data structures  within the data stores .Thus an input dataset represents the input which will be used and an output dataset represents the output which will be produced by an activity.

5.PIPELINE


It is here where all the components discussed above are used to design the data workflow with help of an activity or goup of activities, like from where data is to be collected, how it is to be collected, what is to be done with data. So it is a logical grouping of activities or a scope of the tasks.


6. MONITORING PIPELINE


Inbuilt support for monitoring the pipeline runs is provided out of box, also details on Activity level can be checked and alerts can be set on top of it.




Below is small demonstration to help getting started with Azure data Factory.


Scenario:

For demo I am using simple scenario - say a business wants to archive the transactional data from the application database (where data is retained for few days only and then after gets deleted). As business needs the data for further processing, it wants it to be stored somewhere i.e. move data from application database to archive database periodically. 

Here I have used Cosmos db as application database and Azure SQL database  as the archive database.

Steps in creating solution


1. Create Azure Cosmos DB -- Source

Cosmos DB collection

To keep it simple have created collection with four records only (In reality there would be lot of data, upto millions of records )

2. Create Azure SQL DB -- Destination
SQL DB Table


To keep it simple have created table with column names matching to that of  fields in Records of cosmos db. I made small mistake here which led to failure in Pipeline run.

3. Create Azure Data Factory


Click on +Create a resource --> Data Factories --> click on +Add button 

create new data factory

Provide unique name to Data Factory instance to be created as per purpose , select the subscription, resource group, version and location. 

Note that currently only limited locations are available to choose from.

Instance of Data factory

After creating an instance of Data Factory, you need to click on Author and Monitor - which will lead to ADF designer portal (dev env) which opens in separate tab. 

Data Factory Portal

The new tab opens with options to get started with. Either of first two can be used to create pipeline and but as we need to create a pipeline to copy data so selected the Copy Data Wizard(it creates a pipeline with Copy data activity)
name to copy data

Give a descriptive name and select the Task schedule, as I want to run it manually once ,  Run once now is selected. 
source data store

Now we need to provide the data store from  where data is to be extracted and for that we need to create new connection (Linked Service)



As data source is Cosmos DB which is SQL API based, create linked service by selecting Azure Cosmos DB (SQL API)
Next provide name to the Linked Service, select the Runtime . As my data store is in Cloud itself , AutoResolveIntegrationRuntime is selected. If source data store would have been on Premises, then Integration Runtime agent had to be installed on the onPremises server and that would have been used as gateway.
For Connection String, as it is same subscription, From Azure Subscription is selected(it automatically fetches the keys/connection string), else we can also enter manually.
Select the CosmosDB account and Database which is to be connected, and click on Test connection and Finish.

Once connection is tested, create a dataset
Source data Source


Here Table data is to be fetched directly so EmployeeRecords table is selected from Existing tables, else query can also be used to fetch the data

Next is to create connection with Destination , select Azure SQL database

Provide the details like server name, database name, login details -- here too it is on Azure and same subscription 
Table mapping in ADF


EmployeeArchive table is selected as destination as data is to be copied to it 

Column mapping in ADF

With presents  with column mapping for creating Destination Dataset, you can make changes select/unselect here or continue. As for this demo, I kept it simple and same - one to one mapping
fault tolerant setting


Select what is to be done if any exception occurs in Pipeline -- You can abort and fail the copy activity when incompatible data is encountered (default behavior) or You can continue to copy all of the data by adding fault tolerance and skipping incompatible data rows (also errors can be logged).

In summary page, verify the details and if any changes are to be done then Edit option can be used.


Click on Finish, that's it, pipeline is ready, save it , publish it and test.



Testing



For testing I triggered the pipeline manually.




 After the pipeline run succeeded, entries were found in the SQL DB 
Data moved to destination using ADF



Related Post