in flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java [99:141]
private void flush() throws IllegalStateException {
List<Event> events = new ArrayList<>();
List<IN> toFlushList;
synchronized (this) {
if (incomingList.isEmpty()) {
return;
}
toFlushList = incomingList;
incomingList = new ArrayList<>();
}
for (IN value: toFlushList) {
Event event = this.eventBuilder.createFlumeEvent(value, getRuntimeContext());
events.add(event);
}
int retries = 0;
boolean flag = true;
while (flag) {
if (null != client || retries > maxRetryAttempts) {
flag = false;
}
if (retries <= maxRetryAttempts && null == client) {
LOG.info("Wait for {} ms before retry", waitTimeMs);
try {
Thread.sleep(waitTimeMs);
} catch (InterruptedException ignored) {
LOG.error("Interrupted while trying to connect {} on {}", hostname, port);
}
reconnect();
LOG.info("Retry attempt number {}", retries);
retries++;
}
}
try {
client.appendBatch(events);
} catch (EventDeliveryException e) {
LOG.info("Encountered exception while sending data to flume : {}", e.getMessage(), e);
}
}