Data Processing Apache Spark

Data Processing with Apache Spark

Spark has emerged as a favorite for analytics, especially those that can handle massive volumes of data as well as provide high performance compared to any other conventional database engines. Spark SQL allows users to formulate their complex business requirements to Spark by using the familiar language of SQL.

So, in this blog, we will see how you can process data with Apache Spark and what better way to establish the capabilities of Spark than to put it through its paces and use the Hadoop-DS benchmark to compare performance, throughput, and SQL compatibility against SQL Server.

Before we begin, ensure that the following test environment is available:

SQL Server: 32 GB RAM with Windows server 2012 R2

Hadoop Cluster: 2 machines with 8GB RAM Ubuntu flavor

Sample Data:

For the purpose of this demo, we will use AdventureWorks2016DW data.

Following table is used in query with no of records:

Table Name No. Of Records
FactInternetSales 60458398
dimProduct 606
DimProductSubcategory 37
DimProductCategory 4
Dimcustomer 18484

 

We will compare performance of three data processing engines, which are SQL Server, Spark with CSV files as datafiles and Spark with Parquet files as datafiles.

 

Query:

We will use the following query to process data:

select pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName, sum(SalesAmount)

from FactInternetSales f

inner join dimProduct p on f.productkey = p.productkey

inner join DimProductSubcategory ps on p.ProductSubcategoryKey = ps.ProductSubcategoryKey

inner join DimProductCategory pc on pc.ProductCategoryKey = ps.ProductCategoryKey

inner join dimcustomer c on c.customerkey = f.customerkey

group by pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName

Let’s measure the performance of each processing engine:

While running query in SQL Server with the 32GB RAM Microsoft 2012 Server, it takes around 2.33 mins to execute and return the data.

Following is the screenshot for the same:

  • Spark with CSV data files:

Now let’s export the same dataset to CSV and move it to HDFS.

Following is the screenshot of HDFS with the CSV file as an input source.

Now that we have the files for the specific input tables moved to HDFS as CSV files, we can start with Spark Shell and create DataFrames for each source file.

Run Following commands for creating SQL Context:

 

import org.apache.spark.sql.types._

import org.apache.spark.sql.{Row, SQLContext}

val sqlContext = new SQLContext(sc)

Run following command to create Fact Schema :

val factSchema = StructType(Array(

StructField(“ProductKey”, IntegerType, true),

StructField(“OrderDateKey”, IntegerType, true),

StructField(“DueDateKey”, IntegerType, true),

StructField(“ShipDateKey”, IntegerType, true),

StructField(“CustomerKey”, IntegerType, true),

StructField(“PromotionKey”, IntegerType, true),

StructField(“CurrencyKey”, IntegerType, true),

StructField(“SalesTerritoryKey”, IntegerType, true),

StructField(“SalesOrderNumber”, StringType, true),

StructField(“SalesOrderLineNumber”, IntegerType, true),

StructField(“RevisionNumber”, IntegerType, true),

StructField(“OrderQuantity”, IntegerType, true),

StructField(“UnitPrice”, DoubleType, true),

StructField(“ExtendedAmount”, DoubleType, true),

StructField(“UnitPriceDiscountPct”, DoubleType, true),

StructField(“DiscountAmount”, DoubleType, true),

StructField(“ProductStandardCost”, DoubleType, true),

StructField(“TotalProductCost”, DoubleType, true),

StructField(“SalesAmount”, DoubleType, true),

StructField(“TaxAmt”, DoubleType, true),

StructField(“Freight”, DoubleType, true),

StructField(“CarrierTrackingNumber”, StringType, true),

StructField(“CustomerPONumber”, StringType, true),

StructField(“OrderDate”, TimestampType, true),

StructField(“DueDate”, TimestampType, true),

StructField(“ShipDate”, TimestampType, true)

));

Run following command to create a DataFrame for Sales with Fact Schema:

val salesCSV = sqlContext.read.format(“csv”)

.option(“header”, “false”)

.schema(factSchema)

.load(“/data/FactSalesNew/part-m-00000”)

Run following command to create Customer schema:

