JSON - read, set schema and write with pySpark

By Niraj Zade | 2023-12-18 | Tags: guide file-io


If you're using Jupyter notebook setup, here is the code I used to initialize things:

# for jupyter notebook
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity='all'

# import relavant python modules
from pyspark.sql import types as T
from pyspark.sql import functions as F

# Create spark session (not required on databricks)
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Required format of JSON objects in files

There are 2 types of json files:

  1. Single line json - NDJSON
  2. Multiline JSON

Single line json - NDJSON

Every line has to be a valid & independent JSON object. Newline Delimited Json aka NDJSON format. This is also called as the JsonLine format.

Don't wrap the JSON objects into an array.

Correct single line JSON

  • One valid json object per line
  • Json objects are not wrapped in an array
  • Every json object can end with a comma. Spark reader won't care.
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1}
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}

Wrong single line json

Objects are wrapped in an array using braces. In this case spark will treat the lines with braces as a single json object, and treat them as corrupt records.

Example input:

[
    {"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1},
    {"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2},
    {"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1},
    {"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}
]

The loaded dataframe looks like -

+---------------+------+----------+----+-----+----+
|_corrupt_record|amount|customerid| day|month|year|
+---------------+------+----------+----+-----+----+
|              [|  NULL|      NULL|NULL| NULL|NULL|
|           NULL|  1000|         1|   1|    1|2022|
|           NULL|  1000|         1|   2|    1|2022|
|           NULL|  2000|         2|   1|    1|2022|
|           NULL|  2000|         2|   2|    1|2022|
|              ]|  NULL|      NULL|NULL| NULL|NULL|
+---------------+------+----------+----+-----+----+

Multiline json

The entire file, when parsed, has to read like a single valid json object.

You'll have to wrap all the json objects within the file into an array [ ...json objects array... ]. If you don't, spark will only read the first json object, and skip the rest.

Correct multiline json

  • Objects are wrapped in an array using braces
[
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 1000
  },
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 1000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 2000
  }
]

Wrong multiline json

Objects are not wrapped in an array using braces. So spark will only read the first object, and skip the rest

{
  "customerid": 1,
  "year": 2022,
  "month": 1,
  "day": 1,
  "amount": 1000
}
{
  "customerid": 1,
  "year": 2022,
  "month": 1,
  "day": 2,
  "amount": 1000
}
{
  "customerid": 2,
  "year": 2022,
  "month": 1,
  "day": 1,
  "amount": 2000
}

The loaded dataframe looks like -

+------+---+-----+----+
|amount|day|month|year|
+------+---+-----+----+
|  1000|  1|    1|2022|
+------+---+-----+----+

Read JSON

Read single line json

Sample input file:

{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1}
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1}

Code:

df = (
    spark.read
    .format("json")
    .load("json_multi.json")
)

Read multiline json

Sample input file:

[
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 1000
  },
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 1000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 2000
  }
]

Code:

df = (
    spark.read
    .format("json")
    .option("multiline","true")
    .load("json_multi.json")
)

Find all read options for Json at - Json data source options documentation

Set schema

When a json object is read. Spark parses the object and automatically infers schema. You can see this using df.printSchema()

Set schema of simple json

Sample input:

[
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 1000
  },
  {
    "customerid": 1,
    "year": 2022,
    "month": 1,
    "day": 2,
    "amount": 1000
  },
  {
    "customerid": 2,
    "year": 2022,
    "month": 1,
    "day": 1,
    "amount": 2000
  }
]

Code:

import pyspark.sql.types as T

schema = (
    T.StructType()
    .add("customerid", T.IntegerType(), True)
    .add("amount", T.IntegerType(), True)
    .add("year", T.IntegerType(), True)
    .add("month", T.IntegerType(), True)
    .add("day", T.IntegerType(), True)
)
df = (
    spark.read
    .format("json")
    .schema(schema)
    .option("multiline","true")
    .load("json_multi.json")
)
df.show()
df.printSchema()

Output

+----------+------+----+-----+---+
|customerid|amount|year|month|day|
+----------+------+----+-----+---+
|         1|  1000|2022|    1|  1|
|         1|  1000|2022|    1|  2|
|         2|  2000|2022|    1|  1|
+----------+------+----+-----+---+

root
 |-- customerid: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

Set schema of complex json

Objects within objects

Suppose instead of directly giving you year, month and date columns, the columns are wrapped inside a date object. Let's set schema for this json.

Sample input:

