in Microsoft.Azure.Cosmos/FaultInjection/src/implementation/RntbdConnectionErrorInjector.cs [50:184]
public Task InjectConnectionErrorTask(FaultInjectionConnectionErrorRule rule)
{
TimeSpan delay = rule.GetResult().GetTimespan();
return Task.Delay(delay).ContinueWith(
t =>
{
//check to see if rule is valid
if (this.IsEffectiveRule(rule))
{
List<Channel> allChannels = this.channelStore.GetAllChannels();
Random random = new Random();
FaultInjectionConnectionErrorType connectionErrorType = rule.GetResult().GetConnectionErrorType();
//Case 1: Inject connection error for specific physical address
List<Uri> addresses = rule.GetAddresses();
if (addresses != null && addresses.Count > 0)
{
foreach (Uri addressUri in addresses)
{
foreach (Channel channel in allChannels)
{
Uri serverUri;
try
{
serverUri = channel.GetServerUri();
}
catch (Exception)
{
//Channel is alread disposed, there can sometimes be lag from when the rule is applied and when the channel is disposed
//and marked unhealthy
continue;
}
if (serverUri.Equals(addressUri))
{
rule.ApplyRule();
if (random.NextDouble() < rule.GetResult().GetThresholdPercentage())
{
rule.ApplyRule();
DefaultTrace.TraceInformation("FaultInjection: Injecting {0} connection error rule: {1}, for address {2}",
connectionErrorType,
rule.GetId(),
addressUri);
channel.InjectFaultInjectionConnectionError(this.GetTransportException(connectionErrorType, serverUri));
}
}
}
}
return Task.CompletedTask;
}
//Case 2: Inject connection error for all endpoints of one region when there is no specific physical address
List<Uri> regionEndpoints = rule.GetRegionEndpoints();
if (regionEndpoints != null && regionEndpoints.Count > 0)
{
foreach (Uri regionEndpoint in regionEndpoints)
{
foreach (Channel channel in allChannels)
{
Uri serverUri;
try
{
serverUri = channel.GetServerUri();
}
catch (Exception)
{
//Channel is alread disposed, there can sometimes be lag from when the rule is applied and when the channel is disposed
//and marked unhealthy
continue;
}
if(this.ParseRntbdEndpointForNormalizedRegion(serverUri).Equals(this.ParseRntbdEndpointForNormalizedRegion(regionEndpoint)))
{
if (random.NextDouble() < rule.GetResult().GetThresholdPercentage())
{
rule.ApplyRule();
DefaultTrace.TraceInformation("FaultInjection: Injecting {0} connection error rule: {1} for region {2}",
connectionErrorType,
rule.GetId(),
regionEndpoint);
channel.InjectFaultInjectionConnectionError(this.GetTransportException(connectionErrorType, serverUri));
}
}
}
}
return Task.CompletedTask;
}
//Case 3: Inject connection error for all endpoints of all regions when there is no specific physical address and region
foreach (Channel channel in allChannels)
{
Uri serverUri;
try
{
serverUri = channel.GetServerUri();
}
catch (Exception)
{
//Channel is alread disposed, there can sometimes be lag from when the rule is applied and when the channel is disposed
//and marked unhealthy
continue;
}
if (random.NextDouble() < rule.GetResult().GetThresholdPercentage())
{
rule.ApplyRule();
DefaultTrace.TraceInformation("FaultInjection: Injecting {0} connection error rule: {1}",
connectionErrorType,
rule.GetId());
channel.InjectFaultInjectionConnectionError(this.GetTransportException(connectionErrorType, serverUri));
}
}
return Task.CompletedTask;
}
return Task.CompletedTask;
}).ContinueWith(
t =>
{
//repeats rule injection if rule is still valid
if (this.IsEffectiveRule(rule))
{
this.InjectConnectionErrorTask(rule);
}
else
{
//removes rule from rule store one rule is no longer valid
this.ruleStore.RemoveRule(rule);
}
});
}