in adb3client/src/main/java/com/alibaba/cloud/analyticdb/adb3client/impl/collector/RecordCollector.java [110:160]
public BatchState getBatchState() {
long afterLastCommit = System.currentTimeMillis() - startTimeMs;
// 行数够多少条
boolean isSizeEnough = size >= maxRecords;
if (isSizeEnough) {
return BatchState.SizeEnough;
}
// 大小够多少条
boolean isByteSizeEnough = byteSize >= maxByteSize;
if (isByteSizeEnough) {
return BatchState.ByteSizeEnough;
}
boolean isTimeWaitEnough = startTimeMs > -1 && afterLastCommit >= maxWaitTime;
if (isTimeWaitEnough) {
return BatchState.TimeWaitEnough;
}
boolean isEarlyCommit = false;
//当已经凑够2的指数时
if (enableEarlyCommit && size > 0 && (size & (size - 1)) == 0) {
// 已经过去了maxWaitTime 40%的时间,统计上来说,不能再翻倍,那就提早commit
boolean timeCondition = startTimeMs > -1 && afterLastCommit * 5 > maxWaitTime * 2;
if (timeCondition) {
return BatchState.TimeCondition;
}
//当前行数的数据已经超过40%maxByteSize,可能不能再翻倍,那就提早commit
boolean byteSizeCondition = byteSize * 5 > maxByteSize * 2;
if (byteSizeCondition) {
return BatchState.ByteSizeCondition;
}
//当当前的行数的数据超过1/4的剩余avaliable
long availableByteSize = pool.getAvailableByteSize();
boolean totalByteSizeCondition = byteSize * shardCount > availableByteSize;
if (totalByteSizeCondition) {
return BatchState.TotalByteSizeCondition;
}
isEarlyCommit = timeCondition
|| byteSizeCondition
|| totalByteSizeCondition;
if (isEarlyCommit) {
if (timeCondition) {
LOGGER.debug("table {} earlyCommit[timeCondition].afterLastCommit({}) > 40% maxWaitTime({})", afterLastCommit, maxWaitTime);
} else if (byteSizeCondition) {
LOGGER.debug("table {} earlyCommit[byteSizeCondition].byteSize({}) > 40% maxByteSize({})", byteSize, maxByteSize);
} else {
LOGGER.debug("table {} earlyCommit[totalByteSizeCondition].afterLastCommit({}) > 40% availableByteSize({})", afterLastCommit, maxWaitTime);
}
}
}
return BatchState.NotEnough;
//return isSizeEnough || isByteSizeEnough || isTimeWaitEnough || isEarlyCommit;
}