This cheat sheet provides a quick reference for PySpark and AWS Glue, offering essential commands and operations to help you revise key concepts or quickly recall syntax during interviews or coding exams. Whether you’re working with PySpark for data processing or leveraging AWS Glue for ETL workflows, this guide serves as a handy resource to streamline your development process. Keep it handy to navigate common tasks and boost your productivity!

Imports

# Importing SparkSession to create and manage Spark applications
from pyspark.sql import SparkSession
# Importing functions (F) for transformations and types (T) for schema definitions
from pyspark.sql import functions as F, types as T
# Importing StorageLevel to control persistence levels in caching
from pyspark import StorageLevel
# Importing AWS Glue libraries
from awsglue.context import GlueContext
from awsglue.job import Job

Initializing SparkSession & AWS Glue Context

# Creating a SparkSession, setting an application name, and applying optional configurations
spark = SparkSession.builder.appName("example").config().getOrCreate()
# Creating a GlueContext for AWS Glue jobs
glueContext = GlueContext(spark.sparkContext)
# Creating an AWS Glue job
job = Job(glueContext)
job.init("example_job", {})

Reading Data

# Reading a JSON file into a DataFrame
df = spark.read.json("example.json")
# Loading a Parquet file into a DataFrame
df = spark.read.load("example.parquet")
# Reading a CSV file with headers and inferring schema
df = spark.read.csv("example.csv", header=True, inferSchema=True)
# Creating sample data as a list of tuples
data = [("John", 25), ("Doe", 30), ("Matt", 22)]
# Converting the list into an RDD
rdd = spark.sparkContext.parallelize(data)
# Converting the RDD into a DataFrame with column names
df = rdd.toDF(["name", "age"])

Reading from AWS Glue Catalog

# Reading a table from AWS Glue Data Catalog
df = glueContext.create_dynamic_frame.from_catalog(database="my_database", table_name="my_table").toDF()

Basics

# Displays the first 20 rows of the DataFrame in tabular format
df.show()
# Returns the first 5 rows as a list of Row objects
df.head(5)
# Returns the last 5 rows as a list of Row objects
df.tail(5)
# Returns a list of column names
df.columns
# Returns a list of column names with their data types
df.dtypes
# Returns the schema of the DataFrame
df.schema
# Returns the number of rows in the DataFrame
df.count()
# Selecting specific columns from the DataFrame
df = df.select('name', 'age')

Filtering

# Filters rows where name is "John"
df = df.filter(df.name == "John")
# Filters rows where age is greater than 20
df = df.filter(df.age > 20)
# Filters rows where name is "John" and age is greater than 20
df = df.filter((df.name == "John") & (df.age > 20))
# Filters rows where name is either "John" or "Matt"
df = df.filter(F.col('name').isin(["John", "Matt"]))
# Filters out rows where age is NULL
df = df.filter(df.age.isNotNull())

Joins

# Performs an inner join on the "name" column
df = df.join(other_table, 'name')
# Performs a left join on different column names
df = df.join(other_table, df.d_id == other_table.dept_id, 'left')
# Joins on multiple columns
df = df.join(other_table, ['emp_name', 'dept_name'])

Column Operations

# Removes leading whitespace from the "name" column
df = df.withColumn('name', F.ltrim('name'))
# Concatenates first name and last name with a space in between
df = df.withColumn('full_name', F.concat('fname', F.lit(' '), 'lname'))
# Renames a column
df = df.withColumnRenamed('current_column_name', 'new_column_name')
# Drops specified columns from the DataFrame
df = df.drop('column_name_1', 'column_name_2')

Miscellaneous Transformations

# Removes duplicate rows
df = df.dropDuplicates()
# Removes rows with NULL values
df = df.na.drop()
# Replaces NULL values in the "age" column with 0
df = df.na.fill(0, ["age"])
# Groups data by "age" and counts the number of occurrences for each age
df = df.groupBy(F.col("age")).agg(F.count("name").alias("count"))

Repartitioning

# Increases the number of partitions to 10 (useful for parallelism)
df = df.repartition(10)
# Reduces the number of partitions to 5 (useful to optimize shuffle performance)
df = df.coalesce(5)
# Partitions data based on the "age" column
df = df.repartition("age")
# Retrieves the current number of partitions
num_partition = df.rdd.getNumPartitions()

Caching & Persistence

# Caches the DataFrame in memory for faster access
df = df.cache()
# Persists the DataFrame in both memory and disk (useful for large datasets)
df.persist(StorageLevel.MEMORY_AND_DISK)
# Removes the DataFrame from cache or persisted storage
df.unpersist()

Spark SQL

# Creates a temporary view that can be queried using SQL
df.createOrReplaceTempView("customer")
# Executes an SQL query on the registered view
df = spark.sql("SELECT * FROM customer")

Writing Data

# Writes DataFrame to a CSV file, overwriting existing data
df.write.mode("overwrite").csv("output_folder/csv_output", header=True)
# Writes CSV as a single file (useful when reducing file count)
df.coalesce(1).write.mode("overwrite").csv("output_folder/csv_output", header=True)
# Writes DataFrame to a JSON file
df.write.mode("overwrite").json("output_folder/json_output")
# Writes DataFrame to a Parquet file
df.write.mode("overwrite").parquet("output_folder/parquet_output")

Writing to AWS Glue Catalog

# Writing a DataFrame back to AWS Glue Catalog as a Parquet table
glueContext.write_dynamic_frame.from_options(
    frame=glueContext.create_dynamic_frame.fromDF(df, glueContext),
    connection_type="s3",
    connection_options={"path": "s3://my-bucket/output/"},
    format="parquet"
)

Stopping SparkSession & AWS Glue Job

# Stops the SparkSession and releases resources
spark.stop()
# Marks the Glue job as complete
job.commit()