override def shape: SinkShape[ProblemItem] = SinkShape.of()

in ProxyStatsGathering/src/main/scala/StreamComponents/ProblemItemDeleteIfEmpty.scala [23:79]


  override def shape: SinkShape[ProblemItem] = SinkShape.of(in)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[(Int, Int)]) = {
    val completionPromise = Promise[(Int, Int)]


    val logic = new GraphStageLogic(shape) {
      var totalCount=0
      var deletedCount=0
      val delayTime=500

      private val problemItemIndexer = new ProblemItemIndexer(indexName)

      def doDelete(elem:ProblemItem, retryNumber:Int=0):Unit = {
        Await.result(problemItemIndexer.deleteEntry(elem), 10 seconds) match {
          case Left(err)=>
            if(err.status==409){  //conflict error; we can try again
              println("ERROR: Conflict error deleting item. Trying again...")
              Thread.sleep(500*(retryNumber+1))
              doDelete(elem, retryNumber+1)
            } else {
              throw new RuntimeException(s"Item delete failed: $err")
            }
          case Right(result)=>
            println(s"DEBUG: ES reports ${result.result.deleted} item(s) deleted")
        }
      }

      setHandler(in, new AbstractInHandler {
        override def onPush(): Unit = {
          val elem = grab(in)

          totalCount+=1
          val resultsWithProblems = elem.verifyResults.filter(result=>result.wantProxy && !result.haveProxy.getOrElse(false))

          if(resultsWithProblems.isEmpty){
            println(s"Removing item $elem")
            doDelete(elem)
            deletedCount+=1
          } else {
            println(s"Item still has ${resultsWithProblems.length} reports, not removing")
          }
          pull(in)
        }
      })

      override def preStart(): Unit = {
        pull(in)
      }

      override def postStop(): Unit = {
        completionPromise.complete(Success((totalCount, deletedCount)))
      }
    }

    (logic, completionPromise.future)
  }