def main()

in spark-process/src/main/scala/com/optimize/downstream/process/ProcessFilesFromS3AndConvertToParquet.scala [16:68]


  def main(args: Array[String]) =
  {
    if(args.length != 3)
    {
      println("Requires 3 parameters")
      println("Usage: <sourceBucket> <s3InputLocation> <s3OutputLocation>")
      System.exit(-1)
    }
    val s3BucketName = args(0)
    val s3InputLocation = args(1)
    val s3OutputLocation = args(2)

    //def s3Client = new AmazonS3Client(new BasicAWSCredentials(accesskeyID, secretAccessKey))
    def s3Client = new AmazonS3Client()

    val spark = SparkSession
      .builder()
      .appName("AWS-Small-Blogs-Job")
      .getOrCreate()

    val request = new ListObjectsRequest()
    request.setBucketName(s3BucketName)
    request.setPrefix(s3InputLocation) //Get the prefix part only
    request.setMaxKeys(pageLength)


    var objs= new ObjectListing()
    objs = s3Client.listObjects(request)
    val s3ObjectKeys = objs.getObjectSummaries.map(x => x.getKey).toList
    println("Printing the keys")
    s3ObjectKeys.foreach { println }

    val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
    { key => Source.fromInputStream(new GZIPInputStream(s3Client.getObject(s3BucketName, key).getObjectContent: InputStream)).getLines }

    var finalDF = spark.read.json(allLinesRDD).toDF()

    while(objs.isTruncated())
    {
      objs = s3Client.listNextBatchOfObjects(objs)
      val s3ObjectKeys = objs.getObjectSummaries.map(x => x.getKey).toList
      //println("Printing the keys")
      s3ObjectKeys.foreach { println }
      val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
      { key => Source.fromInputStream(new GZIPInputStream(s3Client.getObject(s3BucketName, key).getObjectContent: InputStream)).getLines }

      val allLines = spark.read.json(allLinesRDD).toDF()
      finalDF = finalDF.union(allLines)
    }
    finalDF.write
      .mode("append")
      .parquet("s3://" + s3BucketName + "/" + s3OutputLocation)
  }