public void handleSplitsChanges()

in flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java [89:117]


    public void handleSplitsChanges(final SplitsChange<InfluxDBSplit> splitsChange) {
        if (splitsChange.splits().isEmpty()) {
            return;
        }
        this.split = splitsChange.splits().get(0);

        if (this.server != null) {
            return;
        }
        try {
            this.server = HttpServer.create(new InetSocketAddress(this.defaultPort), 0);
        } catch (final IOException e) {
            throw new RuntimeException(
                    String.format(
                            "Unable to start HTTP Server on Port %d: %s",
                            this.defaultPort, e.getMessage()));
        }

        this.server.createContext(
                "/api/v2/write",
                new WriteAPIHandler(
                        this.maximumLinesPerRequest,
                        this.ingestionQueue,
                        this.split.splitId().hashCode(),
                        this.enqueueWaitTime));
        this.server.createContext("/health", new HealthCheckHandler());
        this.server.setExecutor(null); // creates a default executor
        this.server.start();
    }