def buildGraphModel()

in ProxyStatsGathering/src/main/scala/MainContent.scala [79:156]


  def buildGraphModel(forCollection:Option[String]=None) = {
    val converter = new SearchHitToArchiveEntryFlow()
    val mimeTypeBranch = new MimeTypeBranch()
    val mimeTypeWantProxyBranch = new MimeTypeWantProxyBranch()
    val fileTypeWantProxyBranch = new FileTypeWantProxyBranch()

    val counterSink = new GroupedResultCounter

    GraphDSL.create(counterSink){ implicit builder:GraphDSL.Builder[Future[ProblemItemCount]] =>counterSink =>
      import GraphDSL.Implicits._

      val src = builder.add(getStreamSource(getIndexName, forCollection))
      val conv = builder.add(converter)
      val mtb = builder.add(mimeTypeBranch)
      val mtwpb = builder.add(mimeTypeWantProxyBranch)
      val ftwpb = builder.add(fileTypeWantProxyBranch)

      val isDotFileBranch = builder.add(new IsDotFileBranch)
      val isGlacierBranch = builder.add(injector.getInstance(classOf[IsGlacierBranch]))
      val preVideoMerge = builder.add(new Merge[ProxyVerifyResult](2, false))
      val videoProxyRequest = builder.add(new VerifyProxy(ProxyType.VIDEO, injector))
      val preAudioMerge = builder.add(new Merge[ProxyVerifyResult](2, false))
      val audioProxyRequest = builder.add(new VerifyProxy(ProxyType.AUDIO, injector))
      val preThumbMerge = builder.add(new Merge[ProxyVerifyResult](2, false))
      val thumbProxyRequest = builder.add(new VerifyProxy(ProxyType.THUMBNAIL, injector))

      val convertToProblemItemFilter = builder.add(new ConvertToProblemItemFilter)
      val postVerifyMerge = builder.add(new Merge[ProxyVerifyResult](3, false))
      val proxyResultGroup = builder.add(new ProxyResultGroup)
      val groupCounter = builder.add(new GroupCounter)

      val postGroupBroadcast = builder.add(new Broadcast[Seq[ProxyVerifyResult]](2,true))

      val preCounterMerge = builder.add(new Merge[GroupedResult](5, false))

      val recordProblemSink = builder.add(getProblemElementsSink(problemsIndexName))

      src ~> conv ~> isDotFileBranch
      isDotFileBranch.out(0).map(entry=>GroupedResult(entry.id, entry.proxied,ProxyHealth.DotFile)) ~> preCounterMerge
      isDotFileBranch.out(1) ~> isGlacierBranch

      isGlacierBranch.out(0).map(entry=>GroupedResult(entry.id, entry.proxied, ProxyHealth.GlacierClass)) ~> preCounterMerge
      isGlacierBranch.out(1) ~> mtb

      //"want proxy" branch
      mtb.out(0) ~> mtwpb.in
      mtb.out(1) ~> ftwpb.in

      mtwpb.out(0) ~> preVideoMerge
      ftwpb.out(0) ~> preVideoMerge

      mtwpb.out(1) ~> preAudioMerge
      ftwpb.out(1) ~> preAudioMerge

      mtwpb.out(2) ~> preThumbMerge
      ftwpb.out(2) ~> preThumbMerge


      preVideoMerge ~> videoProxyRequest ~> postVerifyMerge.in(0)
      preAudioMerge ~> audioProxyRequest ~> postVerifyMerge.in(1)
      preThumbMerge ~> thumbProxyRequest ~> postVerifyMerge.in(2)

      postVerifyMerge ~> proxyResultGroup ~> postGroupBroadcast

      //once we have proxy results grouped, broadcast the results between a counter branch and a log-to-elastic branch
      postGroupBroadcast.out(0) ~> groupCounter ~> preCounterMerge
      postGroupBroadcast.out(1) ~> convertToProblemItemFilter ~> recordProblemSink

      //"don't want proxy" branch
      mtwpb.out(3).map(verifyResult=>GroupedResult(verifyResult.fileId, verifyResult.esRecordSays, ProxyHealth.NotNeeded)) ~> preCounterMerge
      ftwpb.out(3).map(verifyResult=>GroupedResult(verifyResult.fileId, verifyResult.esRecordSays, ProxyHealth.NotNeeded)) ~> preCounterMerge

      //completion
      preCounterMerge ~> counterSink

      ClosedShape
    }
  }