!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark
PySpark Practice
Introduction to Apache Spark | PySpark
This is intended to run on Google Colab
Install everything necessary to make spark work.
Set the paths to the installs
import os
"JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7" os.environ[
Find the spark installation
import findspark
findspark.init()
Start doing fancy pyspark stuff
import json
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StringType, StructField, StructType
Request Orders.json from google drive via command line.
!wget -q --no-check-certificate 'https://drive.google.com/uc?export=download&id=1I6VuRILNtyhnWMUml61Dv58YOP2dqlvx' -O 'Orders.json'
Or, do it the Python way.
# from google.colab import drive
# drive.mount('/content/drive')
# INPUT_FILE = '/content/drive/MyDrive/Colab Notebooks/Starbucks/1_basic_exercise/resources/Order.json'
Set input/output variables
= "/content/Orders.json" # TODO: Change this based on actual location for your environment setup
INPUT_FILE = "./output/files/output.csv"
OUTPUT_CSV_FILE = "./output/delta/" OUTPUT_DELTA_PATH
JSON data summary:
- Each list element is an event
- Events contain a message about an order
- Orders contain a list of items that contain attributes about the item
- Items can also contain a lists of items (childItems, discounts)
Create spark session
= (
spark "programming")
SparkSession.builder.appName("local")
.master("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.ui.port", "4050")
.config(
.getOrCreate() )
def read_json(file_path: str, schema: StructType) -> DataFrame:
"""
The goal of this method is to parse the input json data using the schema from another method.
We are only interested in data starting at orderPaid attribute.
:param file_path: Order.json will be provided
:param schema: schema that needs to be passed to this method
:return: Dataframe containing records from Order.json
"""
# Only interested in data starting at orderPaid
with open(file_path) as f:
= json.load(f)[0]["data"]["message"]["orderPaid"]
js
# Create Dataframe
# - Use spark JSON method for reading object
# - Use parallelize on array object, which contains json structured data
# - Use custom schema so data type/structure is defined instead of inferred
= spark.read.json(spark.sparkContext.parallelize([js]), schema=schema["order_paid_type"])
df return df
The schema outlined below represents a “one-to-many” relationship and is defined in a bottom-up fashion.
def get_struct_type() -> StructType:
"""
Build a schema based on the the file Order.json
:return: Structype of equivalent JSON schema
"""
= StructType(
discount_type "amount", IntegerType(), True), StructField("description", StringType(), True)]
[StructField(
)
= StructType(
child_item_type
["lineItemNumber", StringType(), True),
StructField("itemLabel", StringType(), True),
StructField("quantity", DoubleType(), True),
StructField("price", IntegerType(), True),
StructField(
StructField("discounts", ArrayType(discount_type), True
# Changed "TODO --> ArrayType(discount_type). Will inherit discout_type attributes.
),
]
)
= StructType(
item_type
["lineItemNumber", StringType(), True),
StructField("itemLabel", StringType(), True),
StructField("quantity", DoubleType(), True),
StructField("price", IntegerType(), True),
StructField(
StructField("discounts", ArrayType(discount_type), True
# Changed "TODO" --> ArrayType(discount_type). Will inherit discount_type attributes.
),
StructField("childItems", ArrayType(child_item_type), True
# Changed "TODO" --> ArrayType(child_item_type). Will inherit chile_item_type attributes.
),
]
)
= StructType(
order_paid_type
["orderToken", StringType(), True),
StructField("preparation", StringType(), True),
StructField(
StructField("items", ArrayType(item_type), True
# Changed "TODO" --> ArrayType(item_type). Will inherit item_type attributes.
),
]
)
= StructType(
message_type "orderPaid", order_paid_type, True)]
[StructField(# Changed "TODO" --> order_paid_type. Will inherit order_paid_type attributes.
)
= StructType(
data_type "message", message_type, True)]
[StructField(# Changed "TODO" --> message_type. Will inherit message_type attributes.
)
= StructType(
body_type
["id", StringType(), True),
StructField("subject", StringType(), True),
StructField("data", data_type, True), # Changed "TODO" --> data_type. Will inherit data_type attributes.
StructField("eventTime", StringType(), True),
StructField(
]
)return {
"body_type": body_type,
"data_type": data_type,
"message_type": message_type,
"order_paid_type": order_paid_type,
"item_type": item_type,
"child_item_type": child_item_type,
"discount_type": discount_type,
}
def get_rows_from_array(df: DataFrame) -> DataFrame:
"""
Input data frame contains columns of type array. Identify those columns and convert them to rows.
:param df: Contains column with data type of type array.
:return: The dataframe should not contain any columns of type array
"""
# explode will create a new row for each element in an array
from pyspark.sql.functions import explode
# Iterate over field names
for i, f in enumerate(df.schema.fields):
# Check datatype of field
if isinstance(f.dataType, ArrayType):
= f.name
arrayCol
# Overwrite dataframe object
# Create a new row for every element in "arrayCol".
# Each new row will contain the same values from
# columns that are not "arrayCol".
# Use "withColumn()" to transform dataframe.
# First argument - What column will be transformed (will overwrite because already exists)
# Second argument - Expression to create/modify values for the column
= df.withColumn(arrayCol, explode(arrayCol))
df return df
def get_unwrapped_nested_structure(df: DataFrame) -> DataFrame:
"""
Convert columns that contain multiple attributes to columns of their own
:param df: Contains columns that have multiple attributes
:return: Dataframe should not contain any nested structures
"""
def parse_struct(df):
"""Create an array of columns to be selected.
If column type = StructType, select all attributes
in that StructType as additional columns.
Returns list of columns."""
= []
cols for i, d in enumerate(df.schema.fields):
if isinstance(d.dataType, StructType):
f"{d.name}.*")
cols.append(else:
cols.append(d.name)return cols
= df.select(parse_struct(df))
df
# Check for columns of type Array.
# If type Array, transform elements to rows
= [c.name for c in df.schema.fields if isinstance(c.dataType, ArrayType)]
arrayCols if len(arrayCols) > 0:
for col in arrayCols:
= get_rows_from_array(df)
df
# Could have multiple instances of key names
# Will have to add columns manually
# If unique names, could probably reuse "parse_struct()"
= df.withColumn("discountAmount", df.discounts.amount).withColumn(
df "discountDescription", df.discounts.description
)= df.drop("discounts")
df
= (
df "childItemLineNumber", df.childItems.lineItemNumber)
df.withColumn("childItemLabel", df.childItems.itemLabel)
.withColumn("childItemQuantity", df.childItems.quantity)
.withColumn("childItemPrice", df.childItems.price)
.withColumn("childItemDiscounts", df.childItems.discounts)
.withColumn(
)= df.drop("childItems")
df
= get_rows_from_array(df)
df = df.withColumn("childItemDiscountAmount", df.childItemDiscounts.amount).withColumn(
df "childItemDiscountDescription", df.childItemDiscounts.description
)= df.drop("childItemDiscounts")
df
return df
def write_df_as_csv(df: DataFrame) -> None:
"""
Write the data frame to a local destination of your choice with headers
:param df: Contains flattened order data
"""
format("csv").mode("overwrite").option("header", "true").save(OUTPUT_CSV_FILE)
df.write.return
def create_delta_table(spark: SparkSession) -> None:
"CREATE DATABASE IF NOT EXISTS EXERCISE")
spark.sql(
spark.sql(f"""
CREATE TABLE IF NOT EXISTS EXERCISE.ORDERS(
OrderToken String,
Preparation String,
ItemLineNumber String,
ItemLabel String,
ItemQuantity Double,
ItemPrice Integer,
ItemDiscountAmount Integer,
ItemDiscountDescription String,
ChildItemLineNumber String,
ChildItemLabel String,
ChildItemQuantity Double,
ChildItemPrice Integer,
ChildItemDiscountAmount Integer,
ChildItemDiscountDescription String
) USING DELTA
LOCATION "{OUTPUT_DELTA_PATH}"
"""
)
return
Haven’t used Delta before. Reference material: https://docs.microsoft.com/en-us/azure/databricks/delta/quick-start
def write_df_as_delta(df: DataFrame) -> None:
"""
Write the dataframe output to the table created, overwrite mode can be used
:param df: flattened data
:return: Data from the orders table
"""
# Rename columns to match delta table schema
= (
df "orderToken", "OrderToken")
df.withColumnRenamed("preparation", "Rreparation")
.withColumnRenamed("lineItemNumber", "LineItemNumber")
.withColumnRenamed("itemLabel", "ItemLabel")
.withColumnRenamed("quantity", "Quantity")
.withColumnRenamed("price", "Price")
.withColumnRenamed("discountAmount", "DiscountAmount")
.withColumnRenamed("discountDescription", "DiscountDescription")
.withColumnRenamed("childItemLineNumber", "ChildItemLineNumber")
.withColumnRenamed("childItemLabel", "ChildItemLabel")
.withColumnRenamed("childItemQuantity", "ChildItemQuantity")
.withColumnRenamed("childItemPrice", "ChildItemPrice")
.withColumnRenamed("childItemDiscountAmount", "ChildItemDiscountAmount")
.withColumnRenamed("childItemDiscountDescription", "ChildItemDiscountDescription")
.withColumnRenamed(
)
"EXERCISE.ORDERS", overwrite=True)
df.write.insertInto(
return
def read_data_delta(spark: SparkSession) -> DataFrame:
"""
Read data from the table created
:param spark:
:return:
"""
return spark.sql("select * from exercise.orders;")
if __name__ == "__main__":
= get_struct_type()
input_schema
= read_json(INPUT_FILE, input_schema)
input_df
= get_rows_from_array(input_df)
arrays_to_rows_df
= get_unwrapped_nested_structure(arrays_to_rows_df)
unwrap_struct_df
write_df_as_csv(unwrap_struct_df)
create_delta_table(spark)
write_df_as_delta(unwrap_struct_df)
= read_data_delta(spark)
result_df =False) result_df.show(truncate
+-----------------------+------------------+--------------+---------+------------+---------+------------------+-----------------------+-------------------+--------------+-----------------+--------------+-----------------------+----------------------------+
|OrderToken |Preparation |ItemLineNumber|ItemLabel|ItemQuantity|ItemPrice|ItemDiscountAmount|ItemDiscountDescription|ChildItemLineNumber|ChildItemLabel|ChildItemQuantity|ChildItemPrice|ChildItemDiscountAmount|ChildItemDiscountDescription|
+-----------------------+------------------+--------------+---------+------------+---------+------------------+-----------------------+-------------------+--------------+-----------------+--------------+-----------------------+----------------------------+
|97331549875122744335422|Magic happens here|1 |COFFEE |1.0 |345 |495 |Item 1, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 1, Discount 1 |
|97331549875122744335422|Magic happens here|2 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
|97331549875122744335422|Magic happens here|3 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
|97331549875122744335422|Magic happens here|4 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
|97331549875122744335422|Magic happens here|5 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
|97331549875122744335422|Magic happens here|6 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
|97331549875122744335422|Magic happens here|7 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
|97331549875122744335422|Magic happens here|8 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
|97331549875122744335422|Magic happens here|9 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
|97331549875122744335422|Magic happens here|10 |COFFEE |2.0 |945 |295 |Item 2, Discount 1 |1 |CREAM |1.0 |0 |495 |Child Item 2, Discount 1 |
+-----------------------+------------------+--------------+---------+------------+---------+------------------+-----------------------+-------------------+--------------+-----------------+--------------+-----------------------+----------------------------+