in Services/DataX.Config/DataX.Config.Input.EventHub/Processor/CreateEventHubConsumerGroup.cs [94:196]
public override async Task<string> Process(FlowDeploymentSession flowToDeploy)
{
var flowConfig = flowToDeploy.Config;
var config = flowConfig?.GetGuiConfig()?.Input;
var inputType = config?.InputType?.ToLowerInvariant();
if (inputType == null)
{
return "eventhub/iothub input not defined, skipped";
}
if (inputType != Constants.InputType_EventHub && inputType != Constants.InputType_IoTHub && inputType != Constants.InputType_KafkaEventHub && inputType != Constants.InputType_Kafka && inputType != Constants.InputType_Blob)
{
return $"unsupported inputtype '{inputType}', skipped.";
}
var props = config.Properties;
Ensure.NotNull(props, "flowConfig.Gui.Input.Properties");
var connectionString = props.InputEventhubConnection;
flowToDeploy.SetStringToken(TokenName_InputEventHubConnectionString, connectionString);
var resolvedConnectionString = await KeyVaultClient.ResolveSecretUriAsync(connectionString);
var hubInfo = ConnectionStringParser.ParseEventHub(resolvedConnectionString);
if (inputType == Constants.InputType_Kafka || inputType == Constants.InputType_KafkaEventHub)
{
hubInfo.Name = this.NormalizeEventNames(config.Properties.InputEventHubName);
}
var consumerGroupName = flowConfig.Name;
// Create consumer group only if the resource creation flag is set. This is to support scenario where EventHub
// is in a different subscription than where services are deployed
if (Configuration[Constants.ConfigSettingName_ResourceCreation].ToLower(CultureInfo.InvariantCulture) == "true" && (inputType == Constants.InputType_EventHub || inputType == Constants.InputType_IoTHub))
{
var serviceKeyVaultName = Configuration[Constants.ConfigSettingName_ServiceKeyVaultName];
Ensure.NotNull(serviceKeyVaultName, "serviceKeyVaultName");
var resolvedSecretKey = await KeyVaultClient.GetSecretFromKeyVaultAsync(serviceKeyVaultName, Configuration[Constants.ConfigSettingName_SecretPrefix] + "clientsecret");
var clientId = Configuration[Constants.ConfigSettingName_ConfigGenClientId];
var tenantId = Configuration[Constants.ConfigSettingName_ConfigGenTenantId];
var inputSubscriptionId = string.IsNullOrEmpty(config?.Properties?.InputSubscriptionId) ? await KeyVaultClient.ResolveSecretUriAsync(flowToDeploy.GetTokenString(TokenName_InputEventHubSubscriptionId)) : await KeyVaultClient.ResolveSecretUriAsync(config?.Properties?.InputSubscriptionId);
var inputResourceGroupName = string.IsNullOrEmpty(config?.Properties?.InputResourceGroup) ? flowToDeploy.GetTokenString(TokenName_InputEventHubResourceGroupName) : await KeyVaultClient.ResolveSecretUriAsync(config?.Properties?.InputResourceGroup);
Result result = null;
switch (inputType)
{
case Constants.InputType_EventHub:
case Constants.InputType_KafkaEventHub:
//Check for required parameters
if (string.IsNullOrEmpty(hubInfo.Namespace) || string.IsNullOrEmpty(hubInfo.Name))
{
throw new ConfigGenerationException("Could not parse Event Hub connection string; please check input.");
}
result = await EventHubUtil.CreateEventHubConsumerGroups(
clientId: clientId,
tenantId: tenantId,
secretKey: resolvedSecretKey,
subscriptionId: inputSubscriptionId,
resourceGroupName: inputResourceGroupName,
hubNamespace: hubInfo.Namespace,
hubNames: hubInfo.Name,
consumerGroupName: consumerGroupName);
break;
case Constants.InputType_IoTHub:
//Check for required parameters
if (string.IsNullOrEmpty(hubInfo.Name))
{
throw new ConfigGenerationException("Could not parse IoT Hub connection string; please check input.");
}
result = await EventHubUtil.CreateIotHubConsumerGroup(
clientId: clientId,
tenantId: tenantId,
secretKey: resolvedSecretKey,
subscriptionId: inputSubscriptionId,
resourceGroupName: inputResourceGroupName,
hubName: hubInfo.Name,
consumerGroupName: consumerGroupName);
break;
default:
throw new ConfigGenerationException($"unexpected inputtype '{inputType}'.");
}
Ensure.IsSuccessResult(result);
}
flowToDeploy.SetStringToken(TokenName_InputEventHubConsumerGroup, consumerGroupName);
flowToDeploy.SetStringToken(TokenName_InputEventHubs, hubInfo.Name);
var sparkType = Configuration.TryGet(Constants.ConfigSettingName_SparkType, out string value) ? value : null;
var checkpointDir = (sparkType == Constants.SparkTypeDataBricks) ? Configuration.GetOrDefault(ConfigSettingName_InputEventHubCheckpointDir, "dbfs:/mycluster/datax/direct/${name}/") : Configuration.GetOrDefault(ConfigSettingName_InputEventHubCheckpointDir, "hdfs://mycluster/datax/direct/${name}/");
flowToDeploy.SetStringToken(TokenName_InputEventHubCheckpointDir, checkpointDir);
var intervalInSeconds = props?.WindowDuration;
flowToDeploy.SetStringToken(TokenName_InputEventHubCheckpointInterval, intervalInSeconds);
flowToDeploy.SetObjectToken(TokenName_InputEventHubMaxRate, props.MaxRate);
var flushCheckpointsString = Configuration.GetOrDefault(ConfigSettingName_InputEventHubFlushExistingCheckpoints, "False");
bool.TryParse(flushCheckpointsString, out bool flushExistingCheckpoints);
flowToDeploy.SetObjectToken(TokenName_InputEventHubFlushExistingCheckpoints, flushExistingCheckpoints);
return "done";
}