in holo-client/src/main/java/com/alibaba/hologres/client/impl/collector/TableShardCollector.java [60:128]
public synchronized void append(Record record) throws HoloClientException {
HoloClientException exception = null;
// 与之前的TableSchema不一致时,先commit,再append
if (buffer.size() > 0 && !Objects.equals(record.getSchema(), tableSchemaInBuffer)) {
try {
flush(true, false, null);
} catch (HoloClientException e) {
exception = e;
}
}
// 与之前的CheckAndPutRecord的condition不一致时(包括之前不是CheckAndPut),先commit,再append
CheckAndPutCondition checkAndPutCondition = null;
if (record instanceof CheckAndPutRecord) {
checkAndPutCondition = ((CheckAndPutRecord) record).getCheckAndPutCondition();
}
if (buffer.size() > 0 && !Objects.equals(checkAndPutCondition, checkAndPutConditionInBuffer)) {
try {
flush(true, false, null);
} catch (HoloClientException e) {
exception = e;
}
}
// 配置不允许去重(checkAndPut Record强制不允许去重),与之前的record主键重复时,先commit,再append
if ((!enableDeduplication || checkAndPutCondition != null) && buffer.isKeyExists(new RecordKey(record))) {
try {
flush(true, false, null);
} catch (HoloClientException e) {
exception = e;
}
}
// 异常在函数末尾抛出,即使有异常,当前record也会被append到buffer中
boolean full = buffer.append(record);
setRecordInfoInBuffer(record);
if (full) {
try {
waitActionDone();
} catch (HoloClientWithDetailsException e) {
if (exception == null) {
exception = e;
} else if (exception instanceof HoloClientWithDetailsException) {
((HoloClientWithDetailsException) exception).merge(e);
}
} catch (HoloClientException e) {
exception = e;
}
commit(buffer.getBatchState());
} else {
boolean isActionDone = false;
try {
isActionDone = isActionDone();
} catch (HoloClientWithDetailsException e) {
if (exception == null) {
exception = e;
} else if (exception instanceof HoloClientWithDetailsException) {
((HoloClientWithDetailsException) exception).merge(e);
}
} catch (HoloClientException e) {
exception = e;
}
// 激进模式, 发现空闲直接提交
if (enableAggressive && isActionDone) {
commit(buffer.getBatchState());
}
}
if (exception != null) {
throw exception;
}
}