override def handleRequest()

in handlers/soft-opt-in-consent-setter/src/main/scala/com/gu/soft_opt_in_consent_setter/HandlerIAP.scala [56:147]


  override def handleRequest(input: SQSEvent, context: Context): Unit = {
    logger.info("Handling request")

    val messages =
      input.getRecords.asScala.toList.map(message =>
        circeDecode[MessageBody](message.getBody) match {
          case Left(pf: ParsingFailure) =>
            val exception = SoftOptInError(
              s"Error '${pf.message}' when decoding JSON to MessageBody with cause :${pf.getCause} with body: ${message.getBody}",
              pf,
            )
            handleError(exception)
          case Left(ex) =>
            val exception =
              SoftOptInError(s"Unknown error when decoding JSON to MessageBody with body: ${message.getBody}", ex)
            handleError(exception)
          case Right(result) =>
            logger.info(s"Decoded message body: $result")
            result
        },
      )

    val setup = for {
      config <- SoftOptInConfig()
      sfConnector <- SalesforceConnector(config.sfConfig, config.sfApiVersion)
      dynamoConnector <- DynamoConnector()

      identityConnector = new IdentityConnector(config.identityConfig)
      consentsCalculator = new ConsentsCalculator(ConsentsMapping.consentsMapping)
      mpapiConnector = new MpapiConnector(config.mpapiConfig)
    } yield (sfConnector, identityConnector, consentsCalculator, mpapiConnector, dynamoConnector)

    val (sfConnector, identityConnector, consentsCalculator, mpapiConnector, dynamoConnector) = setup match {
      case Left(error) => handleError(error)
      case Right(x) =>
        logger.info("Setup successful")
        x
    }

    messages.foreach { message =>
      logger.info(s"Processing message: $message")

      val result = message.eventType match {
        case Acquisition =>
          Metrics.put(event = "acquisitions_to_process", 1)

          processAcquiredSub(
            message,
            identityConnector.sendConsentsReq,
            consentsCalculator,
          )
        case Cancellation =>
          Metrics.put(event = "cancellations_to_process", 1)

          processCancelledSub(
            message,
            identityConnector.sendConsentsReq,
            mpapiConnector.getMobileSubscriptions,
            consentsCalculator,
            sfConnector,
          )
        case Switch =>
          Metrics.put(event = "product_switches_to_process", 1)

          processProductSwitchSub(
            message,
            identityConnector.sendConsentsReq,
            mpapiConnector.getMobileSubscriptions,
            consentsCalculator,
            sfConnector,
          )
      }

      emitIdentityMetric(result)

      result match {
        case Left(e) => handleError(e)
        case Right(_) =>
          dynamoConnector.updateLoggingTable(message.subscriptionId, message.identityId, message.eventType) match {
            case Success(_) =>
              logger.info("Logged soft opt-in setting to Dynamo")
            case Failure(exception) =>
              logger.error(s"Dynamo write failed for identityId: ${message.identityId}")
              logger.error(s"Exception: $exception")
              Metrics.put("failed_dynamo_update")
          }
      }
    }

    logger.info("Finished processing messages")
    Metrics.put(event = "successful_run")
  }