in edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs [142:653]
protected override void Load(ContainerBuilder builder)
{
// IMessageConverter<IRoutingMessage>
builder.Register(c => new RoutingMessageConverter())
.As<Core.IMessageConverter<IRoutingMessage>>()
.SingleInstance();
// IRoutingPerfCounter
builder.Register(
c =>
{
Routing.PerfCounter = NullRoutingPerfCounter.Instance;
return Routing.PerfCounter;
})
.As<IRoutingPerfCounter>()
.AutoActivate()
.SingleInstance();
// IRoutingUserAnalyticsLogger
builder.Register(
c =>
{
Routing.UserAnalyticsLogger = NullUserAnalyticsLogger.Instance;
return Routing.UserAnalyticsLogger;
})
.As<IRoutingUserAnalyticsLogger>()
.AutoActivate()
.SingleInstance();
// IRoutingUserMetricLogger
builder.Register(
c =>
{
Routing.UserMetricLogger = EdgeHubRoutingUserMetricLogger.Instance;
return Routing.UserMetricLogger;
})
.As<IRoutingUserMetricLogger>()
.AutoActivate()
.SingleInstance();
// IMessageConverter<Message>
builder.Register(c => new DeviceClientMessageConverter())
.As<Core.IMessageConverter<Message>>()
.SingleInstance();
// IMessageConverter<Twin>
builder.Register(c => new TwinMessageConverter())
.As<Core.IMessageConverter<Twin>>()
.SingleInstance();
// IMessageConverter<TwinCollection>
builder.Register(c => new TwinCollectionMessageConverter())
.As<Core.IMessageConverter<TwinCollection>>()
.SingleInstance();
// IMessageConverterProvider
builder.Register(
c => new MessageConverterProvider(
new Dictionary<Type, IMessageConverter>()
{
{ typeof(Message), c.Resolve<Core.IMessageConverter<Message>>() },
{ typeof(Twin), c.Resolve<Core.IMessageConverter<Twin>>() },
{ typeof(TwinCollection), c.Resolve<Core.IMessageConverter<TwinCollection>>() }
}))
.As<IMessageConverterProvider>()
.SingleInstance();
// IDeviceConnectivityManager
builder.Register(
c =>
{
var edgeHubCredentials = c.ResolveNamed<IClientCredentials>("EdgeHubCredentials");
IDeviceConnectivityManager deviceConnectivityManager = this.experimentalFeatures.DisableConnectivityCheck
? new NullDeviceConnectivityManager()
: new DeviceConnectivityManager(this.connectivityCheckFrequency, TimeSpan.FromMinutes(2), edgeHubCredentials.Identity) as IDeviceConnectivityManager;
return deviceConnectivityManager;
})
.As<IDeviceConnectivityManager>()
.SingleInstance();
// IDeviceClientProvider
builder.Register(
c =>
{
IClientProvider underlyingClientProvider = new ClientProvider(this.gatewayHostname);
IClientProvider connectivityAwareClientProvider = new ConnectivityAwareClientProvider(underlyingClientProvider, c.Resolve<IDeviceConnectivityManager>());
return connectivityAwareClientProvider;
})
.As<IClientProvider>()
.SingleInstance();
// Task<ICloudConnectionProvider>
builder.Register(
async c =>
{
var metadataStore = await c.Resolve<Task<IMetadataStore>>();
var messageConverterProvider = c.Resolve<IMessageConverterProvider>();
var clientProvider = c.Resolve<IClientProvider>();
var tokenProvider = c.ResolveNamed<ITokenProvider>("EdgeHubClientAuthTokenProvider");
var credentialsCacheTask = c.Resolve<Task<ICredentialsCache>>();
var edgeHubCredentials = c.ResolveNamed<IClientCredentials>("EdgeHubCredentials");
var deviceScopeIdentitiesCacheTask = c.Resolve<Task<IDeviceScopeIdentitiesCache>>();
var proxy = c.Resolve<Option<IWebProxy>>();
IDeviceScopeIdentitiesCache deviceScopeIdentitiesCache = await deviceScopeIdentitiesCacheTask;
ICredentialsCache credentialsCache = await credentialsCacheTask;
ICloudConnectionProvider cloudConnectionProvider = new CloudConnectionProvider(
messageConverterProvider,
this.connectionPoolSize,
clientProvider,
this.upstreamProtocol,
tokenProvider,
deviceScopeIdentitiesCache,
credentialsCache,
edgeHubCredentials.Identity,
this.cloudConnectionIdleTimeout,
this.closeCloudConnectionOnIdleTimeout,
this.operationTimeout,
this.cloudConnectionHangingTimeout,
this.useServerHeartbeat,
proxy,
metadataStore,
scopeAuthenticationOnly: this.scopeAuthenticationOnly,
trackDeviceState: this.trackDeviceState,
nestedEdgeEnabled: this.nestedEdgeEnabled);
return cloudConnectionProvider;
})
.As<Task<ICloudConnectionProvider>>()
.SingleInstance();
// IIdentityProvider
builder.Register(_ => new IdentityProvider(this.iotHubName))
.As<IIdentityProvider>()
.SingleInstance();
// Task<IConnectionManager>
builder.Register(
async c =>
{
var cloudConnectionProviderTask = c.Resolve<Task<ICloudConnectionProvider>>();
var credentialsCacheTask = c.Resolve<Task<ICredentialsCache>>();
var identityProvider = c.Resolve<IIdentityProvider>();
var deviceConnectivityManager = c.Resolve<IDeviceConnectivityManager>();
ICloudConnectionProvider cloudConnectionProvider = await cloudConnectionProviderTask;
ICredentialsCache credentialsCache = await credentialsCacheTask;
IConnectionManager connectionManager = new ConnectionManager(
cloudConnectionProvider,
credentialsCache,
identityProvider,
deviceConnectivityManager,
this.maxConnectedClients,
this.closeCloudConnectionOnDeviceDisconnect);
return connectionManager;
})
.As<Task<IConnectionManager>>()
.SingleInstance();
// Task<IEndpointFactory>
builder.Register(
async c =>
{
var messageConverter = c.Resolve<Core.IMessageConverter<IRoutingMessage>>();
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId, this.maxUpstreamBatchSize, this.upstreamFanOutFactor, this.trackDeviceState) as IEndpointFactory;
})
.As<Task<IEndpointFactory>>()
.SingleInstance();
// Task<RouteFactory>
builder.Register(async c => new EdgeRouteFactory(await c.Resolve<Task<IEndpointFactory>>()) as RouteFactory)
.As<Task<RouteFactory>>()
.SingleInstance();
// RouterConfig
builder.Register(c => new RouterConfig(Enumerable.Empty<Route>()))
.As<RouterConfig>()
.SingleInstance();
if (!this.isStoreAndForwardEnabled)
{
// EndpointExecutorConfig
builder.Register(
c =>
{
RetryStrategy defaultRetryStrategy = new FixedInterval(0, TimeSpan.FromSeconds(1));
TimeSpan defaultRevivePeriod = TimeSpan.FromHours(1);
TimeSpan defaultTimeout = TimeSpan.FromSeconds(60);
return new EndpointExecutorConfig(defaultTimeout, defaultRetryStrategy, defaultRevivePeriod, true);
})
.As<EndpointExecutorConfig>()
.SingleInstance();
// IEndpointExecutorFactory
builder.Register(c => new SyncEndpointExecutorFactory(c.Resolve<EndpointExecutorConfig>()))
.As<IEndpointExecutorFactory>()
.SingleInstance();
// Task<Router>
builder.Register(
async c =>
{
var endpointExecutorFactory = c.Resolve<IEndpointExecutorFactory>();
var routerConfig = c.Resolve<RouterConfig>();
Router router = await Router.CreateAsync(Guid.NewGuid().ToString(), this.iotHubName, routerConfig, endpointExecutorFactory);
return router;
})
.As<Task<Router>>()
.SingleInstance();
// Task<ITwinManager>
builder.Register(
async c =>
{
if (!this.useV1TwinManager)
{
var messageConverterProvider = c.Resolve<IMessageConverterProvider>();
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
ITwinManager twinManager = new PassThroughTwinManager(connectionManager, messageConverterProvider);
return twinManager;
}
else
{
var messageConverterProvider = c.Resolve<IMessageConverterProvider>();
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
return TwinManager.CreateTwinManager(connectionManager, messageConverterProvider, Option.None<IStoreProvider>());
}
})
.As<Task<ITwinManager>>()
.SingleInstance();
}
else
{
// EndpointExecutorConfig
builder.Register(
c =>
{
// Endpoint executor config values -
// ExponentialBackoff - minBackoff = 1s, maxBackoff = 60s, delta (used to add randomness to backoff) - 1s (default)
// Num of retries = int.MaxValue(we want to keep retrying till the message is sent)
// Revive period - period for which the endpoint should be considered dead if it doesn't respond - 1 min (we want to try continuously till the message expires)
// Timeout - time for which we want for the ack from the endpoint = 30s
// TODO - Should the number of retries be tied to the Store and Forward ttl? Not
// doing that right now as that value can be changed at runtime, but these settings
// cannot. Need to make the number of retries dynamically configurable for that.
TimeSpan minWait = TimeSpan.FromSeconds(1);
TimeSpan maxWait = TimeSpan.FromSeconds(60);
TimeSpan delta = TimeSpan.FromSeconds(1);
int retries = int.MaxValue;
RetryStrategy retryStrategy = new ExponentialBackoff(retries, minWait, maxWait, delta);
TimeSpan timeout = TimeSpan.FromSeconds(30);
TimeSpan revivePeriod = TimeSpan.FromSeconds(30);
return new EndpointExecutorConfig(timeout, retryStrategy, revivePeriod);
})
.As<EndpointExecutorConfig>()
.SingleInstance();
// Task<ICheckpointStore>
builder.Register(
async c =>
{
var dbStoreProvider = await c.Resolve<Task<IDbStoreProvider>>();
IStoreProvider storeProvider = new StoreProvider(dbStoreProvider);
ICheckpointStore checkpointStore = CheckpointStore.Create(storeProvider);
return checkpointStore;
})
.As<Task<ICheckpointStore>>()
.SingleInstance();
// Task<IMessageStore>
builder.Register(
async c =>
{
var checkpointStore = await c.Resolve<Task<ICheckpointStore>>();
var dbStoreProvider = await c.Resolve<Task<IDbStoreProvider>>();
IStoreProvider storeProvider = new StoreProvider(dbStoreProvider);
IMessageStore messageStore = new MessageStore(
storeProvider,
checkpointStore,
this.storeAndForwardConfiguration.TimeToLive,
this.checkEntireQueueOnCleanup,
this.messageCleanupIntervalSecs);
return messageStore;
})
.As<Task<IMessageStore>>()
.SingleInstance();
// Task<IEndpointExecutorFactory>
builder.Register(
async c =>
{
var endpointExecutorConfig = c.Resolve<EndpointExecutorConfig>();
var messageStore = await c.Resolve<Task<IMessageStore>>();
IEndpointExecutorFactory endpointExecutorFactory = new StoringAsyncEndpointExecutorFactory(endpointExecutorConfig, new AsyncEndpointExecutorOptions(10, TimeSpan.FromSeconds(10)), messageStore);
return endpointExecutorFactory;
})
.As<Task<IEndpointExecutorFactory>>()
.SingleInstance();
// Task<Router>
builder.Register(
async c =>
{
var checkpointStore = await c.Resolve<Task<ICheckpointStore>>();
var routerConfig = c.Resolve<RouterConfig>();
var endpointExecutorFactory = await c.Resolve<Task<IEndpointExecutorFactory>>();
return await Router.CreateAsync(Guid.NewGuid().ToString(), this.iotHubName, routerConfig, endpointExecutorFactory, checkpointStore);
})
.As<Task<Router>>()
.SingleInstance();
// Task<ITwinManager>
builder.Register(
async c =>
{
if (this.useV1TwinManager)
{
var dbStoreProvider = await c.Resolve<Task<IDbStoreProvider>>();
var messageConverterProvider = c.Resolve<IMessageConverterProvider>();
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
return TwinManager.CreateTwinManager(connectionManager, messageConverterProvider, Option.Some<IStoreProvider>(new StoreProvider(dbStoreProvider)));
}
else
{
var messageConverterProvider = c.Resolve<IMessageConverterProvider>();
var deviceConnectivityManager = c.Resolve<IDeviceConnectivityManager>();
var connectionManagerTask = c.Resolve<Task<IConnectionManager>>();
IEntityStore<string, TwinStoreEntity> entityStore = await this.GetTwinStore(c);
IConnectionManager connectionManager = await connectionManagerTask;
ITwinManager twinManager = StoringTwinManager.Create(
connectionManager,
messageConverterProvider,
entityStore,
deviceConnectivityManager,
new ReportedPropertiesValidator(),
this.minTwinSyncPeriod,
this.reportedPropertiesSyncFrequency);
return twinManager;
}
})
.As<Task<ITwinManager>>()
.SingleInstance();
}
// Task<IInvokeMethodHandler>
builder.Register(
async c =>
{
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
return new InvokeMethodHandler(connectionManager) as IInvokeMethodHandler;
})
.As<Task<IInvokeMethodHandler>>()
.SingleInstance();
// Task<ISubscriptionProcessor>
builder.Register(
async c =>
{
var connectionManagerTask = c.Resolve<Task<IConnectionManager>>();
if (this.experimentalFeatures.DisableCloudSubscriptions)
{
return new LocalSubscriptionProcessor(await connectionManagerTask) as ISubscriptionProcessor;
}
else
{
var invokeMethodHandlerTask = c.Resolve<Task<IInvokeMethodHandler>>();
var deviceConnectivityManager = c.Resolve<IDeviceConnectivityManager>();
IConnectionManager connectionManager = await connectionManagerTask;
IInvokeMethodHandler invokeMethodHandler = await invokeMethodHandlerTask;
return new SubscriptionProcessor(connectionManager, invokeMethodHandler, deviceConnectivityManager) as ISubscriptionProcessor;
}
})
.As<Task<ISubscriptionProcessor>>()
.SingleInstance();
// Task<IEdgeHub>
builder.Register(
async c =>
{
var routingMessageConverter = c.Resolve<Core.IMessageConverter<IRoutingMessage>>();
var routerTask = c.Resolve<Task<Router>>();
var twinManagerTask = c.Resolve<Task<ITwinManager>>();
var invokeMethodHandlerTask = c.Resolve<Task<IInvokeMethodHandler>>();
var connectionManagerTask = c.Resolve<Task<IConnectionManager>>();
var subscriptionProcessorTask = c.Resolve<Task<ISubscriptionProcessor>>();
var deviceScopeIdentitiesCacheTask = c.Resolve<Task<IDeviceScopeIdentitiesCache>>();
Router router = await routerTask;
ITwinManager twinManager = await twinManagerTask;
IConnectionManager connectionManager = await connectionManagerTask;
IInvokeMethodHandler invokeMethodHandler = await invokeMethodHandlerTask;
ISubscriptionProcessor subscriptionProcessor = await subscriptionProcessorTask;
IDeviceScopeIdentitiesCache deviceScopeIdentitiesCache = await deviceScopeIdentitiesCacheTask;
IEdgeHub hub = new RoutingEdgeHub(
router,
routingMessageConverter,
connectionManager,
twinManager,
this.edgeDeviceId,
this.edgeModuleId,
invokeMethodHandler,
subscriptionProcessor,
deviceScopeIdentitiesCache);
return hub;
})
.As<Task<IEdgeHub>>()
.SingleInstance();
// BrokerPropertiesValidator
builder.Register(
c =>
{
return new BrokerPropertiesValidator();
})
.As<BrokerPropertiesValidator>()
.SingleInstance();
// Task<EdgeHubConfigParser>
builder.Register(
async c =>
{
RouteFactory routeFactory = await c.Resolve<Task<RouteFactory>>();
BrokerPropertiesValidator validator = c.Resolve<BrokerPropertiesValidator>();
var configParser = new EdgeHubConfigParser(routeFactory, validator);
return configParser;
})
.As<Task<EdgeHubConfigParser>>()
.SingleInstance();
// Task<ConfigUpdater>
builder.Register(
async c =>
{
IMessageStore messageStore = this.isStoreAndForwardEnabled ? await c.Resolve<Task<IMessageStore>>() : null;
var storageSpaceChecker = c.Resolve<IStorageSpaceChecker>();
var edgeHubCredentials = c.ResolveNamed<IClientCredentials>("EdgeHubCredentials");
RouteFactory routeFactory = await c.Resolve<Task<RouteFactory>>();
Router router = await c.Resolve<Task<Router>>();
var twinManagerTask = c.Resolve<Task<ITwinManager>>();
var twinMessageConverter = c.Resolve<Core.IMessageConverter<Twin>>();
var twinManager = await twinManagerTask;
var configUpdater = new ConfigUpdater(router, messageStore, this.configUpdateFrequency, storageSpaceChecker);
return configUpdater;
})
.As<Task<ConfigUpdater>>()
.SingleInstance();
// Task<IConfigSource>
builder.Register<Task<IConfigSource>>(
async c =>
{
RouteFactory routeFactory = await c.Resolve<Task<RouteFactory>>();
EdgeHubConfigParser configParser = await c.Resolve<Task<EdgeHubConfigParser>>();
if (this.useTwinConfig)
{
var edgeHubCredentials = c.ResolveNamed<IClientCredentials>("EdgeHubCredentials");
var twinCollectionMessageConverter = c.Resolve<Core.IMessageConverter<TwinCollection>>();
var twinMessageConverter = c.Resolve<Core.IMessageConverter<Twin>>();
var twinManagerTask = c.Resolve<Task<ITwinManager>>();
var edgeHubTask = c.Resolve<Task<IEdgeHub>>();
ITwinManager twinManager = await twinManagerTask;
IEdgeHub edgeHub = await edgeHubTask;
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
IDeviceScopeIdentitiesCache deviceScopeIdentitiesCache = await c.Resolve<Task<IDeviceScopeIdentitiesCache>>();
var edgeHubConnection = await EdgeHubConnection.Create(
edgeHubCredentials.Identity,
edgeHub,
twinManager,
connectionManager,
routeFactory,
twinCollectionMessageConverter,
this.versionInfo,
deviceScopeIdentitiesCache);
return new TwinConfigSource(
edgeHubConnection,
edgeHubCredentials.Identity.Id,
this.versionInfo,
twinManager,
twinMessageConverter,
twinCollectionMessageConverter,
configParser,
this.manifestTrustBundle);
}
else
{
return new LocalConfigSource(
routeFactory,
this.routes,
this.storeAndForwardConfiguration);
}
})
.As<Task<IConfigSource>>()
.SingleInstance();
// Task<IConnectionProvider>
builder.Register(
async c =>
{
var connectionManagerTask = c.Resolve<Task<IConnectionManager>>();
var edgeHubTask = c.Resolve<Task<IEdgeHub>>();
IConnectionManager connectionManager = await connectionManagerTask;
IEdgeHub edgeHub = await edgeHubTask;
IConnectionProvider connectionProvider = new ConnectionProvider(connectionManager, edgeHub, this.messageAckTimeout);
return connectionProvider;
})
.As<Task<IConnectionProvider>>()
.SingleInstance();
builder.RegisterType<MqttUsernameParser>().As<IUsernameParser>().SingleInstance();
base.Load(builder);
}