Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory

In this article, we will see how to create an Azure Data Factory and we will copy data from Blob Storage to Cosmos DB using ADF pipelines. We will create the Source and Destination (Sink) datasets in the pipeline and will link these datasets with the Azure subscription. We will publish this pipeline and later, trigger it manually.

Prerequisites

  • Azure Blob Storage Account
  • Cosmos DB Account
Azure Data Factory is a cloud-based data integration service that allows you to create data-driven workflows in the cloud for orchestrating and automating data movement and data transformation. Using Azure Data Factory, you can create and schedule data-driven workflows (called pipelines) that can ingest data from disparate data stores.
 
Step 1 - Create Azure Data Factory
 
Log into the Azure Portal.
 
Select "Create a resource" and choose Analytics -> Data Factory.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
Give a valid name to the Azure Data Factory and choose resource group. If you don’t have any existing resource group, please create a new one.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 

Azure Data Factory will be created shortly.

Step 2 - Store Data in Blob Storage

We can upload a sample CSV file to the Blob Storage now.

Go to Storage Account and click “Storage Explorer” (Currently, it is in the preview mode).

Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
By right clicking the Blob Container, you can see the "Create Blob Container" context menu. Just click it.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
Please choose a valid container name (it is case-sensitive) and choose Public access level as Container so that we can access this container later from our Azure Data Factory.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
Open the container and upload a sample CSV file to the blob container. I will upload an employee data CSV file which contains only 3 records. Please click the “Upload” button to proceed.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 

Step 3 - Create a new Database and Collection in Azure Cosmos DB Account

Open Cosmos DB account and click Data Explorer.

Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
Click “New Database” button, give database name, and choose Throughput value. (It is not mandatory, you can simply ignore it).
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
You can add a new collection to this database by right clicking the database and choosing “New Collection” button.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory
 
Give a name to the Collection and give partition key also. Partition key is like a Primary key in SQL Server database.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory

Step 4 - Create a Pipeline in Azure Data Factory

We have already created Azure Data Factory. We can create a “Copy Data” pipeline now. Please open Azure Data Factory and click “Author and Monitor” button

Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
It will open the ADF dashboard. Choose “Create Pipeline” option.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
We can create a new activity now. In the filer box, please type “Copy” it will show the “Copy Data” option under Move & Transform tab. You can drag this activity to the work area as I did.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
We can rename the Activity in “General” tab. I have given a small description also.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 

In the Source tab, you can select the source dataset. Please click “New” Button. It will list all the data sources available in ADF. Currently, Microsoft supports more than 70 data sources.

As our source dataset is Blob storage, please choose it and click  the “Finish” Button.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
Choose the “Connection” tab and click “New” button to create a new linked service for source dataset.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
You can choose your Azure subscription and choose the already created Storage account name and click the “Finish” button.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
We can choose container name in Connection by browsing it and choose the container name from blob storage.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 

You can ignore the file name. It will automatically pick the file from the container.

In our CSV, the first row contains the column name. Select “Column name in first row” option.

Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 

You can click the pipeline and choose the Sink tab. It is for Destination dataset.

Please click the “New” button to choose Destination dataset and select Cosmos DB as destination data source and click “Finish” button.

Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory
Choose Connection tab and click “New” button to create new linked service for Cosmos DB.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
Choose your Azure subscription from the dropdown list and select Cosmos DB account name and database name from the list and click “Finish” button.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
You can choose the Collection name from the list. (We have already created the collection in Cosmos DB account)
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
We have created Source Dataset and Sink Dataset in our pipeline. We can validate the pipeline and datasets before publishing it.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
We can see the validation errors if anything occurred.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
Our validation was successful. We can now publish ADF.
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 

It will take some time to publish all the changes.

After a successful publish, we can Trigger the pipeline.

Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 

Click “Trigger” Button and choose “Trigger Now” It will open a window and choose “Finish” button.

We will be notified with a message that the pipeline succeeded.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 
 
Our data integration is completed now. We can open the Cosmos DB to check the copied data from Blob Storage. You can see that there are three records (documents) available in Cosmos DB. As I mentioned earlier my CSV file contains 3 records.
 