val customerSchema = StructType(Array(

StructField(“CustomerKey”, IntegerType, true),

StructField(“GeographyKey”, IntegerType, true),

StructField(“CustomerAlternateKey”, StringType, true),

StructField(“Title”, StringType, true),

StructField(“FirstName”, StringType, true),

StructField(“MiddleName”, StringType, true),

StructField(“LastName”, StringType, true),

StructField(“NameStyle”, BooleanType, true),

StructField(“BirthDate”, TimestampType, true),

StructField(“MaritalStatus”, StringType, true),

StructField(“Suffix”, StringType, true),

StructField(“Gender”, StringType, true),

StructField(“EmailAddress”, StringType, true),

StructField(“YearlyIncome”, DoubleType, true),

StructField(“TotalChildren”, IntegerType, true),

StructField(“NumberChildrenAtHome”, IntegerType, true),

StructField(“EnglishEducation”, StringType, true),

StructField(“SpanishEducation”, StringType, true),

StructField(“FrenchEducation”, StringType, true),

StructField(“EnglishOccupation”, StringType, true),

StructField(“SpanishOccupation”, StringType, true),

StructField(“FrenchOccupation”, StringType, true),

StructField(“HouseOwnerFlag”, StringType, true),

StructField(“NumberCarsOwned”, IntegerType, true),

StructField(“AddressLine1”, StringType, true),

StructField(“AddressLine2”, StringType, true),

StructField(“Phone”, StringType, true),

StructField(“DateFirstPurchase”, TimestampType, true),

StructField(“CommuteDistance”, StringType, true)

));

Run following command to create Customer a dataframe with Customer Schema.

 

val customer = sqlContext.read.format(“csv”)

.option(“header”, “false”)

.schema(customerSchema)

.load(“/data/dimCustomer/part-m-00000”)

Now create product schema with the following command:

val productSchema = StructType(Array(

StructField(“ProductKey”, IntegerType, true),

StructField(“ProductAlternateKey”, StringType, true),

StructField(“ProductSubcategoryKey”, IntegerType, true),

StructField(“WeightUnitMeasureCode”, StringType, true),

StructField(“SizeUnitMeasureCode”, StringType, true),

StructField(“EnglishProductName”, StringType, true),

StructField(“SpanishProductName”, StringType, true),

StructField(“FrenchProductName”, StringType, true),

StructField(“StandardCost”, DoubleType, true),

StructField(“FinishedGoodsFlag”, BooleanType, true),

StructField(“Color”, StringType, true),

StructField(“SafetyStockLevel”, IntegerType, true),

StructField(“ReorderPoint”, IntegerType, true),

StructField(“ListPrice”, DoubleType, true),

StructField(“Size”, StringType, true),

StructField(“SizeRange”, StringType, true),

StructField(“Weight”, DoubleType, true),

StructField(“DaysToManufacture”, IntegerType, true),

StructField(“ProductLine”, StringType, true),

StructField(“DealerPrice”, DoubleType, true),

StructField(“Class”, StringType, true),

StructField(“Style”, StringType, true),

StructField(“ModelName”, StringType, true),

StructField(“LargePhoto”, StringType, true),

StructField(“EnglishDescription”, StringType, true),

StructField(“FrenchDescription”, StringType, true),

StructField(“ChineseDescription”, StringType, true),

StructField(“ArabicDescription”, StringType, true),

StructField(“HebrewDescription”, StringType, true),

StructField(“ThaiDescription”, StringType, true),

StructField(“GermanDescription”, StringType, true),

StructField(“JapaneseDescription”, StringType, true),

StructField(“TurkishDescription”, StringType, true),

StructField(“StartDate”, TimestampType, true),

StructField(“EndDate”, TimestampType, true),

StructField(“Status”, StringType, true)

))

Create product data frame with Product schema.

val product = sqlContext.read.format(“csv”)

.option(“header”, “false”)

.schema(productSchema)

.load(“/data/dimProduct/part-m-00000”)

Now create Product Category schema using following command:

