in pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java [109:157]
private void flush() {
final Map<byte[], byte[]> recordsToSet = new ConcurrentHashMap<>();
final List<Record<byte[]>> recordsToFlush;
synchronized (this) {
if (incomingList.isEmpty()) {
return;
}
recordsToFlush = incomingList;
incomingList = Lists.newArrayList();
}
if (CollectionUtils.isNotEmpty(recordsToFlush)) {
for (Record<byte[]> record: recordsToFlush) {
try {
// records with null keys or values will be ignored
byte[] key = record.getKey().isPresent() ? record.getKey().get().getBytes(StandardCharsets.UTF_8) : null;
byte[] value = record.getValue();
recordsToSet.put(key, value);
} catch (Exception e) {
record.fail();
recordsToFlush.remove(record);
log.warn("Record flush thread was exception ", e);
}
}
}
try {
if (recordsToSet.size() > 0) {
if (log.isDebugEnabled()) {
log.debug("Calling mset with {} values", recordsToSet.size());
}
RedisFuture<?> future = redisSession.asyncCommands().mset(recordsToSet);
if (!future.await(operationTimeoutMs, TimeUnit.MILLISECONDS) || future.getError() != null) {
log.warn("Operation failed with error {} or timeout {} is exceeded", future.getError(), operationTimeoutMs);
recordsToFlush.forEach(tRecord -> tRecord.fail());
return;
}
}
recordsToFlush.forEach(tRecord -> tRecord.ack());
recordsToSet.clear();
recordsToFlush.clear();
} catch (InterruptedException e) {
recordsToFlush.forEach(tRecord -> tRecord.fail());
log.error("Redis mset data interrupted exception ", e);
}
}