in ProxyStatsGathering/src/main/scala/MainContent.scala [79:156]
def buildGraphModel(forCollection:Option[String]=None) = {
val converter = new SearchHitToArchiveEntryFlow()
val mimeTypeBranch = new MimeTypeBranch()
val mimeTypeWantProxyBranch = new MimeTypeWantProxyBranch()
val fileTypeWantProxyBranch = new FileTypeWantProxyBranch()
val counterSink = new GroupedResultCounter
GraphDSL.create(counterSink){ implicit builder:GraphDSL.Builder[Future[ProblemItemCount]] =>counterSink =>
import GraphDSL.Implicits._
val src = builder.add(getStreamSource(getIndexName, forCollection))
val conv = builder.add(converter)
val mtb = builder.add(mimeTypeBranch)
val mtwpb = builder.add(mimeTypeWantProxyBranch)
val ftwpb = builder.add(fileTypeWantProxyBranch)
val isDotFileBranch = builder.add(new IsDotFileBranch)
val isGlacierBranch = builder.add(injector.getInstance(classOf[IsGlacierBranch]))
val preVideoMerge = builder.add(new Merge[ProxyVerifyResult](2, false))
val videoProxyRequest = builder.add(new VerifyProxy(ProxyType.VIDEO, injector))
val preAudioMerge = builder.add(new Merge[ProxyVerifyResult](2, false))
val audioProxyRequest = builder.add(new VerifyProxy(ProxyType.AUDIO, injector))
val preThumbMerge = builder.add(new Merge[ProxyVerifyResult](2, false))
val thumbProxyRequest = builder.add(new VerifyProxy(ProxyType.THUMBNAIL, injector))
val convertToProblemItemFilter = builder.add(new ConvertToProblemItemFilter)
val postVerifyMerge = builder.add(new Merge[ProxyVerifyResult](3, false))
val proxyResultGroup = builder.add(new ProxyResultGroup)
val groupCounter = builder.add(new GroupCounter)
val postGroupBroadcast = builder.add(new Broadcast[Seq[ProxyVerifyResult]](2,true))
val preCounterMerge = builder.add(new Merge[GroupedResult](5, false))
val recordProblemSink = builder.add(getProblemElementsSink(problemsIndexName))
src ~> conv ~> isDotFileBranch
isDotFileBranch.out(0).map(entry=>GroupedResult(entry.id, entry.proxied,ProxyHealth.DotFile)) ~> preCounterMerge
isDotFileBranch.out(1) ~> isGlacierBranch
isGlacierBranch.out(0).map(entry=>GroupedResult(entry.id, entry.proxied, ProxyHealth.GlacierClass)) ~> preCounterMerge
isGlacierBranch.out(1) ~> mtb
//"want proxy" branch
mtb.out(0) ~> mtwpb.in
mtb.out(1) ~> ftwpb.in
mtwpb.out(0) ~> preVideoMerge
ftwpb.out(0) ~> preVideoMerge
mtwpb.out(1) ~> preAudioMerge
ftwpb.out(1) ~> preAudioMerge
mtwpb.out(2) ~> preThumbMerge
ftwpb.out(2) ~> preThumbMerge
preVideoMerge ~> videoProxyRequest ~> postVerifyMerge.in(0)
preAudioMerge ~> audioProxyRequest ~> postVerifyMerge.in(1)
preThumbMerge ~> thumbProxyRequest ~> postVerifyMerge.in(2)
postVerifyMerge ~> proxyResultGroup ~> postGroupBroadcast
//once we have proxy results grouped, broadcast the results between a counter branch and a log-to-elastic branch
postGroupBroadcast.out(0) ~> groupCounter ~> preCounterMerge
postGroupBroadcast.out(1) ~> convertToProblemItemFilter ~> recordProblemSink
//"don't want proxy" branch
mtwpb.out(3).map(verifyResult=>GroupedResult(verifyResult.fileId, verifyResult.esRecordSays, ProxyHealth.NotNeeded)) ~> preCounterMerge
ftwpb.out(3).map(verifyResult=>GroupedResult(verifyResult.fileId, verifyResult.esRecordSays, ProxyHealth.NotNeeded)) ~> preCounterMerge
//completion
preCounterMerge ~> counterSink
ClosedShape
}
}