← Back to Index

PySpark Reference

0. Session & Basics

0.1 Session Init

from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("PySparkReference") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
TERMINAL OUTPUT
(No output)

0.2 Context/Config

# Access the underlying SparkContext
sc = spark.sparkContext

# Get and set runtime configuration
spark.conf.set("spark.sql.shuffle.partitions", "10")
current_val = spark.conf.get("spark.sql.shuffle.partitions")
print(f"Partitions: {current_val}")
TERMINAL OUTPUT
Partitions: 10

0.3 Show/Display

df = spark.createDataFrame([(1, "foo"), (2, "bar")], ["id", "val"])

# Display the first 20 rows (default)
df.show()

# Display with vertical formatting and no truncation
df.show(n=5, truncate=False, vertical=True)
TERMINAL OUTPUT
+---+---+
| id|val|
+---+---+
|  1|foo|
|  2|bar|
+---+---+

-RECORD 0----
 id  | 1     
 val | foo   
-RECORD 1----
 id  | 2     
 val | bar   

1. DataFrames & I/O

1.1 Read (CSV/Parquet)

# Read CSV with header and schema inference
df_csv = spark.read.csv("data.csv", header=True, inferSchema=True)

# Read Parquet (columnar format)
df_parquet = spark.read.parquet("data.parquet")
TERMINAL OUTPUT
(No output)

1.2 Write

# Write to Parquet with overwrite mode
df.write.mode("overwrite").parquet("output.parquet")

# Write to CSV with header and custom delimiter
df.write.option("header", "true").option("sep", "|").csv("output.csv")
TERMINAL OUTPUT
(No output)

1.3 Create from Local

# Create DataFrame from a list of tuples or dictionaries
data = [("Alice", 34), ("Bob", 45)]
df = spark.createDataFrame(data, ["name", "age"])
TERMINAL OUTPUT
(DataFrame structure)

2. Schema & Types

2.1 StructType/Field

from pyspark.sql import types as T

# Define explicit schema using StructType
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("metadata", T.MapType(T.StringType(), T.StringType()), True)
])

df_with_schema = spark.createDataFrame([], schema)
TERMINAL OUTPUT
(No output)

2.2 Casting

from pyspark.sql import functions as F
from pyspark.sql import types as T

# Cast an existing column to a different type
df_cast = df.withColumn("age", F.col("age").cast(T.DoubleType()))

# Casting via selectExpr (SQL style)
df_cast_sql = df.selectExpr("cast(age as string) as age_str")
TERMINAL OUTPUT
(No output)

2.3 Inspecting Schema

# Print schema in a hierarchical tree format
df.printSchema()

# Get column names and their data types as a list of tuples
print(df.dtypes)
TERMINAL OUTPUT
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

[('name', 'string'), ('age', 'bigint')]

3. Selection & Filtering

3.1 Select/Alias

from pyspark.sql import functions as F

# Select columns and rename using alias
df_sel = df.select(
    F.col("name").alias("user_name"),
    F.col("age") + 1
)
TERMINAL OUTPUT
(No output)

3.2 Filter/Where

from pyspark.sql import functions as F

# Filter rows based on conditions (filter and where are synonyms)
df_filtered = df.filter(F.col("age") > 30)
df_where = df.where("age > 30 AND name LIKE 'A%'")
TERMINAL OUTPUT
(No output)

3.3 Drop/Drop Duplicates

# Remove columns from DataFrame
df_less = df.drop("age")

# Remove duplicate rows based on all or specific columns
df_unique = df.dropDuplicates(["name"])
TERMINAL OUTPUT
(No output)

4. Column Operations

4.1 withColumn

from pyspark.sql import functions as F

# Add a new column or replace an existing one
df_new = df.withColumn("is_adult", F.col("age") >= 18)
TERMINAL OUTPUT
(No output)

4.2 when/otherwise

from pyspark.sql import functions as F

# Conditional logic similar to IF-THEN-ELSE
df_logic = df.withColumn("category", 
    F.when(F.col("age") < 18, "Minor")
     .when(F.col("age") < 65, "Adult")
     .otherwise("Senior")
)
TERMINAL OUTPUT
(No output)

4.3 String/Date Math

from pyspark.sql import functions as F

# String and Date manipulations
df_ops = df.withColumn("upper_name", F.upper(F.col("name"))) \
           .withColumn("plus_7_days", F.date_add(F.current_date(), 7)) \
           .withColumn("diff", F.datediff(F.current_date(), F.to_date(F.lit("2020-01-01"))))
TERMINAL OUTPUT
(No output)

5. Aggregations & Grouping

5.1 groupBy

# Group DataFrame by one or more columns
grouped_df = df.groupBy("name")
TERMINAL OUTPUT
(No output)

