PySpark Practice

Introduction to Apache Spark | PySpark

This is intended to run on Google Colab

Install everything necessary to make spark work.

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

Set the paths to the installs

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

Find the spark installation

import findspark

findspark.init()

Start doing fancy pyspark stuff

import json

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StringType, StructField, StructType

Request Orders.json from google drive via command line.

!wget -q --no-check-certificate 'https://drive.google.com/uc?export=download&id=1I6VuRILNtyhnWMUml61Dv58YOP2dqlvx' -O 'Orders.json'

Or, do it the Python way.

# from google.colab import drive
# drive.mount('/content/drive')
# INPUT_FILE = '/content/drive/MyDrive/Colab Notebooks/Starbucks/1_basic_exercise/resources/Order.json'

Set input/output variables

INPUT_FILE = "/content/Orders.json"  # TODO: Change this based on actual location for your environment setup
OUTPUT_CSV_FILE = "./output/files/output.csv"
OUTPUT_DELTA_PATH = "./output/delta/"

JSON data summary:

Create spark session

spark = (
    SparkSession.builder.appName("programming")
    .master("local")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.ui.port", "4050")
    .getOrCreate()
)
def read_json(file_path: str, schema: StructType) -> DataFrame:
    """
    The goal of this method is to parse the input json data using the schema from another method.

    We are only interested in data starting at orderPaid attribute.

    :param file_path: Order.json will be provided
    :param schema: schema that needs to be passed to this method
    :return: Dataframe containing records from Order.json
    """
    # Only interested in data starting at orderPaid
    with open(file_path) as f:
        js = json.load(f)[0]["data"]["message"]["orderPaid"]

    # Create Dataframe
    #   - Use spark JSON method for reading object
    #   - Use parallelize on array object, which contains json structured data
    #   - Use custom schema so data type/structure is defined instead of inferred
    df = spark.read.json(spark.sparkContext.parallelize([js]), schema=schema["order_paid_type"])
    return df

The schema outlined below represents a “one-to-many” relationship and is defined in a bottom-up fashion.

def get_struct_type() -> StructType:
    """
    Build a schema based on the the file Order.json

    :return: Structype of equivalent JSON schema
    """
    discount_type = StructType(
        [StructField("amount", IntegerType(), True), StructField("description", StringType(), True)]
    )

    child_item_type = StructType(
        [
            StructField("lineItemNumber", StringType(), True),
            StructField("itemLabel", StringType(), True),
            StructField("quantity", DoubleType(), True),
            StructField("price", IntegerType(), True),
            StructField(
                "discounts", ArrayType(discount_type), True
            ),  # Changed "TODO --> ArrayType(discount_type). Will inherit discout_type attributes.
        ]
    )

    item_type = StructType(
        [
            StructField("lineItemNumber", StringType(), True),
            StructField("itemLabel", StringType(), True),
            StructField("quantity", DoubleType(), True),
            StructField("price", IntegerType(), True),
            StructField(
                "discounts", ArrayType(discount_type), True
            ),  # Changed "TODO" --> ArrayType(discount_type). Will inherit discount_type attributes.
            StructField(
                "childItems", ArrayType(child_item_type), True
            ),  # Changed "TODO" --> ArrayType(child_item_type). Will inherit chile_item_type attributes.
        ]
    )

    order_paid_type = StructType(
        [
            StructField("orderToken", StringType(), True),
            StructField("preparation", StringType(), True),
            StructField(
                "items", ArrayType(item_type), True
            ),  # Changed "TODO" --> ArrayType(item_type). Will inherit item_type attributes.
        ]
    )

    message_type = StructType(
        [StructField("orderPaid", order_paid_type, True)]
    )  # Changed "TODO" --> order_paid_type. Will inherit order_paid_type attributes.

    data_type = StructType(
        [StructField("message", message_type, True)]
    )  # Changed "TODO" --> message_type. Will inherit message_type attributes.

    body_type = StructType(
        [
            StructField("id", StringType(), True),
            StructField("subject", StringType(), True),
            StructField("data", data_type, True),  # Changed "TODO" --> data_type. Will inherit data_type attributes.
            StructField("eventTime", StringType(), True),
        ]
    )
    return {
        "body_type": body_type,
        "data_type": data_type,
        "message_type": message_type,
        "order_paid_type": order_paid_type,
        "item_type": item_type,
        "child_item_type": child_item_type,
        "discount_type": discount_type,
    }
