in Services/DataX.SimulatedData/DataX.SimulatedData.DataGenService/DataGenService.cs [51:143]
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var configPackage = FabricRuntime.GetActivationContext().GetConfigurationPackageObject("Config");
var inputConfig = configPackage.Settings.Sections["InputConfig"];
KeyVaultManager keyManager = new KeyVaultManager();
string keyVaultName = inputConfig.Parameters["KeyVaultName"].Value;
string iotDeviceConnectionStringKeyVaultKeyName = inputConfig.Parameters["IotDeviceConnectionStringKeyVaultKeyName"].Value;
string eventhubConnectionStringKeyVaultKeyName = inputConfig.Parameters["EventhubConnectionStringKeyVaultKeyName"].Value;
string dataSchemaStorageAccountKeyValueKeyVaultKeyName = inputConfig.Parameters["DataSchemaStorageAccountKeyValueKeyVaultKeyName"].Value;
string iotDeviceConnectionString = (iotDeviceConnectionStringKeyVaultKeyName.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, iotDeviceConnectionStringKeyVaultKeyName) : "";
string ehConnectionString = (eventhubConnectionStringKeyVaultKeyName.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, eventhubConnectionStringKeyVaultKeyName) : "";
string dataSchemaStorageAccountName = inputConfig.Parameters["DataSchemaStorageAccountName"].Value;
string dataSchemaStorageAccountKeyValue = (dataSchemaStorageAccountKeyValueKeyVaultKeyName.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, dataSchemaStorageAccountKeyValueKeyVaultKeyName) : "";
string dataSchemaStorageContainerName = inputConfig.Parameters["DataSchemaStorageContainerName"].Value;
List<string> dataSchemaPathsWithinContainer = Array.ConvertAll(inputConfig.Parameters["DataSchemaPathWithinContainer"].Value.Split(','), p => p.Trim()).ToList();
KafkaConnection kafkaEnabledEventHubConn = new KafkaConnection
{
Topics = (inputConfig.Parameters["KafkaTopics"].Value.Length > 0) ? Array.ConvertAll(inputConfig.Parameters["KafkaTopics"].Value.Split(','), p => p.Trim()).ToList() : new List<string>(),
ConnectionString = (inputConfig.Parameters["KafkaConnectionStringKeyVaultKeyName"].Value.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, inputConfig.Parameters["KafkaConnectionStringKeyVaultKeyName"].Value) : ""
};
KafkaConnection kafkaHDInsightConn = new KafkaConnection
{
Topics = (inputConfig.Parameters["KafkaHDInsightTopics"].Value.Length > 0) ? Array.ConvertAll(inputConfig.Parameters["KafkaHDInsightTopics"].Value.Split(','), p => p.Trim()).ToList() : new List<string>(),
BootstrapServers = (inputConfig.Parameters["KafkaHDInsightBrokersKeyVaultKeyName"].Value.Length > 0) ? await keyManager.GetSecretStringAsync(keyVaultName, inputConfig.Parameters["KafkaHDInsightBrokersKeyVaultKeyName"].Value) : ""
};
if (!string.IsNullOrEmpty(kafkaEnabledEventHubConn.ConnectionString))
{
Regex regex = new Regex(@"sb?://([\w\d\.]+).*");
kafkaEnabledEventHubConn.BootstrapServers = regex.Match(kafkaEnabledEventHubConn.ConnectionString).Groups[1].Value + ":9093";
WebClient webClient = new WebClient();
webClient.DownloadFile("https://curl.haxx.se/ca/cacert.pem", @".\cacert.pem");
}
List<DataSchema> dataSchemaList = new List<DataSchema>();
foreach(var dataSchemaPathWithinContainer in dataSchemaPathsWithinContainer)
{
var dataSchemaFileContent = await GetDataSchemaAndRules(dataSchemaStorageAccountName, dataSchemaStorageAccountKeyValue, dataSchemaStorageContainerName, dataSchemaPathWithinContainer);
dataSchemaFileContent.currentCounter = 1;
dataSchemaList.Add(dataSchemaFileContent);
}
Stopwatch stopwatchDelay = new Stopwatch();
Stopwatch stopwatchThreshold = new Stopwatch();
stopwatchThreshold.Start();
DataGen dataGenInstance = new DataGen();
while (true)
{
stopwatchDelay.Restart();
cancellationToken.ThrowIfCancellationRequested();
if (stopwatchThreshold.Elapsed.TotalMinutes >= 1440)
{
stopwatchThreshold.Restart();
}
foreach(var dataSchemaFileContent in dataSchemaList)
{
if (dataSchemaFileContent.currentCounter >= dataSchemaFileContent.rulesCounterRefreshInMinutes)
{
dataSchemaFileContent.currentCounter = 1;
}
List<JObject> dataStreams = new List<JObject>();
foreach (var ds in dataSchemaFileContent.dataSchema)
{
if (stopwatchThreshold.Elapsed.Minutes % ds.simulationPeriodInMinute == 0)
{
//generate random data
dataGenInstance.GenerateRandomData(dataStreams, ds);
//generate rules triggering data only for the 0th node in SF to avoid data duplication
if ((ds.rulesData != null) && (this.Context.NodeContext.NodeName.Substring(this.Context.NodeContext.NodeName.Length - 1) == "0"))
{
dataGenInstance.GenerateDataRules(dataStreams, ds, dataSchemaFileContent.currentCounter);
}
}
}
if (dataStreams.Count > 0)
{
await SendData(dataStreams, ehConnectionString, iotDeviceConnectionString, kafkaEnabledEventHubConn, kafkaHDInsightConn);
}
dataSchemaFileContent.currentCounter++;
}
var setDelay = ((60 - stopwatchDelay.Elapsed.TotalSeconds) > 0) ? (60 - stopwatchDelay.Elapsed.TotalSeconds) : 1;
await Task.Delay(TimeSpan.FromSeconds(setDelay), cancellationToken);
}
}