5.2 Aggregate Functions

from pyspark.sql import functions as F

# Perform aggregations on groups
df_agg = df.groupBy("name").agg(
    F.count("*").alias("total"),
    F.avg("age").alias("avg_age"),
    F.collect_list("age").alias("all_ages")
)
TERMINAL OUTPUT
+-----+-----+-------+--------+
| name|total|avg_age|all_ages|
+-----+-----+-------+--------+
|Alice|    1|   34.0|    [34]|
|  Bob|    1|   45.0|    [45]|
+-----+-----+-------+--------+

5.3 Pivot

# Pivot column values into separate columns
df_pivot = df.groupBy("name").pivot("age").count()
TERMINAL OUTPUT
+-----+---+---+
| name| 34| 45|
+-----+---+---+
|Alice|  1|null|
|  Bob|null|  1|
+-----+---+---+

6. Joins & Set Operations

6.1 Inner/Outer/Left Joins

df1 = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "val1"])
df2 = spark.createDataFrame([(1, "X"), (3, "Y")], ["id", "val2"])

# Join types: inner, left, right, outer, left_semi, left_anti
df_inner = df1.join(df2, on="id", how="inner")
df_left = df1.join(df2, on="id", how="left")
TERMINAL OUTPUT
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   A|   X|
+---+----+----+

6.2 Broadcast Join

from pyspark.sql import functions as F

# Force broadcasting a small DataFrame for performance
df_joined = df1.join(F.broadcast(df2), "id")
TERMINAL OUTPUT
(No output)

6.3 Union/Intersect

from pyspark.sql import functions as F

# Combine DataFrames vertically (must have same schema)
df_union = df1.union(df2.select("id", F.col("val2").alias("val1")))

# Find common rows
df_intersect = df1.intersect(df2.select("id", F.col("val2").alias("val1")))
TERMINAL OUTPUT
+---+----+
| id|val1|
+---+----+
|  1|   A|
|  2|   B|
|  1|   X|
|  3|   Y|
+---+----+

7. Window Functions

7.1 WindowSpec

from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Define a window specification
window_spec = Window.partitionBy("category").orderBy(F.col("salary").desc())
TERMINAL OUTPUT
(No output)

7.2 Ranking

from pyspark.sql import functions as F

# Apply ranking functions over the window
df_ranked = df.withColumn("rank", F.rank().over(window_spec)) \
              .withColumn("row_num", F.row_number().over(window_spec))
TERMINAL OUTPUT
(No output)

7.3 Lead/Lag

from pyspark.sql import functions as F

# Access values from preceding or following rows
df_trends = df.withColumn("prev_val", F.lag("salary", 1).over(window_spec)) \
              .withColumn("next_val", F.lead("salary", 1).over(window_spec))
TERMINAL OUTPUT
(No output)

8. UDFs & Advanced

8.1 Standard UDFs

from pyspark.sql import functions as F
from pyspark.sql import types as T

# Define and register a Python function as a UDF
@F.udf(returnType=T.StringType())
def complex_logic(val):
    return f"Processed: {val}"

df_udf = df.withColumn("result", complex_logic(F.col("name")))
TERMINAL OUTPUT
(No output)

8.2 Vectorized UDFs

import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Pandas UDF for vectorized execution (requires pyarrow)
@F.pandas_udf(T.DoubleType())
def vectorized_add_one(s: pd.Series) -> pd.Series:
    return s + 1

df_vec = df.withColumn("age_plus_one", vectorized_add_one(F.col("age")))
TERMINAL OUTPUT
(No output)

8.3 RDD Interop

# Convert DataFrame to RDD
rdd = df.rdd

# Perform RDD operations
rdd_mapped = rdd.map(lambda row: (row.name, row.age * 2))

# Convert RDD back to DataFrame
df_back = rdd_mapped.toDF(["name", "double_age"])
TERMINAL OUTPUT
(No output)

9. Performance & Tuning

9.1 Cache/Persist

from pyspark import StorageLevel

# Cache in memory (short-cut for persist(MEMORY_AND_DISK))
df.cache()

# Persist with explicit storage level
df.persist(StorageLevel.DISK_ONLY)

# Remove from cache
df.unpersist()
TERMINAL OUTPUT
(No output)

9.2 Repartition/Coalesce

# Increase or shuffle partitions
df_repartitioned = df.repartition(100, "name")

# Decrease partitions without full shuffle
df_coalesced = df.coalesce(1)
TERMINAL OUTPUT
(No output)

9.3 Explain Plan

# Display physical execution plan
df.explain()

# Display detailed plans (Parsed, Analyzed, Optimized, Physical)
df.explain(extended=True)
TERMINAL OUTPUT
== Physical Plan ==
*(1) Scan ExistingRDD[name#0,age#1L]