in src/main/java/com/google/cloud/run/kafkascaler/KafkaScaler.java [62:98]
private Scaler initializeScaler() throws IOException {
ConfigurationProvider configProvider =
new ConfigurationProvider(
new ConfigurationProvider.SystemEnvProvider(), SCALING_CONFIG_FILE);
ConfigurationProvider.StaticConfig staticConfig = configProvider.staticConfig();
ScalingConfig scalingConfig = configProvider.scalingConfig();
KafkaAdminClientWrapper kafkaAdminClient =
new KafkaAdminClientWrapper(
configProvider.kafkaClientProperties(KAFKA_CLIENT_PROPERTIES_FILE));
Kafka kafka = new Kafka(kafkaAdminClient);
WorkloadInfoParser.WorkloadInfo workloadInfo = configProvider.workloadInfo(scalingConfig);
CloudRunClientWrapper cloudRunClient =
new CloudRunClientWrapper(
CloudRunClientWrapper.cloudRunClient(APPLICATION_NAME),
workloadInfo.projectId(),
workloadInfo.location());
int currentInstanceCount = InstanceCountProvider.getInstanceCount(cloudRunClient, workloadInfo);
CloudMonitoringClientWrapper cloudMonitoringClient =
new CloudMonitoringClientWrapper(CloudMonitoringClientWrapper.metricServiceClient());
MetricsService metricsService =
new MetricsService(cloudMonitoringClient, workloadInfo.projectId());
ScalingStabilizer scalingStabilizer = new ScalingStabilizer(currentInstanceCount);
return new Scaler(
kafka,
scalingStabilizer,
cloudRunClient,
metricsService,
workloadInfo,
staticConfig,
configProvider);
}