public void start()

in data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java [69:129]


    public void start(final Buffer<Record<Log>> buffer) {
        if (buffer == null) {
            throw new IllegalStateException("Buffer provided is null");
        }
        if (server == null) {
            final ServerBuilder sb = Server.builder();

            sb.disableServerHeader();

            if (sourceConfig.isSsl()) {
                LOG.info("Creating http source with SSL/TLS enabled.");
                final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
                final Certificate certificate = certificateProvider.getCertificate();
                // TODO: enable encrypted key with password
                sb.https(sourceConfig.getPort()).tls(
                        new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
                        new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
                        )
                );
            } else {
                LOG.warn("Creating http source without SSL/TLS. This is not secure.");
                LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl");
                sb.http(sourceConfig.getPort());
            }

            authenticationProvider.addAuthenticationDecorator(sb);

            sb.maxNumConnections(sourceConfig.getMaxConnectionCount());
            final int requestTimeoutInMillis = sourceConfig.getRequestTimeoutInMillis();
            // Allow 2*requestTimeoutInMillis to accommodate non-blocking operations other than buffer writing.
            sb.requestTimeout(Duration.ofMillis(2*requestTimeoutInMillis));
            final int threads = sourceConfig.getThreadCount();
            final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
            sb.blockingTaskExecutor(blockingTaskExecutor, true);
            final int maxPendingRequests = sourceConfig.getMaxPendingRequests();
            final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
                    maxPendingRequests, blockingTaskExecutor.getQueue());
            final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);
            // TODO: allow customization on URI path for log ingestion
            sb.decorator(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
            final LogHTTPService logHTTPService = new LogHTTPService(requestTimeoutInMillis, buffer, pluginMetrics);
            sb.annotatedService(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, logHTTPService);
            // TODO: attach HealthCheckService

            server = sb.build();
        }

        try {
            server.start().get();
        } catch (ExecutionException ex) {
            if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
                throw (RuntimeException) ex.getCause();
            } else {
                throw new RuntimeException(ex);
            }
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(ex);
        }
        LOG.info("Started http source...");
    }