Spark With Cosmos DB Connector

Real Time Analytics Using Spark With Cosmos DB Connector

How can you integrate Spark & Cosmos DB?

This blog helps you understand how Spark and Cosmos DB can be integrated allowing Spark to fully take advantage of Cosmos DB to run real-time analytics directly on petabytes of operational data!

High-Level Architecture

Spark Cosmos DB Connector
With the Spark Connector for Azure Cosmos DB, data is run in parallel with the Spark worker nodes and Azure Cosmos DB data partitions. Whether your data is stored in tables, graphs, or documents, you will achieve performance, scalability, throughput, and consistency all backed by Azure Cosmos DB

Accelerated connectivity between Apache Spark and Azure Cosmos DB increases your capacity to solve your rapidly moving data science challenges where information can be quickly maintained and retrieved using Azure Cosmos DB.

We can use the Cosmos DB Spark connector to operate Spark jobs with information stored in Azure Cosmos DB.

For cosmos DB, three kinds of spark connectors are accessible

  • SQL API Spark connector
  • MongoDB Spark connector
  • Cassandra Spark connector

Let’s develop Cosmos DB service using SQL API and query the data in our existing Azure Databricks Spark cluster using Scala notebook using Azure Cosmos DB Spark Connector.

Prerequisites:

Before proceeding further, ensure you have the following resources:

  • Active Azure account.
  • Azure Cosmos DB Account
  • Pre-configured Azure Databricks Cluster

Step 1: Cosmos DB creation

We must build a Cosmos Account in the first phase. We can build various databases under the Cosmos account. Each database includes a collection in which JSON formatted files are available. This database is similar to our RDBMS database; container is like a table, and item is like a row.

Create Database named Items and create Container Container1 in newly created database. We will be using these values in our script.

Cosmos DB creation
Step 2: Data Preparation

Data set: Please Yellow Taxi Trip Data from https://data.cityofnewyork.us/api/views/biws-g3hs/rows.csv?accessType=DOWNLOAD.

We will use a data migration tool to migrate data to Azure Cosmos DB. Download a pre-compiled binary from https://aka.ms/csdmtool, then run either of the following.

  • exe: Graphical interface version of the tool
  • exe: Command-line version of the tool

It is UI based tool, where we need to specify source and destination details. Once done, our data will be available on cosmos db.

Step 3: Azure Databricks Cluster Setup

Follow the steps at Azure Databricks to start setting up an Azure Databricks workspace and cluster – https://docs.azuredatabricks.net/getting-started/try-databricks.html

Step 4: Import the Cosmos DB connector library

Step 5: Using Databricks notebooks

We have created notebook with Scala language. Azure Databricks supports Python, R and SQL also.

1. Import the required libraries to our notebook using the following command:

import org.joda.time._

import org.joda.time.format._

import com.microsoft.azure.cosmosdb.spark.schema._

import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark

import com.microsoft.azure.cosmosdb.spark.config.Config

import org.apache.spark.sql.functions._

2. We can create Cosmos DB configuration in our notebook

Val configMap = Map(

“Endpoint” -> {URI of the Azure Cosmos DB account},

“Masterkey”->{Key used to access the account},

“Database” ->{Database name},

“Collection”->{Collection name})

val config = Config(configMap)

  • Dataframe Creation : Let’s define a data frame variable df and read our current setup to read the Cosmos DB information.

val df = spark.sqlContext.read.cosmosDB(config)

3. Let’s create view from dataframe which we can use as table in Spark SQL

df.createOrReplaceTempView(“container1”)

4. Once you have connected to the table, you can create a Spark Data Frame (in the preceding example, this would be container1).

For example, below command will print the schema for our data frame.

container1.printSchema()

5. Run Spark SQL query against your Azure Cosmos DB table

val sql1 = spark.sql(“””            SELECT ( 3959 * Acos(Cos(Radians(42.290763)) * Cos(Radians(PICKUP_LATITUDE)) *
Cos(Radians(PICKUP_LONGITUDE)
– Radians(
-71.35368)) +
Sin(Radians(42.290763)) *
Sin(Radians(PICKUP_LATITUDE))) ) AS
Distance
FROM   CONTAINER1
“””)

sql1.show()

And voila! You’re all set. You have enabled Spark to make the most of Cosmos DB and run real-time analytics using operational data.

Reach out to us at Nitor Infotech to learn more about how easy it is to integrate Spark with Cosmos DB and how it can be extremely advantageous for you to do so.

About Pallavi Mind

Senior Lead Engineer

  • BigData
  • Business Intelligence
  • Cloud
Pallavi has been working as a Senior Lead engineer at Nitor Infotech with 8 years’ of experience in the IT domain. She loves to play with data to capture different perspectives using latest technologies. She has exposure to various Big data components along with Multiple BI tools and technologies.