in lambda/src/main/scala/pricemigrationengine/handlers/CohortTableDatalakeExportHandler.scala [65:128]
private def printItem(cohortSpec: CohortSpec, printer: CSVPrinter)(cohortItem: CohortItem) =
ZIO
.attempt(
printer.printRecord(
cohortSpec.cohortName,
cohortItem.subscriptionName,
cohortItem.processingStage.value,
cohortItem.startDate.getOrElse(""),
cohortItem.currency.getOrElse(""),
cohortItem.oldPrice.getOrElse(""),
cohortItem.estimatedNewPrice.getOrElse(""),
cohortItem.billingPeriod.getOrElse(""),
cohortItem.whenEstimationDone.getOrElse(""),
cohortItem.salesforcePriceRiseId.getOrElse(""),
cohortItem.whenSfShowEstimate.getOrElse(""),
cohortItem.newPrice.getOrElse(""),
cohortItem.newSubscriptionId.getOrElse(""),
cohortItem.whenAmendmentDone.getOrElse(""),
cohortItem.whenNotificationSent.getOrElse(""),
cohortItem.whenNotificationSentWrittenToSalesforce.getOrElse(""),
cohortItem.whenAmendmentWrittenToSalesforce.getOrElse(""),
cohortItem.cancellationReason.getOrElse("")
)
)
.mapError(ex => CohortTableDatalakeExportFailure(s"Failed to write CohortItem as CSV to s3: ${ex.getMessage}"))
private def buildPrinter(outputStream: OutputStream) =
managedCSVPrinter(
outputStream,
List(
"cohort_name",
"subscription_name",
"processing_stage",
"start_date",
"currency",
"old_price",
"estimated_new_price",
"billing_period",
"when_estimation_done",
"salesforce_price_rise_id",
"when_sf_show_estimate",
"new_price",
"new_subscription_id",
"when_amendment_done",
"when_notification_sent",
"when_notification_sent_written_to_salesforce",
"when_amendment_written_to_salesforce",
"cancellation_reason"
)
)
private def managedCSVPrinter(outputStream: OutputStream, headers: List[String]) =
ZIO
.acquireRelease(
ZIO.attempt(
new CSVPrinter(
new OutputStreamWriter(outputStream, StandardCharsets.UTF_8.name()),
CSVFormat.Builder.create(csvFormat).setHeader(headers: _*).get()
)
)
)(printer => ZIO.succeed(printer.close(true)))
.mapError { ex =>
CohortTableDatalakeExportFailure(s"Failed to write CohortItems as CSV to s3: ${ex.getMessage}")
}