be/src/http/http_channel.cpp (125 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "http/http_channel.h" #include <absl/strings/str_split.h> #include <event2/buffer.h> #include <event2/bufferevent.h> #include <event2/http.h> #include <event2/http_struct.h> #include <sstream> #include <string> #include <vector> #include "common/logging.h" #include "common/status.h" #include "http/http_headers.h" #include "http/http_request.h" #include "http/http_status.h" #include "util/slice.h" #include "util/zlib.h" namespace doris { // Send Unauthorized status with basic challenge void HttpChannel::send_basic_challenge(HttpRequest* req, const std::string& realm) { static std::string s_prompt_str = "Please provide your userid and password\n"; std::stringstream ss; ss << "Basic realm=\"" << realm << "\""; req->add_output_header(HttpHeaders::WWW_AUTHENTICATE, ss.str().c_str()); send_reply(req, HttpStatus::UNAUTHORIZED, s_prompt_str); } void HttpChannel::send_error(HttpRequest* request, HttpStatus status) { evhttp_send_error(request->get_evhttp_request(), status, default_reason(status).c_str()); } void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) { evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), nullptr); } void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) { auto* evb = evbuffer_new(); std::string compressed_content; if (compress_content(request->header(HttpHeaders::ACCEPT_ENCODING), content, &compressed_content)) { request->add_output_header(HttpHeaders::CONTENT_ENCODING, "gzip"); evbuffer_add(evb, compressed_content.c_str(), compressed_content.size()); } else { evbuffer_add(evb, content.c_str(), content.size()); } evhttp_send_reply(request->get_evhttp_request(), status, default_reason(status).c_str(), evb); evbuffer_free(evb); } void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size, bufferevent_rate_limit_group* rate_limit_group) { auto* evb = evbuffer_new(); evbuffer_add_file(evb, fd, off, size); auto* evhttp_request = request->get_evhttp_request(); if (rate_limit_group) { auto* evhttp_connection = evhttp_request_get_connection(evhttp_request); auto* buffer_event = evhttp_connection_get_bufferevent(evhttp_connection); bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group); } evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb); evbuffer_free(evb); } void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir, std::vector<std::string> local_files, bufferevent_rate_limit_group* rate_limit_group) { if (rate_limit_group) { auto* evhttp_request = request->get_evhttp_request(); auto* evhttp_connection = evhttp_request_get_connection(evhttp_request); auto* buffer_event = evhttp_connection_get_bufferevent(evhttp_connection); bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group); } send_files(request, root_dir, std::move(local_files)); } void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir, std::vector<std::string> local_files) { std::unique_ptr<evbuffer, decltype(&evbuffer_free)> evb(evbuffer_new(), &evbuffer_free); for (const std::string& file : local_files) { std::string file_path = fmt::format("{}/{}", root_dir, file); int fd = open(file_path.c_str(), O_RDONLY); if (fd < 0) { std::string error_msg = "Failed to open file: " + file_path; LOG(WARNING) << "http channel send files: " << error_msg; HttpChannel::send_reply(request, HttpStatus::NOT_FOUND, error_msg); return; } struct stat st; auto res = fstat(fd, &st); if (res < 0) { close(fd); std::string error_msg = "Failed to open file: " + file_path; LOG(WARNING) << "http channel send files: " << error_msg; HttpChannel::send_reply(request, HttpStatus::NOT_FOUND, error_msg); return; } int64_t file_size = st.st_size; VLOG_DEBUG << "http channel send file " << file_path << ", size: " << file_size; evbuffer_add_printf(evb.get(), "File-Name: %s\r\n", file.c_str()); evbuffer_add_printf(evb.get(), "Content-Length: %" PRIi64 "\r\n", file_size); evbuffer_add_printf(evb.get(), "\r\n"); if (file_size > 0) { evbuffer_add_file(evb.get(), fd, 0, file_size); } } evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb.get()); } bool HttpChannel::compress_content(const std::string& accept_encoding, const std::string& input, std::string* output) { // Don't bother compressing empty content. if (input.empty()) { return false; } // Check if gzip compression is accepted by the caller. If so, compress the // content and replace the prerendered output. bool is_compressed = false; std::vector<std::string> encodings = absl::StrSplit(accept_encoding, ","); for (auto& encoding : encodings) { absl::StripAsciiWhitespace(&encoding); if (encoding == "gzip") { std::ostringstream oss; Status s = zlib::CompressLevel(Slice(input), 1, &oss); if (s.ok()) { *output = oss.str(); is_compressed = true; } else { LOG(WARNING) << "Could not compress output: " << s.to_string(); } break; } } return is_compressed; } } // namespace doris