def exportObject()

in src/main/scala/com/gu/zuora/fullexport/Program.scala [28:62]


  def exportObject(obj: ZuoraObject, beginningOfTime: String): String = {
    val ZuoraObject(objectName, fields) = obj
    if (file"$scratchDir/$objectName.success".exists) {
      s"$objectName full export has already successfully competed. Skipping!"
    } else {
      val bookmark = readBookmark(objectName, beginningOfTime) tap { bookmark => logger.info(s"Resume $objectName from $bookmark") }
      val chunkRange = fromBookmarkUntilNowByMonth(bookmark)
      val totalChunks = chunkRange.length
      chunkRange foreach { step =>
        val start = bookmark.plusMonths(step)
        val end = start.plusMonths(1)
        val chunk = s"(${step + 1}/$totalChunks)"
        val zoqlQuery = buildZoqlQuery(objectName, fields, start, end)
        val jobId = startAquaJob(zoqlQuery, objectName, start) tap { jobId => logger.info(s"Exporting $objectName $start to $end chunk $chunk by job $jobId") }
        val jobResult = getJobResult(jobId)
        val batch = jobResult.batches.head
        val filePath = downloadCsvFile(batch, objectName, start)
        val iteratorForLength = filePath.lineIterator
        val lines = filePath.lineIterator
        val recordCountWithoutHeader = iteratorForLength.length - 1
        Assert(s"Downloaded record count should match $jobId metadata record count $recordCountWithoutHeader =/= ${batch.recordCount}", recordCountWithoutHeader == batch.recordCount)
        writeHeaderOnceAndAdvanceIterator(objectName, lines) tap (_ => logger.info(s"Completed $objectName-$start.csv header processing"))
        logger.info(s"Writing downloaded $objectName records to .csv file")
        val aggregateFile = file"$outputDir/$objectName.csv"
        val linesWithIsDeletedColumn = lines.map(row => s"false,$row")
        aggregateFile.printLines(linesWithIsDeletedColumn)
        file"$scratchDir/$objectName-$start.metadata".write(write(jobResult))
        file"$scratchDir/$objectName.bookmark".write(end.toString)
        logger.info(s"Done $objectName $start to $end chunk $chunk with record count $recordCountWithoutHeader exported by job $jobId")
      }
      verifyAggregateFileAgainstChunkMetadata(objectName)
      file"$scratchDir/$objectName.success".touch()
      s"All $objectName chunks successfully exported and verified!"
    }
  }