documentation links of pySpark api

By Niraj Zade | 31 july   | Tags: pyspark spark mpp big data


Quick and useful pySpark reference for working professionals.

Because the official api reference isn't fast enough!

Collection <- Current phase
Re-organization
Editing
Polishing

Checklist

Workflows

Register a df as a table, and run sql on it

df.createOrReplaceTempView("people")
result = spark.sql("select * from people")

I/O - Read and write data from file formats

Note: the spark api has a lot of baggage. There is always more than one way to do the same thing.

However, there are two distinct styles of getting something done: 1. Directly using a function and setting its arguments 2. Chaining a function, and progressively setting options Example:

## READING A FILE
## direct
df = spark.read.csv(path="/data/data.csv", nullValue="undefined")
df = spark.read.load(path="/data/data.csv", schema=df.schema, format="csv", nullValue="undefined", header=True)
## progressive
df = spark.read.option("nullValue", "undefined").format('csv').load("/data/data.csv")


## WRITING A FILE
## direct
df.write.csv("/data/data.csv", mode="overwrite")
## progressive
df.write.mode("overwrite").format("csv").save("/data/data.csv")

I prefer to use the direct way of defining the operation. As we are simply setting arguments to a function, autocomplete works very well with it. And it is pythonic.

In the indirect method, we are setting strings most of the time (especially when using the options() method in which we purely use strings in the function chains.). This leads to mistakes, and also demands a lot of memorization.

You can notice the things that creep in when a framework of one language is operated through other languages.

Look at the spark.read.csv() vs spark.read.load(). You can see that the csv() is a sharp and dedicated function, while the load() function is designed to handle all data formats internally. The load() function has a distinct java/scala style to it (the factory style of programming)

Also, the progressive style of chaining functions has a distinctive functional programming feel to it. We don't usually write python in this style.

CSV

Readcsv into df

A particular CSV, or all CSVs in a directory

df = spark.read.csv("/dir/path_with_many_csv/single_file.csv") # single
df = spark.read.csv("/dir/path_with_many_csv/") # all

arguments:

  • delimiter=","
  • inferSchema='True' (default false)
  • header='True' (read first line as col name)
  • nullValue='unknown' (consider a string as null)
## Read with user defined schema
schema = StructType().add("name", StringType(), True).add("age", IntegerType(), True)

df = spark.read.csv("path_to_csv", schema=)

StructType

Write df into csv

df.write.csv("/dir/path_with_many_csv/", mode="<mode>")

Where mode=

  • overwrite
  • append
  • ignore
  • error

JDBC

Read Table into dataframe

df = spark.read.jdbc(url="", table="", properties={})

## properties are jdbc connection arguments
## example:
properties = { user : SYSTEM, password : mypassword }

How to parallelize? From the documentation: Partitions of the table will be retrieved in parallel if either column or predicates is specified. lowerBound, upperBound and numPartitions is needed when column is specified.

Write dataframe into table

<Insert example when I have the time>

params:

  • table (str)
  • mode
  • properties (dict)

JSON

Similar to csv, depends on the path you specify. * Entire firectory? give dir path * Single file? give file path * Multiple files? give files path as an array

df = spark.read.json(path="/dir/path_with_many_csv/single_file.csv") # single
df = spark.read.json(path="/dir/path_with_many_csv/") # all

JSON Lines will work by default. This data is never multiline (only one json record per line)

For one record for file json, with the data in multiple lines, set the argument multiline=True

JSON will need user specified schema, to maintain your sanity.

schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
])
df = spark.read.json(path="<your path>", schema=schema)

Write json file:

Writes in JSON lines format

 df.write.json(path="/path/to/output/zipcodes.json", mode="overwrite")

Modes: * append * overwrite * ignore * error or errorifexists

ORC

Read

df = spark.read.orc(path="<path to dir, file, files>")

Notable parameters: * mergeSchema (boolean) * recursiveFileLookup * modifiedBefore * modifiedAfter

Write

df.write.orc(path='<path>')

Notable parameters: * partitionBy (str, list of str) - parititon by column name(s)

Parquet

The parquet api's options are surprisingly limited

Read

spark.read.parquet(path='/data/data.parquet')

