PySpark Reference
0. Session & Basics
0.1 Session Init
from pyspark.sql import SparkSession
# Initialize a SparkSession
spark = SparkSession.builder \
.appName("PySparkReference") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
TERMINAL OUTPUT
(No output)
0.2 Context/Config
# Access the underlying SparkContext
sc = spark.sparkContext
# Get and set runtime configuration
spark.conf.set("spark.sql.shuffle.partitions", "10")
current_val = spark.conf.get("spark.sql.shuffle.partitions")
print(f"Partitions: {current_val}")
TERMINAL OUTPUT
Partitions: 10
0.3 Show/Display
df = spark.createDataFrame([(1, "foo"), (2, "bar")], ["id", "val"])
# Display the first 20 rows (default)
df.show()
# Display with vertical formatting and no truncation
df.show(n=5, truncate=False, vertical=True)
TERMINAL OUTPUT
+---+---+
| id|val|
+---+---+
| 1|foo|
| 2|bar|
+---+---+
-RECORD 0----
id | 1
val | foo
-RECORD 1----
id | 2
val | bar
1. DataFrames & I/O
1.1 Read (CSV/Parquet)
# Read CSV with header and schema inference
df_csv = spark.read.csv("data.csv", header=True, inferSchema=True)
# Read Parquet (columnar format)
df_parquet = spark.read.parquet("data.parquet")
TERMINAL OUTPUT
(No output)
1.2 Write
# Write to Parquet with overwrite mode
df.write.mode("overwrite").parquet("output.parquet")
# Write to CSV with header and custom delimiter
df.write.option("header", "true").option("sep", "|").csv("output.csv")
TERMINAL OUTPUT
(No output)
1.3 Create from Local
# Create DataFrame from a list of tuples or dictionaries
data = [("Alice", 34), ("Bob", 45)]
df = spark.createDataFrame(data, ["name", "age"])
TERMINAL OUTPUT
(DataFrame structure)
2. Schema & Types
2.1 StructType/Field
from pyspark.sql import types as T
# Define explicit schema using StructType
schema = T.StructType([
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True),
T.StructField("metadata", T.MapType(T.StringType(), T.StringType()), True)
])
df_with_schema = spark.createDataFrame([], schema)
TERMINAL OUTPUT
(No output)
2.2 Casting
from pyspark.sql import functions as F
from pyspark.sql import types as T
# Cast an existing column to a different type
df_cast = df.withColumn("age", F.col("age").cast(T.DoubleType()))
# Casting via selectExpr (SQL style)
df_cast_sql = df.selectExpr("cast(age as string) as age_str")
TERMINAL OUTPUT
(No output)
2.3 Inspecting Schema
# Print schema in a hierarchical tree format
df.printSchema()
# Get column names and their data types as a list of tuples
print(df.dtypes)
TERMINAL OUTPUT
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
[('name', 'string'), ('age', 'bigint')]
3. Selection & Filtering
3.1 Select/Alias
from pyspark.sql import functions as F
# Select columns and rename using alias
df_sel = df.select(
F.col("name").alias("user_name"),
F.col("age") + 1
)
TERMINAL OUTPUT
(No output)
3.2 Filter/Where
from pyspark.sql import functions as F
# Filter rows based on conditions (filter and where are synonyms)
df_filtered = df.filter(F.col("age") > 30)
df_where = df.where("age > 30 AND name LIKE 'A%'")
TERMINAL OUTPUT
(No output)
3.3 Drop/Drop Duplicates
# Remove columns from DataFrame
df_less = df.drop("age")
# Remove duplicate rows based on all or specific columns
df_unique = df.dropDuplicates(["name"])
TERMINAL OUTPUT
(No output)
4. Column Operations
4.1 withColumn
from pyspark.sql import functions as F
# Add a new column or replace an existing one
df_new = df.withColumn("is_adult", F.col("age") >= 18)
TERMINAL OUTPUT
(No output)
4.2 when/otherwise
from pyspark.sql import functions as F
# Conditional logic similar to IF-THEN-ELSE
df_logic = df.withColumn("category",
F.when(F.col("age") < 18, "Minor")
.when(F.col("age") < 65, "Adult")
.otherwise("Senior")
)
TERMINAL OUTPUT
(No output)
4.3 String/Date Math
from pyspark.sql import functions as F
# String and Date manipulations
df_ops = df.withColumn("upper_name", F.upper(F.col("name"))) \
.withColumn("plus_7_days", F.date_add(F.current_date(), 7)) \
.withColumn("diff", F.datediff(F.current_date(), F.to_date(F.lit("2020-01-01"))))
TERMINAL OUTPUT
(No output)
5. Aggregations & Grouping
5.1 groupBy
# Group DataFrame by one or more columns
grouped_df = df.groupBy("name")
TERMINAL OUTPUT
(No output)
5.2 Aggregate Functions
from pyspark.sql import functions as F
# Perform aggregations on groups
df_agg = df.groupBy("name").agg(
F.count("*").alias("total"),
F.avg("age").alias("avg_age"),
F.collect_list("age").alias("all_ages")
)
TERMINAL OUTPUT
+-----+-----+-------+--------+
| name|total|avg_age|all_ages|
+-----+-----+-------+--------+
|Alice| 1| 34.0| [34]|
| Bob| 1| 45.0| [45]|
+-----+-----+-------+--------+
5.3 Pivot
# Pivot column values into separate columns
df_pivot = df.groupBy("name").pivot("age").count()
TERMINAL OUTPUT
+-----+---+---+
| name| 34| 45|
+-----+---+---+
|Alice| 1|null|
| Bob|null| 1|
+-----+---+---+
6. Joins & Set Operations
6.1 Inner/Outer/Left Joins
df1 = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "val1"])
df2 = spark.createDataFrame([(1, "X"), (3, "Y")], ["id", "val2"])
# Join types: inner, left, right, outer, left_semi, left_anti
df_inner = df1.join(df2, on="id", how="inner")
df_left = df1.join(df2, on="id", how="left")
TERMINAL OUTPUT
+---+----+----+
| id|val1|val2|
+---+----+----+
| 1| A| X|
+---+----+----+
6.2 Broadcast Join
from pyspark.sql import functions as F
# Force broadcasting a small DataFrame for performance
df_joined = df1.join(F.broadcast(df2), "id")
TERMINAL OUTPUT
(No output)
6.3 Union/Intersect
from pyspark.sql import functions as F
# Combine DataFrames vertically (must have same schema)
df_union = df1.union(df2.select("id", F.col("val2").alias("val1")))
# Find common rows
df_intersect = df1.intersect(df2.select("id", F.col("val2").alias("val1")))
TERMINAL OUTPUT
+---+----+
| id|val1|
+---+----+
| 1| A|
| 2| B|
| 1| X|
| 3| Y|
+---+----+
7. Window Functions
7.1 WindowSpec
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Define a window specification
window_spec = Window.partitionBy("category").orderBy(F.col("salary").desc())
TERMINAL OUTPUT
(No output)
7.2 Ranking
from pyspark.sql import functions as F
# Apply ranking functions over the window
df_ranked = df.withColumn("rank", F.rank().over(window_spec)) \
.withColumn("row_num", F.row_number().over(window_spec))
TERMINAL OUTPUT
(No output)
7.3 Lead/Lag
from pyspark.sql import functions as F
# Access values from preceding or following rows
df_trends = df.withColumn("prev_val", F.lag("salary", 1).over(window_spec)) \
.withColumn("next_val", F.lead("salary", 1).over(window_spec))
TERMINAL OUTPUT
(No output)
8. UDFs & Advanced
8.1 Standard UDFs
from pyspark.sql import functions as F
from pyspark.sql import types as T
# Define and register a Python function as a UDF
@F.udf(returnType=T.StringType())
def complex_logic(val):
return f"Processed: {val}"
df_udf = df.withColumn("result", complex_logic(F.col("name")))
TERMINAL OUTPUT
(No output)
8.2 Vectorized UDFs
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import types as T
# Pandas UDF for vectorized execution (requires pyarrow)
@F.pandas_udf(T.DoubleType())
def vectorized_add_one(s: pd.Series) -> pd.Series:
return s + 1
df_vec = df.withColumn("age_plus_one", vectorized_add_one(F.col("age")))
TERMINAL OUTPUT
(No output)
8.3 RDD Interop
# Convert DataFrame to RDD
rdd = df.rdd
# Perform RDD operations
rdd_mapped = rdd.map(lambda row: (row.name, row.age * 2))
# Convert RDD back to DataFrame
df_back = rdd_mapped.toDF(["name", "double_age"])
TERMINAL OUTPUT
(No output)
9. Performance & Tuning
9.1 Cache/Persist
from pyspark import StorageLevel
# Cache in memory (short-cut for persist(MEMORY_AND_DISK))
df.cache()
# Persist with explicit storage level
df.persist(StorageLevel.DISK_ONLY)
# Remove from cache
df.unpersist()
TERMINAL OUTPUT
(No output)
9.2 Repartition/Coalesce
# Increase or shuffle partitions
df_repartitioned = df.repartition(100, "name")
# Decrease partitions without full shuffle
df_coalesced = df.coalesce(1)
TERMINAL OUTPUT
(No output)
9.3 Explain Plan
# Display physical execution plan
df.explain()
# Display detailed plans (Parsed, Analyzed, Optimized, Physical)
df.explain(extended=True)
TERMINAL OUTPUT
== Physical Plan ==
*(1) Scan ExistingRDD[name#0,age#1L]