in contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java [343:483]
public InputStream getInputStream() {
Request.Builder requestBuilder = new Request.Builder()
.url(url);
final Pattern bodyParamsKeyPattern = Pattern.compile("^body\\..+$");
final Pattern headerParamsKeyPattern = Pattern.compile("^header\\..+$");
// The configuration does not allow for any other request types other than POST and GET.
if (apiConfig.getMethodType() == HttpMethod.POST) {
// Handle POST requests
FormBody.Builder formBodyBuilder;
// If the user wants filters pushed down to the POST body, do so here.
if (apiConfig.getPostLocation() == PostLocation.POST_BODY) {
formBodyBuilder = buildPostBody(filters, apiConfig.postBody());
requestBuilder.post(formBodyBuilder.build());
} else if (apiConfig.getPostLocation() == PostLocation.JSON_BODY
|| (apiConfig.getPostLocation() == PostLocation.QUERY_STRING
&& pluginConfig.enableEnhancedParamSyntax())) {
// Add static parameters from postBody
JSONObject json = buildJsonPostBody(apiConfig.postBody());
// Now add filters
if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
json.put(filter.getKey().substring(5), filter.getValue());
}
}
} else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
json.put(filter.getKey(), filter.getValue());
}
}
RequestBody requestBody = RequestBody.create(json.toJSONString(), JSON_MEDIA_TYPE);
requestBuilder.post(requestBody);
} else if (apiConfig.getPostLocation() == PostLocation.XML_BODY) {
StringBuilder xmlRequest = new StringBuilder();
xmlRequest.append("<request>");
if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
xmlRequest.append("<").append(filter.getKey().substring(5)).append(">");
xmlRequest.append(filter.getValue());
xmlRequest.append("</").append(filter.getKey().substring(5)).append(">");
}
}
} else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
xmlRequest.append("<").append(filter.getKey()).append(">");
xmlRequest.append(filter.getValue());
xmlRequest.append("</").append(filter.getKey()).append(">");
}
}
xmlRequest.append("</request>");
RequestBody requestBody = RequestBody.create(xmlRequest.toString(), XML_MEDIA_TYPE);
requestBuilder.post(requestBody);
} else {
formBodyBuilder = buildPostBody(apiConfig.postBody());
requestBuilder.post(formBodyBuilder.build());
}
}
// Log the URL and method to aid in debugging user issues.
logger.info("Connection: {}, Method {}, URL: {}",
connection,
apiConfig.getMethodType().name(), url());
// Add headers to request
if (apiConfig.headers() != null) {
for (Map.Entry<String, String> entry : apiConfig.headers().entrySet()) {
requestBuilder.addHeader(entry.getKey(), entry.getValue());
}
}
if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
for (Map.Entry<String, String> filter : filters.entrySet()) {
if (headerParamsKeyPattern.matcher(filter.getKey()).find()){
requestBuilder.addHeader(filter.getKey().substring(7), filter.getValue());
}
}
}
// Build the request object
Request request = requestBuilder.build();
Response response = null;
try {
logger.debug("Executing request: {}", request);
logger.debug("Headers: {}", request.headers());
// Execute the request
response = client.newCall(request).execute();
// Preserve the response
responseMessage = response.message();
responseCode = response.code();
responseProtocol = response.protocol().toString();
responseURL = response.request().url().toString();
// Case for pagination without limit
if (paginator != null && (
response.code() != 200 || response.body() == null ||
response.body().contentLength() == 0)) {
paginator.notifyPartialPage();
}
// If the request is unsuccessful clean up and throw a UserException
if (!isSuccessful(responseCode)) {
AutoCloseables.closeSilently(response);
throw UserException
.dataReadError()
.message("HTTP request failed")
.addContext("Response code", response.code())
.addContext("Response message", response.message())
.addContext(errorContext)
.build(logger);
}
logger.debug("HTTP Request for {} successful.", url());
logger.debug("Response Headers: {} ", response.headers());
// In the case of Header Index Pagination, send the header(s) to the paginator
if (paginator != null && paginator.getMode() == PaginatorMethod.HEADER_INDEX) {
((HeaderIndexPaginator)paginator).setResponseHeaders(response.headers());
}
// Return the InputStream of the response. Note that it is necessary and
// and sufficient that the caller invokes close() on the returned stream.
return Objects.requireNonNull(response.body()).byteStream();
} catch (IOException e) {
// response can only be null at this location so we do not attempt to close it.
throw UserException
.dataReadError(e)
.message("Failed to read the HTTP response body")
.addContext("Error message", e.getMessage())
.addContext(errorContext)
.build(logger);
}
}