private boolean processQueueItem()

in src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java [105:181]


    private boolean processQueueItem(String queueName, DistributionQueueEntry queueEntry) throws DistributionException {
        boolean removeItemFromQueue = false;
        ResourceResolver agentResourceResolver = null;
        DistributionPackage distributionPackage = null;
        DistributionQueueItem queueItem = queueEntry.getItem();
        DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus();
        try {

            int processingAttempt = queueItemStatus.getAttempts();
            if (processingAttempt > 0) {
                // since there is a retry, it is possible that the same error is observed again
                // we should add a linear backoff using random delay before re-attempting to distribute the same item.
                addRandomDelay(queueItemStatus.getAttempts());
            }

            String callingUser = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, String.class);
            String requestId = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, String.class);
            Long globalStartTime = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME, Long.class);

            agentResourceResolver = DistributionUtils.getResourceResolver(callingUser, authenticationInfo.getAgentService(),
                    authenticationInfo.getSlingRepository(), authenticationInfo.getSubServiceName(),
                    authenticationInfo.getResourceResolverFactory());

            final long startTime = System.currentTimeMillis();

            distributionPackage = distributionPackageExporter.getPackage(agentResourceResolver, queueItem.getPackageId());

            if (distributionPackage != null) {
                final long packageSize = distributionPackage.getSize();
                DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), queueEntry);

                final DistributionRequestType requestType = distributionPackage.getInfo().getRequestType();
                final String[] paths = distributionPackage.getInfo().getPaths();

                try {
                    // import package
                    distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage);

                    // generated event
                    distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
                            DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo());

                    removeItemFromQueue = true;
                    final long endTime = System.currentTimeMillis();

                    distributionLog.info("[{}] PACKAGE-DELIVERED {}: {} item={}, paths={}, importTime={}ms, execTime={}ms, size={}B", queueName, requestId,
                            requestType, queueEntry.getItem().getPackageId(), paths,
                            endTime - startTime, endTime - globalStartTime,
                            packageSize);
                } catch (RecoverableDistributionException e) {
                    distributionLog.warn("[{}] PACKAGE-FAIL {}: could not deliver {}, {}", queueName, requestId, distributionPackage.getId(), e.getMessage());
                    distributionLog.debug("could not deliver package {}", distributionPackage.getId(), e);
                } catch (Throwable e) {
                    distributionLog.error("[{}] PACKAGE-FAIL {}: could not deliver package {} {}", queueName, requestId, distributionPackage.getId(), e.getMessage(), e);
                    if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
                        removeItemFromQueue = reEnqueuePackage(distributionPackage);
                        distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DROPPED,
                                DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo());
                        distributionLog.info("[{}] PACKAGE-QUEUED {}: distribution package {} was enqueued to an error queue", queueName, requestId, distributionPackage.getId());
                    }
                }
            } else {
                removeItemFromQueue = true; // return success if package does not exist in order to clear the queue.
                distributionLog.error("distribution package with id {} does not exist. the package will be skipped.", queueItem.getPackageId());
            }
        } finally {
            if (removeItemFromQueue) {
                DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
            } else {
                DistributionPackageUtils.closeSafely(distributionPackage);
            }
            DistributionUtils.ungetResourceResolver(agentResourceResolver);
        }

        // return true if item should be removed from queue
        return removeItemFromQueue;
    }