in handlers/soft-opt-in-consent-setter/src/main/scala/com/gu/soft_opt_in_consent_setter/HandlerIAP.scala [56:147]
override def handleRequest(input: SQSEvent, context: Context): Unit = {
logger.info("Handling request")
val messages =
input.getRecords.asScala.toList.map(message =>
circeDecode[MessageBody](message.getBody) match {
case Left(pf: ParsingFailure) =>
val exception = SoftOptInError(
s"Error '${pf.message}' when decoding JSON to MessageBody with cause :${pf.getCause} with body: ${message.getBody}",
pf,
)
handleError(exception)
case Left(ex) =>
val exception =
SoftOptInError(s"Unknown error when decoding JSON to MessageBody with body: ${message.getBody}", ex)
handleError(exception)
case Right(result) =>
logger.info(s"Decoded message body: $result")
result
},
)
val setup = for {
config <- SoftOptInConfig()
sfConnector <- SalesforceConnector(config.sfConfig, config.sfApiVersion)
dynamoConnector <- DynamoConnector()
identityConnector = new IdentityConnector(config.identityConfig)
consentsCalculator = new ConsentsCalculator(ConsentsMapping.consentsMapping)
mpapiConnector = new MpapiConnector(config.mpapiConfig)
} yield (sfConnector, identityConnector, consentsCalculator, mpapiConnector, dynamoConnector)
val (sfConnector, identityConnector, consentsCalculator, mpapiConnector, dynamoConnector) = setup match {
case Left(error) => handleError(error)
case Right(x) =>
logger.info("Setup successful")
x
}
messages.foreach { message =>
logger.info(s"Processing message: $message")
val result = message.eventType match {
case Acquisition =>
Metrics.put(event = "acquisitions_to_process", 1)
processAcquiredSub(
message,
identityConnector.sendConsentsReq,
consentsCalculator,
)
case Cancellation =>
Metrics.put(event = "cancellations_to_process", 1)
processCancelledSub(
message,
identityConnector.sendConsentsReq,
mpapiConnector.getMobileSubscriptions,
consentsCalculator,
sfConnector,
)
case Switch =>
Metrics.put(event = "product_switches_to_process", 1)
processProductSwitchSub(
message,
identityConnector.sendConsentsReq,
mpapiConnector.getMobileSubscriptions,
consentsCalculator,
sfConnector,
)
}
emitIdentityMetric(result)
result match {
case Left(e) => handleError(e)
case Right(_) =>
dynamoConnector.updateLoggingTable(message.subscriptionId, message.identityId, message.eventType) match {
case Success(_) =>
logger.info("Logged soft opt-in setting to Dynamo")
case Failure(exception) =>
logger.error(s"Dynamo write failed for identityId: ${message.identityId}")
logger.error(s"Exception: $exception")
Metrics.put("failed_dynamo_update")
}
}
}
logger.info("Finished processing messages")
Metrics.put(event = "successful_run")
}