Processing CSV and GZ format files into table through PDI

When dealing with the loading of an aggregate or summary table in Pentaho Data Integration, there is a wide variety of sources from which day-to-day data come.

When dealing with the loading of aggregate or summary table in Pentaho Data Integration, there are a wide variety of sources from which day-to-day data are coming. Since there are many sources such as tables, structured files, raw files, and pipe-separated files, we need to integrate data from all of these into a common staging area.

Among the varieties of sources, the most common format in which we receive data from clients and third-party vendors are in CSV and GZ format.

As it is best to organize data in a structured fashion, either separated by commas or pipe ‘|’, we often need to develop PDI jobs and transformations to load CSV files. In an ideal world, any data source will provide us with a single format for ingestion. All files look the same and a single PDI transformation can load them all into a database table.

Below is the job flow which will take the data from the CSV input file and load it into fact table


Fig.fact_mbs_category_job


Below is the workflow for the above job to load CSV or GZip files

START - Start point of Job.


Truncate lt_mbs_category_stg

This step will truncate the stage table 'lt_mbs_category_stg' to load the new data after the update_date.


delete stage files 

This step will delete the stage file named 'lt_mbs_category_stage'.


create_mbs_category_stg_load_file

This step executes 'create_mbs_category_stg_load_file.ktr' transformation.

It will load data from table hrst_ic.mbs_category into file 'lt_mbs_category_stage'.


get_mbs_category_stg_filenames 

This step executes 'get_stage_filenames.ktr' transformation. This is a common transformation to get all stage file names.


for_each_mbs_category_stg_file 

This step executes 'for_each_mbs_category_stg_file.kjb' job. It will execute for every input row. This job will receive all file names and then perform the bulkLoad into lt_mbs_category_stg table for each created file.


delete fact files 

This step will delete the fact and hold file named 'lt_mbs_category_stage'.


create_mbs_category_fact_file 

This step will execute 'create_mbs_category_fact_and_hold_load_file.ktr' transformation.

It will load data from stage table 'lt_mbs_category_stg' to fact file lt_mbs_category_fact.


get_mbs_category_fact_filenames 

This step finds previously created fact load files from the out folder path '${mdb.output.path}/${directory_path}' with a RegExp '${database}_${file_name}_fact.*\.txt'.

Parameter 'directory_path' and 'file_name' are declared in the main Job.


for_each_mbs_category_fact_file 

This step executes 'for_each_authorization_fact_file.kjb' job. It will execute for every input row. This job will get the all file names and then perform the bulkLoad into wt_mbs_category_f table for each created file.


Success 

Endpoint of job with Success


1. truncate lt_mbs_category_stg

This step will truncate the stage table 'lt_mbs_category_stg' to load the new data after the update_date.



2. delete stage files 

This step will delete stage file named as 'lt_mbs_category_stage'.




3. create_mbs_category_stg_load_file

This step executes 'create_mbs_category_stg_load_file.ktr' transformation.

It will load data from table hrst_ic.mbs_category into file 'lt_mbs_category_stage'.


Specify the directory path and filename for source file along with extension CSV.




Specify the filetype as CSV along with separator character |.


Map the name of columns from the source CSV file.

Rename the original field names as the column name with significance and data type.

Specify the path where stage files should be created.


4.Text file output 

This step creates stage files from 'hrst_ic.mbs_category' table.

Specify the number of rows each stage files will have along with separator character.

Map the fields from stream to text file output with column names and data types.


5. get_mbs_category_stg_filenames

This step executes 'get_stage_filenames.ktr' transformation.This is common transformation to get all stage file names.



Construct the new file path along with file name and extension.


Move the already processed files to the processed folder.




This step will read the names of all stage files and pass it from bulkload job to bulkload into fact table.







6. for_each_mbs_category_stg_file

This step executes 'for_each_mbs_category_stg_file.kjb' job. It will execute for every input row.

This job will get all file names and then perform the bulkLoad into lt_mbs_category_stg table for each created file.



Mysql Bulk Load will load each stage file into lt_mbs_category_stg table. Specify the column names.