in flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java [89:117]
public void handleSplitsChanges(final SplitsChange<InfluxDBSplit> splitsChange) {
if (splitsChange.splits().isEmpty()) {
return;
}
this.split = splitsChange.splits().get(0);
if (this.server != null) {
return;
}
try {
this.server = HttpServer.create(new InetSocketAddress(this.defaultPort), 0);
} catch (final IOException e) {
throw new RuntimeException(
String.format(
"Unable to start HTTP Server on Port %d: %s",
this.defaultPort, e.getMessage()));
}
this.server.createContext(
"/api/v2/write",
new WriteAPIHandler(
this.maximumLinesPerRequest,
this.ingestionQueue,
this.split.splitId().hashCode(),
this.enqueueWaitTime));
this.server.createContext("/health", new HealthCheckHandler());
this.server.setExecutor(null); // creates a default executor
this.server.start();
}