[
  {
    "customerid": 1,
    "amount": 1000,
    "date": {
      "year": 2022,
      "month": 1,
      "day": 1,
    }
  },
  {
    "customerid": 1,
    "amount": 1000,
    "date": {
      "year": 2022,
      "month": 1,
      "day": 2,
    }
  },
  {
    "customerid": 2,
    "amount": 1000,
    "date": {
      "year": 2022,
      "month": 1,
      "day": 1,
    }
  }
]

Basic read without schema

df = (
    spark.read
    .format("json")
    .option("multiline","true")
    .load("json_nested_object.json")
)
# print dataframe
df.show()
# print schema that spark inferred
df.printSchema()

Output:

+------+----------+------------------+
|amount|customerid|              date|
+------+----------+------------------+
|  1000|         1|{1000, 1, 1, 2022}|
|  1000|         1|{1000, 2, 1, 2022}|
|  2000|         2|{2000, 1, 1, 2022}|
+------+----------+------------------+

root
 |-- amount: long (nullable = true)
 |-- customerid: long (nullable = true)
 |-- date: struct (nullable = true)
 |    |-- amount: long (nullable = true)
 |    |-- day: long (nullable = true)
 |    |-- month: long (nullable = true)
 |    |-- year: long (nullable = true)

Set schema while reading

Set schema (datatypes of all fields in the objects).

Spark inferred all the objects to have type long. Suppose we want them all to be integers.

import pyspark.sql.types as T

schema = (
    T.StructType()
    .add("customerid", T.IntegerType(), True)
    .add("amount", T.IntegerType(), True)
    .add("date", (
            T.StructType()
                .add("year", T.IntegerType(), True)
                .add("month", T.IntegerType(), True)
                .add("day", T.IntegerType(), True)
        )
         , True)
)
df = (
    spark.read
    .format("json")
    .schema(schema)
    .option("multiline","true")
    .load("json_nested_object.json")
)
df.show()
df.printSchema()

Output:

As you can see, all the fields are now treated as integers. Just as we defined in the schema.

+----------+------+------------+
|customerid|amount|        date|
+----------+------+------------+
|         1|  1000|{2022, 1, 1}|
|         1|  1000|{2022, 1, 2}|
|         2|  2000|{2022, 1, 1}|
+----------+------+------------+

root
 |-- customerid: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- date: struct (nullable = true)
 |    |-- year: integer (nullable = true)
 |    |-- month: integer (nullable = true)
 |    |-- day: integer (nullable = true)

Bonus: Flatten the dataframe

Suppose instead of the date column with json inside it, you want separate columns for year, month and day. Just select the fields from the colums.

df = (
    spark.read
    .format("json")
    .option("multiline","true")
    .load("json_nested_object.json")
)
df_flat = df.select(
    F.col("customerId"),
    F.col("amount"),
    F.col("date.year").alias("year"),
    F.col("date.month").alias("month"),
    F.col("date.day").alias("day"),
)

df:

+----------+------+------------+
|customerid|amount|        date|
+----------+------+------------+
|         1|  1000|{2022, 1, 1}|
|         1|  1000|{2022, 1, 2}|
|         2|  2000|{2022, 1, 1}|
|         2|  2000|{2022, 1, 2}|
+----------+------+------------+

df_flat:

+----------+------+----+-----+---+
|customerId|amount|year|month|day|
+----------+------+----+-----+---+
|         1|  1000|2022|    1|  1|
|         1|  1000|2022|    1|  2|
|         2|  2000|2022|    1|  1|
|         2|  2000|2022|    1|  2|
+----------+------+----+-----+---+

Objects within array

Sample input:

[
  {
    "customerid": 1,
    "orders": [
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 1,
        }
      },
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 2,
        }
      }
    ]
  },
  {
    "customerid": 2,
    "orders": [
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 1,
        }
      },
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 2,
        }
      }
    ]
  }
]

Basic read without schema

df = (
    spark.read
    .format("json")
    .option("multiline","true")
    .load("json_nested_array.json")
)
df.show()
df.printSchema()

Output

