If you're using Jupyter notebook setup, here is the code I used to initialize things:
# for jupyter notebook
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity='all'
# import relavant python modules
from pyspark.sql import types as T
from pyspark.sql import functions as F
# Create spark session (not required on databricks)
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
Required format of JSON objects in files
There are 2 types of json files:
- Single line json - NDJSON
- Multiline JSON
Single line json - NDJSON
Every line has to be a valid & independent JSON object. Newline Delimited Json aka NDJSON format. This is also called as the JsonLine format.
Don't wrap the JSON objects into an array.
Correct single line JSON
- One valid json object per line
- Json objects are not wrapped in an array
- Every json object can end with a comma. Spark reader won't care.
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1}
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}
Wrong single line json
Objects are wrapped in an array using braces. In this case spark will treat the lines with braces as a single json object, and treat them as corrupt records.
Example input:
[
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1},
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2},
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1},
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 2}
]
The loaded dataframe looks like -
+---------------+------+----------+----+-----+----+
|_corrupt_record|amount|customerid| day|month|year|
+---------------+------+----------+----+-----+----+
| [| NULL| NULL|NULL| NULL|NULL|
| NULL| 1000| 1| 1| 1|2022|
| NULL| 1000| 1| 2| 1|2022|
| NULL| 2000| 2| 1| 1|2022|
| NULL| 2000| 2| 2| 1|2022|
| ]| NULL| NULL|NULL| NULL|NULL|
+---------------+------+----------+----+-----+----+
Multiline json
The entire file, when parsed, has to read like a single valid json object.
You'll have to wrap all the json objects within the file into an array [ ...json objects array... ]
. If you don't, spark will only read the first json object, and skip the rest.
Correct multiline json
- Objects are wrapped in an array using braces
[
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
},
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
}
]
Wrong multiline json
Objects are not wrapped in an array using braces. So spark will only read the first object, and skip the rest
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
}
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
}
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
}
The loaded dataframe looks like -
+------+---+-----+----+
|amount|day|month|year|
+------+---+-----+----+
| 1000| 1| 1|2022|
+------+---+-----+----+
Read JSON
Read single line json
Sample input file:
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 1}
{"customerid": 1, "amount": 1000, "year": 2022,"month": 1,"day": 2}
{"customerid": 2, "amount": 2000, "year": 2022,"month": 1,"day": 1}
Code:
df = (
spark.read
.format("json")
.load("json_multi.json")
)
Read multiline json
Sample input file:
[
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
},
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
}
]
Code:
df = (
spark.read
.format("json")
.option("multiline","true")
.load("json_multi.json")
)
Find all read options for Json at - Json data source options documentation
Set schema
When a json object is read. Spark parses the object and automatically infers schema.
You can see this using df.printSchema()
Set schema of simple json
Sample input:
[
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 1,
"amount": 1000
},
{
"customerid": 1,
"year": 2022,
"month": 1,
"day": 2,
"amount": 1000
},
{
"customerid": 2,
"year": 2022,
"month": 1,
"day": 1,
"amount": 2000
}
]
Code:
import pyspark.sql.types as T
schema = (
T.StructType()
.add("customerid", T.IntegerType(), True)
.add("amount", T.IntegerType(), True)
.add("year", T.IntegerType(), True)
.add("month", T.IntegerType(), True)
.add("day", T.IntegerType(), True)
)
df = (
spark.read
.format("json")
.schema(schema)
.option("multiline","true")
.load("json_multi.json")
)
df.show()
df.printSchema()
Output
+----------+------+----+-----+---+
|customerid|amount|year|month|day|
+----------+------+----+-----+---+
| 1| 1000|2022| 1| 1|
| 1| 1000|2022| 1| 2|
| 2| 2000|2022| 1| 1|
+----------+------+----+-----+---+
root
|-- customerid: integer (nullable = true)
|-- amount: integer (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
Set schema of complex json
Objects within objects
Suppose instead of directly giving you year
, month
and date
columns, the columns are wrapped inside a date
object. Let's set schema for this json.
Sample input:
[
{
"customerid": 1,
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
}
},
{
"customerid": 1,
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
}
},
{
"customerid": 2,
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
}
}
]
Basic read without schema
df = (
spark.read
.format("json")
.option("multiline","true")
.load("json_nested_object.json")
)
# print dataframe
df.show()
# print schema that spark inferred
df.printSchema()
Output:
+------+----------+------------------+
|amount|customerid| date|
+------+----------+------------------+
| 1000| 1|{1000, 1, 1, 2022}|
| 1000| 1|{1000, 2, 1, 2022}|
| 2000| 2|{2000, 1, 1, 2022}|
+------+----------+------------------+
root
|-- amount: long (nullable = true)
|-- customerid: long (nullable = true)
|-- date: struct (nullable = true)
| |-- amount: long (nullable = true)
| |-- day: long (nullable = true)
| |-- month: long (nullable = true)
| |-- year: long (nullable = true)
Set schema while reading
Set schema (datatypes of all fields in the objects).
Spark inferred all the objects to have type long. Suppose we want them all to be integers.
import pyspark.sql.types as T
schema = (
T.StructType()
.add("customerid", T.IntegerType(), True)
.add("amount", T.IntegerType(), True)
.add("date", (
T.StructType()
.add("year", T.IntegerType(), True)
.add("month", T.IntegerType(), True)
.add("day", T.IntegerType(), True)
)
, True)
)
df = (
spark.read
.format("json")
.schema(schema)
.option("multiline","true")
.load("json_nested_object.json")
)
df.show()
df.printSchema()
Output:
As you can see, all the fields are now treated as integers. Just as we defined in the schema.
+----------+------+------------+
|customerid|amount| date|
+----------+------+------------+
| 1| 1000|{2022, 1, 1}|
| 1| 1000|{2022, 1, 2}|
| 2| 2000|{2022, 1, 1}|
+----------+------+------------+
root
|-- customerid: integer (nullable = true)
|-- amount: integer (nullable = true)
|-- date: struct (nullable = true)
| |-- year: integer (nullable = true)
| |-- month: integer (nullable = true)
| |-- day: integer (nullable = true)
Bonus: Flatten the dataframe
Suppose instead of the date column with json inside it, you want separate columns for year, month and day. Just select the fields from the colums.
df = (
spark.read
.format("json")
.option("multiline","true")
.load("json_nested_object.json")
)
df_flat = df.select(
F.col("customerId"),
F.col("amount"),
F.col("date.year").alias("year"),
F.col("date.month").alias("month"),
F.col("date.day").alias("day"),
)
df
:
+----------+------+------------+
|customerid|amount| date|
+----------+------+------------+
| 1| 1000|{2022, 1, 1}|
| 1| 1000|{2022, 1, 2}|
| 2| 2000|{2022, 1, 1}|
| 2| 2000|{2022, 1, 2}|
+----------+------+------------+
df_flat
:
+----------+------+----+-----+---+
|customerId|amount|year|month|day|
+----------+------+----+-----+---+
| 1| 1000|2022| 1| 1|
| 1| 1000|2022| 1| 2|
| 2| 2000|2022| 1| 1|
| 2| 2000|2022| 1| 2|
+----------+------+----+-----+---+
Objects within array
Sample input:
[
{
"customerid": 1,
"orders": [
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
}
},
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
}
}
]
},
{
"customerid": 2,
"orders": [
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
}
},
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
}
}
]
}
]
Basic read without schema
df = (
spark.read
.format("json")
.option("multiline","true")
.load("json_nested_array.json")
)
df.show()
df.printSchema()
Output
+----------+--------------------+
|customerid| orders|
+----------+--------------------+
| 1|[{1000, {1000, 1,...|
| 2|[{1000, {2000, 1,...|
+----------+--------------------+
root
|-- customerid: long (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- amount: long (nullable = true)
| | |-- date: struct (nullable = true)
| | | |-- amount: long (nullable = true)
| | | |-- day: long (nullable = true)
| | | |-- month: long (nullable = true)
| | | |-- year: long (nullable = true)
Set schema while reading
Set all fields to be of type Integer
import pyspark.sql.types as T
schema = (
T.StructType()
.add("customerid", T.IntegerType(), True)
.add(
"orders",
T.ArrayType(
T.StructType()
.add("amount", T.IntegerType(), True)
.add(
"date",
(
T.StructType()
.add("year", T.IntegerType(), True)
.add("month", T.IntegerType(), True)
.add("day", T.IntegerType(), True)
),
True,
)
),
True,
)
)
df = (
spark.read.format("json")
.schema(schema)
.option("multiline", "true")
.load("json_nested_array.json")
)
df.show()
df.printSchema()
Output:
+----------+--------------------+
|customerid| orders|
+----------+--------------------+
| 1|[{1000, {2022, 1,...|
| 2|[{1000, {2022, 1,...|
+----------+--------------------+
root
|-- customerid: integer (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- amount: integer (nullable = true)
| | |-- date: struct (nullable = true)
| | | |-- year: integer (nullable = true)
| | | |-- month: integer (nullable = true)
| | | |-- day: integer (nullable = true)
Bonus: Flatten the dataframe
Consider this input sample:
[
{
"customerid": 1,
"orders": [
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
}
},
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
}
}
]
},
{
"customerid": 2,
"orders": [
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 1,
}
},
{
"amount": 1000,
"date": {
"year": 2022,
"month": 1,
"day": 2,
}
}
]
}
]
from pyspark.sql import functions as F
import pyspark.sql.types as T
schema = (
T.StructType()
.add("customerid", T.IntegerType(), True)
.add(
"orders",
T.ArrayType(
T.StructType()
.add("amount", T.IntegerType(), True)
.add(
"date",
(
T.StructType()
.add("year", T.IntegerType(), True)
.add("month", T.IntegerType(), True)
.add("day", T.IntegerType(), True)
),
True,
)
),
True,
)
)
# read dataframe
df = (
spark.read.format("json")
.schema(schema)
.option("multiline", "true")
.load("json_nested_array.json")
)
df.show()
# explode the dataframe on orders, to create one row for every array object
df_exploded = df.withColumn("orders", F.explode(F.col("orders")))
df_exploded.show()
# flatten the dataframe
df_flat = (
df.withColumn("orders", F.explode(F.col("orders")))
.select(
F.col("customerId"),
F.col("orders.amount"),
F.col("orders.date.year"),
F.col("orders.date.month"),
F.col("orders.date.day")
)
)
df_flat.show()
Outputs:
df
+----------+--------------------+
|customerid| orders|
+----------+--------------------+
| 1|[{1000, {2022, 1,...|
| 2|[{1000, {2022, 1,...|
+----------+--------------------+
df_exploded
+----------+--------------------+
|customerid| orders|
+----------+--------------------+
| 1|{1000, {2022, 1, 1}}|
| 1|{1000, {2022, 1, 2}}|
| 2|{1000, {2022, 1, 1}}|
+----------+--------------------+
df_flat
+----------+------+----+-----+---+
|customerId|amount|year|month|day|
+----------+------+----+-----+---+
| 1| 1000|2022| 1| 1|
| 1| 1000|2022| 1| 2|
| 2| 1000|2022| 1| 1|
+----------+------+----+-----+---+
Write JSON
The barebones write method template is:
(
df
.write
.format("json")
.mode("overwrite")
.save("data/orders")
)
Partition written data on specific columns
To make reading data faster and more optimized, by increasing parallelization.
Eg - Partition by year, month, date:
(
df
.write
.format("json")
.mode("overwrite")
.partitionBy("year", "month", "day")
.save("data/sales")
)
Bucket written data on specific columns
Note :
save()
doesn't support saving after usingbucketBy()
. OnlysaveAsTable()
suports it.If you use
save()
, it will throw an exceptionAnalysisException: 'save' does not support bucketBy right now.
(
sales_df
.write
.format("json")
.mode("overwrite")
.option("path", "data/sales")
.bucketBy(10, "year", "date", "day")
.saveAsTable("sales")
)
More details in bucketBy() documentation