public InputStream getInputStream()

in contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java [343:483]


  public InputStream getInputStream() {

    Request.Builder requestBuilder = new Request.Builder()
      .url(url);
    final Pattern bodyParamsKeyPattern = Pattern.compile("^body\\..+$");
    final Pattern headerParamsKeyPattern = Pattern.compile("^header\\..+$");

    // The configuration does not allow for any other request types other than POST and GET.
    if (apiConfig.getMethodType() == HttpMethod.POST) {
      // Handle POST requests
      FormBody.Builder formBodyBuilder;

      // If the user wants filters pushed down to the POST body, do so here.
      if (apiConfig.getPostLocation() == PostLocation.POST_BODY) {
        formBodyBuilder = buildPostBody(filters, apiConfig.postBody());
        requestBuilder.post(formBodyBuilder.build());
      } else if (apiConfig.getPostLocation() == PostLocation.JSON_BODY
          || (apiConfig.getPostLocation() == PostLocation.QUERY_STRING
          && pluginConfig.enableEnhancedParamSyntax())) {
        // Add static parameters from postBody
        JSONObject json = buildJsonPostBody(apiConfig.postBody());
        // Now add filters
        if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
          for (Map.Entry<String, String> filter : filters.entrySet()) {
            if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
              json.put(filter.getKey().substring(5), filter.getValue());
            }
          }
        } else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
          for (Map.Entry<String, String> filter : filters.entrySet()) {
            json.put(filter.getKey(), filter.getValue());
          }
        }

        RequestBody requestBody = RequestBody.create(json.toJSONString(), JSON_MEDIA_TYPE);
        requestBuilder.post(requestBody);
      } else if (apiConfig.getPostLocation() == PostLocation.XML_BODY) {
        StringBuilder xmlRequest = new StringBuilder();
        xmlRequest.append("<request>");
        if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
          for (Map.Entry<String, String> filter : filters.entrySet()) {
            if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
              xmlRequest.append("<").append(filter.getKey().substring(5)).append(">");
              xmlRequest.append(filter.getValue());
              xmlRequest.append("</").append(filter.getKey().substring(5)).append(">");
            }
          }
        } else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
          for (Map.Entry<String, String> filter : filters.entrySet()) {
            xmlRequest.append("<").append(filter.getKey()).append(">");
            xmlRequest.append(filter.getValue());
            xmlRequest.append("</").append(filter.getKey()).append(">");
          }
        }
        xmlRequest.append("</request>");
        RequestBody requestBody = RequestBody.create(xmlRequest.toString(), XML_MEDIA_TYPE);
        requestBuilder.post(requestBody);

      } else {
        formBodyBuilder = buildPostBody(apiConfig.postBody());
        requestBuilder.post(formBodyBuilder.build());
      }
    }

    // Log the URL and method to aid in debugging user issues.
    logger.info("Connection: {}, Method {}, URL: {}",
      connection,
      apiConfig.getMethodType().name(), url());

    // Add headers to request
    if (apiConfig.headers() != null) {
      for (Map.Entry<String, String> entry : apiConfig.headers().entrySet()) {
        requestBuilder.addHeader(entry.getKey(), entry.getValue());
      }
    }

    if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
      for (Map.Entry<String, String> filter : filters.entrySet()) {
        if (headerParamsKeyPattern.matcher(filter.getKey()).find()){
          requestBuilder.addHeader(filter.getKey().substring(7), filter.getValue());
        }
      }
    }

    // Build the request object
    Request request = requestBuilder.build();
    Response response = null;

    try {
      logger.debug("Executing request: {}", request);
      logger.debug("Headers: {}", request.headers());

      // Execute the request
      response = client.newCall(request).execute();

      // Preserve the response
      responseMessage = response.message();
      responseCode = response.code();
      responseProtocol = response.protocol().toString();
      responseURL = response.request().url().toString();

      // Case for pagination without limit
      if (paginator != null && (
        response.code() != 200 || response.body() == null ||
        response.body().contentLength() == 0)) {
        paginator.notifyPartialPage();
      }

      // If the request is unsuccessful clean up and throw a UserException
      if (!isSuccessful(responseCode)) {
        AutoCloseables.closeSilently(response);
        throw UserException
          .dataReadError()
          .message("HTTP request failed")
          .addContext("Response code", response.code())
          .addContext("Response message", response.message())
          .addContext(errorContext)
          .build(logger);
      }
      logger.debug("HTTP Request for {} successful.", url());
      logger.debug("Response Headers: {} ", response.headers());

      // In the case of Header Index Pagination, send the header(s) to the paginator
      if (paginator != null && paginator.getMode() == PaginatorMethod.HEADER_INDEX) {
        ((HeaderIndexPaginator)paginator).setResponseHeaders(response.headers());
      }

      // Return the InputStream of the response. Note that it is necessary and
      // and sufficient that the caller invokes close() on the returned stream.
      return Objects.requireNonNull(response.body()).byteStream();

    } catch (IOException e) {
      // response can only be null at this location so we do not attempt to close it.
      throw UserException
        .dataReadError(e)
        .message("Failed to read the HTTP response body")
        .addContext("Error message", e.getMessage())
        .addContext(errorContext)
        .build(logger);
    }
  }