in src/WebJobs.Extensions/Extensions/Files/Listener/FileProcessor.cs [143:213]
public virtual async Task<bool> ProcessFileAsync(FileSystemEventArgs eventArgs, CancellationToken cancellationToken)
{
try
{
StatusFileEntry status = null;
string filePath = eventArgs.FullPath;
using (StreamWriter statusWriter = AcquireStatusFileLock(filePath, eventArgs.ChangeType, out status))
{
if (statusWriter == null)
{
return false;
}
// We've acquired the lock. The current status might be either Failed
// or Processing (if processing failed before we were unable to update
// the file status to Failed)
int processCount = 0;
if (status != null)
{
processCount = status.ProcessCount;
}
while (processCount++ < MaxProcessCount)
{
FunctionResult result = null;
if (result != null)
{
TimeSpan delay = GetRetryInterval(result, processCount);
await Task.Delay(delay);
}
// write an entry indicating the file is being processed
status = new StatusFileEntry
{
State = ProcessingState.Processing,
Timestamp = DateTime.Now,
LastWrite = File.GetLastWriteTimeUtc(filePath),
ChangeType = eventArgs.ChangeType,
InstanceId = InstanceId,
ProcessCount = processCount
};
_serializer.Serialize(statusWriter, status);
statusWriter.WriteLine();
// invoke the job function
TriggeredFunctionData input = new TriggeredFunctionData
{
TriggerValue = eventArgs
};
result = await _executor.TryExecuteAsync(input, cancellationToken);
// write a status entry indicating the state of processing
status.State = result.Succeeded ? ProcessingState.Processed : ProcessingState.Failed;
status.Timestamp = DateTime.Now;
_serializer.Serialize(statusWriter, status);
statusWriter.WriteLine();
if (result.Succeeded)
{
return true;
}
}
return false;
}
}
catch
{
return false;
}
}