associated-press/app/services/AssociatedPressService.scala (97 lines of code) (raw):

package services import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem, Props} import org.apache.pekko.stream.Materializer import client.HttpClient.get import config.AWS.{readFromDynamoDB, writeToDynamoDB} import config.AppConfig import model.FeedResponse import play.api.Logging import play.api.libs.ws.StandaloneWSResponse import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} class AssociatedPressService( config: AppConfig, implicit val system: ActorSystem, implicit val materializer: Materializer, implicit val executionContext: ExecutionContext ) extends Logging { val associatedPressServiceActor: ActorRef = system.actorOf( Props(new AssociatedPressServiceActor(config, system, executionContext)), name = "associatedPressServiceActor" ) def start(): Unit = { associatedPressServiceActor ! getFirstPageUrl } private def getFirstPageUrl: String = { readFromDynamoDB(config.dynamoDBNextPageTable, "key", "nextPage") match { case Success(values) => values.getOrElse(config.associatedPressAPIDefaultFeedUrl) case Failure(e) => logger.error( "Failed to retrieve first page from dynamoDB, using default url instead", e ) config.associatedPressAPIDefaultFeedUrl } } } class AssociatedPressServiceActor( config: AppConfig, implicit val system: ActorSystem, implicit val executionContext: ExecutionContext ) extends Actor with Logging { val imageUploaderServiceActor: ActorRef = system.actorOf( Props(new ImageUploaderService(config, executionContext)), name = "imageUploaderServiceActor" ) override def receive: Receive = { case page: String => get(page, Seq(("x-apikey", config.associatedPressAPIKey))) .map(handleResponse) .recover(e => { logger.error(s"Request failed $page", e) resendRequest() }) def handleResponse(response: StandaloneWSResponse): Unit = { if (response.status == 200) { FeedResponse .parse(response.body) .fold(resendRequest())(response => { logger .info(s"Received response with ${response.items.length} items") imageUploaderServiceActor ! response.items writeNextPageToDynamoDB(response.nextPage) match { case Success(_) => logger.debug( s"Successfully wrote next page to table: ${response.nextPage}" ) case Failure(_) => logger.error( s"Failed to write next page to table: ${response.nextPage}" ) } self ! response.nextPage }) } else { logger.error( s"Received ${response.status} response: ${response.body}" ) resendRequest() } } def writeNextPageToDynamoDB(nextPage: String): Try[UpdateItemResponse] = { writeToDynamoDB( table = config.dynamoDBNextPageTable, keyName = "key", keyValue = "nextPage", content = nextPage ) } def resendRequest(): Unit = { // if there is an error, we wait 5 seconds and try fetching the page again Thread.sleep(5000L) logger.error(s"Resending request $page") self ! page } } }