Create Azure Data Factory And Pipeline Using .NET SDK

In this article, we will create Azure Data Factory and pipeline using .NET SDK. We will create two linked services and two datasets. One for source dataset and another for destination (sink) dataset. Here we will use Azure Blob Storage as input data source and Cosmos DB as output (sink) data source. We will copy data from CSV file (which is in Azure Blob Storage) to Cosmos DB database.

In this article, we will create an Azure Data Factory and Pipeline using .NET SDK. We will create two linked services and two datasets - One for the source dataset and another one for the destination (sink) dataset. Here, we will use Azure Blob Storage as input data source and Cosmos DB as the output (sink) data source. We will copy the data from the CSV file (which is in Azure Blob Storage) to the Cosmos DB database.

I have already explained about Azure Data Factory and how to copy the activity using the Azure portal in my previous article. Here, we will see how to achieve the same result with .NET SDK.
 
Step 1 - Create Azure Blob Storage
 
Create a resource -> Storage -> Storage account.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Click the “Review + Create” button.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Now, click the “Create” button.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
After some time, our storage account will be created successfully. We can go to the resource now.
 
Create Azure Data Factory and Pipeline using .NET SDK  
We can upload one sample CSV file to Blob storage. This CSV file contains employee data, has 3 columns (name, age, and department). Currently 3 rows are available in this CSV file. Open Storage explorer and create a new container.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
We can right-click the BLOB CONTAINER and click “Create Blob Container”.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Give a valid name to the container. It is case sensitive.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
We can upload the CSV file from our local hard drive. Please choose “SAS“ as authentication type.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 

We have successfully created a Blob Storage and uploaded the CSV file to the blob container.

Step 2 - Create Azure Cosmos DB account

Create a new resource -> databases -> Cosmos DB.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Please give a valid name to the Cosmos DB account and choose resource group also. You can click “Review + Create” button.
 
Create Azure Data Factory and Pipeline using .NET SDK  

After successful validation, click “Create” button.

We can go to Cosmos DB account and open “Data Explorer” tab.

 
Create Azure Data Factory and Pipeline using .NET SDK  
 
We need to create a new database.
Create Azure Data Factory and Pipeline using .NET SDK  
 
We can create a new collection inside the database. Please give “name” column as a partition key. (Partition is like the Primary key in RDMS).
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
We have successfully created a Cosmos DB account and we created a new database and collection too.

Step 3 - Create Azure Data Factory and Pipeline using .NET SDK

For creating any Azure resource from .NET, we must install the .NET SDK first. We need the below information from the Azure portal to create a resource using .NET SDK.

  1. Tenant ID
  2. Subscription ID
  3. Application ID
  4. Authentication Key

We can get the Tenant ID from Azure Portal. Click “Azure Active Directory” -> “Properties” -> and choose Directory ID.

The Directory ID is the Tenant ID.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Click “Subscriptions” tab and choose Subscription Id.
 
Create Azure Data Factory and Pipeline using .NET SDK  

We need an application id and authentication key also. We can create a new app registration and get the id and key.

Click “Azure Active Directory” -> “App registrations” and click “New application registration” button.

Create Azure Data Factory and Pipeline using .NET SDK  
 
We must give a valid name to our app registration and choose Web app / API as the application type. You can give a dummy URL as Sign-on URL. Click Create button.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Your app registration will be created shortly and copy the application id and save it to any safe place.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
You can click “Settings” button to create an authentication key. Click “Keys” button and give a description and expiry period. Please note our key will be automatically generated while saving the entries. Click “Save” button.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Copy the key value and save to a safe place. We will be using this key value later in our .NET application. Please note, you can’t retrieve this key after you leave this blade.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
We have successfully got the Tenant ID, Subscription ID, Application ID and Authentication Key from the Azure portal.
 
We must grant access roles to our App registration. (This is mandatory for creating resources from .NET SDK).
 
Click “Subscription” button and click “Access control (IAM)” tab. Click “Add” button to grant new access role.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Choose our previously created app and give “Contributor” role.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Create a new .NET Console application.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Save the project.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
We can install the “Microsoft.Azure.Management.DataFactory” NuGet Package now.
 
Open Package Manager Console and execute the below command. 
  1. Install-Package Microsoft.Azure.Management.DataFactory  
Create Azure Data Factory and Pipeline using .NET SDK

Install two more packages.

  1. Install-Package Microsoft.Azure.Management.ResourceManager -Prerelease  
  2. Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory  

We can modify the static “Main” method inside “Program” class to create Azure Data Factory and Pipeline.

This method will be automatically executed while starting the application.

