override def shape: FlowShape[ProxyVerifyResult, ProxyVerifyResult] = FlowShape.of()

in ProxyStatsGathering/src/main/scala/StreamComponents/VerifyProxy.scala [23:82]


  override def shape: FlowShape[ProxyVerifyResult, ProxyVerifyResult] = FlowShape.of(in, out)

  val maxAttempts = 50

  val config = injector.getInstance(classOf[ArchiveHunterConfiguration])
  private val profileName = config.getOptional[String]("externalData.awsProfile")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    val proxyLocationDAO = injector.getInstance(classOf[ProxyLocationDAO])
    val dynamoClientManager = injector.getInstance(classOf[DynamoClientManager])

    implicit val dynamoClient = dynamoClientManager.getNewDynamoClient(profileName)

    /**
      * call synchronous getProxy and retry up to maxAttempts times, backing off by an extra 1/2 second each time
      * @param fileId file ID to look up
      * @param proxyType proxy type for file
      * @return None if no entry exists. Some(Left(error)) if an error occurred or Some(Right(Result)) if an entry does exist.
      */
    def getProxySyncWithBackoff(fileId:String, proxyType:ProxyType.Value, attempt:Int=0):Option[Either[DynamoReadError, ProxyLocation]] =
      proxyLocationDAO.getProxySync(fileId, proxyType) match {
        case Some(Left(err))=>
          if(attempt+1>maxAttempts){
            Some(Left(err))
          } else {
            Thread.sleep(500*attempt)
            getProxySyncWithBackoff(fileId, proxyType, attempt+1)
          }
        case Some(Right(result))=>Some(Right(result))
        case None=>None
      }

    setHandler(in, new AbstractInHandler {
      override def onPush(): Unit = {
        val elem = grab(in)

        //println(s"DEBUG: checking proxy for $elem")
        val result = getProxySyncWithBackoff(elem.fileId, proxyType) match {
          case Some(Left(err))=>
            println(s"Could not read Dynamodb after $maxAttempts attempts: ${err.toString}")
            throw new RuntimeException("DynamoDB error")
          case Some(Right(proxyEntry))=>
            //println(s"DEBUG: Got $proxyType proxy $proxyEntry for $elem")
            elem.copy(haveProxy = Some(true))
          case None=>
            //println(s"DEBUG: Got no $proxyType proxy for $elem")
            elem.copy(haveProxy = Some(false))
        }

        push(out, result)
      }
    })

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = {
        //println(s"verifyProxy $proxyType: pull from downstream")
        pull(in)
      }
    })
  }