+----------+--------------------+
|customerid|              orders|
+----------+--------------------+
|         1|[{1000, {1000, 1,...|
|         2|[{1000, {2000, 1,...|
+----------+--------------------+

root
 |-- customerid: long (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: long (nullable = true)
 |    |    |-- date: struct (nullable = true)
 |    |    |    |-- amount: long (nullable = true)
 |    |    |    |-- day: long (nullable = true)
 |    |    |    |-- month: long (nullable = true)
 |    |    |    |-- year: long (nullable = true)

Set schema while reading

Set all fields to be of type Integer

import pyspark.sql.types as T

schema = (
    T.StructType()
    .add("customerid", T.IntegerType(), True)
    .add(
        "orders",
        T.ArrayType(
            T.StructType()
            .add("amount", T.IntegerType(), True)
            .add(
                "date",
                (
                    T.StructType()
                    .add("year", T.IntegerType(), True)
                    .add("month", T.IntegerType(), True)
                    .add("day", T.IntegerType(), True)
                ),
                True,
            )
        ),
        True,
    )
)
df = (
    spark.read.format("json")
    .schema(schema)
    .option("multiline", "true")
    .load("json_nested_array.json")
)
df.show()
df.printSchema()

Output:

+----------+--------------------+
|customerid|              orders|
+----------+--------------------+
|         1|[{1000, {2022, 1,...|
|         2|[{1000, {2022, 1,...|
+----------+--------------------+

root
 |-- customerid: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: integer (nullable = true)
 |    |    |-- date: struct (nullable = true)
 |    |    |    |-- year: integer (nullable = true)
 |    |    |    |-- month: integer (nullable = true)
 |    |    |    |-- day: integer (nullable = true)

Bonus: Flatten the dataframe

Consider this input sample:

[
  {
    "customerid": 1,
    "orders": [
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 1,
        }
      },
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 2,
        }
      }
    ]
  },
  {
    "customerid": 2,
    "orders": [
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 1,
        }
      },
      {
        "amount": 1000,
        "date": {
          "year": 2022,
          "month": 1,
          "day": 2,
        }
      }
    ]
  }
]
from pyspark.sql import functions as F
import pyspark.sql.types as T

schema = (
    T.StructType()
    .add("customerid", T.IntegerType(), True)
    .add(
        "orders",
        T.ArrayType(
            T.StructType()
            .add("amount", T.IntegerType(), True)
            .add(
                "date",
                (
                    T.StructType()
                    .add("year", T.IntegerType(), True)
                    .add("month", T.IntegerType(), True)
                    .add("day", T.IntegerType(), True)
                ),
                True,
            )
        ),
        True,
    )
)

# read dataframe
df = (
    spark.read.format("json")
    .schema(schema)
    .option("multiline", "true")
    .load("json_nested_array.json")
)
df.show()

# explode the dataframe on orders, to create one row for every array object
df_exploded = df.withColumn("orders", F.explode(F.col("orders")))
df_exploded.show()

# flatten the dataframe
df_flat = (
    df.withColumn("orders", F.explode(F.col("orders")))
    .select(
        F.col("customerId"),
        F.col("orders.amount"),
        F.col("orders.date.year"),
        F.col("orders.date.month"),
        F.col("orders.date.day")
)
    )
df_flat.show()

Outputs:

df

+----------+--------------------+
|customerid|              orders|
+----------+--------------------+
|         1|[{1000, {2022, 1,...|
|         2|[{1000, {2022, 1,...|
+----------+--------------------+

df_exploded

+----------+--------------------+
|customerid|              orders|
+----------+--------------------+
|         1|{1000, {2022, 1, 1}}|
|         1|{1000, {2022, 1, 2}}|
|         2|{1000, {2022, 1, 1}}|
+----------+--------------------+

df_flat

+----------+------+----+-----+---+
|customerId|amount|year|month|day|
+----------+------+----+-----+---+
|         1|  1000|2022|    1|  1|
|         1|  1000|2022|    1|  2|
|         2|  1000|2022|    1|  1|
+----------+------+----+-----+---+

Write JSON

The barebones write method template is:

(
    df
    .write
    .format("json")
    .mode("overwrite")
    .save("data/orders")
)

Partition written data on specific columns

To make reading data faster and more optimized, by increasing parallelization.

Eg - Partition by year, month, date:

(
    df
    .write
    .format("json")
    .mode("overwrite")
    .partitionBy("year", "month", "day")
    .save("data/sales")
)

Bucket written data on specific columns

Note : save() doesn't support saving after using bucketBy(). Only saveAsTable() suports it.

If you use save(), it will throw an exception AnalysisException: 'save' does not support bucketBy right now.

(
    sales_df
    .write
    .format("json")
    .mode("overwrite")
    .option("path", "data/sales")
    .bucketBy(10, "year", "date", "day")
    .saveAsTable("sales")
)

More details in bucketBy() documentation