private async Task OnTick()

in src/Serilog.Sinks.AzureDataExplorer/Durable/LogShipper.cs [148:320]


        private async Task OnTick()
        {
            try
            {
                int count;
                do
                {
                    count = 0;
                    using (var bookmarkFile = m_fileSet.OpenBookmarkFile())
                    {
                        var position = bookmarkFile.TryReadBookmark();
                        var files = m_fileSet.GetBufferFiles();

                        if (position.File == null || !IOFile.Exists(position.File))
                        {
                            position = new FileSetPosition(0, files.FirstOrDefault());
                        }

                        TPayload payload;
                        if (position.File == null)
                        {
                            payload = m_payloadReader.GetNoPayload();
                            count = 0;
                        }
                        else
                        {
                            payload = m_payloadReader.ReadPayload(m_batchPostingLimit, m_eventBodyLimitBytes, ref position, ref count, position.File);
                        }

                        var stopWatch = Stopwatch.StartNew();
                        var fileIdentifier = Guid.NewGuid();

                        if (count > 0 || m_controlledSwitch.IsActive && m_nextRequiredLevelCheckUtc < DateTime.UtcNow)
                        {
                            for (int retry = 1; retry <= m_options.IngestionRetries; ++retry)
                            {
                                IKustoIngestionResult result;
                                m_nextRequiredLevelCheckUtc = DateTime.UtcNow.Add(RequiredLevelCheckInterval);
                                using var dataStream = CreateStreamFromLogEvents(payload);
                                try
                                {
                                    result = await m_ingestClient.IngestFromStreamAsync(
                                            dataStream,
                                            new KustoQueuedIngestionProperties(m_databaseName, m_tableName)
                                            {
                                                DatabaseName = m_databaseName,
                                                TableName = m_tableName,
                                                FlushImmediately = m_flushImmediately,
                                                Format = DataSourceFormat.multijson,
                                                IngestionMapping = m_ingestionMapping,
                                                ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
                                                ReportMethod = IngestionReportMethod.Table
                                            },
                                            new StreamSourceOptions
                                            {
                                                LeaveOpen = false,
                                                CompressionType = DataSourceCompressionType.GZip,
                                                SourceId = fileIdentifier
                                            }).ConfigureAwait(false);
                                    var ingestionStatus = result.GetIngestionStatusBySourceId(fileIdentifier);
                                    while (true) //loop until the record is updated or we timeout
                                    {
                                        // check if the record is updated
                                        if (ingestionStatus.Status != Status.Pending)
                                        {
                                            break; // the record is updated, so we can exit the loop!
                                        }

                                        // check if we have exceeded our timeout
                                        if (stopWatch.Elapsed > Timeout)
                                        {
                                            break; // break loop if we timed out
                                        }

                                        // the record isn't updated & we haven't timed out, so the 
                                        // loop will repeat. we're worried about querying the DB 
                                        // too often, so we add a delay. this will work a lot like 
                                        // a timer, but it is async and avoids reentrancy issues.
                                        await Task.Delay(TimeBetweenChecks);
                                        ingestionStatus = result.GetIngestionStatusBySourceId(fileIdentifier);
                                    }

                                    if (ingestionStatus.Status == Status.Succeeded)
                                    {
                                        m_connectionSchedule.MarkSuccess();
                                        bookmarkFile.WriteBookmark(position);
                                    }
                                    else
                                    {
                                        m_connectionSchedule.MarkFailure();
                                        if (m_bufferSizeLimitBytes.HasValue)
                                            m_fileSet.CleanUpBufferFiles(m_bufferSizeLimitBytes.Value, 2);

                                        break;
                                    }

                                }
                                catch (KustoClientApplicationAuthenticationException ex)
                                {
                                    if (m_options.FailOnError)
                                    {
                                        SelfLog.WriteLine("Auth failure on  Kusto sink due to authentication error (EmitBatchAsync). Please check your credentials", ex);
                                        SelfLog.WriteLine("FailOnError is set to true, the exception will be thrown to the caller to handle the failure");
                                        throw new LoggingFailedException($"Auth failure on  Kusto sink due to authentication error (EmitBatchAsync). Please check your credentials.{ex.Message}");
                                    }
                                    SelfLog.WriteLine(" sink due to authentication error (EmitBatchAsync). Please check your credentials.", ex);

                                }
                                catch (IngestClientException ex)
                                {
                                    if (ex.IsPermanent)
                                    { // <- no retry
                                        if (m_options.FailOnError)
                                        {
                                            SelfLog.WriteLine("Permanent ingestion failure ingesting to Kusto.FailOnError is set to true, the exception will be thrown to the caller to handle the failure", ex);
                                            throw new LoggingFailedException($"Permanent ingestion failure ingesting to Kusto.{ex.Message}");
                                        }
                                        // Since the failure is permanent, we can't retry, so we'll just log the error and continue.
                                        SelfLog.WriteLine("Permanent ingestion failure ingesting to Kusto.Since FailOnError is not set, the batch will be dropped", ex);
                                        break; // no more retries on unwanted exception
                                    }
                                    else
                                    {
                                        SelfLog.WriteLine($"Temporary failures writing to Kusto (Retry attempt {retry} of {m_options.IngestionRetries})", ex);
                                    }
                                }
                            }
                        }
                        else if (position.File == null)
                        {
                            break;
                        }
                        else
                        {
                            // For whatever reason, there's nothing waiting to send. This means we should try connecting again at the
                            // regular interval, so mark the attempt as successful.
                            m_connectionSchedule.MarkSuccess();

                            // Only advance the bookmark if no other process has the
                            // current file locked, and its length is as we found it.
                            if (files.Length == 2 && files.First() == position.File &&
                                FileIsUnlockedAndUnextended(position))
                            {
                                bookmarkFile.WriteBookmark(new FileSetPosition(0, files[1]));
                            }

                            if (files.Length > 2)
                            {
                                // By this point, we expect writers to have relinquished locks
                                // on the oldest file.
                                IOFile.Delete(files[0]);
                            }
                        }
                    }
                } while (count == m_batchPostingLimit);
            }
            catch (Exception ex)
            {
                m_connectionSchedule.MarkFailure();
                SelfLog.WriteLine("Exception while emitting periodic batch from {0}: {1}", this, ex);

                if (m_bufferSizeLimitBytes.HasValue)
                    m_fileSet.CleanUpBufferFiles(m_bufferSizeLimitBytes.Value, 2);
            }
            finally
            {
                lock (m_stateLock)
                {
                    if (!m_unloading)
                        SetTimer();
                }
            }
        }