in odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/impl/UpsertStreamImpl.java [285:377]
private void flush(boolean flushAll) throws TunnelException, IOException {
List<FlushResultHandler> handlers = new ArrayList<>();
boolean success;
int retry = 0;
// update slot map
Map<Integer, Slot> bucketMap = session.getBuckets();
if (bucketMap.size() != buckets.size()) {
throw new TunnelException("session slot map is changed");
} else {
buckets = bucketMap;
}
do {
success = true;
handlers.clear();
Channel channel = null;
try {
checkStatus();
latch = new CountDownLatch(bucketBuffer.size());
for (Map.Entry<Integer, ProtobufRecordPack> entry : bucketBuffer.entrySet()) {
ProtobufRecordPack pack = entry.getValue();
if (pack.getSize() > 0) {
if (pack.getTotalBytes() > slotBufferSize || flushAll) {
int bucketId = entry.getKey();
long bytes = pack.getTotalBytes();
pack.checkTransConsistency(false);
pack.complete();
bytes = pack.getTotalBytes() - bytes;
if (!flushAll) {
totalBufferSize += bytes;
}
Request request = session.buildRequest("PUT", bucketId, buckets.get(bucketId), pack.getTotalBytes(), pack.getSize(), compressOption);
channel = channelPool.acquire();
FlushResultHandler handler = new FlushResultHandler(pack, latch, listener, retry, bucketId);
channel.pipeline().addLast(handler);
handlers.add(handler);
ChannelFuture
channelFuture =
channel.writeAndFlush(buildFullHttpRequest(request, pack.getProtobufStream()));
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
latch.countDown();
channelPool.release(future.channel());
handler.setException(
new TunnelException("Connect : " + future.cause().getMessage(),
future.cause()));
future.channel().close();
} else {
future.channel().pipeline().addFirst(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
}
});
} else {
latch.countDown();
}
} else {
latch.countDown();
}
}
latch.await();
} catch (InterruptedException e) {
throw new TunnelException("flush interrupted", e);
}
for (FlushResultHandler handler : handlers) {
if (handler.getException() != null) {
success = false;
if (listener != null) {
if (!listener.onFlushFail(handler.getException(), retry)) {
status = Status.ERROR;
TunnelException e = new TunnelException(handler.getException().getErrorMsg(), handler.getException());
e.setRequestId(handler.getException().getRequestId());
e.setErrorCode(handler.getException().getErrorCode());
throw e;
}
} else {
TunnelException e = new TunnelException(handler.getException().getErrorMsg(), handler.getException());
e.setRequestId(handler.getException().getRequestId());
e.setErrorCode(handler.getException().getErrorCode());
throw e;
}
} else {
if (!flushAll) {
totalBufferSize -= handler.getFlushResult().flushSize;
}
}
}
++retry;
} while (!success);
if (flushAll) {
totalBufferSize = 0;
}
}