Set variables
  1. // Set variables    
  2. string tenantID = "<fill the value>";    
  3. string subscriptionId = "<fill the value>";    
  4. string applicationId = "<fill the value>";    
  5. string authenticationKey = "<fill the value>";    
  6. string resourceGroup = "sarath-rg";  
  7. string region = "East US";  
  8. string dataFactoryName = "sarathadf1"//must be globally unique  

Specify the source Azure Blob information

  1. // Specify the source Azure Blob information  
  2. string storageAccount = "sarathstorage";  
  3. string storageKey = "<fill the value>";  
  4. string inputBlobPath = "sarathcontainer/";  
  5. string inputBlobName = "employee.csv";  

Specify the Azure Cosmos DB information

  1. // Specify the Azure Cosmos DB information  
  2. string azureCosmosDBConnString = "AccountEndpoint=https://sarathcosmosdb.documents.azure.com:443/;AccountKey=<account key>;Database=sarathlal";  
  3. string azureCosmosDBCollection = "employee";  

Specify the Linked Service Names and Dataset Names

  1. string blobStorageLinkedServiceName = "AzureBlobStorageLinkedService";  
  2. string cosmosDbLinkedServiceName = "AzureCosmosDbLinkedService";  
  3. string blobDatasetName = "BlobDataset";  
  4. string cosmosDbDatasetName = "CosmosDbDataset";  
  5. string pipelineName = "SarathADFBlobToCosmosDbCopy";  

We can authenticate and create a data factory management client

  1. // Authenticate and create a data factory management client  
  2. var context = new AuthenticationContext("https://login.windows.net/" + tenantID);  
  3. ClientCredential cc = new ClientCredential(applicationId, authenticationKey);  
  4. AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;  
  5. ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);  
  6. var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };  

Create data factory and wait.

  1. // Create data factory  
  2. Console.WriteLine("Creating data factory " + dataFactoryName + "...");  
  3. Factory dataFactory = new Factory  
  4. {  
  5.     Location = region,  
  6.     Identity = new FactoryIdentity()  
  7.   
  8. };  
  9. client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);  
  10. Console.WriteLine(SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));  
  11.   
  12. while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")  
  13. {  
  14.     System.Threading.Thread.Sleep(1000);  
  15. }  

Create an Azure Blob Storage linked service

  1. // Create an Azure Blob Storage linked service  
  2. Console.WriteLine("Creating linked service " + blobStorageLinkedServiceName + "...");  
  3.   
  4. LinkedServiceResource storageLinkedService = new LinkedServiceResource(  
  5.     new AzureStorageLinkedService  
  6.     {  
  7.         ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)  
  8.     }  
  9. );  
  10. client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, blobStorageLinkedServiceName, storageLinkedService);  
  11. Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));  

Create an Azure Cosmos DB linked service

  1. // Create an Azure Cosmos DB linked service  
  2. Console.WriteLine("Creating linked service " + cosmosDbLinkedServiceName + "...");  
  3.   
  4. LinkedServiceResource cosmosDbLinkedService = new LinkedServiceResource(  
  5.     new CosmosDbLinkedService  
  6.     {  
  7.         ConnectionString = new SecureString(azureCosmosDBConnString),  
  8.     }  
  9. );  
  10. client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, cosmosDbLinkedServiceName, cosmosDbLinkedService);  
  11. Console.WriteLine(SafeJsonConvert.SerializeObject(cosmosDbLinkedService, client.SerializationSettings));  

Create an Azure Blob dataset

  1. // Create an Azure Blob dataset  
  2. Console.WriteLine("Creating dataset " + blobDatasetName + "...");  
  3. DatasetResource blobDataset = new DatasetResource(  
  4.     new AzureBlobDataset  
  5.     {  
  6.         LinkedServiceName = new LinkedServiceReference  
  7.         {  
  8.             ReferenceName = blobStorageLinkedServiceName  
  9.         },  
  10.         FolderPath = inputBlobPath,  
  11.         FileName = inputBlobName,  
  12.         Format = new TextFormat { ColumnDelimiter = ",", TreatEmptyAsNull = true, FirstRowAsHeader = true },  
  13.         Structure = new List<DatasetDataElement>  
  14.         {  
  15. new DatasetDataElement  
  16. {  
  17.     Name = "name",  
  18.     Type = "String"  
  19. },  
  20. new DatasetDataElement  
  21. {  
  22.     Name = "age",  
  23.     Type = "Int32"  
  24. },  
  25. new DatasetDataElement  
  26. {  
  27.     Name = "department",  
  28.     Type = "String"  
  29. }  
  30.         }  
  31.     }  
  32. );  
  33. client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobDatasetName, blobDataset);  
  34. Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));  

