public void run()

in kinesis-data-analytics-sse/src/main/java/com/amazonaws/services/kinesisanalytics/SSESource.java [45:108]


    public void run(SourceContext<String> sourceContext) throws Exception {
        String url = configProps.getProperty(CONFIG_PROPERTY_URL);
        List<String> collectTypes = null;
        //Allow the end user to change the type of events we collect and send into the kinesis stream
        if (configProps.containsKey(CONFIG_PROPERTY_TYPES)) {
            collectTypes = Arrays.asList(configProps.getProperty(CONFIG_PROPERTY_TYPES).split("\\|"));
        }
        // Allow the end user to adjust the SSE read timeout value in milliseconds. Note in most cases adjusting this to anything but zero will cause the system to not connect.
        if (configProps.containsKey(CONFIG_PROPERTY_READ_TIMEOUT_MS)) {
            readTimeoutMS = Integer.parseInt(configProps.getProperty(CONFIG_PROPERTY_READ_TIMEOUT_MS));
        }
        // Allow the end user to specify how often to report the number of messages received over a given period of time in milliseconds
        if (configProps.containsKey(CONFIG_PROPERTY_REPORT_MESSAGE_RECEIVED_MS)) {
            reportMessagesReceivedMS = Integer.parseInt(configProps.getProperty(CONFIG_PROPERTY_REPORT_MESSAGE_RECEIVED_MS));
        }
        logger.info("SSESource read timeout: " + readTimeoutMS);
        logger.info("SSESource report messages received MS: " + reportMessagesReceivedMS);

        while (isRunning) {
            messagesReceived = 0L; //reset the number of messages received since last connect
            OkHttpClient client = new OkHttpClient.Builder()
                    .readTimeout(readTimeoutMS, TimeUnit.MILLISECONDS)
                    .retryOnConnectionFailure(true)
                    .build();
            EventSourceListener listener = new EventSourceSender(sourceContext, logger, collectTypes);
            Request.Builder requestBuilder = new Request.Builder();

            // Allow the end user to specify url headers to send during the initial sse connection
            if (configProps.containsKey(CONFIG_PROPERTY_HEADERS)) {
                String headers = configProps.getProperty(CONFIG_PROPERTY_HEADERS);
                String[] splitHeaders = headers.split("\\|");
                requestBuilder = requestBuilder.headers(Headers.of(splitHeaders));
            }

            // Create a request and connect using the standard headers for SSE endpoints
            Request request = requestBuilder
                    .url(url)
                    .header("Accept-Encoding", "")
                    .header("Accept", "text/event-stream")
                    .header("Cache-Control", "no-cache")
                    .build();
            logger.info("SSESource Request created to: " + url);
            final EventSource eventSource = EventSources.createFactory(client).newEventSource(request, listener);
            isConnected = true;
            logger.info("SSESource connected");
            try {
                long startTime = System.currentTimeMillis();
                // while we are connected and running we need to hold this thread and report messages received if that option is enabled.
                // SSE events are sent via a callback in another thread
                while (isRunning && isConnected) {
                    Thread.sleep(100);
                    long endTime = System.currentTimeMillis();
                    if (reportMessagesReceivedMS > 0 && (endTime - startTime > reportMessagesReceivedMS)) {
                        logger.info("SSERate received [" + messagesReceived + "] events between [" + startTime + "] and [" + endTime + "]");
                        startTime = endTime;
                    }
                }
            } catch (InterruptedException e) {
                logger.error("Sleep timer interrupted");
            }
            eventSource.cancel();
            logger.info("SSESource Stopping event source");
        }
    }