def main()

in src/main/scala/com/aliyun/emr/example/spark/MongoDBWordCount.scala [29:83]


  def main(args: Array[String]): Unit = {
    if (args.length < 12) {
      System.err.println(
        """Usage: bin/spark-submit --class MongoDBWordCount examples-1.0-SNAPSHOT-shaded.jar <dbName> <dbUrl> <dbPort>
          |         <userName> <pwd> <collectionName> <sampleRatio> <writeConcern> <splitSize> <splitKey> <inputPath>
          |         <numPartitions>
          |
          |Arguments:
          |
          |    dbName          MongoDB database name.
          |    dbUrl           MongoDB database URL.
          |    dbPort          MongoDB database port.
          |    userName        MongoDB database user name.
          |    pwd             mongoDB database password.
          |    collectionName  MongoDB collection name.
          |    sampleRatio     MongoDB sample ratio.
          |    writeConcern    MongoDB write concern.
          |    splitSize       MongoDB split size.
          |    splitKey        MongoDB split key.
          |    inputPath       OSS input object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/a/b.txt
          |    numPartitions   RDD partition number.
          |
        """.stripMargin)
      System.exit(1)
    }

    val dbName = args(0)
    val dbUrl = args(1)
    val dbPort = args(2)
    val userName = args(3)
    val pwd = args(4)
    val collectionName = args(5)
    val sampleRatio = args(6).toFloat
    val writeConcern = args(7)
    val splitSize = args(8).toInt
    val splitKey = args(9)
    val inputPath = args(10)
    val numPartitions = args(11).toInt

    val sqlContext = new SQLContext(getSparkContext)

    val input = getSparkContext.textFile(inputPath, numPartitions)
    val counts = input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).map(e => Row.apply(e._1, e._2))
    lazy val schema = StructType(
        StructField("word", StringType) ::
        StructField("count", IntegerType) :: Nil)

    val hosts = dbUrl.split(",").map(e => s"$e:$dbPort").toList
    val df = sqlContext.createDataFrame(counts, schema)
    val saveConfig = MongodbConfigBuilder(Map(Host -> hosts, Database -> dbName,
        Collection -> collectionName, SamplingRatio -> sampleRatio, WriteConcern -> writeConcern,
        SplitSize -> splitSize, SplitKey -> splitKey,
        Credentials -> List(com.stratio.datasource.mongodb.config.MongodbCredentials(userName, dbName, pwd.toCharArray))))
    df.saveToMongodb(saveConfig.build())
  }