val productCategotySchema = StructType(Array(

StructField(“ProductCategoryKey”, IntegerType, true),

StructField(“ProductCategoryAlternateKey”, IntegerType, true),

StructField(“EnglishProductCategoryName”, StringType, true),

StructField(“SpanishProductCategoryName”, StringType, true),

StructField(“FrenchProductCategoryName”, StringType, true)

))

Now create Product Category Data frame with ProductCategory Schema:

val productCategory = sqlContext.read.format(“csv”)

.option(“header”, “false”)

.schema(productCategotySchema)

.load(“/data/dimProductCategory/part-m-00000”)

Now create Product Sub Category schema using following command:

val productSubCategotySchema = StructType(Array(

StructField(“ProductSubcategoryKey”, IntegerType, true),

StructField(“ProductSubcategoryAlternateKey”, IntegerType, true),

StructField(“EnglishProductSubcategoryName”, StringType, true),

StructField(“SpanishProductSubcategoryName”, StringType, true),

StructField(“FrenchProductSubcategoryName”, StringType, true),

StructField(“ProductCategoryKey”, IntegerType, true)

))

And create productsubcategory data frame using below command:

val productSubCategory = sqlContext.read.format(“csv”)

.option(“header”, “false”)

.schema(productSubCategotySchema)

.load(“/data/dimProductSubCategory/part-m-00000”)

Now create temporary views of each data frame that we have created so far:

sales.createOrReplaceTempView(“salesV”)

customer.createOrReplaceTempView(“customerV”)

product.createOrReplaceTempView(“productV”)

productCategory.createOrReplaceTempView(“productCategoryV”)

productSubCategory.createOrReplaceTempView(“productSubCategoryV”)

 

And Run the same query which we ran in SQL Server:

Val df_1=spark.sql(“””select pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName, sum(SalesAmount)

from salesV f

inner join productV p on f.productkey = p.productkey

inner join productSubCategoryV ps on p.ProductSubcategoryKey = ps.ProductSubcategoryKey

inner join productCategoryV pc on pc.ProductCategoryKey = ps.ProductCategoryKey

inner join customerV c on c.customerkey = f.customerkey

group by pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName “””)

df_1.show()

It took around 3 mins to execute the result set.

  • Spark with Parquet file for Fact Table:

Now, let’s convert FactInternetSaleNew file to parquet file and save to hdfs using the following command:

salesCSV.write.format(“parquet”).save(“sales_parquet”)

Create dataframe on top of Parquet file using below command:

val sales = sqlContext.read.parquet(“/user/nituser/sales.parquet”)

And create temp view using sales data frame:

sales.createOrReplaceTempView(“salesV”)

 

Now, we will run the same query which we used in step 2:

val df_1=spark.sql(“””select pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName, sum(SalesAmount)

from salesV f

inner join productV p on f.productkey = p.productkey

inner join productSubCategoryV ps on p.ProductSubcategoryKey = ps.ProductSubcategoryKey

inner join productCategoryV pc on pc.ProductCategoryKey = ps.ProductCategoryKey

inner join customerV c on c.customerkey = f.customerkey

group by pc.EnglishProductCategoryName, ps.EnglishProductSubcategoryName “””)

 

It will return the same result set in less than 20 secs.

We can conclude by stating that Spark with commodity hardware performs very similar to the high-end server of SQL Server. However, Spark outshines other engines when it deals with column-oriented efficient and compressed storage format.

So, we need to decide the specifications for the processing engine and storage based on business requirements, while also understanding how we can boost the power of such a highly efficient processing engine and get the required performance.

Reach out to us at Nitor Infotech know more about Apache Spark and how you can utilize it to accelerate your business and make advanced analytics more innovative.

About Rushik Shah

Associate Architect

  • Big Data
  • Analytics
  • Cloud
Rushik is Data Architect at Nitor Infotech with strong experience in Data Engineering space. He has very good exposure with Big Data Hadoop tech stack, different RDBMS & ETL tools. He has also worked with cloud technologies, majorly with hands on experience on AWS Redshift, AWS Glue, AWS EMR, Azure HDInsight, Azure Data Factory etc. He likes to learn new technologies and eager to solve complex business problems.