Notable parameters: * mergeSchema (bool)

Write

spark.write.parquet(path='/data/data.parquet')

Notable params: * mode (str, optional) {append, overwrite, ignore error/errorifexists} * partitionBy (str or list) - names of cols to partition by

Text

Read Reads every line in the text file as a new row in the dataframe.

Suppose file.txt looks like this:

alphabets
a
b
c
spark.read.schema(df.schema).text("/data/file.txt").sort("alphabets").show()

Write The dataframe must have only one col of string type. Every row becomes a new line in the output file.

df.write.mode("overwrite").text(path="/data/data.txt")

other params: * lineSep (str)

Option/Options

These are weird. Used to set arguments (options) of both read and write functions.

## you can either directly set the nullvalue argument of the function:
spark.read.csv(path="/data/", nullValue="undefined")
## or set arguments via options, through the chained functions style of coding
spark.read.option("nullValue", "undefined").format('parquet').load("/data/data.parquet")

These are just two different styles of programming.

I don't like using options, and prefer to stick to directly setting arguments in functions. (Since we give strings in options, autocomplete doesn't work for them, which leads to mistakes)

More on writing

Save()

You might have noticed it at the end of chained write functions. We specify the path to save into (file or directory)

df.write.mode("overwrite").format("json").save(path="/data/data.json")

In the above example, the mode and format is set using functions in the chain.

You can also set them directly within save()

df.write.save(path="/data/data.json", format="json", mode="overwrite")

Other arguments: * partitionBy (list) * **options (dict) other string options)

This more than one way of doing things makes spark a pain to deal with.

Partition on a dataframe col while writing

uses partitionBy() function

df.write.partitionBy(cols="name").mode("overwrite").format("parquet").save("/data/data.parquet")

The argument cols can be a string or a list of strings

Misc

Mode

Acts the same way as options. Either set mode as an argument to a function, or set it via the mode() function.

## you can either directly set the mode argument of the function:
df.write.mode("overwrite").format("parquet").save(d)
## or set mode through the chained functions style of coding
df.write.mode("overwrite").format("parquet").save(d)

Load

Load is a generic function to read various file formats into a dataframe.

df = spark.read.load(path="/data/data.csv", schema=df.schema, format="csv", nullValue="undefined", header=True)

Prefer directly using the dedicated function. Eg spark.read.csv() instead of spark.read.load(format="csv")

Table

Read a table into a dataframe

spark.read.table(tableName='people').show()

The table used in this function is one loaded via a spark sql function like df.createOrReplaceTempView('people')

Write bucketBy()

write.bucketBy(2, "name").mode("overwrite").saveAsTable("bucketed_table")

Params: * numBuckets (int) * col (str, list, typle) * cols (str). Keep this empty if a list is already provided in cols

bucketBy().sortBy()

Use sortBy() to sort rows in a bucket

write.bucketBy(2, "name").sortBy(col="age").saveAsTable("bucketed_table")

parameters: * col (str, list of str) * cols (str, list of str)

Insert dataframe into a table

Works only when (schema of dataframe) = (schema of table)

df.selectExpr("age AS col1", "name AS col2").write.insertInto(tableName="tblPeople")

Params: * overwrite (bool). Default is false. Overwrites existing data when true.

Save into a table

df.write.saveAsTable(name="tblA")

Other params: * format (str) * mode (str) * partitionBy (str/list) * **options (dict)

Note: In append mode, if the table already exists, saveAsTable() doesn't need the dataframe's schema to be the same as table's schema. It will find correct column positions through the column names, and insert data into the tables.

Table stuff

The documentation of table stuff SUCKS. Maybe because it is legacy stuff

create

create new table from contents of dataframe. The new table’s schema, partition layout, properties, and other configuration will be based on the configuration set on this writer.

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.create.html

replace

Replace an existing table with the contents of the data frame.

The existing table’s schema, partition layout, properties, and other configuration will be replaced with the contents of the data frame and the configuration set on this writer.

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.replace.html

createOrReplace

Create a new table or replace an existing table with the contents of the data frame.

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.createOrReplace.html

append

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.append.html

overwrite