Create a Cosmos DB Database dataset

  1. // Create a Cosmos DB Database dataset  
  2. Console.WriteLine("Creating dataset " + cosmosDbDatasetName + "...");  
  3. DatasetResource cosmosDbDataset = new DatasetResource(  
  4.     new DocumentDbCollectionDataset  
  5.     {  
  6.         LinkedServiceName = new LinkedServiceReference  
  7.         {  
  8.             ReferenceName = cosmosDbLinkedServiceName  
  9.         },  
  10.         CollectionName = azureCosmosDBCollection  
  11.     }  
  12. );  
  13. client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, cosmosDbDatasetName, cosmosDbDataset);  
  14. Console.WriteLine(SafeJsonConvert.SerializeObject(cosmosDbDataset, client.SerializationSettings));  

Create a Pipeline with Copy Activity (very important)

  1. // Create a Pipeline with Copy Activity  
  2. Console.WriteLine("Creating pipeline " + pipelineName + "...");  
  3. PipelineResource pipeline = new PipelineResource  
  4. {  
  5.     Activities = new List<Activity>  
  6.     {  
  7.         new CopyActivity  
  8.         {  
  9.             Name = "CopyFromBlobToCosmosDB",  
  10.             Inputs = new List<DatasetReference>  
  11.             {  
  12.                 new DatasetReference()  
  13.                 {  
  14.                     ReferenceName = blobDatasetName  
  15.                 }  
  16.             },  
  17.             Outputs = new List<DatasetReference>  
  18.             {  
  19.                 new DatasetReference  
  20.                 {  
  21.                     ReferenceName = cosmosDbDatasetName  
  22.                 }  
  23.             },  
  24.             Source = new BlobSource { },  
  25.             Sink = new DocumentDbCollectionSink { }  
  26.         }  
  27.     }  
  28. };  
  29. client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);  
  30. Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));  

Create a Pipeline Run

  1. // Create a Pipeline Run  
  2. Console.WriteLine("Creating Pipeline run...");  
  3. CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName).Result.Body;  
  4. Console.WriteLine("Pipeline run ID: " + runResponse.RunId);  

Monitor the Pipeline Run

  1. // Monitor the Pipeline Run  
  2. Console.WriteLine("Checking Pipeline Run Status...");  
  3. PipelineRun pipelineRun;  
  4. while (true)  
  5. {  
  6.     pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);  
  7.     Console.WriteLine("Status: " + pipelineRun.Status);  
  8.     if (pipelineRun.Status == "InProgress")  
  9.         System.Threading.Thread.Sleep(15000);  
  10.     else  
  11.         break;  
  12. }  

Check the Copy Activity Run Details

  1. // Check the Copy Activity Run Details  
  2. Console.WriteLine("Checking copy activity run details...");  
  3. if (pipelineRun.Status == "Succeeded")  
  4. {  
  5.     Console.WriteLine("Copy Activity Succeeded!");  
  6. }  
  7. else  
  8. {  
  9.     Console.WriteLine("Copy Activity Failed!");  
  10. }  
  11. Console.WriteLine("\nPress any key to exit...");  
  12. Console.ReadKey();  

We have completed all the coding for creating Azure Data Factory, pipeline, linked services for both input and output ,and completed data sets also. Now we can run the application. It will take some moments to create all these items and we will write all the logs in to the console. Our application executed successfully without any errors.

Create Azure Data Factory and Pipeline using .NET SDK  
 
We can go to the Azure Cosmos DB account and open a new query to check the current documents (records) available in the employee collection.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Please note that our .NET application successfully copied the data from Blob Storage to Cosmos DB. If you check all the available resources in the Azure portal, you can find that the new Azure data factory is available there.
 
Create Azure Data Factory and Pipeline using .NET SDK  
You can click Azure data factory to open it. Open resource and click “Author and Monitor” button. You can see that there are two datasets and one pipeline available in the data factory.
 
Create Azure Data Factory and Pipeline using .NET SDK  
 
Each dataset contains the linked services and connection information. If you click at the pipeline, you can get all the details about the pipeline.
 
Create Azure Data Factory and Pipeline using .NET SDK  

You can see many tabs are available in the pipeline. Source and Sink tab contain the information about the dataset and linked service details.

In this article, we have created a Blob Storage and uploaded a CSV file to the Blob container. We have created Cosmos DB account and created a database and a collection. We have created an Azure Data Factory and pipeline along with linked service and datasets for Source and Sink. Finally, we have executed the console application and found that all the resources created successfully. We have seen that the data copied from Blob storage to Cosmos DB successfully.
 
We will discuss more features of Azure Data Factory in my upcoming articles.