in ProxyStatsGathering/src/main/scala/StreamComponents/ProblemItemDeleteIfEmpty.scala [23:79]
override def shape: SinkShape[ProblemItem] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[(Int, Int)]) = {
val completionPromise = Promise[(Int, Int)]
val logic = new GraphStageLogic(shape) {
var totalCount=0
var deletedCount=0
val delayTime=500
private val problemItemIndexer = new ProblemItemIndexer(indexName)
def doDelete(elem:ProblemItem, retryNumber:Int=0):Unit = {
Await.result(problemItemIndexer.deleteEntry(elem), 10 seconds) match {
case Left(err)=>
if(err.status==409){ //conflict error; we can try again
println("ERROR: Conflict error deleting item. Trying again...")
Thread.sleep(500*(retryNumber+1))
doDelete(elem, retryNumber+1)
} else {
throw new RuntimeException(s"Item delete failed: $err")
}
case Right(result)=>
println(s"DEBUG: ES reports ${result.result.deleted} item(s) deleted")
}
}
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
totalCount+=1
val resultsWithProblems = elem.verifyResults.filter(result=>result.wantProxy && !result.haveProxy.getOrElse(false))
if(resultsWithProblems.isEmpty){
println(s"Removing item $elem")
doDelete(elem)
deletedCount+=1
} else {
println(s"Item still has ${resultsWithProblems.length} reports, not removing")
}
pull(in)
}
})
override def preStart(): Unit = {
pull(in)
}
override def postStop(): Unit = {
completionPromise.complete(Success((totalCount, deletedCount)))
}
}
(logic, completionPromise.future)
}