in notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/DeliveryService.scala [32:86]
def send(
notification: Notification,
token: String
): Stream[F, Either[DeliveryException, C#Success]] = {
def sendAsync(client: C)(token: String, payload: client.Payload): F[C#Success] =
Async[F].async { (cb: Either[Throwable, C#Success] => Unit) =>
client.sendNotification(
notification.id,
token,
payload,
notification.dryRun.contains(true) || client.dryRun
)(cb)
}
def sending(client: C)(token: String, payload: client.Payload): Stream[F, Either[DeliveryException, C#Success]] = {
val delayInMs = {
val rangeInMs = Range(1000, 3000)
rangeInMs.min + Random.nextInt(rangeInMs.length)
}
Stream
.retry(
sendAsync(client)(token, payload),
delay = FiniteDuration(delayInMs, TimeUnit.MILLISECONDS),
nextDelay = _.mul(2),
maxAttempts = 3,
retriable = {
case NonFatal(e: FailedDelivery) => true
case NonFatal(e: InvalidToken) => false
case NonFatal(exception: Exception) =>
logger.error("Encountered an error, will retry", exception)
true
case _ => false
}
)
.attempt
.map {
_.leftMap {
case de: DeliveryException => de
case NonFatal(e) => GenericFailure(notification.id, token, e)
}
}
}
val payloadF: F[client.Payload] = client
.payloadBuilder(notification)
.map(p => F.delay(p))
.getOrElse(F.raiseError(InvalidPayload(notification.id)))
for {
payload <- Stream.eval(payloadF)
res <- sending(client)(token, payload)
} yield res
}