PySpark Practice

Introduction to Apache Spark | PySpark

This is intended to run on Google Colab

Install everything necessary to make spark work.

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

Set the paths to the installs

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

Find the spark installation

import findspark


Start doing fancy pyspark stuff

from pyspark.sql import DataFrame
from pyspark.sql.types import (
from pyspark.sql import DataFrame, SparkSession

import json

Request Orders.json from google drive via command line.

!wget -q --no-check-certificate '' -O 'Orders.json'

Or, do it the Python way.

# from google.colab import drive
# drive.mount('/content/drive')
# INPUT_FILE = '/content/drive/MyDrive/Colab Notebooks/Starbucks/1_basic_exercise/resources/Order.json'

Set input/output variables

INPUT_FILE = "/content/Orders.json"  # TODO: Change this based on actual location for your environment setup
OUTPUT_CSV_FILE = "./output/files/output.csv"
OUTPUT_DELTA_PATH = "./output/delta/"

JSON data summary:

Create spark session

spark = (
    .config("spark.jars.packages", "")
    .config("spark.sql.extensions", "")
    .config("spark.ui.port", "4050")
from traitlets.traitlets import default

def read_json(file_path: str, schema: StructType) -> DataFrame:
    The goal of this method is to parse the input json data using the schema from another method.

    We are only interested in data starting at orderPaid attribute.

    :param file_path: Order.json will be provided
    :param schema: schema that needs to be passed to this method
    :return: Dataframe containing records from Order.json
    # Only interested in data starting at orderPaid
    with open(file_path) as f:
        js = json.load(f)[0]["data"]["message"]["orderPaid"]

    # Create Dataframe
    #   - Use spark JSON method for reading object
    #   - Use parallelize on array object, which contains json structured data
    #   - Use custom schema so data type/structure is defined instead of inferred
    df =
        spark.sparkContext.parallelize([js]), schema=schema["order_paid_type"]
    return df

The schema outlined below represents a “one-to-many” relationship and is defined in a bottom-up fashion.

def get_struct_type() -> StructType:
    Build a schema based on the the file Order.json

    :return: Structype of equivalent JSON schema
    discount_type = StructType(
            StructField("amount", IntegerType(), True),
            StructField("description", StringType(), True),

    child_item_type = StructType(
            StructField("lineItemNumber", StringType(), True),
            StructField("itemLabel", StringType(), True),
            StructField("quantity", DoubleType(), True),
            StructField("price", IntegerType(), True),
                "discounts", ArrayType(discount_type), True
            ),  # Changed "TODO --> ArrayType(discount_type). Will inherit discout_type attributes.

    item_type = StructType(
            StructField("lineItemNumber", StringType(), True),
            StructField("itemLabel", StringType(), True),
            StructField("quantity", DoubleType(), True),
            StructField("price", IntegerType(), True),
                "discounts", ArrayType(discount_type), True
            ),  # Changed "TODO" --> ArrayType(discount_type). Will inherit discount_type attributes.
                "childItems", ArrayType(child_item_type), True
            ),  # Changed "TODO" --> ArrayType(child_item_type). Will inherit chile_item_type attributes.

    order_paid_type = StructType(
            StructField("orderToken", StringType(), True),
            StructField("preparation", StringType(), True),
                "items", ArrayType(item_type), True
            ),  # Changed "TODO" --> ArrayType(item_type). Will inherit item_type attributes.

    message_type = StructType(
        [StructField("orderPaid", order_paid_type, True)]
    )  # Changed "TODO" --> order_paid_type. Will inherit order_paid_type attributes.

    data_type = StructType(
        [StructField("message", message_type, True)]
    )  # Changed "TODO" --> message_type. Will inherit message_type attributes.

    body_type = StructType(
            StructField("id", StringType(), True),
            StructField("subject", StringType(), True),
                "data", data_type, True
            ),  # Changed "TODO" --> data_type. Will inherit data_type attributes.
            StructField("eventTime", StringType(), True),
    return {
        "body_type": body_type,
        "data_type": data_type,
        "message_type": message_type,
        "order_paid_type": order_paid_type,
        "item_type": item_type,
        "child_item_type": child_item_type,
        "discount_type": discount_type,
def get_rows_from_array(df: DataFrame) -> DataFrame:
    Input data frame contains columns of type array. Identify those columns and convert them to rows.

    :param df: Contains column with data type of type array.
    :return: The dataframe should not contain any columns of type array
    # explode will create a new row for each element in an array
    from pyspark.sql.functions import explode

    # Iterate over field names
    for i, f in enumerate(df.schema.fields):
        # Check datatype of field
        if isinstance(f.dataType, ArrayType):
            arrayCol =

    # Overwrite dataframe object
    # Create a new row for every element in "arrayCol".
    # Each new row will contain the same values from
    #   columns that are not "arrayCol".
    # Use "withColumn()" to transform dataframe.
    #   First argument - What column will be transformed (will overwrite because already exists)
    #   Second argument - Expression to create/modify values for the column
    df = df.withColumn(arrayCol, explode(arrayCol))
    return df
def get_unwrapped_nested_structure(df: DataFrame) -> DataFrame:
    Convert columns that contain multiple attributes to columns of their own

    :param df: Contains columns that have multiple attributes
    :return: Dataframe should not contain any nested structures

    def parse_struct(df):
        """Create an array of columns to be selected.
        If column type = StructType, select all attributes
        in that StructType as additional columns.
        Returns list of columns."""
        cols = []
        for i, d in enumerate(df.schema.fields):
            if isinstance(d.dataType, StructType):
        return cols

    df =

    # Check for columns of type Array.
    # If type Array, transform elements to rows
    arrayCols = [ for c in df.schema.fields if isinstance(c.dataType, ArrayType)]
    if len(arrayCols) > 0:
        for col in arrayCols:
            df = get_rows_from_array(df)

    # Could have multiple instances of key names
    # Will have to add columns manually
    # If unique names, could probably reuse "parse_struct()"
    df = df.withColumn("discountAmount", df.discounts.amount).withColumn(
        "discountDescription", df.discounts.description
    df = df.drop("discounts")

    df = (
        df.withColumn("childItemLineNumber", df.childItems.lineItemNumber)
        .withColumn("childItemLabel", df.childItems.itemLabel)
        .withColumn("childItemQuantity", df.childItems.quantity)
        .withColumn("childItemPrice", df.childItems.price)
        .withColumn("childItemDiscounts", df.childItems.discounts)
    df = df.drop("childItems")

    df = get_rows_from_array(df)
    df = df.withColumn(
        "childItemDiscountAmount", df.childItemDiscounts.amount
    ).withColumn("childItemDiscountDescription", df.childItemDiscounts.description)
    df = df.drop("childItemDiscounts")

    return df
def write_df_as_csv(df: DataFrame) -> None:
    Write the data frame to a local destination of your choice with headers

    :param df: Contains flattened order data
    df.write.format("csv").mode("overwrite").option("header", "true").save(
    return None
def create_delta_table(spark: SparkSession) -> None:

        OrderToken String,
        Preparation  String,
        ItemLineNumber String,
        ItemLabel String,
        ItemQuantity Double,
        ItemPrice Integer,
        ItemDiscountAmount Integer,
        ItemDiscountDescription String,
        ChildItemLineNumber String, 
        ChildItemLabel String,
        ChildItemQuantity Double,
        ChildItemPrice Integer,
        ChildItemDiscountAmount Integer,
        ChildItemDiscountDescription String
    LOCATION "{0}"

    return None

Haven’t used Delta before. Reference material:

def write_df_as_delta(df: DataFrame) -> None:
    Write the dataframe output to the table created, overwrite mode can be used

    :param df: flattened data
    :return: Data from the orders table
    # Rename columns to match delta table schema
    df = (
        df.withColumnRenamed("orderToken", "OrderToken")
        .withColumnRenamed("preparation", "Rreparation")
        .withColumnRenamed("lineItemNumber", "LineItemNumber")
        .withColumnRenamed("itemLabel", "ItemLabel")
        .withColumnRenamed("quantity", "Quantity")
        .withColumnRenamed("price", "Price")
        .withColumnRenamed("discountAmount", "DiscountAmount")
        .withColumnRenamed("discountDescription", "DiscountDescription")
        .withColumnRenamed("childItemLineNumber", "ChildItemLineNumber")
        .withColumnRenamed("childItemLabel", "ChildItemLabel")
        .withColumnRenamed("childItemQuantity", "ChildItemQuantity")
        .withColumnRenamed("childItemPrice", "ChildItemPrice")
        .withColumnRenamed("childItemDiscountAmount", "ChildItemDiscountAmount")
            "childItemDiscountDescription", "ChildItemDiscountDescription"

    df.write.insertInto("EXERCISE.ORDERS", overwrite=True)

    return None
def read_data_delta(spark: SparkSession) -> DataFrame:
    Read data from the table created

    :param spark:
    return spark.sql("select * from exercise.orders;")
if __name__ == "__main__":
    input_schema = get_struct_type()

    input_df = read_json(INPUT_FILE, input_schema)

    arrays_to_rows_df = get_rows_from_array(input_df)

    unwrap_struct_df = get_unwrapped_nested_structure(arrays_to_rows_df)



    result_df = read_data_delta(spark)
|OrderToken             |Preparation       |ItemLineNumber|ItemLabel|ItemQuantity|ItemPrice|ItemDiscountAmount|ItemDiscountDescription|ChildItemLineNumber|ChildItemLabel|ChildItemQuantity|ChildItemPrice|ChildItemDiscountAmount|ChildItemDiscountDescription|
|97331549875122744335422|Magic happens here|1             |COFFEE   |1.0         |345      |495               |Item 1, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 1, Discount 1    |
|97331549875122744335422|Magic happens here|2             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|3             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|4             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|5             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|6             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|7             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|8             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|9             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|10            |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |