mongo-spark

Example application on how to use mongo-hadoop connector with Spark

  • 所有者: dselivanov/mongo-spark
  • 平台:
  • 許可證:
  • 分類:
  • 主題:
  • 喜歡:
    0
      比較:

Github星跟蹤圖

mongo-spark

Example application on how to use mongo-hadoop connector with Apache Spark.

Prerequisites

  • MongoDB installed and running on local or remote machine
  • Scala 2.10 and SBT installed
  • Sbt assemly plugin installed
  • Apache spark 1.4 installed. (Also should work with 1.0 + version, just adjusb build.sbt )

How to

First of all thanks to original repository and blog post.
This example provides instructions on how to run you application on stadalone spark cluster running on ec2.

  1. Because by default ec2 scipt installs standalone spark cluster with hadoop version 2.4 you should build you mongo-hadoop connector against this version of hadoop (here the prebuilded version is located at lib/). Please check mongo-hadoop-conector repository to learn how to build mongo-hadoop connector against your version of hadoop.
  2. The strightforward way to run your application is to make "fat" jar using assembly sbt plugin:
    sbt assembly
    and pass it to submit script. Please review the build.sbt file and check mergeStrategy and libraryDependencies.
  3. Copy your "fat jar" ( mongo-spark-assembly-1.0.jar ) to spark master:
    scp mongo-spark-assembly-1.0.jar root@hostSparkMaster:~/jobs/

Read and write from mongodb / bson / serialized files

Based on my experience most of the time you have to read or write whole collection. And preferably from bson dump or serialized RDD[BSONObject]. For these tasks I wrote simple methods - readBsonRDD and writeBsonRDD.

// create spark context
val sparkConf = new SparkConf()
      .set("spark.executor.memory", "12g")
      .set("spark.storage.memoryFraction", "0.1")
      .set("spark.akka.frameSize", "16")
val sc = new SparkContext(sparkConf)
val config = new Configuration()
// read from mongodb localted at localhost from db DB and collection COLLECTION
// mongoRDD result will be  RDD[BSONObject]
val mongoRDD = readBsonRDD("mongodb://127.0.0.1:27017/DB.COLLECTION", sc, "mongodb")
val mongoRDD = readBsonRDD("s3n://scorr-spark/normalized_profiles_20150324", sc, "bson")
val mongoRDD = readBsonRDD("s3n://scorr-spark/normalized_profiles_spark_20150325", sc, "serialized_file")
// write to  diffrent sources
// for serialized file save only BSONObject
saveBsonRDD[BSONObject](mongoRDD, path = "s3n://scorr-spark/normalized_profiles_serialized", format = "serialized_file")
// for bson or mongodb db we HAVE TO SAVE PAIR: (Object, org.bson.BSONObject)
saveBsonRDD[(Object, org.bson.BSONObject)](mongoRDD.map(x => (null, x)), path = "s3n://scorr-spark/normalized_profiles_bson", format = "bson")
saveBsonRDD[(Object, org.bson.BSONObject)](mongoRDD.map(x => (null, x)), path = "mongodb://127.0.0.1:27017/DB.COLLECTION", format = "mongodb")

#stop spark context 
sc.stop()

Running

Generate sample data (or use existing), run ScalaWordCount and print the results. For example you can add some data using mongo shell

for (var i = 1; i <= 25; i++) db.INPUT_COLLECTION.insert( { TEXT_FIELD_NAME_TO_COUNT_WORDS : "bla blaa blaaa" } )

  1. Run your job on spark master:
    ~/spark/bin/spark-submit --class ScalaWordCount --master spark://hostSparkMaster:7077 ~/jobs/mongo-spark-assembly-1.0.jar <mongo-host:port> <DB_NAME.INPUT_COLLECTION> <DB_NAME.OUTPUT_COLLECTION> <TEXT_FIELD_NAME_TO_COUNT_WORDS>

  2. check result in DB_NAME.OUTPUT_COLLECTION

主要指標

概覽
名稱與所有者dselivanov/mongo-spark
主編程語言Scala
編程語言Scala (語言數: 1)
平台
許可證
所有者活动
創建於2014-06-20 12:48:49
推送於2015-06-20 13:02:21
最后一次提交2015-06-20 16:02:15
發布數0
用户参与
星數6
關注者數2
派生數3
提交數16
已啟用問題?
問題數0
打開的問題數0
拉請求數0
打開的拉請求數0
關閉的拉請求數0
项目设置
已啟用Wiki?
已存檔?
是復刻?
已鎖定?
是鏡像?
是私有?