Overwrite rows matching the given filter condition with the contents of the data frame in the output table.

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.overwrite.html

overwritePartitions

Overwrite all partition for which the data frame contains at least one row with the contents of the data frame in the output table.

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.overwritePartitions.html

tableProperty

[https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.tableProperty.html](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.tableProperty.html)

partitionedBy

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.partitionedBy.html

Sessions

## set app name (shown in spark ui)
SparkSession.builder.appName("My App Name")
## set master url
SparkSession.builder.master("local")
## or
SparkSession.builder.master("spark://master:7077")
SparkSession.builder.enableHiveSupport()

## Set spark remote url
SparkSession.builder.remote("sc://localhost")
## set some config options
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())
## will be applied to sparksessions created or fetched via this builder

GetOrCreate

Checks if a valid global SparkSession exists. * Yes? returns it. Also applies config options specified in the builder to the returned SparkSession. * No? creates new SparkSession and sets it as the global default

session_1 = SparkSession.buildier.getOrCreate()
## can set its config
session_1.conf.set("key_1", "value_1")
## can get its config
session_1.conf.get("key_1")

Get currently active session

active_session = SparkSession.getActiveSession()

Create a new session

The session has separate SQLConf, registered tmp views, UDFs. However, SparkContext and tablecache is shared.

spark.newSession()

Set session config at runtime

Set all spark/hadoop configs relevant to Spark SQL. When fetching a config key's value, defaults to the value set in underlying SparkContext (if it is already set).

spark.conf.set("key", "value")

Catalog

Catalog object, through which we can do operations on underlying db, tables, functions etc (create, drop alter, query etc)

spark.catalog

## eg: list tables
spark.catalog.listTables()

Create dataframe

DF is created via a sparksession

spark.createDataFrame([('Alice', 1)]).collect()

Can create with explicit schema

from pyspark.sql.types import *
schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True)])
df3 = spark.createDataFrame(rdd, schema)

And can create from various sources

## DF from tuple
spark.createDataFrame([('Alice', 1)]).collect()


## df from list of dicts
d = [{'name': 'Alice', 'age': 1}]
spark.createDataFrame(d).collect()

## from RDD
rdd = spark.sparkContext.parallelize([('Alice', 1)])
df = spark.createDataFrame(rdd, ['name', 'age'])

## from row instances
from pyspark.sql import Row
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
df2 = spark.createDataFrame(person)

## from pandas dataframe
spark.createDataFrame(df.toPandas())

## from RDD with schema in DDL formatted string
spark.createDataFrame(rdd, "a: string, b: int").collect()
rdd = rdd.map(lambda row: row[1])
spark.createDataFrame(rdd, "int").collect()

range

Creates DataFrame with with single pyspark.sql.types.LongType column named id, containing elements in a range from start to end (exclusive) with step value step.

spark.range(start=1, end=7, step=2)
## +---+
## | id|
## +---+
## |  1|
## |  3|
## |  5|
## +---+

read

Returns a DataFrameReader, that can be used to read data as a DataFrame

spark.read
## <...DataFrameReader object ...>

This is the interface, through which we read in stuff. Eg: spark.read.format('json').load(d).show()

readstream

Read data as a streaming DataFrame

spark.readStream.format("rate").load()
## write stream to console, and stop streaming query after 3 seconds
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()

sql

Execute sql query

spark.sql("SELECT * FROM range(10) where id > 7")

stop sparkcontext

Stop underlying sparkcontext

spark.stop()

spark.streams

returns a StreamingQueryManager, that manages all StreamingQuery instances active on this context

sq = spark.readStream.format(
    "rate").load().writeStream.format('memory').queryName('this_query').start()
sqm = spark.streams

[q.name for q in sqm.active]
['this_query']
sq.stop()

table

returns table as a DataFrame

df = spark.table("table1")

udf

strlen = spark.udf.register("strlen", lambda x: len(x))
spark.sql("SELECT strlen('test')").show()

version

Gives the running application's spark version

_ = spark.version

© pySparkGuide.com 2024 | Website was autogenerated on 2024-04-24

Brought to you by Niraj Zade - Website, Linkedin

~ whoever owns storage, owns computing ~