public Iterable add()

in src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java [60:139]


    public Iterable<DistributionQueueItemStatus> add(@NotNull DistributionPackage distributionPackage,
                                                     @NotNull DistributionQueueProvider queueProvider) throws DistributionException {

        if (!(distributionPackage instanceof SharedDistributionPackage) && deliveryMappings.size() > 1) {
            throw new DistributionException("distribution package must be a shared package to be added in multiple queues");
        }

        String distributionPackageId = distributionPackage.getId();

        List<DistributionQueueItemStatus> result = new LinkedList<DistributionQueueItemStatus>();

        for (String referenceQueueName : deliveryMappings.keySet()) {
            DistributionQueue queue = queueProvider.getQueue(referenceQueueName);

            int MAX_QUEUE_ITEMS_THRESHOLD = 100;
            if (queue.getStatus().getItemsCount() > MAX_QUEUE_ITEMS_THRESHOLD) {
                // too many items in the queue, let's send actual packages and references separately

                distributionPackage.getInfo().put("reference-required", true);
                DistributionQueueItem item = getItem(distributionPackage);

                // create and acquire reference package
                ReferencePackage referencePackage = new ReferencePackage(distributionPackage);
                DistributionPackageUtils.acquire(referencePackage, referenceQueueName);

                // acquire actual package
                String deliveryQueueName = deliveryMappings.get(referenceQueueName);
                DistributionPackageUtils.acquire(distributionPackage, deliveryQueueName);

                // add the actual package to the delivery queue
                DistributionQueue deliveryQueue = queueProvider.getQueue(deliveryQueueName, DistributionQueueType.PARALLEL);
                DistributionQueueEntry deliveryQueueEntry = deliveryQueue.add(item);
                if (deliveryQueueEntry != null) {
                    DistributionQueueItemStatus status = deliveryQueueEntry.getStatus();
                    log.debug("item {} added to delivery queue: {}", item, status);
                    result.add(status);
                } else {
                    DistributionPackageUtils.release(distributionPackage, deliveryQueueName);
                    log.error("cannot add package {} to delivery queue {}", distributionPackageId, deliveryQueueName);
                    result.add(new DistributionQueueItemStatus(DistributionQueueItemState.ERROR, deliveryQueue.getName()));
                }

                // add the reference package to the reference queue
                DistributionQueue referenceQueue = queueProvider.getQueue(referenceQueueName);
                DistributionQueueItem referenceQueueItem = getItem(referencePackage);
                DistributionQueueEntry referenceQueueEntry = referenceQueue.add(referenceQueueItem);
                if (referenceQueueEntry != null) {
                    DistributionQueueItemStatus status = referenceQueueEntry.getStatus();
                    log.debug("item {} added to reference queue: {}", referenceQueueItem, status);
                } else {
                    DistributionPackageUtils.release(referencePackage, referenceQueueName);
                    log.error("cannot add package {} to reference queue {}", distributionPackageId, referenceQueueName);
                    result.add(new DistributionQueueItemStatus(DistributionQueueItemState.ERROR, referenceQueue.getName()));
                }

            } else {
                // normal queueing

                DistributionQueueItem item = getItem(distributionPackage);

                DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemState.ERROR, queue.getName());
                DistributionPackageUtils.acquire(distributionPackage, referenceQueueName);

                DistributionQueueEntry queueEntry = queue.add(item);

                if (queueEntry != null) {
                    status = queueEntry.getStatus();
                    log.debug("item {} added to queue: {}", item, status);
                } else {
                    DistributionPackageUtils.release(distributionPackage, referenceQueueName);
                    log.error("cannot add package {} to queue {}", distributionPackageId, referenceQueueName);
                }

                result.add(status);
            }
        }

        return result;

    }