src/ApiForFhirMigrationTool.Function/MigrationOrchestrator.cs (188 lines of code) (raw):

// ------------------------------------------------------------------------------------------------- // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. // ------------------------------------------------------------------------------------------------- using ApiForFhirMigrationTool.Function.Configuration; using ApiForFhirMigrationTool.Function.FhirOperation; using ApiForFhirMigrationTool.Function.Models; using ApiForFhirMigrationTool.Function.OrchestrationHelper; using ApiForFhirMigrationTool.Function.Processors; using Azure; using Azure.Data.Tables; using Microsoft.ApplicationInsights; using Microsoft.Azure.Functions.Worker; using Microsoft.Azure.Functions.Worker.Http; using Microsoft.DurableTask; using Microsoft.DurableTask.Client; using Microsoft.Extensions.Logging; namespace ApiForFhirMigrationTool.Function { public class MigrationOrchestrator { private readonly MigrationOptions _options; private readonly ILogger _logger; private readonly IOrchestrationHelper _orchestrationHelper; private readonly IAzureTableClientFactory _azureTableClientFactory; private readonly IFhirProcessor _exportProcessor; private readonly IFhirClient _fhirClient; private readonly TelemetryClient _telemetryClient; private readonly IMetadataStore _azureTableMetadataStore; public MigrationOrchestrator(MigrationOptions options, ILoggerFactory loggerFactory, IOrchestrationHelper orchestrationHelper, IMetadataStore azureTableMetadataStore, IAzureTableClientFactory azureTableClientFactory, IFhirProcessor exportProcessor, IFhirClient fhirClient, TelemetryClient telemetryClient) { _options = options; _logger = loggerFactory.CreateLogger<MigrationOrchestrator>(); _orchestrationHelper = orchestrationHelper; _azureTableClientFactory = azureTableClientFactory; _exportProcessor = exportProcessor; _fhirClient = fhirClient; _telemetryClient = telemetryClient; _azureTableMetadataStore = azureTableMetadataStore; } [Function(nameof(MigrationOrchestration))] public async Task<List<string>> MigrationOrchestration( [OrchestrationTrigger] TaskOrchestrationContext context) { ILogger logger = context.CreateReplaySafeLogger(nameof(MigrationOrchestration)); var outputs = new List<string>(); try { bool shouldRun = true; bool continueRun = true; if (_options.StopDm) { if (_options.StartTime < 0 || _options.StartTime > 23 || _options.EndTime < 0 || _options.EndTime > 23) { throw new ArgumentOutOfRangeException("StartTime and EndTime should be between 0 and 23."); } logger.LogInformation("Data Migration Tool will pause during business hours."); var currentTime = DateTime.UtcNow; var startHour = new TimeSpan((_options.StartTime == 0 ? 23 : _options.StartTime - 1), 0, 0); var endHour = new TimeSpan(_options.EndTime, 30, 0); logger.LogInformation($" Current time : ({currentTime}), startHour :({startHour}), endHour :({endHour})"); bool isWithinSkipWindow = startHour < endHour ? (currentTime.TimeOfDay > startHour && currentTime.TimeOfDay < endHour) : (currentTime.TimeOfDay > startHour || currentTime.TimeOfDay < endHour); if (isWithinSkipWindow) { shouldRun = false; logger.LogInformation("Execution skipped: Current time is within the restricted window"); } } _options.ValidateConfig(); logger.LogInformation("Creating table client"); TableClient chunktableClient = _azureTableClientFactory.Create(_options.ChunkTableName); logger.LogInformation("Table client created successfully."); if (_options.IsParallel == true) { Pageable<TableEntity> jobList = chunktableClient.Query<TableEntity>(); if (jobList.Count() <= 0) { var tableEntity = new TableEntity(_options.PartitionKey, _options.RowKey) { { "JobId", 0 }, {"SurfaceJobId",0 }, {"DeepJobId",0 }, {"ImportId",0 }, {"SearchParameterMigration", false } }; logger.LogInformation("Starting update of the chunk table."); _azureTableMetadataStore.AddEntity(chunktableClient, tableEntity); logger.LogInformation("Completed update of the chunk table."); } } else { Pageable<TableEntity> jobList = chunktableClient.Query<TableEntity>(); if (jobList.Count() <= 0) { var tableEntity = new TableEntity(_options.PartitionKey, _options.RowKey) { { "JobId", 0 }, {"SurfaceJobId",0 }, {"DeepJobId",0 }, { "globalSinceExportType", "" }, { "globalTillExportType", "" }, { "noOfResources", _options.ResourceTypes?.Count() }, { "resourceTypeIndex", 0 }, { "multiExport", "" }, {"ImportId",0 }, {"SearchParameterMigration", false } }; logger.LogInformation("Starting update of the chunk table."); _azureTableMetadataStore.AddEntity(chunktableClient, tableEntity); logger.LogInformation("Completed update of the chunk table."); } } TableEntity qEntity = _azureTableMetadataStore.GetEntity(chunktableClient, _options.PartitionKey, _options.RowKey); var since = _options.IsParallel == true ? (string)qEntity["since"] : (string)qEntity["globalSinceExportType"]; if (_options.SpecificRun && !string.IsNullOrEmpty(since)) { logger.LogInformation("Data Migration Tool checking for specific time range."); var currentTime = DateTime.UtcNow; var startDate = _options.StartDate; var endDate = _options.EndDate; logger.LogInformation($" Current time : ({currentTime}), startDate :({startDate}), endDate :({endDate})"); if (endDate <= DateTime.Parse(since)) { continueRun = false; logger.LogInformation("Execution skipped: Specific time range date is reached"); } } if (continueRun) { var options = TaskOptions.FromRetryPolicy(new RetryPolicy( maxNumberOfAttempts: 3, firstRetryInterval: TimeSpan.FromSeconds(5))); if (shouldRun) { logger.LogInformation("Start MigrationOrchestration."); logger.LogInformation("Starting SearchParameter migration activities."); // Run sub orchestration for search parameter var searchParameter = await context.CallSubOrchestratorAsync<string>("SearchParameterOrchestration", options: options); logger.LogInformation("SearchParameter migration activities ended"); //Run sub orchestration for export and export status logger.LogInformation("Starting Export migration activities."); var exportContent = await context.CallSubOrchestratorAsync<string>("ExportOrchestration", options: options); logger.LogInformation("Export migration activities ended."); logger.LogInformation("Starting Export Status activities"); var exportStatusContent = await context.CallSubOrchestratorAsync<string>("ExportStatusOrchestration", options: options); logger.LogInformation("Export Status activities ended."); // Run sub orchestration for Import and Import status logger.LogInformation("Starting Import migration activities."); var import = await context.CallSubOrchestratorAsync<string>("ImportOrchestration", options: options); logger.LogInformation("Import migration activities ended."); logger.LogInformation("Starting Import Status activities."); var importStatus = await context.CallSubOrchestratorAsync<string>("ImportStatusOrchestration", options: options); logger.LogInformation("Import Status activities ended."); } else if (_options.ContinueLastImportDuringPause) { //Only run export status and import activity to process any completed export after the tool stopped. //Run sub orchestration for export status logger.LogInformation("Starting Export Status activities"); var exportStatusContent = await context.CallSubOrchestratorAsync<string>("ExportStatusOrchestration", options: options); logger.LogInformation("Export Status activities ended."); // Run sub orchestration for Import and Import status logger.LogInformation("Starting Import migration activities."); var import = await context.CallSubOrchestratorAsync<string>("ImportOrchestration", options: options); logger.LogInformation("Import migration activities ended."); logger.LogInformation("Starting Import Status activities."); var importStatus = await context.CallSubOrchestratorAsync<string>("ImportStatusOrchestration", options: options); logger.LogInformation("Import Status activities ended."); } } } catch (Exception ex) { logger.LogError($"Error occurred during migration process: {ex.Message}"); } return outputs; } [Function("TimerOrchestration")] public async Task Run( [TimerTrigger("0 */5 * * * *")] TimerInfo myTimer, [DurableClient] DurableTaskClient client, FunctionContext executionContext) { string instanceId_new = "FhirMigrationTool"; StartOrchestrationOptions options = new StartOrchestrationOptions(instanceId_new); try { var instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(MigrationOrchestration), options); _logger.LogInformation("Started: Timed {instanceId}...", instanceId); } catch (Exception ex) { _logger.LogInformation($"Error in starting instance due to {ex.Message}"); } } } }