override def shape: UniformFanOutShape[ArchiveEntry, ArchiveEntry] = UniformFanOutShape()

in ProxyStatsGathering/src/main/scala/StreamComponents/IsGlacierBranch.scala [17:79]


  override def shape: UniformFanOutShape[ArchiveEntry, ArchiveEntry] = UniformFanOutShape(in, outYes, outNo)

  private val profileName = config.getOptional[String]("externalData.awsProfile")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    val s3Client = s3ClientMgr.getClient(profileName)

    setHandler(in, new AbstractInHandler {
      override def onPush(): Unit = {
        val elem = grab(in)

        try {
          val result = s3Client.getObjectMetadata(elem.bucket, elem.path)

          (result.getStorageClass: @switch) match {
            case "STANDARD" =>
              //println(s"${elem.bucket}/${elem.path} is STANDARD")
              push(outNo, elem)
            case null =>
              //println(s"${elem.bucket}/${elem.path} is STANDARD with no reply")
              push(outNo, elem)
            case "STANDARD_IA" =>
              //println(s"${elem.bucket}/${elem.path} is IA")
              push(outNo, elem)
            case "REDUCED_REDUNDANCY" =>
              //println(s"${elem.bucket}/${elem.path} is RR")
              push(outNo, elem)
            case "OneZoneInfrequentAccess" =>
              //println(s"${elem.bucket}/${elem.path} is one-zone IA")
              push(outNo, elem)
            case "INTELLIGENT_TIERING" =>
              //println(s"${elem.bucket}/${elem.path} is in intelligent tiering")
              push(outNo, elem)
            case "GLACIER" =>
              //println(s"${elem.bucket}/${elem.path} is in Glacier")
              push(outYes, elem)
            case "DEEP_ARCHIVE" =>
              //println(s"${elem.bucket}/${elem.path} is in Glacier DEEP")
              push(outYes, elem)
            case _ =>
              //println(s"ERROR: Did not recognise storage class ${result.getStorageClass} for ${elem.bucket}/${elem.path}")
              throw new RuntimeException("Unrecognised storage class")
          }
        } catch {
          case ex:AmazonS3Exception=>
            //println(s"WARNING: Could not process ${elem.bucket}/${elem.path}: $ex")
            pull(in)
        }
      }
    })

    setHandler(outYes, new AbstractOutHandler {
      override def onPull(): Unit = {
        if(!hasBeenPulled(in)) pull(in)
      }
    })

    setHandler(outNo, new AbstractOutHandler {
      override def onPull(): Unit = {
        if(!hasBeenPulled(in)) pull(in)
      }
    })
  }