6  Python has libraries to tell the data processing engine (Spark, Trino, Duckdb, Polars, etc) what to do

Almost every data processing system has a Python library that allows for interaction with it.

Some examples Pyspark API Snowflake Python Connector BigQuery Python Connector Trino Python Connector Polars Python API

The main types of data processing libraries are:

  1. Python standard libraries: Python has a strong set of standard libraries for processing data. When using standard libraries, you’ll usually be working with Python’s native data structures (link). Some popular ones are csv, json, gzip etc.
  2. Dataframe libraries: Libraries like pandas, polars, and Spark enable you to work with tabular data. These libraries utilize a DataFrame (Python’s equivalent of a SQL table) and enable you to perform everything (and more) that you can with SQL. Note that some of these libraries are meant for data that can be processed in memory.
  3. Processing data on SQL via Python: You can use database drivers and write SQL queries to process the data. The benefit of this approach is that you don’t need to bring data into Python memory and offload it to the database.

Note: When we use systems like Spark, Dask, Snowflake, BigQuery to process data, you should note that we interact with them via Python. Data is processed by the external systems.

Let’s work through some code examples that aim to clean a sample dataset. We will first perform the cleaning using Python’s standard libraries and then repeat the process using PySpark dataframes.

6.1 Data processing with Python standard library

print(
    "################################################################################"
)
print("Use standard python libraries to do the transformations")
print(
    "################################################################################"
)
import csv

data = []
with open("./sample_data.csv", "r", newline="") as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        data.append(row)
print(data[:2])
data_unique = []
customer_ids_seen = set()
for row in data:
    if row["Customer_ID"] not in customer_ids_seen:
        data_unique.append(row)
        customer_ids_seen.add(row["Customer_ID"])
    else:
        print(f'duplicate customer id {row["Customer_ID"]}')
for row in data_unique:
    if not row["Age"]:
        print(f'Customer {row["Customer_Name"]} does not have Age value')
        row["Age"] = 0
    if not row["Purchase_Amount"]:
        row["Purchase_Amount"] = 0.0

data_cleaned = [
    row
    for row in data_unique
    if int(row["Age"]) <= 100 and float(row["Purchase_Amount"]) <= 1000
]

for row in data_cleaned:
    if row["Gender"] == "Female":
        row["Gender"] = 0
    elif row["Gender"] == "Male":
        row["Gender"] = 1

for row in data_cleaned:
    first_name, last_name = row["Customer_Name"].split(" ", 1)
    row["First_Name"] = first_name
    row["Last_Name"] = last_name

print(data_cleaned[:3])
from collections import defaultdict
total_purchase_by_gender = defaultdict(float)
for row in data_cleaned:
    total_purchase_by_gender[row["Gender"]] += float(row["Purchase_Amount"])

age_groups = {"18-30": [], "31-40": [], "41-50": [], "51-60": [], "61-70": []}
for row in data_cleaned:
    age = int(row["Age"])
    if age <= 30:
        age_groups["18-30"].append(float(row["Purchase_Amount"]))
    elif age <= 40:
        age_groups["31-40"].append(float(row["Purchase_Amount"]))
    elif age <= 50:
        age_groups["41-50"].append(float(row["Purchase_Amount"]))
    elif age <= 60:
        age_groups["51-60"].append(float(row["Purchase_Amount"]))
    else:
        age_groups["61-70"].append(float(row["Purchase_Amount"]))

average_purchase_by_age_group = {
    group: sum(amounts) / len(amounts) for group, amounts in age_groups.items()
}
print("Total purchase amount by Gender:", total_purchase_by_gender)
print("Average purchase amount by Age group:", average_purchase_by_age_group)

6.2 Data processing with PySpark

spark
print(
    "################################################################################"
)
print("Use PySpark DataFrame API to do the transformations")
print(
    "################################################################################"
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit, when, split, sum as spark_sum, avg, regexp_replace
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType

schema = StructType([
    StructField("Customer_ID", IntegerType(), True),
    StructField("Customer_Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Purchase_Amount", FloatType(), True),
    StructField("Purchase_Date", DateType(), True)
])

# Read data from CSV file into DataFrame
data = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .csv("./sample_data.csv")

# Question: How do you remove duplicate rows based on customer ID in PySpark?
data_unique = data.dropDuplicates()

# Question: How do you handle missing values by replacing them with 0 in PySpark?
data_cleaned_missing = data_unique.select(
    col("Customer_ID"),
    col("Customer_Name"),
    coalesce(col("Age"), lit(0)).alias("Age"),
    col("Gender"),
    coalesce(col("Purchase_Amount"), lit(0.0)).alias("Purchase_Amount"),
    col("Purchase_Date")
)

# Question: How do you remove outliers (e.g., age > 100 or purchase amount > 1000) in PySpark?
data_cleaned_outliers = data_cleaned_missing.filter(
    (col("Age") <= 100) & (col("Purchase_Amount") <= 1000)
)

# Question: How do you convert the Gender column to a binary format (0 for Female, 1 for Male) in PySpark?
data_cleaned_gender = data_cleaned_outliers.withColumn(
    "Gender_Binary",
    when(col("Gender") == "Female", 0).otherwise(1)
)

# Question: How do you split the Customer_Name column into separate First_Name and Last_Name columns in PySpark?
data_cleaned = data_cleaned_gender.select(
    col("Customer_ID"),
    split(col("Customer_Name"), " ").getItem(0).alias("First_Name"),
    split(col("Customer_Name"), " ").getItem(1).alias("Last_Name"),
    col("Age"),
    col("Gender_Binary"),
    col("Purchase_Amount"),
    col("Purchase_Date")
)
# Question: How do you calculate the total purchase amount by Gender in PySpark?
total_purchase_by_gender = data_cleaned.groupBy("Gender_Binary") \
    .agg(spark_sum("Purchase_Amount").alias("Total_Purchase_Amount")) \
    .collect()
# Question: How do you calculate the average purchase amount by Age group in PySpark?
average_purchase_by_age_group = data_cleaned.withColumn(
    "Age_Group",
    when((col("Age") >= 18) & (col("Age") <= 30), "18-30")
    .when((col("Age") >= 31) & (col("Age") <= 40), "31-40")
    .when((col("Age") >= 41) & (col("Age") <= 50), "41-50")
    .when((col("Age") >= 51) & (col("Age") <= 60), "51-60")
    .otherwise("61-70")
).groupBy("Age_Group") \
    .agg(avg("Purchase_Amount").alias("Average_Purchase_Amount")) \
    .collect()
# Question: How do you print the results for total purchase amount by Gender and average purchase amount by Age group in PySpark?
print("====================== Results ======================")
print("Total purchase amount by Gender:")
for row in total_purchase_by_gender:
    print(f"Gender_Binary: {row['Gender_Binary']}, Total_Purchase_Amount: {row['Total_Purchase_Amount']}")

print("Average purchase amount by Age group:")
for row in average_purchase_by_age_group:
    print(f"Age_Group: {row['Age_Group']}, Average_Purchase_Amount: {row['Average_Purchase_Amount']}")
# Optional: Show DataFrame contents for verification
print("\n====================== Data Preview ======================")
print("Final cleaned data:")
data_cleaned.show(10)

6.3 Exercises

  1. What is the total purchase amount for each gender in the dataset? Use the data_cleaned_gender dataframe and Group the data by gender and calculate the sum of all purchase amounts.

  2. What is the average purchase amount for different age groups? Use the data_cleaned dataframe and create age group categories (18-30, 31-40, 41-50, 51-60, 61-70) and calculate the mean purchase amount for each group.