public override async Task Process()

in Services/DataX.Config/DataX.Config.Input.EventHub/Processor/CreateEventHubConsumerGroup.cs [94:196]


        public override async Task<string> Process(FlowDeploymentSession flowToDeploy)
        {
            var flowConfig = flowToDeploy.Config;
            var config = flowConfig?.GetGuiConfig()?.Input;

            var inputType = config?.InputType?.ToLowerInvariant();
            if (inputType == null)
            {
                return "eventhub/iothub input not defined, skipped";
            }

            if (inputType != Constants.InputType_EventHub && inputType != Constants.InputType_IoTHub && inputType != Constants.InputType_KafkaEventHub && inputType != Constants.InputType_Kafka && inputType != Constants.InputType_Blob)   
            {
                return $"unsupported inputtype '{inputType}', skipped.";
            }

            var props = config.Properties;
            Ensure.NotNull(props, "flowConfig.Gui.Input.Properties");

            var connectionString = props.InputEventhubConnection;
            flowToDeploy.SetStringToken(TokenName_InputEventHubConnectionString, connectionString);

            var resolvedConnectionString = await KeyVaultClient.ResolveSecretUriAsync(connectionString);
            var hubInfo = ConnectionStringParser.ParseEventHub(resolvedConnectionString);

            if (inputType == Constants.InputType_Kafka || inputType == Constants.InputType_KafkaEventHub)
            {
                hubInfo.Name = this.NormalizeEventNames(config.Properties.InputEventHubName);
            }

            var consumerGroupName = flowConfig.Name;

            // Create consumer group only if the resource creation flag is set. This is to support scenario where EventHub
            // is in a different subscription than where services are deployed
            if (Configuration[Constants.ConfigSettingName_ResourceCreation].ToLower(CultureInfo.InvariantCulture) == "true" && (inputType == Constants.InputType_EventHub || inputType == Constants.InputType_IoTHub))
            {
                var serviceKeyVaultName = Configuration[Constants.ConfigSettingName_ServiceKeyVaultName];
                Ensure.NotNull(serviceKeyVaultName, "serviceKeyVaultName");
                var resolvedSecretKey = await KeyVaultClient.GetSecretFromKeyVaultAsync(serviceKeyVaultName, Configuration[Constants.ConfigSettingName_SecretPrefix] + "clientsecret");
                var clientId = Configuration[Constants.ConfigSettingName_ConfigGenClientId];
                var tenantId = Configuration[Constants.ConfigSettingName_ConfigGenTenantId];

                var inputSubscriptionId = string.IsNullOrEmpty(config?.Properties?.InputSubscriptionId) ? await KeyVaultClient.ResolveSecretUriAsync(flowToDeploy.GetTokenString(TokenName_InputEventHubSubscriptionId)) : await KeyVaultClient.ResolveSecretUriAsync(config?.Properties?.InputSubscriptionId);
                var inputResourceGroupName = string.IsNullOrEmpty(config?.Properties?.InputResourceGroup) ? flowToDeploy.GetTokenString(TokenName_InputEventHubResourceGroupName) : await KeyVaultClient.ResolveSecretUriAsync(config?.Properties?.InputResourceGroup);

                Result result = null;
                switch (inputType)
                {
                    case Constants.InputType_EventHub:
                    case Constants.InputType_KafkaEventHub:
                        //Check for required parameters
                        if (string.IsNullOrEmpty(hubInfo.Namespace) || string.IsNullOrEmpty(hubInfo.Name))
                        {
                            throw new ConfigGenerationException("Could not parse Event Hub connection string; please check input.");
                        }
                        result = await EventHubUtil.CreateEventHubConsumerGroups(
                                                        clientId: clientId,
                                                        tenantId: tenantId,
                                                        secretKey: resolvedSecretKey,
                                                        subscriptionId: inputSubscriptionId,
                                                        resourceGroupName: inputResourceGroupName,
                                                        hubNamespace: hubInfo.Namespace,
                                                        hubNames: hubInfo.Name,
                                                        consumerGroupName: consumerGroupName);
                        break;
                    case Constants.InputType_IoTHub:
                        //Check for required parameters
                        if (string.IsNullOrEmpty(hubInfo.Name))
                        {
                            throw new ConfigGenerationException("Could not parse IoT Hub connection string; please check input.");
                        }
                        result = await EventHubUtil.CreateIotHubConsumerGroup(
                                                        clientId: clientId,
                                                        tenantId: tenantId,
                                                        secretKey: resolvedSecretKey,
                                                        subscriptionId: inputSubscriptionId,
                                                        resourceGroupName: inputResourceGroupName,
                                                        hubName: hubInfo.Name,
                                                        consumerGroupName: consumerGroupName);
                        break;
                    default:
                        throw new ConfigGenerationException($"unexpected inputtype '{inputType}'.");
                }

                Ensure.IsSuccessResult(result);
            }

            flowToDeploy.SetStringToken(TokenName_InputEventHubConsumerGroup, consumerGroupName);
            flowToDeploy.SetStringToken(TokenName_InputEventHubs, hubInfo.Name);
            var sparkType = Configuration.TryGet(Constants.ConfigSettingName_SparkType, out string value) ? value : null;
            var checkpointDir = (sparkType == Constants.SparkTypeDataBricks) ? Configuration.GetOrDefault(ConfigSettingName_InputEventHubCheckpointDir, "dbfs:/mycluster/datax/direct/${name}/") : Configuration.GetOrDefault(ConfigSettingName_InputEventHubCheckpointDir, "hdfs://mycluster/datax/direct/${name}/");
            flowToDeploy.SetStringToken(TokenName_InputEventHubCheckpointDir, checkpointDir);

            var intervalInSeconds = props?.WindowDuration;
            flowToDeploy.SetStringToken(TokenName_InputEventHubCheckpointInterval, intervalInSeconds);
            flowToDeploy.SetObjectToken(TokenName_InputEventHubMaxRate, props.MaxRate);

            var flushCheckpointsString = Configuration.GetOrDefault(ConfigSettingName_InputEventHubFlushExistingCheckpoints, "False");
            bool.TryParse(flushCheckpointsString, out bool flushExistingCheckpoints);
            flowToDeploy.SetObjectToken(TokenName_InputEventHubFlushExistingCheckpoints, flushExistingCheckpoints);

            return "done";
        }