in odps-console-dship/src/main/java/com/aliyun/odps/ship/upload/BlockUploader.java [98:151]
public void upload()
throws TunnelException, IOException, ParseException {
startTime = System.currentTimeMillis();
preTime = System.currentTimeMillis();
String type = isUpsert ? "upsert" : (isScan ? "scan" : "upload");
print(type + " block: '" + blockId + "'\n");
sessionHistory.log("start " + type + " , blockid=" + blockId);
sessionHistory.saveContext();
//if upsert block fail, retry 5 time.
int retry = 1;
while (true) {
try {
doUpdate();
break;
} catch (TunnelException e) {
sessionHistory.log("retry:" + retry + " " + Util.getStack(e));
if (retry > Constants.RETRY_LIMIT) {
DshipContext.INSTANCE.put(Constants.STATUS, SessionStatus.resume.toString());
sessionHistory.saveContext();
throw e;
}
print("update block " + blockId + " fail, retry:" + retry + "\n");
} catch (IOException e) {
sessionHistory.log("retry:" + retry + " " + Util.getStack(e));
if (retry > Constants.RETRY_LIMIT) {
DshipContext.INSTANCE.put(Constants.STATUS, SessionStatus.resume.toString());
sessionHistory.saveContext();
throw e;
}
print("update block " + blockId + " fail, retry:" + retry + "\n");
}
retry++;
try {
Thread.sleep(Constants.RETRY_INTERNAL);
} catch (InterruptedException e) {
throw new UserInterruptException(e.getMessage());
}
}
sessionHistory.log(type + " complete, blockid=" + blockId);
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append(String.format("%s block complete, block id: %d%s",
type,
blockId,
(badRecords > 0 ? " [bad " + badRecords + "]" : "")));
if (!isScan) {
messageBuilder.append(localIOStopWatch.getFormattedSummary());
messageBuilder.append(tunnelIOStopWatch.getFormattedSummary());
}
messageBuilder.append("\n");
print(messageBuilder.toString());
}