protected override async Task RunAsync()

in Services/DataX.SimulatedData/DataX.SimulatedData.DataGenService/DataGenService.cs [51:143]


        protected override async Task RunAsync(CancellationToken cancellationToken)
        {
            var configPackage = FabricRuntime.GetActivationContext().GetConfigurationPackageObject("Config");
            var inputConfig = configPackage.Settings.Sections["InputConfig"];

            KeyVaultManager keyManager = new KeyVaultManager();
            string keyVaultName = inputConfig.Parameters["KeyVaultName"].Value;
            string iotDeviceConnectionStringKeyVaultKeyName = inputConfig.Parameters["IotDeviceConnectionStringKeyVaultKeyName"].Value;
            string eventhubConnectionStringKeyVaultKeyName = inputConfig.Parameters["EventhubConnectionStringKeyVaultKeyName"].Value;
            string dataSchemaStorageAccountKeyValueKeyVaultKeyName = inputConfig.Parameters["DataSchemaStorageAccountKeyValueKeyVaultKeyName"].Value;
            string iotDeviceConnectionString = (iotDeviceConnectionStringKeyVaultKeyName.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, iotDeviceConnectionStringKeyVaultKeyName) : "";
            string ehConnectionString = (eventhubConnectionStringKeyVaultKeyName.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, eventhubConnectionStringKeyVaultKeyName) : "";
            string dataSchemaStorageAccountName = inputConfig.Parameters["DataSchemaStorageAccountName"].Value;
            string dataSchemaStorageAccountKeyValue = (dataSchemaStorageAccountKeyValueKeyVaultKeyName.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, dataSchemaStorageAccountKeyValueKeyVaultKeyName) : "";
            string dataSchemaStorageContainerName = inputConfig.Parameters["DataSchemaStorageContainerName"].Value;
            List<string> dataSchemaPathsWithinContainer = Array.ConvertAll(inputConfig.Parameters["DataSchemaPathWithinContainer"].Value.Split(','), p => p.Trim()).ToList();

            KafkaConnection kafkaEnabledEventHubConn = new KafkaConnection
            {
                Topics = (inputConfig.Parameters["KafkaTopics"].Value.Length > 0) ? Array.ConvertAll(inputConfig.Parameters["KafkaTopics"].Value.Split(','), p => p.Trim()).ToList() : new List<string>(),
                ConnectionString = (inputConfig.Parameters["KafkaConnectionStringKeyVaultKeyName"].Value.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, inputConfig.Parameters["KafkaConnectionStringKeyVaultKeyName"].Value) : ""
            };

            KafkaConnection kafkaHDInsightConn = new KafkaConnection
            {
                Topics = (inputConfig.Parameters["KafkaHDInsightTopics"].Value.Length > 0) ? Array.ConvertAll(inputConfig.Parameters["KafkaHDInsightTopics"].Value.Split(','), p => p.Trim()).ToList() : new List<string>(),
                BootstrapServers = (inputConfig.Parameters["KafkaHDInsightBrokersKeyVaultKeyName"].Value.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, inputConfig.Parameters["KafkaHDInsightBrokersKeyVaultKeyName"].Value) : ""
            };

            if (!string.IsNullOrEmpty(kafkaEnabledEventHubConn.ConnectionString))
            {
                Regex regex = new Regex(@"sb?://([\w\d\.]+).*");
                kafkaEnabledEventHubConn.BootstrapServers = regex.Match(kafkaEnabledEventHubConn.ConnectionString).Groups[1].Value + ":9093";
                WebClient webClient = new WebClient();
                webClient.DownloadFile("https://curl.haxx.se/ca/cacert.pem", @".\cacert.pem");
            }

            List<DataSchema> dataSchemaList = new List<DataSchema>();
            foreach(var dataSchemaPathWithinContainer in dataSchemaPathsWithinContainer)
            {
                var dataSchemaFileContent = await GetDataSchemaAndRules(dataSchemaStorageAccountName, dataSchemaStorageAccountKeyValue, dataSchemaStorageContainerName, dataSchemaPathWithinContainer);
                dataSchemaFileContent.currentCounter = 1;
                dataSchemaList.Add(dataSchemaFileContent);
            }

            Stopwatch stopwatchDelay = new Stopwatch();
            Stopwatch stopwatchThreshold = new Stopwatch();
            stopwatchThreshold.Start();
            DataGen dataGenInstance = new DataGen();
            while (true)
            {
                stopwatchDelay.Restart();
                cancellationToken.ThrowIfCancellationRequested();

                if (stopwatchThreshold.Elapsed.TotalMinutes >= 1440)
                {
                    stopwatchThreshold.Restart();
                }

                foreach(var dataSchemaFileContent in dataSchemaList)
                {
                    if (dataSchemaFileContent.currentCounter >= dataSchemaFileContent.rulesCounterRefreshInMinutes)
                    {
                        dataSchemaFileContent.currentCounter = 1;
                    }

                    List<JObject> dataStreams = new List<JObject>();
                    foreach (var ds in dataSchemaFileContent.dataSchema)
                    {
                        if (stopwatchThreshold.Elapsed.Minutes % ds.simulationPeriodInMinute == 0)
                        {
                            //generate random data
                            dataGenInstance.GenerateRandomData(dataStreams, ds);

                            //generate rules triggering data only for the 0th node in SF to avoid data duplication
                            if ((ds.rulesData != null) && (this.Context.NodeContext.NodeName.Substring(this.Context.NodeContext.NodeName.Length - 1) == "0"))
                            {
                                dataGenInstance.GenerateDataRules(dataStreams, ds, dataSchemaFileContent.currentCounter);
                            }
                        }
                    }

                    if (dataStreams.Count > 0)
                    {
                        await SendData(dataStreams, ehConnectionString, iotDeviceConnectionString, kafkaEnabledEventHubConn, kafkaHDInsightConn);
                    }
                    dataSchemaFileContent.currentCounter++;
                }

                var setDelay = ((60 - stopwatchDelay.Elapsed.TotalSeconds) > 0) ? (60 - stopwatchDelay.Elapsed.TotalSeconds) : 1;
                await Task.Delay(TimeSpan.FromSeconds(setDelay), cancellationToken);
            }
        }