in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java [356:406]
public String getUploadAddress(String fileName) throws CopyLoadException {
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder
.setUrl(uploadUrl)
.addFileName(fileName)
.addCommonHeader()
.setEmptyEntity()
.baseAuth(username, password);
try {
Object address =
BackoffAndRetryUtils.backoffAndRetry(
BackoffAndRetryUtils.LoadOperation.GET_INTERNAL_STAGE_ADDRESS,
() -> {
try (CloseableHttpClient httpClient =
httpClientBuilder.build()) {
try (CloseableHttpResponse execute =
httpClient.execute(putBuilder.build())) {
int statusCode =
execute.getStatusLine().getStatusCode();
String reason =
execute.getStatusLine().getReasonPhrase();
if (statusCode == 307) {
Header location =
execute.getFirstHeader("location");
String uploadAddress = location.getValue();
return uploadAddress;
} else {
HttpEntity entity = execute.getEntity();
String result =
entity == null
? null
: EntityUtils.toString(entity);
LOG.error(
"Failed to get internalStage address, status {}, reason {}, response {}",
statusCode,
reason,
result);
throw new CopyLoadException(
"Failed get internalStage address");
}
}
}
});
Preconditions.checkNotNull(address, "internalStage address is null");
return address.toString();
} catch (Exception e) {
LOG.error("Get internalStage address error,", e);
throw new CopyLoadException("Get internalStage address error, " + e.getMessage());
}
}