in myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java [59:113]
public void init(Injector injector) {
this.disruptorExecutors = Executors.newCachedThreadPool();
// todo: (kensipe) need to make ringsize configurable (overriding the defaults)
this.registeredEventDisruptor = new Disruptor<>(new RegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE,
disruptorExecutors);
this.registeredEventDisruptor.handleEventsWith(injector.getInstance(RegisteredEventHandler.class));
this.registeredEventDisruptor.start();
this.reRegisteredEventDisruptor = new Disruptor<>(new ReRegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE,
disruptorExecutors);
this.reRegisteredEventDisruptor.handleEventsWith(injector.getInstance(ReRegisteredEventHandler.class));
this.reRegisteredEventDisruptor.start();
this.resourceOffersEventDisruptor = new Disruptor<>(new ResourceOffersEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE,
disruptorExecutors);
this.resourceOffersEventDisruptor.handleEventsWith(injector.getInstance(ResourceOffersEventHandler.class));
this.resourceOffersEventDisruptor.start();
this.offerRescindedEventDisruptor = new Disruptor<>(new OfferRescindedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE,
disruptorExecutors);
this.offerRescindedEventDisruptor.handleEventsWith(injector.getInstance(OfferRescindedEventHandler.class));
this.offerRescindedEventDisruptor.start();
this.statusUpdateEventDisruptor = new Disruptor<>(new StatusUpdateEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE,
disruptorExecutors);
this.statusUpdateEventDisruptor.handleEventsWith(injector.getInstance(StatusUpdateEventHandler.class));
this.statusUpdateEventDisruptor.start();
this.frameworkMessageEventDisruptor = new Disruptor<>(new FrameworkMessageEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE,
disruptorExecutors);
this.frameworkMessageEventDisruptor.handleEventsWith(injector.getInstance(FrameworkMessageEventHandler.class));
this.frameworkMessageEventDisruptor.start();
this.disconnectedEventDisruptor = new Disruptor<>(new DisconnectedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE,
disruptorExecutors);
this.disconnectedEventDisruptor.handleEventsWith(injector.getInstance(DisconnectedEventHandler.class));
this.disconnectedEventDisruptor.start();
this.slaveLostEventDisruptor = new Disruptor<>(new SlaveLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
this.slaveLostEventDisruptor.handleEventsWith(injector.getInstance(SlaveLostEventHandler.class));
this.slaveLostEventDisruptor.start();
this.executorLostEventDisruptor = new Disruptor<>(new ExecutorLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE,
disruptorExecutors);
this.executorLostEventDisruptor.handleEventsWith(injector.getInstance(ExecutorLostEventHandler.class));
this.executorLostEventDisruptor.start();
this.errorEventDisruptor = new Disruptor<>(new ErrorEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
this.errorEventDisruptor.handleEventsWith(injector.getInstance(ErrorEventHandler.class));
this.errorEventDisruptor.start();
}