def processProduct[Request <: CreditRequest, Result <: ZuoraCreditAddResult]()

in lib/credit-processor/src/main/scala/com/gu/creditprocessor/Processor.scala [79:189]


  def processProduct[Request <: CreditRequest, Result <: ZuoraCreditAddResult](
      creditProduct: CreditProductForSubscription,
      getCreditRequestsFromSalesforce: (ZuoraProductType, List[LocalDate]) => SalesforceApiResponse[List[Request]],
      fulfilmentDatesFetcher: FulfilmentDatesFetcher,
      processOverrideDate: Option[LocalDate],
      productType: ZuoraProductType,
      getSubscription: SubscriptionName => ZuoraApiResponse[Subscription],
      getAccount: String => ZuoraApiResponse[ZuoraAccount],
      updateToApply: (
          CreditProductForSubscription,
          Subscription,
          ZuoraAccount,
          Request,
      ) => ZuoraApiResponse[SubscriptionUpdate],
      updateSubscription: (Subscription, SubscriptionUpdate) => ZuoraApiResponse[Unit],
      resultOfZuoraCreditAdd: (Request, RatePlanCharge) => Result,
      writeCreditResultsToSalesforce: List[Result] => SalesforceApiResponse[_],
      getNextInvoiceDate: String => ZuoraApiResponse[LocalDate] = null, // FIXME,
  ): List[ProcessResult[Result]] = {

    val creditRequestsFromSalesforce = for {
      datesToProcess <- getDatesToProcess(fulfilmentDatesFetcher, productType, processOverrideDate, LocalDate.now())
      _ = logger.info(s"Processing credits for ${productType.name} for issue dates ${datesToProcess.mkString(", ")}")
      salesforceCreditRequests <-
        if (datesToProcess.isEmpty) Nil.asRight else getCreditRequestsFromSalesforce(productType, datesToProcess)
    } yield salesforceCreditRequests

    creditRequestsFromSalesforce match {
      case Left(sfReadError) =>
        List(ProcessResult(Nil, Nil, Nil, Some(OverallFailure(sfReadError.reason))))

      case Right(creditRequestsFromSalesforce) =>
        val creditRequests = creditRequestsFromSalesforce.distinct
        val alreadyActionedCredits = creditRequestsFromSalesforce.flatMap(_.chargeCode).distinct

        def updateSaleforce(
            creditRequests: List[Request],
            zuoraApiResponse: ZuoraApiResponse[Result],
            creditAddResult: Result,
        ) = {
          val notAlreadyActionedCredits =
            List(creditAddResult).filterNot(v => alreadyActionedCredits.contains(v.chargeCode))
          val salesForceResponse = writeCreditResultsToSalesforce(
            notAlreadyActionedCredits,
          )

          ProcessResult(
            creditRequests,
            List(zuoraApiResponse),
            notAlreadyActionedCredits,
            OverallFailure(List.empty, salesForceResponse),
          )
        }

        def updateInZuoraAndSf(creditRequests: List[Request]) = {
          creditRequests.map { creditRequest =>
            val zuoraApiResponse = addCreditToSubscription(
              creditProduct,
              getSubscription,
              getAccount,
              updateToApply,
              updateSubscription,
              resultOfZuoraCreditAdd,
              getNextInvoiceDate,
            )(creditRequest)

            val sfResult =
              zuoraApiResponse
                .map(ar => updateSaleforce(creditRequests, zuoraApiResponse, ar))
                .leftMap(failure =>
                  ProcessResult(
                    creditRequests,
                    List(zuoraApiResponse),
                    List.empty,
                    Some(OverallFailure(failure.reason)),
                  ),
                )

            sfResult
          }
        }

        logger.info(s"Processing ${creditRequests.length} credits in Zuora ...")

        // we group the creditRequests by subscription to make the requests to zuora in parallel
        // & avoid lock contention on the resource
        val creditRequestBatches =
          creditRequests
            .groupBy(_.subscriptionName)
            .values
            .toList
            .par

        val requestConcurrency = 20
        /*https://developer.zuora.com/docs/guides/rate-limits/#concurrent-request-limits
        Zuora supports up to 40 concurrent requests until migration to orders API which supports 200
         */
        val forkJoinPool = new java.util.concurrent.ForkJoinPool(requestConcurrency)
        creditRequestBatches.tasksupport = new ForkJoinTaskSupport(forkJoinPool)

        val processResults =
          creditRequestBatches
            .map(requests => updateInZuoraAndSf(requests))
            .toList
            .flatten
            .map(_.merge)

        forkJoinPool.shutdown()
        processResults
    }
  }