in modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java [165:388]
private void scanFileOrDirectory(final PollTableEntry entry, String fileURI) {
FileObject fileObject = null;
if (log.isDebugEnabled()) {
log.debug("Scanning directory or file : " + VFSUtils.maskURLPassword(fileURI));
}
boolean wasError = true;
int retryCount = 0;
int maxRetryCount = entry.getMaxRetryCount();
long reconnectionTimeout = entry.getReconnectTimeout();
while (wasError) {
try {
retryCount++;
fileObject = fsManager.resolveFile(fileURI);
if (fileObject == null) {
log.error("fileObject is null");
throw new FileSystemException("fileObject is null");
}
wasError = false;
} catch (FileSystemException e) {
if (retryCount >= maxRetryCount) {
processFailure("Repeatedly failed to resolve the file URI: " +
VFSUtils.maskURLPassword(fileURI), e, entry);
return;
} else {
log.warn("Failed to resolve the file URI: " +
VFSUtils.maskURLPassword(fileURI) + ", in attempt " + retryCount +
", " + e.getMessage() + " Retrying in " + reconnectionTimeout +
" milliseconds.");
}
}
if (wasError) {
try {
Thread.sleep(reconnectionTimeout);
} catch (InterruptedException e2) {
log.error("Thread was interrupted while waiting to reconnect.", e2);
}
}
}
try {
if (fileObject.exists() && fileObject.isReadable()) {
entry.setLastPollState(PollTableEntry.NONE);
FileObject[] children = null;
try {
children = fileObject.getChildren();
} catch (FileSystemException ignore) {}
// if this is a file that would translate to a single message
if (children == null || children.length == 0) {
boolean isFailedRecord = false;
if (entry.getMoveAfterMoveFailure() != null) {
isFailedRecord = isFailedRecord(fileObject, entry);
}
if (fileObject.getType() == FileType.FILE && !isFailedRecord) {
if (!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled() &&
VFSUtils.acquireLock(fsManager, fileObject))) {
try {
processFile(entry, fileObject);
entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
metrics.incrementMessagesReceived();
} catch (AxisFault e) {
logException("Error processing File URI : " +
VFSUtils.maskURLPassword(fileObject.getName().getURI()), e);
entry.setLastPollState(PollTableEntry.FAILED);
metrics.incrementFaultsReceiving();
}
try {
moveOrDeleteAfterProcessing(entry, fileObject);
} catch (AxisFault axisFault) {
logException("File object '" + fileObject.getURL().toString() + "' " +
"cloud not be moved", axisFault);
entry.setLastPollState(PollTableEntry.FAILED);
String timeStamp =
VFSUtils.getSystemTime(entry.getFailedRecordTimestampFormat());
addFailedRecord(entry, fileObject, timeStamp);
}
if (entry.isFileLockingEnabled()) {
VFSUtils.releaseLock(fsManager, fileObject);
if (log.isDebugEnabled()) {
log.debug("Removed the lock file '" + fileObject.toString() +
".lock' of the file '" + fileObject.toString());
}
}
} else if (log.isDebugEnabled()) {
log.debug("Couldn't get the lock for processing the file : " +
VFSUtils.maskURLPassword(fileObject.getName().getURI()));
} else if (isFailedRecord) {
if (entry.isFileLockingEnabled()) {
VFSUtils.releaseLock(fsManager, fileObject);
}
// schedule a cleanup task if the file is there
if (fsManager.resolveFile(fileObject.getURL().toString()) != null &&
removeTaskState == STATE_STOPPED && entry.getMoveAfterMoveFailure() != null) {
workerPool.execute(new FileRemoveTask(entry, fileObject));
}
if (log.isDebugEnabled()) {
log.debug("File '" + fileObject.getURL() + "' has been marked as a failed" +
" record, it will not process");
}
}
}
} else {
int failCount = 0;
int successCount = 0;
if (log.isDebugEnabled()) {
log.debug("File name pattern : " + entry.getFileNamePattern());
}
for (FileObject child : children) {
boolean isFailedRecord = false;
if (entry.getMoveAfterMoveFailure() != null) {
isFailedRecord = isFailedRecord(child, entry);
}
if (entry.getFileNamePattern() != null &&
child.getName().getBaseName().matches(entry.getFileNamePattern())){
//child's file name matches the file name pattern
//now we try to get the lock and process
if (log.isDebugEnabled()) {
log.debug("Matching file : " + child.getName().getBaseName());
}
if ((!entry.isFileLockingEnabled()
|| (entry.isFileLockingEnabled() && VFSUtils.acquireLock(fsManager, child)))
&& !isFailedRecord){
//process the file
try {
if (log.isDebugEnabled()) {
log.debug("Processing file :" + child);
}
processFile(entry, child);
successCount++;
// tell moveOrDeleteAfterProcessing() file was success
entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
metrics.incrementMessagesReceived();
} catch (Exception e) {
logException("Error processing File URI : " +
VFSUtils.maskURLPassword(child.getName().getURI()), e);
failCount++;
// tell moveOrDeleteAfterProcessing() file failed
entry.setLastPollState(PollTableEntry.FAILED);
metrics.incrementFaultsReceiving();
}
try {
moveOrDeleteAfterProcessing(entry, child);
} catch (AxisFault axisFault) {
logException("File object '" + child.getURL().toString() +
"'cloud not be moved", axisFault);
failCount++;
entry.setLastPollState(PollTableEntry.FAILED);
String timeStamp =
VFSUtils.getSystemTime(entry.getFailedRecordTimestampFormat());
addFailedRecord(entry, child, timeStamp);
} finally {
// if there is a failure or not we'll try to release the lock
if (entry.isFileLockingEnabled()) {
VFSUtils.releaseLock(fsManager, child);
}
}
}
} else if (entry.getFileNamePattern()!= null &&
!child.getName().getBaseName().matches(entry.getFileNamePattern())){
//child's file name does not match the file name pattern
if (log.isDebugEnabled()) {
log.debug("Non-Matching file : " + child.getName().getBaseName());
}
} else if(isFailedRecord){
//it is a failed record
if (entry.isFileLockingEnabled()) {
VFSUtils.releaseLock(fsManager, child);
VFSUtils.releaseLock(fsManager, fileObject);
}
if (fsManager.resolveFile(child.getURL().toString()) != null &&
removeTaskState == STATE_STOPPED && entry.getMoveAfterMoveFailure() != null) {
workerPool.execute(new FileRemoveTask(entry, child));
}
if (log.isDebugEnabled()) {
log.debug("File '" + fileObject.getURL() +
"' has been marked as a failed record, it will not " +
"process");
}
}
}
if (failCount == 0 && successCount > 0) {
entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
} else if (successCount == 0 && failCount > 0) {
entry.setLastPollState(PollTableEntry.FAILED);
} else {
entry.setLastPollState(PollTableEntry.WITH_ERRORS);
}
}
// processing of this poll table entry is complete
long now = System.currentTimeMillis();
entry.setLastPollTime(now);
entry.setNextPollTime(now + entry.getPollInterval());
} else if (log.isDebugEnabled()) {
log.debug("Unable to access or read file or directory : " + VFSUtils.maskURLPassword(fileURI)+ "." +
" Reason: " + (fileObject.exists()? (fileObject.isReadable()? "Unknown reason":"The file can not be read!"):
"The file does not exists!"));
}
onPollCompletion(entry);
} catch (FileSystemException e) {
processFailure("Error checking for existence and readability : " + VFSUtils.maskURLPassword(fileURI), e, entry);
}
}