in data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java [69:129]
public void start(final Buffer<Record<Log>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
if (server == null) {
final ServerBuilder sb = Server.builder();
sb.disableServerHeader();
if (sourceConfig.isSsl()) {
LOG.info("Creating http source with SSL/TLS enabled.");
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
final Certificate certificate = certificateProvider.getCertificate();
// TODO: enable encrypted key with password
sb.https(sourceConfig.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
)
);
} else {
LOG.warn("Creating http source without SSL/TLS. This is not secure.");
LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl");
sb.http(sourceConfig.getPort());
}
authenticationProvider.addAuthenticationDecorator(sb);
sb.maxNumConnections(sourceConfig.getMaxConnectionCount());
final int requestTimeoutInMillis = sourceConfig.getRequestTimeoutInMillis();
// Allow 2*requestTimeoutInMillis to accommodate non-blocking operations other than buffer writing.
sb.requestTimeout(Duration.ofMillis(2*requestTimeoutInMillis));
final int threads = sourceConfig.getThreadCount();
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
sb.blockingTaskExecutor(blockingTaskExecutor, true);
final int maxPendingRequests = sourceConfig.getMaxPendingRequests();
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
maxPendingRequests, blockingTaskExecutor.getQueue());
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);
// TODO: allow customization on URI path for log ingestion
sb.decorator(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
final LogHTTPService logHTTPService = new LogHTTPService(requestTimeoutInMillis, buffer, pluginMetrics);
sb.annotatedService(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, logHTTPService);
// TODO: attach HealthCheckService
server = sb.build();
}
try {
server.start().get();
} catch (ExecutionException ex) {
if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
} else {
throw new RuntimeException(ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
LOG.info("Started http source...");
}