private void flush()

in flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java [99:141]


    private void flush() throws IllegalStateException {
        List<Event> events = new ArrayList<>();
        List<IN>  toFlushList;
        synchronized (this) {
            if (incomingList.isEmpty()) {
                return;
            }
            toFlushList = incomingList;
            incomingList = new ArrayList<>();
        }

        for (IN value: toFlushList) {
            Event event = this.eventBuilder.createFlumeEvent(value, getRuntimeContext());
            events.add(event);
        }

        int retries = 0;
        boolean flag = true;
        while (flag) {
            if (null != client || retries > maxRetryAttempts) {
                flag = false;
            }

            if (retries <= maxRetryAttempts && null == client) {
                LOG.info("Wait for {} ms before retry", waitTimeMs);
                try {
                    Thread.sleep(waitTimeMs);
                } catch (InterruptedException ignored) {
                    LOG.error("Interrupted while trying to connect {} on {}", hostname, port);
                }
                reconnect();
                LOG.info("Retry attempt number {}", retries);
                retries++;
            }
        }

        try {
            client.appendBatch(events);
        } catch (EventDeliveryException e) {
            LOG.info("Encountered exception while sending data to flume : {}", e.getMessage(), e);
        }

    }