in src/main/java/org/apache/doris/kafka/connector/writer/load/CopyLoad.java [212:255]
public String getUploadAddress(String fileName) {
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder
.setUrl(loadUrlStr)
.addFileName(fileName)
.addCommonHeader()
.setEmptyEntity()
.baseAuth(dorisOptions.getUser(), dorisOptions.getPassword());
AtomicReference<String> uploadAddress = new AtomicReference<>();
try {
BackoffAndRetryUtils.backoffAndRetry(
LoadOperation.GET_UPLOAD_ADDRESS,
() -> {
try (CloseableHttpResponse execute =
httpClient.execute(putBuilder.build())) {
int statusCode = execute.getStatusLine().getStatusCode();
String reason = execute.getStatusLine().getReasonPhrase();
if (statusCode == 307) {
Header location = execute.getFirstHeader("location");
uploadAddress.set(location.getValue());
LOG.info("redirect to s3:{}", uploadAddress.get());
return true;
}
HttpEntity entity = execute.getEntity();
String result = entity == null ? null : EntityUtils.toString(entity);
LOG.error(
"Failed get the redirected address, status {}, reason {}, response {}",
statusCode,
reason,
result);
throw new UploadException("Could not get the redirected address.");
}
});
} catch (Exception e) {
String errMsg =
"Failed to get redirected upload address, fileName="
+ fileName
+ ", loadUrlStr="
+ loadUrlStr;
throw new UploadException(errMsg, e);
}
return uploadAddress.get();
}