private void scanFileOrDirectory()

in modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java [165:388]


    private void scanFileOrDirectory(final PollTableEntry entry, String fileURI) {

        FileObject fileObject = null;

        if (log.isDebugEnabled()) {
            log.debug("Scanning directory or file : " + VFSUtils.maskURLPassword(fileURI));
        }

        boolean wasError = true;
        int retryCount = 0;
        int maxRetryCount = entry.getMaxRetryCount();
        long reconnectionTimeout = entry.getReconnectTimeout();

        while (wasError) {
            try {
                retryCount++;
                fileObject = fsManager.resolveFile(fileURI);

                if (fileObject == null) {
                    log.error("fileObject is null");
                    throw new FileSystemException("fileObject is null");
                }

                wasError = false;

            } catch (FileSystemException e) {
                if (retryCount >= maxRetryCount) {
                    processFailure("Repeatedly failed to resolve the file URI: " +
                            VFSUtils.maskURLPassword(fileURI), e, entry);
                    return;
                } else {
                    log.warn("Failed to resolve the file URI: " +
                            VFSUtils.maskURLPassword(fileURI) + ", in attempt " + retryCount +
                            ", " + e.getMessage() + " Retrying in " + reconnectionTimeout +
                            " milliseconds.");
                }
            }

            if (wasError) {
                try {
                    Thread.sleep(reconnectionTimeout);
                } catch (InterruptedException e2) {
                    log.error("Thread was interrupted while waiting to reconnect.", e2);
                }
            }
        }

        try {
            if (fileObject.exists() && fileObject.isReadable()) {

                entry.setLastPollState(PollTableEntry.NONE);
                FileObject[] children = null;
                try {
                    children = fileObject.getChildren();
                } catch (FileSystemException ignore) {}

                // if this is a file that would translate to a single message
                if (children == null || children.length == 0) {
                    boolean isFailedRecord = false;
                    if (entry.getMoveAfterMoveFailure() != null) {
                        isFailedRecord = isFailedRecord(fileObject, entry);
                    }

                    if (fileObject.getType() == FileType.FILE && !isFailedRecord) {
                        if (!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled() &&
                                VFSUtils.acquireLock(fsManager, fileObject))) {
                            try {
                                processFile(entry, fileObject);
                                entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
                                metrics.incrementMessagesReceived();

                            } catch (AxisFault e) {
                                logException("Error processing File URI : " +
                                             VFSUtils.maskURLPassword(fileObject.getName().getURI()), e);
                                entry.setLastPollState(PollTableEntry.FAILED);
                                metrics.incrementFaultsReceiving();
                            }

                            try {
                                  moveOrDeleteAfterProcessing(entry, fileObject);
                            } catch (AxisFault axisFault) {

                                  logException("File object '" + fileObject.getURL().toString() + "' " +
                                            "cloud not be moved", axisFault);
                                  entry.setLastPollState(PollTableEntry.FAILED);
                                  String timeStamp =
                                            VFSUtils.getSystemTime(entry.getFailedRecordTimestampFormat());
                                  addFailedRecord(entry, fileObject, timeStamp);
                            }
                            if (entry.isFileLockingEnabled()) {
                                   VFSUtils.releaseLock(fsManager, fileObject);
                                    if (log.isDebugEnabled()) {
                                    log.debug("Removed the lock file '" + fileObject.toString() +
                                            ".lock' of the file '" + fileObject.toString());
                              }
                            }
                        } else if (log.isDebugEnabled()) {
                            log.debug("Couldn't get the lock for processing the file : " +
                                      VFSUtils.maskURLPassword(fileObject.getName().getURI()));
                        } else if (isFailedRecord) {
                            if (entry.isFileLockingEnabled()) {
                                VFSUtils.releaseLock(fsManager, fileObject);
                            }
                            // schedule a cleanup task if the file is there
                            if (fsManager.resolveFile(fileObject.getURL().toString()) != null &&
                                    removeTaskState == STATE_STOPPED && entry.getMoveAfterMoveFailure() != null) {
                                workerPool.execute(new FileRemoveTask(entry, fileObject));
                            }
                            if (log.isDebugEnabled()) {
                                log.debug("File '" + fileObject.getURL() + "' has been marked as a failed" +
                                        " record, it will not process");
                            }
                        }
                    }

                } else {
                    int failCount = 0;
                    int successCount = 0;

                    if (log.isDebugEnabled()) {
                        log.debug("File name pattern : " + entry.getFileNamePattern());
                    }
                    for (FileObject child : children) {
                        boolean isFailedRecord = false;
                        if (entry.getMoveAfterMoveFailure() != null) {
                            isFailedRecord = isFailedRecord(child, entry);
                        }

                        if (entry.getFileNamePattern() != null &&
                                child.getName().getBaseName().matches(entry.getFileNamePattern())){
                            //child's file name matches the file name pattern
                            //now we try to get the lock and process
                            if (log.isDebugEnabled()) {
                                log.debug("Matching file : " + child.getName().getBaseName());
                            }

                            if ((!entry.isFileLockingEnabled()
                                    || (entry.isFileLockingEnabled() && VFSUtils.acquireLock(fsManager, child)))
                                    && !isFailedRecord){
                                //process the file
                                try {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Processing file :" + child);
                                    }
                                    processFile(entry, child);
                                    successCount++;
                                    // tell moveOrDeleteAfterProcessing() file was success
                                    entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
                                    metrics.incrementMessagesReceived();

                                } catch (Exception e) {
                                    logException("Error processing File URI : " +
                                                 VFSUtils.maskURLPassword(child.getName().getURI()), e);
                                    failCount++;
                                    // tell moveOrDeleteAfterProcessing() file failed
                                    entry.setLastPollState(PollTableEntry.FAILED);
                                    metrics.incrementFaultsReceiving();
                                }

                                try {
                                    moveOrDeleteAfterProcessing(entry, child);
                                } catch (AxisFault axisFault) {
                                    logException("File object '" + child.getURL().toString() +
                                            "'cloud not be moved", axisFault);
                                    failCount++;
                                    entry.setLastPollState(PollTableEntry.FAILED);
                                    String timeStamp =
                                            VFSUtils.getSystemTime(entry.getFailedRecordTimestampFormat());
                                    addFailedRecord(entry, child, timeStamp);
                                } finally {
                                    // if there is a failure or not we'll try to release the lock
                                    if (entry.isFileLockingEnabled()) {
                                        VFSUtils.releaseLock(fsManager, child);
                                    }
                                }
                            }
                        } else if (entry.getFileNamePattern()!= null &&
                                !child.getName().getBaseName().matches(entry.getFileNamePattern())){
                            //child's file name does not match the file name pattern
                            if (log.isDebugEnabled()) {
                                log.debug("Non-Matching file : " + child.getName().getBaseName());
                            }
                        } else if(isFailedRecord){
                            //it is a failed record
                            if (entry.isFileLockingEnabled()) {
                                VFSUtils.releaseLock(fsManager, child);
                                VFSUtils.releaseLock(fsManager, fileObject);
                            }
                            if (fsManager.resolveFile(child.getURL().toString()) != null &&
                                    removeTaskState == STATE_STOPPED && entry.getMoveAfterMoveFailure() != null) {
                                workerPool.execute(new FileRemoveTask(entry, child));
                            }
                            if (log.isDebugEnabled()) {
                                log.debug("File '" + fileObject.getURL() +
                                        "' has been marked as a failed record, it will not " +
                                        "process");
                            }
                        }
                    }

                    if (failCount == 0 && successCount > 0) {
                        entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
                    } else if (successCount == 0 && failCount > 0) {
                        entry.setLastPollState(PollTableEntry.FAILED);
                    } else {
                        entry.setLastPollState(PollTableEntry.WITH_ERRORS);
                    }
                }

                // processing of this poll table entry is complete
                long now = System.currentTimeMillis();
                entry.setLastPollTime(now);
                entry.setNextPollTime(now + entry.getPollInterval());

            } else if (log.isDebugEnabled()) {
				log.debug("Unable to access or read file or directory : " + VFSUtils.maskURLPassword(fileURI)+ "." +
						  " Reason: " + (fileObject.exists()? (fileObject.isReadable()? "Unknown reason":"The file can not be read!"):
						  "The file does not exists!"));
            }
            onPollCompletion(entry);
        } catch (FileSystemException e) {
            processFailure("Error checking for existence and readability : " + VFSUtils.maskURLPassword(fileURI), e, entry);
        }
    }