def get_rows_from_array(df: DataFrame) -> DataFrame:
    """
    Input data frame contains columns of type array. Identify those columns and convert them to rows.

    :param df: Contains column with data type of type array.
    :return: The dataframe should not contain any columns of type array
    """
    # explode will create a new row for each element in an array
    from pyspark.sql.functions import explode

    # Iterate over field names
    for i, f in enumerate(df.schema.fields):
        # Check datatype of field
        if isinstance(f.dataType, ArrayType):
            arrayCol = f.name

    # Overwrite dataframe object
    # Create a new row for every element in "arrayCol".
    # Each new row will contain the same values from
    #   columns that are not "arrayCol".
    # Use "withColumn()" to transform dataframe.
    #   First argument - What column will be transformed (will overwrite because already exists)
    #   Second argument - Expression to create/modify values for the column
    df = df.withColumn(arrayCol, explode(arrayCol))
    return df
def get_unwrapped_nested_structure(df: DataFrame) -> DataFrame:
    """
    Convert columns that contain multiple attributes to columns of their own

    :param df: Contains columns that have multiple attributes
    :return: Dataframe should not contain any nested structures
    """

    def parse_struct(df):
        """Create an array of columns to be selected.
        If column type = StructType, select all attributes
        in that StructType as additional columns.
        Returns list of columns."""
        cols = []
        for i, d in enumerate(df.schema.fields):
            if isinstance(d.dataType, StructType):
                cols.append(f"{d.name}.*")
            else:
                cols.append(d.name)
        return cols

    df = df.select(parse_struct(df))

    # Check for columns of type Array.
    # If type Array, transform elements to rows
    arrayCols = [c.name for c in df.schema.fields if isinstance(c.dataType, ArrayType)]
    if len(arrayCols) > 0:
        for col in arrayCols:
            df = get_rows_from_array(df)

    # Could have multiple instances of key names
    # Will have to add columns manually
    # If unique names, could probably reuse "parse_struct()"
    df = df.withColumn("discountAmount", df.discounts.amount).withColumn(
        "discountDescription", df.discounts.description
    )
    df = df.drop("discounts")

    df = (
        df.withColumn("childItemLineNumber", df.childItems.lineItemNumber)
        .withColumn("childItemLabel", df.childItems.itemLabel)
        .withColumn("childItemQuantity", df.childItems.quantity)
        .withColumn("childItemPrice", df.childItems.price)
        .withColumn("childItemDiscounts", df.childItems.discounts)
    )
    df = df.drop("childItems")

    df = get_rows_from_array(df)
    df = df.withColumn("childItemDiscountAmount", df.childItemDiscounts.amount).withColumn(
        "childItemDiscountDescription", df.childItemDiscounts.description
    )
    df = df.drop("childItemDiscounts")

    return df
def write_df_as_csv(df: DataFrame) -> None:
    """
    Write the data frame to a local destination of your choice with headers

    :param df: Contains flattened order data
    """
    df.write.format("csv").mode("overwrite").option("header", "true").save(OUTPUT_CSV_FILE)
    return
