in lib/credit-processor/src/main/scala/com/gu/creditprocessor/Processor.scala [79:189]
def processProduct[Request <: CreditRequest, Result <: ZuoraCreditAddResult](
creditProduct: CreditProductForSubscription,
getCreditRequestsFromSalesforce: (ZuoraProductType, List[LocalDate]) => SalesforceApiResponse[List[Request]],
fulfilmentDatesFetcher: FulfilmentDatesFetcher,
processOverrideDate: Option[LocalDate],
productType: ZuoraProductType,
getSubscription: SubscriptionName => ZuoraApiResponse[Subscription],
getAccount: String => ZuoraApiResponse[ZuoraAccount],
updateToApply: (
CreditProductForSubscription,
Subscription,
ZuoraAccount,
Request,
) => ZuoraApiResponse[SubscriptionUpdate],
updateSubscription: (Subscription, SubscriptionUpdate) => ZuoraApiResponse[Unit],
resultOfZuoraCreditAdd: (Request, RatePlanCharge) => Result,
writeCreditResultsToSalesforce: List[Result] => SalesforceApiResponse[_],
getNextInvoiceDate: String => ZuoraApiResponse[LocalDate] = null, // FIXME,
): List[ProcessResult[Result]] = {
val creditRequestsFromSalesforce = for {
datesToProcess <- getDatesToProcess(fulfilmentDatesFetcher, productType, processOverrideDate, LocalDate.now())
_ = logger.info(s"Processing credits for ${productType.name} for issue dates ${datesToProcess.mkString(", ")}")
salesforceCreditRequests <-
if (datesToProcess.isEmpty) Nil.asRight else getCreditRequestsFromSalesforce(productType, datesToProcess)
} yield salesforceCreditRequests
creditRequestsFromSalesforce match {
case Left(sfReadError) =>
List(ProcessResult(Nil, Nil, Nil, Some(OverallFailure(sfReadError.reason))))
case Right(creditRequestsFromSalesforce) =>
val creditRequests = creditRequestsFromSalesforce.distinct
val alreadyActionedCredits = creditRequestsFromSalesforce.flatMap(_.chargeCode).distinct
def updateSaleforce(
creditRequests: List[Request],
zuoraApiResponse: ZuoraApiResponse[Result],
creditAddResult: Result,
) = {
val notAlreadyActionedCredits =
List(creditAddResult).filterNot(v => alreadyActionedCredits.contains(v.chargeCode))
val salesForceResponse = writeCreditResultsToSalesforce(
notAlreadyActionedCredits,
)
ProcessResult(
creditRequests,
List(zuoraApiResponse),
notAlreadyActionedCredits,
OverallFailure(List.empty, salesForceResponse),
)
}
def updateInZuoraAndSf(creditRequests: List[Request]) = {
creditRequests.map { creditRequest =>
val zuoraApiResponse = addCreditToSubscription(
creditProduct,
getSubscription,
getAccount,
updateToApply,
updateSubscription,
resultOfZuoraCreditAdd,
getNextInvoiceDate,
)(creditRequest)
val sfResult =
zuoraApiResponse
.map(ar => updateSaleforce(creditRequests, zuoraApiResponse, ar))
.leftMap(failure =>
ProcessResult(
creditRequests,
List(zuoraApiResponse),
List.empty,
Some(OverallFailure(failure.reason)),
),
)
sfResult
}
}
logger.info(s"Processing ${creditRequests.length} credits in Zuora ...")
// we group the creditRequests by subscription to make the requests to zuora in parallel
// & avoid lock contention on the resource
val creditRequestBatches =
creditRequests
.groupBy(_.subscriptionName)
.values
.toList
.par
val requestConcurrency = 20
/*https://developer.zuora.com/docs/guides/rate-limits/#concurrent-request-limits
Zuora supports up to 40 concurrent requests until migration to orders API which supports 200
*/
val forkJoinPool = new java.util.concurrent.ForkJoinPool(requestConcurrency)
creditRequestBatches.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
val processResults =
creditRequestBatches
.map(requests => updateInZuoraAndSf(requests))
.toList
.flatten
.map(_.merge)
forkJoinPool.shutdown()
processResults
}
}