in DIXFSamples/RecurringIntegrationApp/Runtime/DefaultDataFlowNetwork.cs [206:289]
private async Task<bool> processInprocessQueue()
{
if (!InprocessQueue.IsEmpty)
{
ClientDataMessage dataMessage;
// For each document in the in process queue, read async and
// submit to the AX7 endpoint to enqueue.
while (InprocessQueue.TryDequeue(out dataMessage))
{
IDataSource<ClientDataMessage> sourceDataSource =
DataSourceFactory.GetDataSourceForMessage(dataMessage);
Stream sourceStream = sourceDataSource.Read(dataMessage);
if (sourceStream != null)
{
try
{
sourceStream.Seek(0, SeekOrigin.Begin);
var httpClientHelper = new HttpClientHelper();
string correlationId = dataMessage.Name;
Uri enqueueUri = httpClientHelper.GetEnqueueUri();
base.formInstance.logText("Enqueueing job: " + dataMessage.Name + ". File size: " + sourceStream.Length + " bytes.");
// Post Enqueue request
var response = await httpClientHelper.SendPostRequestAsync(enqueueUri, sourceStream, correlationId);
if (response.IsSuccessStatusCode)
{
// Log success and add to Enqueued jobs for further processing
var messageId = await response.Content.ReadAsStringAsync();
// Log enqueue success
base.formInstance.logText("File: " + dataMessage.Name + " - enqueued successfully.");
base.formInstance.logText("Message identifier for: " + dataMessage.Name + " - is: " + messageId);
// Queue for futher status processing
EnqueuedJobs.TryAdd(messageId, new ClientDataMessage(dataMessage.FullPath, MessageStatus.Enqueued));
}
else
{
// Enqueue failed. Move message to error location.
base.formInstance.logText("Enqueue failed for file: " + dataMessage.Name);
base.formInstance.logText("Failure response: Status: " + response.StatusCode + ", Reason: " + response.ReasonPhrase);
var targetDataMessage = new ClientDataMessage()
{
Name = dataMessage.Name,
FullPath = Path.Combine(Settings.ErrorDir, dataMessage.Name),
MessageStatus = MessageStatus.Failed
};
// Move data to error location
await this.moveDataToTargetAsync(dataMessage, targetDataMessage);
// Enqueue failure
await this.updateStatsAsync(null, StatType.Failure, targetDataMessage, response);
}
}
catch (Exception _ex)
{
base.formInstance.logText("Failure processing file: " + dataMessage.Name + ".Exception : " + _ex.Message);
}
finally
{
if (sourceStream != null)
{
sourceStream.Close();
sourceStream.Dispose();
}
}
}
}
}
return true;
}