protected abstract RBrokerWorker createBrokerWorker()

in src/main/java/com/revo/deployr/client/broker/engine/RBrokerEngine.java [529:660]


    protected abstract RBrokerWorker createBrokerWorker(RTask task,
                                                        long taskIndex,
                                                        boolean isPriorityTask,
                                                        Object resourceToken,
                                                        RBrokerEngine brokerEngine);

    /*
     * RBrokerEngine: private implementation.
     */
    private class RBrokerWorkerManager implements Runnable {

        public void run() {

            try {

                /*
                 * Signal to constructor that the brokerEngineExecutor
                 * thread is fully initialized and active, allowing
                 * the constructor to return safely to caller.
                 */
                engineInitialized.release();

                while (taskBrokerIsActive.get()) {

                    RTask nextTaskInQueue = null;

                    /*
                     * Await next queued Task.
                     */
                    boolean priorityTaskAvailable = false;
                    while (nextTaskInQueue == null &&
                            taskBrokerIsActive.get()) {

                        /*
                         * Retrieves but does not remove the task
                         * at the head of the queue.
                         */

                        nextTaskInQueue = pendingHighPriorityQueue.peek();
                        if (nextTaskInQueue == null) {
                            nextTaskInQueue = pendingLowPriorityQueue.peek();
                            priorityTaskAvailable = false;
                        } else {
                            priorityTaskAvailable = true;
                        }

                        if (nextTaskInQueue == null) {
                            try {
                                /*
                                 * Avoid busy-wait, sleep.
                                 */
                                Thread.currentThread().sleep(50);
                            } catch (Exception tex) {
                            }
                        } else {
                            /*
                             * Retrieves and removes the task
                             * at the head of the queue.
                             */
                            nextTaskInQueue = priorityTaskAvailable ?
                                    pendingHighPriorityQueue.take() :
                                    pendingLowPriorityQueue.take();
                        }
                    }

                    /*
                     * If task found on queue and taskBroker
                     * is still active, process task.
                     */
                    if (nextTaskInQueue != null &&
                            taskBrokerIsActive.get()) {

                        /*
                         * Await next available resource token in pool.
                         */

                        Object resourceToken = resourceTokenPool.take();

                        boolean resourceTokenInUse = false;

                        try {

                            RBrokerWorker worker =
                                    createBrokerWorker(nextTaskInQueue,
                                            executorTaskCounter.getAndIncrement(),
                                            priorityTaskAvailable,
                                            resourceToken,
                                            RBrokerEngine.this);

                            resourceTokenInUse = true;
                            taskResourceTokenMap.put(nextTaskInQueue, resourceToken);

                            Future future = taskWorkerExecutor.submit(worker);

                            RTaskTokenListener taskTokenListener =
                                    taskTokenListenerMap.remove(nextTaskInQueue);

                            if (taskTokenListener != null) {
                                taskTokenListener.onTask(nextTaskInQueue, future);
                            } else {
                                System.out.println("RBrokerEngine: " +
                                        "taskTokenListener callback not found for " +
                                        nextTaskInQueue + ", unexpected error.");
                            }

                        } catch (Exception ex) {

                            if (!resourceTokenInUse && resourceToken != null) {
                                /*
                                 * Return unused RProject instance
                                 * to the tail of the project pool.
                                 */
                                resourceTokenPool.add(resourceToken);
                            }

                            System.out.println("RBrokerEngine: " +
                                    " processing task " + nextTaskInQueue +
                                    ", ex=" + ex);
                        }

                    } // nextTaskInQueue != null

                } // while taskBrokerIsActive


            } catch (Exception mex) {
                System.out.println("RBrokerEngine: " +
                        "brokerEngineExecutor.run ex=" + mex);
            }
        }

    }