in flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java [67:117]
public void handle(final HttpExchange t) throws IOException {
final BufferedReader in =
new BufferedReader(
new InputStreamReader(t.getRequestBody(), StandardCharsets.UTF_8));
try {
String line;
final List<DataPoint> points = new ArrayList<>();
int numberOfLinesParsed = 0;
while ((line = in.readLine()) != null) {
final DataPoint dataPoint = InfluxParser.parseToDataPoint(line);
points.add(dataPoint);
numberOfLinesParsed++;
if (numberOfLinesParsed > this.maximumLinesPerRequest) {
throw new RequestTooLargeException(
String.format(
"Payload too large. Maximum number of lines per request is %d.",
this.maximumLinesPerRequest));
}
}
final boolean result =
CompletableFuture.supplyAsync(
() -> {
try {
return this.ingestionQueue.put(
this.threadIndex, points);
} catch (final InterruptedException e) {
return false;
}
})
.get(this.enqueueWaitTime, TimeUnit.SECONDS);
if (!result) {
throw new TimeoutException("Failed to enqueue");
}
t.sendResponseHeaders(HttpURLConnection.HTTP_NO_CONTENT, -1);
this.ingestionQueue.notifyAvailable();
} catch (final ParseException e) {
Handler.sendResponse(t, HttpURLConnection.HTTP_BAD_REQUEST, e.getMessage());
} catch (final RequestTooLargeException e) {
Handler.sendResponse(t, HttpURLConnection.HTTP_ENTITY_TOO_LARGE, e.getMessage());
} catch (final TimeoutException e) {
Handler.sendResponse(t, HTTP_TOO_MANY_REQUESTS, "Server overloaded");
LOG.error(e.getMessage());
} catch (final ExecutionException | InterruptedException e) {
Handler.sendResponse(t, HttpURLConnection.HTTP_INTERNAL_ERROR, "Server Error");
LOG.error(e.getMessage());
}
}