public Response execute()

in mr/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java [572:707]


    public Response execute(Request request) throws IOException {
        HttpMethod http = null;

        switch (request.method()) {
        case DELETE:
            http = new DeleteMethodWithBody();
            break;
        case HEAD:
            http = new HeadMethod();
            break;
        case GET:
            http = (request.body() == null ? new GetMethod() : new GetMethodWithBody());
            break;
        case POST:
            http = new PostMethod();
            break;
        case PUT:
            http = new PutMethod();
            break;

        default:
            throw new EsHadoopTransportException("Unknown request method " + request.method());
        }

        CharSequence uri = request.uri();
        if (StringUtils.hasText(uri)) {
            if (String.valueOf(uri).contains("?")) {
                throw new EsHadoopInvalidRequest("URI has query portion on it: [" + uri + "]");
            }
            http.setURI(new URI(escapeUri(uri.toString(), sslEnabled), false));
        }

        // NB: initialize the path _after_ the URI otherwise the path gets reset to /
        // add node prefix (if specified)
        String path = pathPrefix + addLeadingSlashIfNeeded(request.path().toString());
        if (path.contains("?")) {
            throw new EsHadoopInvalidRequest("Path has query portion on it: [" + path + "]");
        }

        path = HttpEncodingTools.encodePath(path);

        http.setPath(path);

        try {
            // validate new URI
            uri = http.getURI().toString();
        } catch (URIException uriex) {
            throw new EsHadoopTransportException("Invalid target URI " + request, uriex);
        }

        CharSequence params = request.params();
        if (StringUtils.hasText(params)) {
            http.setQueryString(params.toString());
        }

        ByteSequence ba = request.body();
        if (ba != null && ba.length() > 0) {
            if (!(http instanceof EntityEnclosingMethod)) {
                throw new IllegalStateException(String.format("Method %s cannot contain body - implementation bug", request.method().name()));
            }
            EntityEnclosingMethod entityMethod = (EntityEnclosingMethod) http;
            entityMethod.setRequestEntity(new BytesArrayRequestEntity(ba));
            entityMethod.setContentChunked(false);
        }

        headers.applyTo(http);

        // We don't want a token added from a proxy user to collide with the
        // run_as mechanism from a real user impersonating said proxy, so
        // make these conditions mutually exclusive.
        if (runAsUser != null) {
            if (log.isDebugEnabled()) {
                log.debug("Performing request with runAs user set to ["+runAsUser+"]");
            }
            http.addRequestHeader("es-security-runas-user", runAsUser);
        } else if (userProvider != null && userProvider.getUser().getEsToken(clusterName) != null) {
            // If we are using token authentication, set the auth to be preemptive:
            if (log.isDebugEnabled()) {
                log.debug("Performing preemptive authentication with API Token");
            }
            http.getHostAuthState().setPreemptive();
            http.getHostAuthState().setAuthAttempted(true);
            http.getHostAuthState().setAuthScheme(new EsApiKeyAuthScheme());
            if (isProxied && !isSecure) {
                http.getProxyAuthState().setPreemptive();
                http.getProxyAuthState().setAuthAttempted(true);
            }
        }

        // Determine a user provider to use for executing, or if we even need one at all
        UserProvider executingProvider;
        if (proxyUserProvider != null) {
            log.debug("Using proxyUserProvider to wrap rest request");
            executingProvider = proxyUserProvider;
        } else if (userProvider != null) {
            log.debug("Using regular user provider to wrap rest request");
            executingProvider = userProvider;
        } else {
            log.debug("Skipping user provider request wrapping");
            executingProvider = null;
        }

        // when tracing, log everything
        if (log.isTraceEnabled()) {
            log.trace(String.format("Tx %s[%s]@[%s][%s]?[%s] w/ payload [%s]", proxyInfo, request.method().name(), httpInfo, request.path(), request.params(), request.body()));
        }

        if (executingProvider != null) {
            final HttpMethod method = http;
            executingProvider.getUser().doAs(new PrivilegedExceptionAction<Object>() {
                @Override
                public Object run() throws Exception {
                    doExecute(method);
                    return null;
                }
            });
        } else {
            doExecute(http);
        }

        if (log.isTraceEnabled()) {
            Socket sk = ReflectionUtils.invoke(GET_SOCKET, conn, (Object[]) null);
            String addr = sk.getLocalAddress().getHostAddress();
            log.trace(String.format("Rx %s@[%s] [%s-%s] [%s]", proxyInfo, addr, http.getStatusCode(), HttpStatus.getStatusText(http.getStatusCode()), http.getResponseBodyAsString()));
        }

        // Parse headers
        Map<String, List<String>> headers = new HashMap<>();
        for (Header responseHeader : http.getResponseHeaders()) {
            List<String> headerValues = headers.computeIfAbsent(responseHeader.getName(), k -> new ArrayList<>());
            headerValues.add(responseHeader.getValue());
        }

        // the request URI is not set (since it is retried across hosts), so use the http info instead for source
        return new SimpleResponse(http.getStatusCode(), new ResponseInputStream(http), httpInfo, headers);
    }