in webindex/modules/data/src/main/java/webindex/data/fluo/WebindexObservers.java [39:70]
public void provide(Registry obsRegistry, Context ctx) {
SimpleConfiguration appCfg = ctx.getAppConfiguration();
MetricsReporter reporter = ctx.getMetricsReporter();
// Create an export queue that handles all updates to the query table.
ExportQueue<String, IndexUpdate> exportQ =
ExportQueue.getInstance(FluoApp.EXPORT_QUEUE_ID, appCfg);
// Create a combineQ that tracks the number of pages linking to a URI.
CombineQueue<String, UriInfo> uriQ =
CombineQueue.getInstance(UriCombineQ.URI_COMBINE_Q_ID, appCfg);
// Create a combineQ that tracks the number of unique URIs observed per domain.
CombineQueue<String, Long> domainQ =
CombineQueue.getInstance(DomainCombineQ.DOMAIN_COMBINE_Q_ID, appCfg);
// Register an observer that handles changes to pages content.
obsRegistry.forColumn(Constants.PAGE_NEW_COL, NotificationType.STRONG).withId("PageObserver")
.useObserver(new PageObserver(uriQ, exportQ, reporter));
// Register an observer to processes queued export data.
exportQ.registerObserver(obsRegistry, new AccumuloExporter<>(FluoApp.EXPORT_QUEUE_ID, appCfg,
new IndexUpdateTranslator(reporter)));
// Register an observer to process updates to the URI map.
uriQ.registerObserver(obsRegistry, UriInfo::reduce,
new UriCombineQ.UriUpdateObserver(exportQ, domainQ, reporter));
// Register an observer to process updates to the domain map.
domainQ.registerObserver(obsRegistry, new SummingCombiner<>(),
new DomainCombineQ.DomainUpdateObserver(exportQ, reporter));
}