Quick and useful pySpark reference for working professionals.
Because the official api reference isn't fast enough!
Collection <- Current phase
Re-organization
Editing
Polishing
Checklist
- [ ] Core Classes
- [ ] pyspark.sql.SparkSession
- [ ] pyspark.sql.Catalog
- [ ] pyspark.sql.DataFrame
- [ ] pyspark.sql.Column
- [ ] pyspark.sql.Observation
- [ ] pyspark.sql.Row
- [ ] pyspark.sql.GroupedData
- [ ] pyspark.sql.PandasCogroupedOps
- [ ] pyspark.sql.DataFrameNaFunctions
- [ ] pyspark.sql.DataFrameStatFunctions
- [ ] pyspark.sql.Window
- [ ] pyspark.sql.DataFrameReader
- [ ] pyspark.sql.DataFrameWriter
- [ ] pyspark.sql.DataFrameWriterV2
- [ ] pyspark.sql.UDFRegistration
- [ ] pyspark.sql.udf.UserDefinedFunction
- [ ] Spark Session
- [ ] pyspark.sql.SparkSession.builder.appName
- [ ] pyspark.sql.SparkSession.builder.config
- [ ] pyspark.sql.SparkSession.builder.enableHiveSupport
- [ ] pyspark.sql.SparkSession.builder.getOrCreate
- [ ] pyspark.sql.SparkSession.builder.master
- [ ] pyspark.sql.SparkSession.builder.remote
- [ ] pyspark.sql.SparkSession.catalog
- [ ] pyspark.sql.SparkSession.conf
- [ ] pyspark.sql.SparkSession.createDataFrame
- [ ] pyspark.sql.SparkSession.getActiveSession
- [ ] pyspark.sql.SparkSession.newSession
- [ ] pyspark.sql.SparkSession.range
- [ ] pyspark.sql.SparkSession.read
- [ ] pyspark.sql.SparkSession.readStream
- [ ] pyspark.sql.SparkSession.sparkContext
- [ ] pyspark.sql.SparkSession.sql
- [ ] pyspark.sql.SparkSession.stop
- [ ] pyspark.sql.SparkSession.streams
- [ ] pyspark.sql.SparkSession.table
- [ ] pyspark.sql.SparkSession.udf
- [ ] pyspark.sql.SparkSession.version
[ ] Configuration
- [ ] pyspark.sql.conf.RuntimeConfig
[ ] Input/Output
- [ ] pyspark.sql.DataFrameReader.csv
- [ ] pyspark.sql.DataFrameReader.format
- [ ] pyspark.sql.DataFrameReader.jdbc
- [ ] pyspark.sql.DataFrameReader.json
- [ ] pyspark.sql.DataFrameReader.load
- [ ] pyspark.sql.DataFrameReader.option
- [ ] pyspark.sql.DataFrameReader.options
- [ ] pyspark.sql.DataFrameReader.orc
- [ ] pyspark.sql.DataFrameReader.parquet
- [ ] pyspark.sql.DataFrameReader.schema
- [ ] pyspark.sql.DataFrameReader.table
- [ ] pyspark.sql.DataFrameReader.text
- [ ] pyspark.sql.DataFrameWriter.bucketBy
- [ ] pyspark.sql.DataFrameWriter.csv
- [ ] pyspark.sql.DataFrameWriter.format
- [ ] pyspark.sql.DataFrameWriter.insertInto
- [ ] pyspark.sql.DataFrameWriter.jdbc
- [ ] pyspark.sql.DataFrameWriter.json
- [ ] pyspark.sql.DataFrameWriter.mode
- [ ] pyspark.sql.DataFrameWriter.option
- [ ] pyspark.sql.DataFrameWriter.options
- [ ] pyspark.sql.DataFrameWriter.orc
- [ ] pyspark.sql.DataFrameWriter.parquet
- [ ] pyspark.sql.DataFrameWriter.partitionBy
- [ ] pyspark.sql.DataFrameWriter.save
- [ ] pyspark.sql.DataFrameWriter.saveAsTable
- [ ] pyspark.sql.DataFrameWriter.sortBy
- [ ] pyspark.sql.DataFrameWriter.text
- [ ] pyspark.sql.DataFrameWriterV2.using
- [ ] pyspark.sql.DataFrameWriterV2.option
- [ ] pyspark.sql.DataFrameWriterV2.options
- [ ] pyspark.sql.DataFrameWriterV2.tableProperty
- [ ] pyspark.sql.DataFrameWriterV2.partitionedBy
- [ ] pyspark.sql.DataFrameWriterV2.create
- [ ] pyspark.sql.DataFrameWriterV2.replace
- [ ] pyspark.sql.DataFrameWriterV2.createOrReplace
- [ ] pyspark.sql.DataFrameWriterV2.append
- [ ] pyspark.sql.DataFrameWriterV2.overwrite
- [ ] pyspark.sql.DataFrameWriterV2.overwritePartitions
- [ ] DataFrame
- [ ] pyspark.sql.DataFrame.getattr
- [ ] pyspark.sql.DataFrame.getitem
- [ ] pyspark.sql.DataFrame.agg
- [ ] pyspark.sql.DataFrame.alias
- [ ] pyspark.sql.DataFrame.approxQuantile
- [ ] pyspark.sql.DataFrame.cache
- [ ] pyspark.sql.DataFrame.checkpoint
- [ ] pyspark.sql.DataFrame.coalesce
- [ ] pyspark.sql.DataFrame.colRegex
- [ ] pyspark.sql.DataFrame.collect
- [ ] pyspark.sql.DataFrame.columns
- [ ] pyspark.sql.DataFrame.corr
- [ ] pyspark.sql.DataFrame.count
- [ ] pyspark.sql.DataFrame.cov
- [ ] pyspark.sql.DataFrame.createGlobalTempView
- [ ] pyspark.sql.DataFrame.createOrReplaceGlobalTempView
- [ ] pyspark.sql.DataFrame.createOrReplaceTempView
- [ ] pyspark.sql.DataFrame.createTempView
- [ ] pyspark.sql.DataFrame.crossJoin
- [ ] pyspark.sql.DataFrame.crosstab
- [ ] pyspark.sql.DataFrame.cube
- [ ] pyspark.sql.DataFrame.describe
- [ ] pyspark.sql.DataFrame.distinct
- [ ] pyspark.sql.DataFrame.drop
- [ ] pyspark.sql.DataFrame.dropDuplicates
- [ ] pyspark.sql.DataFrame.dropduplicates
- [ ] pyspark.sql.DataFrame.dropna
- [ ] pyspark.sql.DataFrame.dtypes
- [ ] pyspark.sql.DataFrame.exceptAll
- [ ] pyspark.sql.DataFrame.explain
- [ ] pyspark.sql.DataFrame.fillna
- [ ] pyspark.sql.DataFrame.filter
- [ ] pyspark.sql.DataFrame.first
- [ ] pyspark.sql.DataFrame.foreach
- [ ] pyspark.sql.DataFrame.foreachPartition
- [ ] pyspark.sql.DataFrame.freqItems
- [ ] pyspark.sql.DataFrame.groupBy
- [ ] pyspark.sql.DataFrame.head
- [ ] pyspark.sql.DataFrame.hint
- [ ] pyspark.sql.DataFrame.inputFiles
- [ ] pyspark.sql.DataFrame.intersect
- [ ] pyspark.sql.DataFrame.intersectAll
- [ ] pyspark.sql.DataFrame.isEmpty
- [ ] pyspark.sql.DataFrame.isLocal
- [ ] pyspark.sql.DataFrame.isStreaming
- [ ] pyspark.sql.DataFrame.join
- [ ] pyspark.sql.DataFrame.limit
- [ ] pyspark.sql.DataFrame.localCheckpoint
- [ ] pyspark.sql.DataFrame.mapInPandas
- [ ] pyspark.sql.DataFrame.mapInArrow
- [ ] pyspark.sql.DataFrame.melt
- [ ] pyspark.sql.DataFrame.na
- [ ] pyspark.sql.DataFrame.observe
- [ ] pyspark.sql.DataFrame.orderBy
- [ ] pyspark.sql.DataFrame.persist
- [ ] pyspark.sql.DataFrame.printSchema
- [ ] pyspark.sql.DataFrame.randomSplit
- [ ] pyspark.sql.DataFrame.rdd
- [ ] pyspark.sql.DataFrame.registerTempTable
- [ ] pyspark.sql.DataFrame.repartition
- [ ] pyspark.sql.DataFrame.repartitionByRange
- [ ] pyspark.sql.DataFrame.replace
- [ ] pyspark.sql.DataFrame.rollup
- [ ] pyspark.sql.DataFrame.sameSemantics
- [ ] pyspark.sql.DataFrame.sample
- [ ] pyspark.sql.DataFrame.sampleBy
- [ ] pyspark.sql.DataFrame.schema
- [ ] pyspark.sql.DataFrame.select
- [ ] pyspark.sql.DataFrame.selectExpr
- [ ] pyspark.sql.DataFrame.semanticHash
- [ ] pyspark.sql.DataFrame.show
- [ ] pyspark.sql.DataFrame.sort
- [ ] pyspark.sql.DataFrame.sortWithinPartitions
- [ ] pyspark.sql.DataFrame.sparkSession
- [ ] pyspark.sql.DataFrame.stat
- [ ] pyspark.sql.DataFrame.storageLevel
- [ ] pyspark.sql.DataFrame.subtract
- [ ] pyspark.sql.DataFrame.summary
- [ ] pyspark.sql.DataFrame.tail
- [ ] pyspark.sql.DataFrame.take
- [ ] pyspark.sql.DataFrame.to
- [ ] pyspark.sql.DataFrame.toDF
- [ ] pyspark.sql.DataFrame.toJSON
- [ ] pyspark.sql.DataFrame.toLocalIterator
- [ ] pyspark.sql.DataFrame.toPandas
- [ ] pyspark.sql.DataFrame.topandasonspark
- [ ] pyspark.sql.DataFrame.transform
- [ ] pyspark.sql.DataFrame.union
- [ ] pyspark.sql.DataFrame.unionAll
- [ ] pyspark.sql.DataFrame.unionByName
- [ ] pyspark.sql.DataFrame.unpersist
- [ ] pyspark.sql.DataFrame.unpivot
- [ ] pyspark.sql.DataFrame.where
- [ ] pyspark.sql.DataFrame.withColumn
- [ ] pyspark.sql.DataFrame.withColumns
- [ ] pyspark.sql.DataFrame.withColumnRenamed
- [ ] pyspark.sql.DataFrame.withColumnsRenamed
- [ ] pyspark.sql.DataFrame.withMetadata
- [ ] pyspark.sql.DataFrame.withWatermark
- [ ] pyspark.sql.DataFrame.write
- [ ] pyspark.sql.DataFrame.writeStream
- [ ] pyspark.sql.DataFrame.writeTo
- [ ] pyspark.sql.DataFrame.pandas_api
- [ ] pyspark.sql.DataFrameNaFunctions.drop
- [ ] pyspark.sql.DataFrameNaFunctions.fill
- [ ] pyspark.sql.DataFrameNaFunctions.replace
- [ ] pyspark.sql.DataFrameStatFunctions.approxQuantile
- [ ] pyspark.sql.DataFrameStatFunctions.corr
- [ ] pyspark.sql.DataFrameStatFunctions.cov
- [ ] pyspark.sql.DataFrameStatFunctions.crosstab
- [ ] pyspark.sql.DataFrameStatFunctions.freqItems
- [ ] pyspark.sql.DataFrameStatFunctions.sampleBy
- [ ] Column
- [ ] pyspark.sql.Column.getattr
- [ ] pyspark.sql.Column.getitem
- [ ] pyspark.sql.Column.alias
- [ ] pyspark.sql.Column.asc
- [ ] pyspark.sql.Column.ascnullsfirst
- [ ] pyspark.sql.Column.ascnullslast
- [ ] pyspark.sql.Column.astype
- [ ] pyspark.sql.Column.between
- [ ] pyspark.sql.Column.bitwiseAND
- [ ] pyspark.sql.Column.bitwiseOR
- [ ] pyspark.sql.Column.bitwiseXOR
- [ ] pyspark.sql.Column.cast
- [ ] pyspark.sql.Column.contains
- [ ] pyspark.sql.Column.desc
- [ ] pyspark.sql.Column.descnullsfirst
- [ ] pyspark.sql.Column.descnullslast
- [ ] pyspark.sql.Column.dropFields
- [ ] pyspark.sql.Column.endswith
- [ ] pyspark.sql.Column.eqNullSafe
- [ ] pyspark.sql.Column.getField
- [ ] pyspark.sql.Column.getItem
- [ ] pyspark.sql.Column.ilike
- [ ] pyspark.sql.Column.isNotNull
- [ ] pyspark.sql.Column.isNull
- [ ] pyspark.sql.Column.isin
- [ ] pyspark.sql.Column.like
- [ ] pyspark.sql.Column.name
- [ ] pyspark.sql.Column.otherwise
- [ ] pyspark.sql.Column.over
- [ ] pyspark.sql.Column.rlike
- [ ] pyspark.sql.Column.startswith
- [ ] pyspark.sql.Column.substr
- [ ] pyspark.sql.Column.when
- [ ] pyspark.sql.Column.withField
- [ ] Data Types
- [ ] ArrayType
- [ ] BinaryType
- [ ] BooleanType
- [ ] ByteType
- [ ] DataType
- [ ] DateType
- [ ] DecimalType
- [ ] DoubleType
- [ ] FloatType
- [ ] IntegerType
- [ ] LongType
- [ ] MapType
- [ ] NullType
- [ ] ShortType
- [ ] StringType
- [ ] CharType
- [ ] VarcharType
- [ ] StructField
- [ ] StructType
- [ ] TimestampType
- [ ] TimestampNTZType
- [ ] DayTimeIntervalType
- [ ] Row
- [ ] Functions
- [ ] Normal Functions
- [ ] Math Functions
- [ ] Datetime Functions
- [ ] Collection Functions
- [ ] Partition Transformation Functions
- [ ] Aggregate Functions
- [ ] Window Functions
- [ ] Sort Functions
- [ ] String Functions
- [ ] UDF
- [ ] Misc Functions
- [ ] Window
- [ ] pyspark.sql.Window.currentRow
- [ ] pyspark.sql.Window.orderBy
- [ ] pyspark.sql.Window.partitionBy
- [ ] pyspark.sql.Window.rangeBetween
- [ ] pyspark.sql.Window.rowsBetween
- [ ] pyspark.sql.Window.unboundedFollowing
- [ ] pyspark.sql.Window.unboundedPreceding
- [ ] pyspark.sql.WindowSpec.orderBy
- [ ] pyspark.sql.WindowSpec.partitionBy
- [ ] pyspark.sql.WindowSpec.rangeBetween
- [ ] pyspark.sql.WindowSpec.rowsBetween
- [ ] Grouping
- [ ] pyspark.sql.GroupedData.agg
- [ ] pyspark.sql.GroupedData.apply
- [ ] pyspark.sql.GroupedData.applyInPandas
- [ ] pyspark.sql.GroupedData.applyInPandasWithState
- [ ] pyspark.sql.GroupedData.avg
- [ ] pyspark.sql.GroupedData.cogroup
- [ ] pyspark.sql.GroupedData.count
- [ ] pyspark.sql.GroupedData.max
- [ ] pyspark.sql.GroupedData.mean
- [ ] pyspark.sql.GroupedData.min
- [ ] pyspark.sql.GroupedData.pivot
- [ ] pyspark.sql.GroupedData.sum
- [ ] pyspark.sql.PandasCogroupedOps.applyInPandas
- [ ] Catalog
- [ ] pyspark.sql.Catalog.cacheTable
- [ ] pyspark.sql.Catalog.clearCache
- [ ] pyspark.sql.Catalog.createExternalTable
- [ ] pyspark.sql.Catalog.createTable
- [ ] pyspark.sql.Catalog.currentCatalog
- [ ] pyspark.sql.Catalog.currentDatabase
- [ ] pyspark.sql.Catalog.databaseExists
- [ ] pyspark.sql.Catalog.dropGlobalTempView
- [ ] pyspark.sql.Catalog.dropTempView
- [ ] pyspark.sql.Catalog.functionExists
- [ ] pyspark.sql.Catalog.getDatabase
- [ ] pyspark.sql.Catalog.getFunction
- [ ] pyspark.sql.Catalog.getTable
- [ ] pyspark.sql.Catalog.isCached
- [ ] pyspark.sql.Catalog.listCatalogs
- [ ] pyspark.sql.Catalog.listColumns
- [ ] pyspark.sql.Catalog.listDatabases
- [ ] pyspark.sql.Catalog.listFunctions
- [ ] pyspark.sql.Catalog.listTables
- [ ] pyspark.sql.Catalog.recoverPartitions
- [ ] pyspark.sql.Catalog.refreshByPath
- [ ] pyspark.sql.Catalog.refreshTable
- [ ] pyspark.sql.Catalog.registerFunction
- [ ] pyspark.sql.Catalog.setCurrentCatalog
- [ ] pyspark.sql.Catalog.setCurrentDatabase
- [ ] pyspark.sql.Catalog.tableExists
- [ ] pyspark.sql.Catalog.uncacheTable
- [ ] Avro
- [ ] Observation
- [ ] UDF
- [ ] Protobuf
Workflows
Register a df as a table, and run sql on it
df.createOrReplaceTempView("people")
result = spark.sql("select * from people")
I/O - Read and write data from file formats
Note: the spark api has a lot of baggage. There is always more than one way to do the same thing.
However, there are two distinct styles of getting something done: 1. Directly using a function and setting its arguments 2. Chaining a function, and progressively setting options Example:
## READING A FILE
## direct
df = spark.read.csv(path="/data/data.csv", nullValue="undefined")
df = spark.read.load(path="/data/data.csv", schema=df.schema, format="csv", nullValue="undefined", header=True)
## progressive
df = spark.read.option("nullValue", "undefined").format('csv').load("/data/data.csv")
## WRITING A FILE
## direct
df.write.csv("/data/data.csv", mode="overwrite")
## progressive
df.write.mode("overwrite").format("csv").save("/data/data.csv")
I prefer to use the direct way of defining the operation. As we are simply setting arguments to a function, autocomplete works very well with it. And it is pythonic.
In the indirect method, we are setting strings most of the time (especially when using the options()
method in which we purely use strings in the function chains.). This leads to mistakes, and also demands a lot of memorization.
You can notice the things that creep in when a framework of one language is operated through other languages.
Look at the spark.read.csv()
vs spark.read.load()
. You can see that the csv()
is a sharp and dedicated function, while the load()
function is designed to handle all data formats internally. The load()
function has a distinct java/scala style to it (the factory style of programming)
Also, the progressive style of chaining functions has a distinctive functional programming feel to it. We don't usually write python in this style.
CSV
Readcsv into df
A particular CSV, or all CSVs in a directory
df = spark.read.csv("/dir/path_with_many_csv/single_file.csv") # single
df = spark.read.csv("/dir/path_with_many_csv/") # all
arguments:
- delimiter=","
- inferSchema='True' (default false)
- header='True' (read first line as col name)
- nullValue='unknown' (consider a string as null)
## Read with user defined schema
schema = StructType().add("name", StringType(), True).add("age", IntegerType(), True)
df = spark.read.csv("path_to_csv", schema=)
Write df into csv
df.write.csv("/dir/path_with_many_csv/", mode="<mode>")
Where mode=
overwrite
append
ignore
error
JDBC
Read Table into dataframe
df = spark.read.jdbc(url="", table="", properties={})
## properties are jdbc connection arguments
## example:
properties = { ‘user’ : ‘SYSTEM’, ‘password’ : ‘mypassword’ }
How to parallelize?
From the documentation: Partitions of the table will be retrieved in parallel if either column
or predicates
is specified. lowerBound
, upperBound
and numPartitions
is needed when column
is specified.
Write dataframe into table
<Insert example when I have the time>
params:
- table (str)
- mode
- properties (dict)
JSON
Similar to csv, depends on the path you specify. * Entire firectory? give dir path * Single file? give file path * Multiple files? give files path as an array
df = spark.read.json(path="/dir/path_with_many_csv/single_file.csv") # single
df = spark.read.json(path="/dir/path_with_many_csv/") # all
JSON Lines will work by default. This data is never multiline (only one json record per line)
For one record for file json, with the data in multiple lines, set the argument multiline=True
JSON will need user specified schema, to maintain your sanity.
schema = StructType([
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True),
])
df = spark.read.json(path="<your path>", schema=schema)
Write json file:
Writes in JSON lines format
df.write.json(path="/path/to/output/zipcodes.json", mode="overwrite")
Modes: * append * overwrite * ignore * error or errorifexists
ORC
Read
df = spark.read.orc(path="<path to dir, file, files>")
Notable parameters: * mergeSchema (boolean) * recursiveFileLookup * modifiedBefore * modifiedAfter
Write
df.write.orc(path='<path>')
Notable parameters: * partitionBy (str, list of str) - parititon by column name(s)
Parquet
The parquet api's options are surprisingly limited
Read
spark.read.parquet(path='/data/data.parquet')
Notable parameters: * mergeSchema (bool)
Write
spark.write.parquet(path='/data/data.parquet')
Notable params: * mode (str, optional) {append, overwrite, ignore error/errorifexists} * partitionBy (str or list) - names of cols to partition by
Text
Read Reads every line in the text file as a new row in the dataframe.
Suppose file.txt looks like this:
alphabets
a
b
c
spark.read.schema(df.schema).text("/data/file.txt").sort("alphabets").show()
Write The dataframe must have only one col of string type. Every row becomes a new line in the output file.
df.write.mode("overwrite").text(path="/data/data.txt")
other params: * lineSep (str)
Option/Options
These are weird. Used to set arguments (options) of both read and write functions.
## you can either directly set the nullvalue argument of the function:
spark.read.csv(path="/data/", nullValue="undefined")
## or set arguments via options, through the chained functions style of coding
spark.read.option("nullValue", "undefined").format('parquet').load("/data/data.parquet")
These are just two different styles of programming.
I don't like using options, and prefer to stick to directly setting arguments in functions. (Since we give strings in options, autocomplete doesn't work for them, which leads to mistakes)
More on writing
Save()
You might have noticed it at the end of chained write functions. We specify the path to save into (file or directory)
df.write.mode("overwrite").format("json").save(path="/data/data.json")
In the above example, the mode and format is set using functions in the chain.
You can also set them directly within save()
df.write.save(path="/data/data.json", format="json", mode="overwrite")
Other arguments:
* partitionBy (list)
* **options
(dict) other string options)
This more than one way of doing things makes spark a pain to deal with.
Partition on a dataframe col while writing
uses partitionBy()
function
df.write.partitionBy(cols="name").mode("overwrite").format("parquet").save("/data/data.parquet")
The argument cols
can be a string or a list of strings
Misc
Mode
Acts the same way as options. Either set mode as an argument to a function, or set it via the mode()
function.
## you can either directly set the mode argument of the function:
df.write.mode("overwrite").format("parquet").save(d)
## or set mode through the chained functions style of coding
df.write.mode("overwrite").format("parquet").save(d)
Load
Load is a generic function to read various file formats into a dataframe.
df = spark.read.load(path="/data/data.csv", schema=df.schema, format="csv", nullValue="undefined", header=True)
Prefer directly using the dedicated function. Eg spark.read.csv()
instead of spark.read.load(format="csv")
Table
Read a table into a dataframe
spark.read.table(tableName='people').show()
The table used in this function is one loaded via a spark sql function like df.createOrReplaceTempView('people')
Write bucketBy()
write.bucketBy(2, "name").mode("overwrite").saveAsTable("bucketed_table")
Params: * numBuckets (int) * col (str, list, typle) * cols (str). Keep this empty if a list is already provided in cols
bucketBy().sortBy()
Use sortBy() to sort rows in a bucket
write.bucketBy(2, "name").sortBy(col="age").saveAsTable("bucketed_table")
parameters: * col (str, list of str) * cols (str, list of str)
Insert dataframe into a table
Works only when (schema of dataframe) = (schema of table)
df.selectExpr("age AS col1", "name AS col2").write.insertInto(tableName="tblPeople")
Params: * overwrite (bool). Default is false. Overwrites existing data when true.
Save into a table
df.write.saveAsTable(name="tblA")
Other params:
* format (str)
* mode (str)
* partitionBy (str/list)
* **options
(dict)
Note: In append mode, if the table already exists, saveAsTable()
doesn't need the dataframe's schema to be the same as table's schema. It will find correct column positions through the column names, and insert data into the tables.
Table stuff
The documentation of table stuff SUCKS. Maybe because it is legacy stuff
create
create new table from contents of dataframe. The new table’s schema, partition layout, properties, and other configuration will be based on the configuration set on this writer.
replace
Replace an existing table with the contents of the data frame.
The existing table’s schema, partition layout, properties, and other configuration will be replaced with the contents of the data frame and the configuration set on this writer.
createOrReplace
Create a new table or replace an existing table with the contents of the data frame.
append
overwrite
Overwrite rows matching the given filter condition with the contents of the data frame in the output table.
overwritePartitions
Overwrite all partition for which the data frame contains at least one row with the contents of the data frame in the output table.
tableProperty
[https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.tableProperty.html](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.tableProperty.html)
partitionedBy
Sessions
## set app name (shown in spark ui)
SparkSession.builder.appName("My App Name")
## set master url
SparkSession.builder.master("local")
## or
SparkSession.builder.master("spark://master:7077")
SparkSession.builder.enableHiveSupport()
## Set spark remote url
SparkSession.builder.remote("sc://localhost")
## set some config options
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())
## will be applied to sparksessions created or fetched via this builder
GetOrCreate
Checks if a valid global SparkSession exists. * Yes? returns it. Also applies config options specified in the builder to the returned SparkSession. * No? creates new SparkSession and sets it as the global default
session_1 = SparkSession.buildier.getOrCreate()
## can set its config
session_1.conf.set("key_1", "value_1")
## can get its config
session_1.conf.get("key_1")
Get currently active session
active_session = SparkSession.getActiveSession()
Create a new session
The session has separate SQLConf, registered tmp views, UDFs. However, SparkContext and tablecache is shared.
spark.newSession()
Set session config at runtime
Set all spark/hadoop configs relevant to Spark SQL. When fetching a config key's value, defaults to the value set in underlying SparkContext (if it is already set).
spark.conf.set("key", "value")
Catalog
Catalog object, through which we can do operations on underlying db, tables, functions etc (create, drop alter, query etc)
spark.catalog
## eg: list tables
spark.catalog.listTables()
Create dataframe
DF is created via a sparksession
spark.createDataFrame([('Alice', 1)]).collect()
Can create with explicit schema
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
df3 = spark.createDataFrame(rdd, schema)
And can create from various sources
## DF from tuple
spark.createDataFrame([('Alice', 1)]).collect()
## df from list of dicts
d = [{'name': 'Alice', 'age': 1}]
spark.createDataFrame(d).collect()
## from RDD
rdd = spark.sparkContext.parallelize([('Alice', 1)])
df = spark.createDataFrame(rdd, ['name', 'age'])
## from row instances
from pyspark.sql import Row
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
df2 = spark.createDataFrame(person)
## from pandas dataframe
spark.createDataFrame(df.toPandas())
## from RDD with schema in DDL formatted string
spark.createDataFrame(rdd, "a: string, b: int").collect()
rdd = rdd.map(lambda row: row[1])
spark.createDataFrame(rdd, "int").collect()
range
Creates DataFrame with with single pyspark.sql.types.LongType
column named id
, containing elements in a range from start
to end
(exclusive) with step value step
.
spark.range(start=1, end=7, step=2)
## +---+
## | id|
## +---+
## | 1|
## | 3|
## | 5|
## +---+
read
Returns a DataFrameReader, that can be used to read data as a DataFrame
spark.read
## <...DataFrameReader object ...>
This is the interface, through which we read in stuff. Eg: spark.read.format('json').load(d).show()
readstream
Read data as a streaming DataFrame
spark.readStream.format("rate").load()
## write stream to console, and stop streaming query after 3 seconds
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()
sql
Execute sql query
spark.sql("SELECT * FROM range(10) where id > 7")
stop sparkcontext
Stop underlying sparkcontext
spark.stop()
spark.streams
returns a StreamingQueryManager, that manages all StreamingQuery instances active on this context
sq = spark.readStream.format(
"rate").load().writeStream.format('memory').queryName('this_query').start()
sqm = spark.streams
[q.name for q in sqm.active]
['this_query']
sq.stop()
table
returns table as a DataFrame
df = spark.table("table1")
udf
strlen = spark.udf.register("strlen", lambda x: len(x))
spark.sql("SELECT strlen('test')").show()
version
Gives the running application's spark version
_ = spark.version