in src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java [77:144]
public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) {
PluginConfig pluginConfig = configFactory.getFromGerritConfig(pluginName);
this.region =
Optional.ofNullable(getStringParam(pluginConfig, REGION_FIELD, null)).map(Region::of);
this.endpoint =
Optional.ofNullable(getStringParam(pluginConfig, ENDPOINT_FIELD, null)).map(URI::create);
this.streamEventsTopic =
getStringParam(pluginConfig, STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
this.sendStreamEvents =
Optional.ofNullable(getStringParam(pluginConfig, SEND_STREAM_EVENTS_FIELD, null))
.map(Boolean::new)
.orElse(DEFAULT_SEND_STREAM_EVENTS);
this.numberOfSubscribers =
Integer.parseInt(
getStringParam(
pluginConfig, NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS));
this.applicationName = getStringParam(pluginConfig, APPLICATION_NAME_FIELD, pluginName);
this.initialPosition =
InitialPositionInStream.valueOf(
getStringParam(pluginConfig, INITIAL_POSITION_FIELD, DEFAULT_INITIAL_POSITION)
.toUpperCase());
this.pollingIntervalMs =
Optional.ofNullable(getStringParam(pluginConfig, POLLING_INTERVAL_MS_FIELD, null))
.map(Long::parseLong)
.orElse(DEFAULT_POLLING_INTERVAL_MS);
this.maxRecords =
Optional.ofNullable(getStringParam(pluginConfig, MAX_RECORDS_FIELD, null))
.map(Integer::parseInt)
.orElse(DEFAULT_MAX_RECORDS);
this.publishSingleRequestTimeoutMs =
Optional.ofNullable(
getStringParam(pluginConfig, PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD, null))
.map(Long::parseLong)
.orElse(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS);
this.publishTimeoutMs =
Optional.ofNullable(getStringParam(pluginConfig, PUBLISH_TIMEOUT_MS_FIELD, null))
.map(Long::parseLong)
.orElse(DEFAULT_PUBLISH_TIMEOUT_MS);
this.shutdownTimeoutMs =
Optional.ofNullable(getStringParam(pluginConfig, SHUTDOWN_MS_FIELD, null))
.map(Long::parseLong)
.orElse(DEFAULT_SHUTDOWN_TIMEOUT_MS);
this.awsLibLogLevel =
Optional.ofNullable(getStringParam(pluginConfig, AWS_LIB_LOG_LEVEL_FIELD, null))
.map(l -> Level.toLevel(l, DEFAULT_AWS_LIB_LOG_LEVEL))
.orElse(DEFAULT_AWS_LIB_LOG_LEVEL);
this.sendAsync =
Optional.ofNullable(getStringParam(pluginConfig, SEND_ASYNC_FIELD, null))
.map(Boolean::new)
.orElse(DEFAULT_SEND_ASYNC);
logger.atInfo().log(
"Kinesis client. Application:'%s'|PollingInterval: %s|maxRecords: %s%s%s",
applicationName,
pollingIntervalMs,
maxRecords,
region.map(r -> String.format("|region: %s", r.id())).orElse(""),
endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""));
}