def main()

in src/main/scala/com/aliyun/emr/example/spark/SparkRdsDemo.scala [23:82]


  def main(args: Array[String]): Unit = {
    if (args.length < 8) {
      System.err.println(
        """Usage: spark-submit --class SparkRdsDemo examples-1.0-SNAPSHOT-shaded.jar <dbName> <tbName> <dbUser>
          |       <dbPwd> <dbUrl> <dbPort> <inputPath> <numPartitions>
          |
          |Arguments:
          |
          |    dbName        RDS database name.
          |    tbName        RDS table name.
          |    dbUser        RDS database user name.
          |    dbPwd         RDS database password.
          |    dbUrl         RDS database URL.
          |    dbPort        RDS database port
          |    inputPath     OSS input object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/a/b.txt
          |    numPartitions
          |
        """.stripMargin)
      System.exit(1)
    }
    val dbName = args(0)
    val tbName = args(1)
    val dbUser = args(2)
    val dbPwd = args(3)
    val dbUrl = args(4)
    val dbPort = args(5)
    val inputPath = args(6)
    val numPartitions = args(7).toInt

    val input = getSparkContext.textFile(inputPath, numPartitions)
    input.collect().foreach(println)
    input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
      .mapPartitions(e => {
        var conn: Connection = null
        var ps: PreparedStatement = null
        val sql = s"insert into $tbName(word, count) values (?, ?)"
        try {
          conn = DriverManager.getConnection(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", dbUser, dbPwd)
          ps = conn.prepareStatement(sql)
          e.foreach(pair => {
            ps.setString(1, pair._1)
            ps.setLong(2, pair._2)
            ps.executeUpdate()
          })

          ps.close()
          conn.close()
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (ps != null) {
            ps.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      Iterator.empty
    }).count()
  }