in flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactory.java [106:139]
private Properties getAsyncSinkOptions(ReadableConfig config) {
Properties properties = new Properties();
Optional.ofNullable(config.get(AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE))
.ifPresent(
flushBufferSize ->
properties.put(
AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE.key(),
flushBufferSize));
Optional.ofNullable(config.get(AsyncSinkConnectorOptions.MAX_BATCH_SIZE))
.ifPresent(
maxBatchSize ->
properties.put(
AsyncSinkConnectorOptions.MAX_BATCH_SIZE.key(),
maxBatchSize));
Optional.ofNullable(config.get(AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS))
.ifPresent(
maxInflightRequests ->
properties.put(
AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS.key(),
maxInflightRequests));
Optional.ofNullable(config.get(AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS))
.ifPresent(
maxBufferedRequests ->
properties.put(
AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS.key(),
maxBufferedRequests));
Optional.ofNullable(config.get(AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT))
.ifPresent(
timeout ->
properties.put(
AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT.key(),
timeout));
return properties;
}