in src/main/scala/com/gu/zuora/fullexport/Program.scala [28:62]
def exportObject(obj: ZuoraObject, beginningOfTime: String): String = {
val ZuoraObject(objectName, fields) = obj
if (file"$scratchDir/$objectName.success".exists) {
s"$objectName full export has already successfully competed. Skipping!"
} else {
val bookmark = readBookmark(objectName, beginningOfTime) tap { bookmark => logger.info(s"Resume $objectName from $bookmark") }
val chunkRange = fromBookmarkUntilNowByMonth(bookmark)
val totalChunks = chunkRange.length
chunkRange foreach { step =>
val start = bookmark.plusMonths(step)
val end = start.plusMonths(1)
val chunk = s"(${step + 1}/$totalChunks)"
val zoqlQuery = buildZoqlQuery(objectName, fields, start, end)
val jobId = startAquaJob(zoqlQuery, objectName, start) tap { jobId => logger.info(s"Exporting $objectName $start to $end chunk $chunk by job $jobId") }
val jobResult = getJobResult(jobId)
val batch = jobResult.batches.head
val filePath = downloadCsvFile(batch, objectName, start)
val iteratorForLength = filePath.lineIterator
val lines = filePath.lineIterator
val recordCountWithoutHeader = iteratorForLength.length - 1
Assert(s"Downloaded record count should match $jobId metadata record count $recordCountWithoutHeader =/= ${batch.recordCount}", recordCountWithoutHeader == batch.recordCount)
writeHeaderOnceAndAdvanceIterator(objectName, lines) tap (_ => logger.info(s"Completed $objectName-$start.csv header processing"))
logger.info(s"Writing downloaded $objectName records to .csv file")
val aggregateFile = file"$outputDir/$objectName.csv"
val linesWithIsDeletedColumn = lines.map(row => s"false,$row")
aggregateFile.printLines(linesWithIsDeletedColumn)
file"$scratchDir/$objectName-$start.metadata".write(write(jobResult))
file"$scratchDir/$objectName.bookmark".write(end.toString)
logger.info(s"Done $objectName $start to $end chunk $chunk with record count $recordCountWithoutHeader exported by job $jobId")
}
verifyAggregateFileAgainstChunkMetadata(objectName)
file"$scratchDir/$objectName.success".touch()
s"All $objectName chunks successfully exported and verified!"
}
}