A Tool To Generate PySpark Schema From JSON

 Hi Folks,

I am not sure which data engineer will need this but it solves a problem that a data engineer might face. When dealing with complex JSON data coming from a source like Kafka or file or any, and then we had to demormalize them.

If you are dealing with the JSON with writing pyspark schema then I have a solution for you.

I built a small tool that solves a problem for a data engineer while dealing with JSON data using PySpark Schema.

As we know JSON data is semi-structured and we always ingest them and denormalize them to smaller tables properly for further processing. This tools helps you speed up a little in your ETL development process.

I faced this problem in one of my projects so I built this tool and thought I should share it with all.

A tool to Generate PySpark Schema from JSON

In my case, I had to generate PySpark Schema from JSON to ingest the data and the JSON structure often gets changed.

The JSON I was dealing with was very complex. Let me give you an example about the tool, and what problem it solves.

For example, we have a JSON coming from Kafka like below,

{
    "name": "PREETish ranjan",
    "dob": "2022-03-04T18:30:00.000Z",
    "status": "active",
    "isActive": true,
    "id": 102,
    "address": {
        "city": "Hyderabad",
        "PIN": 500016
    },
    "mobiles": ["8989898989", "5656565656"],
    "id_cards": [1, 2, 3, 4, 5]
}

The output we need is like this,

StructType([
    StructField('name', StringType(), True),
    StructField('dob', StringType(), True),
    StructField('status', StringType(), True),
    StructField('isActive', BooleanType(), True),
    StructField('id', IntegerType(), True),
    StructField('address', StructType([StructField('city', StringType(), True), StructField('PIN', IntegerType(), True)]), True),
    StructField('mobiles', ArrayType(StringType()), True),
    StructField('id_cards', ArrayType(IntegerType()), True),
])

If the JSON gets more complex and big it's quite difficult to generate the schema if you are dealing the data like this. My tool is a simple javascript tool that generates this for you. It has a few bugs and limitations but it works.

In few complex array like structure, it sometimes generates the wrong schema, like it basically puts null in place of that otherwise it works fine. 

How does it work?

JavaScript is very good at dealing with JSON so I built this using JavaScript. It takes a valid JSON and the function runs recursively and generates strings accordingly for building the PySpark schema. If it finds a string it adds StringType(),

if it finds a number then it adds IntegerType(), if it finds an object it repeats the process again.

You can see the simple source code in the github repository.

Example

Here is a small example of how to use the schema. 

I have used this in Azure databricks and it works completely fine.

The column "values" in the table source_schema_dev.dummy_data  is the string column that contains the JSON string,

import os
from pyspark.sql.types
import * from pyspark.sql.functions
import * env = os.environ["ENVIRONMENT"]
table_name = f "source_schema_{env}.dummy_data"
schema = StructType([
    StructField('name', StringType(), True),
    StructField('dob', StringType(), True),
    StructField('status', StringType(), True),
    StructField('isActive', BooleanType(), True),
    StructField('id', IntegerType(), True),
    StructField('address', StructType([StructField('city', StringType(), True), StructField('PIN', IntegerType(), True)]), True),
    StructField('mobiles', ArrayType(StringType()), True),
    StructField('id_cards', ArrayType(IntegerType()), True),
])
df = spark.table(f "{table_name}")
df1 = df.select("values", "partition", "timestamp")
df2 = df1.withColumn("values", from_json("values", schema))
df2.createOrReplaceTempView("latest_data")
display(df2)

Conclusion

This is a solution to a very niche problem. It will be useful if you are dealing with JSON data with defining PySpark schema.

It has quite a few known bugs like it sometimes generating wrong schema for complex array types but you can always regenerate the inner schema if it results in any issues.

I would love your feedback on this.

Thanks for reading!!!