public void handle()

in flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java [67:117]


    public void handle(final HttpExchange t) throws IOException {
        final BufferedReader in =
                new BufferedReader(
                        new InputStreamReader(t.getRequestBody(), StandardCharsets.UTF_8));

        try {
            String line;
            final List<DataPoint> points = new ArrayList<>();
            int numberOfLinesParsed = 0;
            while ((line = in.readLine()) != null) {
                final DataPoint dataPoint = InfluxParser.parseToDataPoint(line);
                points.add(dataPoint);
                numberOfLinesParsed++;
                if (numberOfLinesParsed > this.maximumLinesPerRequest) {
                    throw new RequestTooLargeException(
                            String.format(
                                    "Payload too large. Maximum number of lines per request is %d.",
                                    this.maximumLinesPerRequest));
                }
            }

            final boolean result =
                    CompletableFuture.supplyAsync(
                                    () -> {
                                        try {
                                            return this.ingestionQueue.put(
                                                    this.threadIndex, points);
                                        } catch (final InterruptedException e) {
                                            return false;
                                        }
                                    })
                            .get(this.enqueueWaitTime, TimeUnit.SECONDS);

            if (!result) {
                throw new TimeoutException("Failed to enqueue");
            }

            t.sendResponseHeaders(HttpURLConnection.HTTP_NO_CONTENT, -1);
            this.ingestionQueue.notifyAvailable();
        } catch (final ParseException e) {
            Handler.sendResponse(t, HttpURLConnection.HTTP_BAD_REQUEST, e.getMessage());
        } catch (final RequestTooLargeException e) {
            Handler.sendResponse(t, HttpURLConnection.HTTP_ENTITY_TOO_LARGE, e.getMessage());
        } catch (final TimeoutException e) {
            Handler.sendResponse(t, HTTP_TOO_MANY_REQUESTS, "Server overloaded");
            LOG.error(e.getMessage());
        } catch (final ExecutionException | InterruptedException e) {
            Handler.sendResponse(t, HttpURLConnection.HTTP_INTERNAL_ERROR, "Server Error");
            LOG.error(e.getMessage());
        }
    }