def create_delta_table(spark: SparkSession) -> None:
    spark.sql("CREATE DATABASE IF NOT EXISTS EXERCISE")

    spark.sql(
        f"""
    CREATE TABLE IF NOT EXISTS EXERCISE.ORDERS(
        OrderToken String,
        Preparation  String,
        ItemLineNumber String,
        ItemLabel String,
        ItemQuantity Double,
        ItemPrice Integer,
        ItemDiscountAmount Integer,
        ItemDiscountDescription String,
        ChildItemLineNumber String, 
        ChildItemLabel String,
        ChildItemQuantity Double,
        ChildItemPrice Integer,
        ChildItemDiscountAmount Integer,
        ChildItemDiscountDescription String
    ) USING DELTA
    LOCATION "{OUTPUT_DELTA_PATH}"
    """
    )

    return

Haven’t used Delta before. Reference material: https://docs.microsoft.com/en-us/azure/databricks/delta/quick-start

def write_df_as_delta(df: DataFrame) -> None:
    """
    Write the dataframe output to the table created, overwrite mode can be used

    :param df: flattened data
    :return: Data from the orders table
    """
    # Rename columns to match delta table schema
    df = (
        df.withColumnRenamed("orderToken", "OrderToken")
        .withColumnRenamed("preparation", "Rreparation")
        .withColumnRenamed("lineItemNumber", "LineItemNumber")
        .withColumnRenamed("itemLabel", "ItemLabel")
        .withColumnRenamed("quantity", "Quantity")
        .withColumnRenamed("price", "Price")
        .withColumnRenamed("discountAmount", "DiscountAmount")
        .withColumnRenamed("discountDescription", "DiscountDescription")
        .withColumnRenamed("childItemLineNumber", "ChildItemLineNumber")
        .withColumnRenamed("childItemLabel", "ChildItemLabel")
        .withColumnRenamed("childItemQuantity", "ChildItemQuantity")
        .withColumnRenamed("childItemPrice", "ChildItemPrice")
        .withColumnRenamed("childItemDiscountAmount", "ChildItemDiscountAmount")
        .withColumnRenamed("childItemDiscountDescription", "ChildItemDiscountDescription")
    )

    df.write.insertInto("EXERCISE.ORDERS", overwrite=True)

    return
def read_data_delta(spark: SparkSession) -> DataFrame:
    """
    Read data from the table created

    :param spark:
    :return:
    """
    return spark.sql("select * from exercise.orders;")
if __name__ == "__main__":
    input_schema = get_struct_type()

    input_df = read_json(INPUT_FILE, input_schema)

    arrays_to_rows_df = get_rows_from_array(input_df)

    unwrap_struct_df = get_unwrapped_nested_structure(arrays_to_rows_df)

    write_df_as_csv(unwrap_struct_df)

    create_delta_table(spark)
    write_df_as_delta(unwrap_struct_df)

    result_df = read_data_delta(spark)
    result_df.show(truncate=False)
+-----------------------+------------------+--------------+---------+------------+---------+------------------+-----------------------+-------------------+--------------+-----------------+--------------+-----------------------+----------------------------+
|OrderToken             |Preparation       |ItemLineNumber|ItemLabel|ItemQuantity|ItemPrice|ItemDiscountAmount|ItemDiscountDescription|ChildItemLineNumber|ChildItemLabel|ChildItemQuantity|ChildItemPrice|ChildItemDiscountAmount|ChildItemDiscountDescription|
+-----------------------+------------------+--------------+---------+------------+---------+------------------+-----------------------+-------------------+--------------+-----------------+--------------+-----------------------+----------------------------+
|97331549875122744335422|Magic happens here|1             |COFFEE   |1.0         |345      |495               |Item 1, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 1, Discount 1    |
|97331549875122744335422|Magic happens here|2             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|3             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|4             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|5             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|6             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|7             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|8             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|9             |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
|97331549875122744335422|Magic happens here|10            |COFFEE   |2.0         |945      |295               |Item 2, Discount 1     |1                  |CREAM         |1.0              |0             |495                    |Child Item 2, Discount 1    |
+-----------------------+------------------+--------------+---------+------------+---------+------------------+-----------------------+-------------------+--------------+-----------------+--------------+-----------------------+----------------------------+