mongo-spark

Example application on how to use mongo-hadoop connector with Spark

Github stars Tracking Chart

mongo-spark

Example application on how to use mongo-hadoop connector with Apache Spark.

Prerequisites

  • MongoDB installed and running on local or remote machine
  • Scala 2.10 and SBT installed
  • Sbt assemly plugin installed
  • Apache spark 1.4 installed. (Also should work with 1.0 + version, just adjusb build.sbt )

How to

First of all thanks to original repository and blog post.
This example provides instructions on how to run you application on stadalone spark cluster running on ec2.

  1. Because by default ec2 scipt installs standalone spark cluster with hadoop version 2.4 you should build you mongo-hadoop connector against this version of hadoop (here the prebuilded version is located at lib/). Please check mongo-hadoop-conector repository to learn how to build mongo-hadoop connector against your version of hadoop.
  2. The strightforward way to run your application is to make "fat" jar using assembly sbt plugin:
    sbt assembly
    and pass it to submit script. Please review the build.sbt file and check mergeStrategy and libraryDependencies.
  3. Copy your "fat jar" ( mongo-spark-assembly-1.0.jar ) to spark master:
    scp mongo-spark-assembly-1.0.jar root@hostSparkMaster:~/jobs/

Read and write from mongodb / bson / serialized files

Based on my experience most of the time you have to read or write whole collection. And preferably from bson dump or serialized RDD[BSONObject]. For these tasks I wrote simple methods - readBsonRDD and writeBsonRDD.

// create spark context
val sparkConf = new SparkConf()
      .set("spark.executor.memory", "12g")
      .set("spark.storage.memoryFraction", "0.1")
      .set("spark.akka.frameSize", "16")
val sc = new SparkContext(sparkConf)
val config = new Configuration()
// read from mongodb localted at localhost from db DB and collection COLLECTION
// mongoRDD result will be  RDD[BSONObject]
val mongoRDD = readBsonRDD("mongodb://127.0.0.1:27017/DB.COLLECTION", sc, "mongodb")
val mongoRDD = readBsonRDD("s3n://scorr-spark/normalized_profiles_20150324", sc, "bson")
val mongoRDD = readBsonRDD("s3n://scorr-spark/normalized_profiles_spark_20150325", sc, "serialized_file")
// write to  diffrent sources
// for serialized file save only BSONObject
saveBsonRDD[BSONObject](mongoRDD, path = "s3n://scorr-spark/normalized_profiles_serialized", format = "serialized_file")
// for bson or mongodb db we HAVE TO SAVE PAIR: (Object, org.bson.BSONObject)
saveBsonRDD[(Object, org.bson.BSONObject)](mongoRDD.map(x => (null, x)), path = "s3n://scorr-spark/normalized_profiles_bson", format = "bson")
saveBsonRDD[(Object, org.bson.BSONObject)](mongoRDD.map(x => (null, x)), path = "mongodb://127.0.0.1:27017/DB.COLLECTION", format = "mongodb")

#stop spark context 
sc.stop()

Running

Generate sample data (or use existing), run ScalaWordCount and print the results. For example you can add some data using mongo shell

for (var i = 1; i <= 25; i++) db.INPUT_COLLECTION.insert( { TEXT_FIELD_NAME_TO_COUNT_WORDS : "bla blaa blaaa" } )

  1. Run your job on spark master:
    ~/spark/bin/spark-submit --class ScalaWordCount --master spark://hostSparkMaster:7077 ~/jobs/mongo-spark-assembly-1.0.jar <mongo-host:port> <DB_NAME.INPUT_COLLECTION> <DB_NAME.OUTPUT_COLLECTION> <TEXT_FIELD_NAME_TO_COUNT_WORDS>

  2. check result in DB_NAME.OUTPUT_COLLECTION

Main metrics

Overview
Name With Ownerdselivanov/mongo-spark
Primary LanguageScala
Program languageScala (Language Count: 1)
Platform
License:
所有者活动
Created At2014-06-20 20:48:49
Pushed At2015-06-20 21:02:21
Last Commit At2015-06-20 21:02:15
Release Count0
用户参与
Stargazers Count6
Watchers Count2
Fork Count3
Commits Count16
Has Issues Enabled
Issues Count0
Issue Open Count0
Pull Requests Count0
Pull Requests Open Count0
Pull Requests Close Count0
项目设置
Has Wiki Enabled
Is Archived
Is Fork
Is Locked
Is Mirror
Is Private