Copy Data From Blob Storage To Cosmos DB Using Azure Data Factory 

You can download the ARM (Azure Resource Manager) template for this ADF for future use. ARM template contains the pipeline and dataset details.

Normally there are two ARM templates available for each ADF.

arm_template.json and “arm_template_parameters.json
arm_template.json
  1. {  
  2.     "$schema""http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",  
  3.     "contentVersion""1.0.0.0",  
  4.     "parameters": {  
  5.         "factoryName": {  
  6.             "type""string",  
  7.             "metadata""Data Factory Name",  
  8.             "defaultValue""sarathadf1"  
  9.         },  
  10.         "AzureBlobStorage1_connectionString": {  
  11.             "type""secureString",  
  12.             "metadata""Secure string for 'connectionString' of 'AzureBlobStorage1'"  
  13.         },  
  14.         "CosmosDb1_connectionString": {  
  15.             "type""secureString",  
  16.             "metadata""Secure string for 'connectionString' of 'CosmosDb1'"  
  17.         },  
  18.         "AzureBlob1_properties_typeProperties_folderPath": {  
  19.             "type""string",  
  20.             "defaultValue""sarathcontainer"  
  21.         }  
  22.     },  
  23.     "variables": {  
  24.         "factoryId""[concat('Microsoft.DataFactory/factories/', parameters('factoryName'))]"  
  25.     },  
  26.     "resources": [  
  27.         {  
  28.             "name""[concat(parameters('factoryName'), '/AzureBlobStorage1')]",  
  29.             "type""Microsoft.DataFactory/factories/linkedServices",  
  30.             "apiVersion""2018-06-01",  
  31.             "properties": {  
  32.                 "annotations": [],  
  33.                 "type""AzureBlobStorage",  
  34.                 "typeProperties": {  
  35.                     "connectionString""[parameters('AzureBlobStorage1_connectionString')]"  
  36.                 }  
  37.             },  
  38.             "dependsOn": []  
  39.         },  
  40.         {  
  41.             "name""[concat(parameters('factoryName'), '/pipeline1')]",  
  42.             "type""Microsoft.DataFactory/factories/pipelines",  
  43.             "apiVersion""2018-06-01",  
  44.             "properties": {  
  45.                 "activities": [  
  46.                     {  
  47.                         "name""BlobCopyToCosmosDB",  
  48.                         "description""Copy data from Blob Storage to Cosmos DB Account.",  
  49.                         "type""Copy",  
  50.                         "dependsOn": [],  
  51.                         "policy": {  
  52.                             "timeout""7.00:00:00",  
  53.                             "retry": 0,  
  54.                             "retryIntervalInSeconds": 30,  
  55.                             "secureOutput"false,  
  56.                             "secureInput"false  
  57.                         },  
  58.                         "userProperties": [],  
  59.                         "typeProperties": {  
  60.                             "source": {  
  61.                                 "type""BlobSource",  
  62.                                 "recursive"true  
  63.                             },  
  64.                             "sink": {  
  65.                                 "type""DocumentDbCollectionSink",  
  66.                                 "nestingSeparator"".",  
  67.                                 "writeBatchSize": 10000,  
  68.                                 "writeBehavior""insert"  
  69.                             },  
  70.                             "enableStaging"false,  
  71.                             "dataIntegrationUnits": 0  
  72.                         },  
  73.                         "inputs": [  
  74.                             {  
  75.                                 "referenceName""AzureBlob1",  
  76.                                 "type""DatasetReference",  
  77.                                 "parameters": {}  
  78.                             }  
  79.                         ],  
  80.                         "outputs": [  
  81.                             {  
  82.                                 "referenceName""DocumentDbCollection1",  
  83.                                 "type""DatasetReference",  
  84.                                 "parameters": {}  
  85.                             }  
  86.                         ]  
  87.                     }  
  88.                 ],  
  89.                 "annotations": []  
  90.             },  
  91.             "dependsOn": [  
  92.                 "[concat(variables('factoryId'), '/datasets/AzureBlob1')]",  
  93.                 "[concat(variables('factoryId'), '/datasets/DocumentDbCollection1')]"  
  94.             ]  
  95.         },  
  96.         {  
  97.             "name""[concat(parameters('factoryName'), '/AzureBlob1')]",  
  98.             "type""Microsoft.DataFactory/factories/datasets",  
  99.             "apiVersion""2018-06-01",  
  100.             "properties": {  
  101.                 "linkedServiceName": {  
  102.                     "referenceName""AzureBlobStorage1",  
  103.                     "type""LinkedServiceReference"  
  104.                 },  
  105.                 "annotations": [],  
  106.                 "type""AzureBlob",  
  107.                 "typeProperties": {  
  108.                     "format": {  
  109.                         "type""TextFormat",  
  110.                         "columnDelimiter"",",  
  111.                         "nullValue""\\N",  
  112.                         "treatEmptyAsNull"true,  
  113.                         "skipLineCount": 0,  
  114.                         "firstRowAsHeader"true  
  115.                     },  
  116.                     "folderPath""[parameters('AzureBlob1_properties_typeProperties_folderPath')]"  
  117.                 }  
  118.             },  
  119.             "dependsOn": [  
  120.                 "[concat(variables('factoryId'), '/linkedServices/AzureBlobStorage1')]"  
  121.             ]  
  122.         },  
  123.         {  
  124.             "name""[concat(parameters('factoryName'), '/DocumentDbCollection1')]",  
  125.             "type""Microsoft.DataFactory/factories/datasets",  
  126.             "apiVersion""2018-06-01",  
  127.             "properties": {  
  128.                 "linkedServiceName": {  
  129.                     "referenceName""CosmosDb1",  
  130.                     "type""LinkedServiceReference"  
  131.                 },  
  132.                 "annotations": [],  
  133.                 "type""DocumentDbCollection",  
  134.                 "typeProperties": {  
  135.                     "collectionName""employee"  
  136.                 }  
  137.             },  
  138.             "dependsOn": [  
  139.                 "[concat(variables('factoryId'), '/linkedServices/CosmosDb1')]"  
  140.             ]  
  141.         },  
  142.         {  
  143.             "name""[concat(parameters('factoryName'), '/CosmosDb1')]",  
  144.             "type""Microsoft.DataFactory/factories/linkedServices",  
  145.             "apiVersion""2018-06-01",  
  146.             "properties": {  
  147.                 "annotations": [],  
  148.                 "type""CosmosDb",  
  149.                 "typeProperties": {  
  150.                     "connectionString""[parameters('CosmosDb1_connectionString')]"  
  151.                 }  
  152.             },  
  153.             "dependsOn": []  
  154.         }  
  155.     ]  
  156. }  

arm_template_parameters.json

  1. {  
  2.     "$schema""https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",  
  3.     "contentVersion""1.0.0.0",  
  4.     "parameters": {  
  5.         "factoryName": {  
  6.             "value""sarathadf1"  
  7.         },  
  8.         "AzureBlobStorage1_connectionString": {  
  9.             "value"""  
  10.         },  
  11.         "CosmosDb1_connectionString": {  
  12.             "value"""  
  13.         },  
  14.         "AzureBlob1_properties_typeProperties_folderPath": {  
  15.             "value""sarathcontainer"  
  16.         }  
  17.     }  
  18. }  

First json file contains all the pipeline and dataset information and second json file contains the details about parameters. You can give the storage account and other connection details in this file. We can import these ARM templates in future and save the time.

In this article, we have created an Azure Data Factory and we have uploaded one simple CSV file to Blob Storage and we have created one Database and empty Collection in Cosmos DB also. We have created a pipeline with two datasets in Data Factory. One for Azure Blob storage account and other for Cosmos DB account. We have linked both datasets with the Azure subscription. Later we published the pipeline and dataset. We have seen how to trigger pipeline on-demand also. We have seen all the data successfully copied from Blob storage to Cosmos DB. 


Similar Articles