in Services/DataX.Flow/DataX.Flow.DeleteHelper/ConfigDeleter.cs [82:236]
public async Task<ApiResult> DeleteFlow(JObject jObject)
{
var diag = jObject.ToObject<InteractiveQueryObject>();
ConfigName = diag.Name;
bool errorExists = false;
var response = await _engineEnvironment.GetEnvironmentVariables();
if (response.Error.HasValue && response.Error.Value)
{
_logger.LogError(response.Message);
return ApiResult.CreateError(response.Message);
}
///Delete consumer group
_logger.LogInformation($"For FlowId: {ConfigName} Deleting flow specific consumer group.. ");
var inputEventhubConnection = Helper.GetSecretFromKeyvaultIfNeeded(diag.EventhubConnectionString);
_inputEventhubConnectionStringRef = Helper.IsKeyVault(diag.EventhubConnectionString) ? diag.EventhubConnectionString : Helper.GenerateNewSecret(_keySecretList, _engineEnvironment.EngineFlowConfig.SparkKeyVaultName, ConfigName + "-input-eventhubconnectionstring", _engineEnvironment.EngineFlowConfig.SparkType, diag.EventhubConnectionString, false);
diag.EventhubConnectionString = _inputEventhubConnectionStringRef;
if (diag.InputType == Constants.InputType_EventHub && !string.IsNullOrEmpty(inputEventhubConnection))
{
var ehName = Helper.ParseEventHub(inputEventhubConnection);
_eventHubNamespace = Helper.ParseEventHubNamespace(inputEventhubConnection);
_eventHubNameRole = Helper.ParseEventHubPolicyName(inputEventhubConnection);
_eventHubPrimaryKeyListener = Helper.ParseEventHubAccessKey(inputEventhubConnection);
if (string.IsNullOrWhiteSpace(ehName) || string.IsNullOrWhiteSpace(_eventHubNamespace) || string.IsNullOrWhiteSpace(_eventHubNameRole) || string.IsNullOrWhiteSpace(_eventHubPrimaryKeyListener))
{
string error = "The connection string for Event Hub input type must contain Endpoint, SharedAccessKeyName, SharedAccessKey, and EntityPath";
_logger.LogError(error);
errorExists = true;
}
_eventHubNames = new List<string>() { ehName };
}
else if (diag.InputType == Constants.InputType_IoTHub && !string.IsNullOrEmpty(diag.EventhubNames) && !string.IsNullOrEmpty(inputEventhubConnection))
{
_eventHubNames = Helper.ParseEventHubNames(diag.EventhubNames);
_eventHubNamespace = Helper.ParseEventHubNamespace(inputEventhubConnection);
_eventHubNameRole = Helper.ParseEventHubPolicyName(inputEventhubConnection);
_eventHubPrimaryKeyListener = Helper.ParseEventHubAccessKey(inputEventhubConnection);
if (_eventHubNames.Count < 1)
{
string error = "The event hub-compatible name for IoT Hub input type must be defined";
_logger.LogError(error);
errorExists = true;
}
if (string.IsNullOrWhiteSpace(_eventHubNamespace) || string.IsNullOrWhiteSpace(_eventHubNameRole) || string.IsNullOrWhiteSpace(_eventHubPrimaryKeyListener))
{
string error = "The event hub-compatible endpoint for IoT Hub input type must contain Endpoint, SharedAccessKeyName, and SharedAccessKey";
_logger.LogError(error);
errorExists = true;
}
}
// ResourceCreation is one of the environment variables.
// If you don't want to create resource, you can set this to false.
if (_engineEnvironment.ResourceCreation && _eventHubNames != null && (diag.InputType == Constants.InputType_EventHub || diag.InputType == Constants.InputType_IoTHub))
{
var inputSubscriptionId = string.IsNullOrEmpty(diag.InputSubscriptionId) ? Helper.GetSecretFromKeyvaultIfNeeded(_engineEnvironment.EngineFlowConfig.SubscriptionId) : Helper.GetSecretFromKeyvaultIfNeeded(diag.InputSubscriptionId);
var inputResourceGroup = string.IsNullOrEmpty(diag.InputResourceGroup) ? _engineEnvironment.EngineFlowConfig.EventHubResourceGroupName : Helper.GetSecretFromKeyvaultIfNeeded(diag.InputResourceGroup);
foreach (string ehName in _eventHubNames)
{
var result = EventHub.DeleteConsumerGroup(inputSubscriptionId, _engineEnvironment.EngineFlowConfig.ServiceKeyVaultName, inputResourceGroup, _engineEnvironment.EngineFlowConfig.EventHubResourceGroupLocation, _eventHubNamespace, ehName, ConsumerGroupName, diag.InputType, _engineEnvironment.EngineFlowConfig.ConfiggenClientId, _engineEnvironment.EngineFlowConfig.ConfiggenTenantId, _engineEnvironment.EngineFlowConfig.ConfiggenSecretPrefix);
if (result.Error.HasValue && result.Error.Value)
{
_logger.LogError(result.Message);
errorExists = true;
}
else
{
_logger.LogInformation($"For FlowId: {ConfigName} Successfully deleted flow specific consumer group");
}
}
}
///Delete cosmosDB document related to a flow
response = await CosmosDB.DeleteConfigFromDocumentDB(_engineEnvironment.CosmosDBDatabaseName, _engineEnvironment.CosmosDBEndPoint, _engineEnvironment.CosmosDBUserName, _engineEnvironment.CosmosDBPassword, "flows", ConfigName);
if (response.Error.HasValue && response.Error.Value)
{
_logger.LogError(response.Message);
errorExists = true;
}
else
{
_logger.LogInformation($"For FlowId: {ConfigName} Successfully Deleted flow specific cosmosDB entry");
}
///Delete configs stored in blobs
// ruleDefinitions
response = await BlobHelper.DeleteBlob(_engineEnvironment.FlowBlobConnectionString, Path.Combine(RuleDefinitionPath, RuleDefinitionFileName));
if (response.Error.HasValue && response.Error.Value)
{
_logger.LogError(response.Message);
errorExists = true;
}
else
{
_logger.LogInformation($"For FlowId: {ConfigName} Successfully Deleted flow specific rules definition blob");
}
// outputTemplates
response = await BlobHelper.DeleteBlob(_engineEnvironment.FlowBlobConnectionString, Path.Combine(OutputTemplatePath, OutputTemplateFileName));
if (response.Error.HasValue && response.Error.Value)
{
_logger.LogError(response.Message);
errorExists = true;
}
else
{
_logger.LogInformation($"For FlowId: {ConfigName} Successfully Deleted flow specific output template blob");
}
string resourceGroupLocation = _engineEnvironment.EngineFlowConfig.ResourceGroupLocation;
string resourceGroupName = _engineEnvironment.EngineFlowConfig.ResourceGroupName;
string storageAccountName = _engineEnvironment.EngineFlowConfig.StorageAccountName;
string containerPath = Path.Combine(_flowContainerName, _engineEnvironment.EngineFlowConfig.EnvironmentType, ConfigName);
string subscriptionId = Helper.GetSecretFromKeyvaultIfNeeded(_engineEnvironment.EngineFlowConfig.SubscriptionId);
BlobStorage.DeleteAllConfigsFromBlobStorage(subscriptionId, _engineEnvironment.EngineFlowConfig.ServiceKeyVaultName, resourceGroupName, resourceGroupLocation, storageAccountName, _Centralprocessing, Path.Combine(_engineEnvironment.EngineFlowConfig.ContainerPath, ConfigName), _engineEnvironment.EngineFlowConfig.ConfiggenClientId, _engineEnvironment.EngineFlowConfig.ConfiggenTenantId, _engineEnvironment.EngineFlowConfig.ConfiggenSecretPrefix);
_logger.LogInformation($"For FlowId: {ConfigName} Successfully Deleted flow specific blobs under the folder {ConfigName} under container {_Centralprocessing}");
BlobStorage.DeleteAllConfigsFromBlobStorage(subscriptionId, _engineEnvironment.EngineFlowConfig.ServiceKeyVaultName, resourceGroupName, resourceGroupLocation, storageAccountName, _Centralprocessing, Path.Combine(_engineEnvironment.EngineFlowConfig.ContainerPath, ConfigName), _engineEnvironment.EngineFlowConfig.ConfiggenClientId, _engineEnvironment.EngineFlowConfig.ConfiggenTenantId, _engineEnvironment.EngineFlowConfig.ConfiggenSecretPrefix);
BlobStorage.DeleteAllConfigsFromBlobStorage(subscriptionId, _engineEnvironment.EngineFlowConfig.ServiceKeyVaultName, resourceGroupName, resourceGroupLocation, storageAccountName, _flowContainerName, Path.Combine(_engineEnvironment.EngineFlowConfig.EnvironmentType, ConfigName), _engineEnvironment.EngineFlowConfig.ConfiggenClientId, _engineEnvironment.EngineFlowConfig.ConfiggenTenantId, _engineEnvironment.EngineFlowConfig.ConfiggenSecretPrefix);
_logger.LogInformation($"For FlowId: {ConfigName} Successfully Deleted flow specific productconfig: {ProductConfigName} and {JobConfigName} for {ConfigName}.");
/// Delete sample data and the checkpoints folder if it exists for that flow
var hashValue = Helper.GetHashCode(diag.UserName);
await BlobHelper.DeleteBlob(_engineEnvironment.OpsBlobConnectionString, Path.Combine(_engineEnvironment.OpsSamplePath, $"{ConfigName}-{hashValue}.json"));
_logger.LogInformation($"For FlowId: {ConfigName} Successfully Deleted flow specific sampledata file: {ConfigName}-{ hashValue}.json");
await BlobHelper.DeleteAllBlobsInAContainer(_engineEnvironment.OpsBlobConnectionString, $"{_engineEnvironment.CheckPointContainerNameHelper(ConfigName)}-checkpoints", _engineEnvironment.EngineFlowConfig.OpsBlobDirectory);
_logger.LogInformation($"For FlowId: {ConfigName} Successfully Deleted flow specific checkpoints for {ConfigName}.");
_logger.LogInformation("Deleting flow specific secrets..");
///Delete secrets specific to a flow from KeyVault
KeyVault.GetSecretsAndDeleteFromKeyvault(_engineEnvironment.EngineFlowConfig.SparkKeyVaultName, ConfigName);
_logger.LogInformation($"For FlowId: {ConfigName} Successfully Deleted flow specific secrets");
if (!errorExists)
{
return ApiResult.CreateSuccess("Deleted!");
}
else
{
return ApiResult.CreateError("Deleted but with some error. Please check logs for details");
}
}