inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc (417 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "utils.h"
#include "logger.h"
#include <arpa/inet.h>
#include <ctime>
#include <curl/curl.h>
#include <errno.h>
#include <fstream>
#include <iostream>
#include <iterator>
#include <net/if.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <regex>
#include <sstream>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/sysinfo.h>
#include <sys/time.h>
#include "api_code.h"
#include "capi_constant.h"
namespace inlong {
uint16_t Utils::sequence = 0;
uint64_t Utils::last_msstamp = 0;
char Utils::snowflake_id[35] = {0};
AtomicUInt g_send_msgid{0};
AtomicInt user_exit_flag{0};
char Base64Table[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K',
'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g',
'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '+', '/'};
void Utils::taskWaitTime(int32_t sec) {
struct timeval tv;
tv.tv_sec = sec;
tv.tv_usec = 0;
int err;
do {
err = select(0, NULL, NULL, NULL, &tv);
} while (err < 0 && errno == EINTR);
}
uint64_t Utils::getCurrentMsTime() {
uint64_t ms_time = 0;
struct timeval tv;
gettimeofday(&tv, NULL);
ms_time = ((uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000);
return ms_time;
}
uint64_t Utils::getCurrentWsTime() {
uint64_t ws_time = 0;
struct timeval tv;
gettimeofday(&tv, NULL);
ws_time = ((uint64_t)tv.tv_sec * 1000000 + tv.tv_usec);
return ws_time;
}
std::string Utils::getFormatTime(uint64_t data_time) {
struct tm timeinfo;
char buffer[80];
// time(&rawtime);
time_t m_time = data_time / 1000;
localtime_r(&m_time, &timeinfo);
strftime(buffer, sizeof(buffer), "%Y%m%d%H%M%S", &timeinfo);
return std::string(buffer);
}
size_t Utils::zipData(const char *input, uint32_t input_len,
std::string &zip_res) {
// size_t zip_res_len = snappy::MaxCompressedLength(input_len);
size_t len_after_zip = snappy::Compress((char *)input, input_len, &zip_res);
// LOG_TRACE("data zip: input len is %u, output len is %u.", input_len,
// len_after_zip);
return len_after_zip;
}
char *Utils::getSnowflakeId() {
std::string local_host;
getFirstIpAddr(local_host);
uint64_t ipaddr = htonl(inet_addr(local_host.c_str()));
uint32_t pidid = static_cast<uint16_t>((getpid() & 0xFFFF));
uint32_t selfid = static_cast<uint16_t>((pthread_self() & 0xFFFF00) >> 8);
uint64_t sequence_mask = -1LL ^ (-1LL << 22);
uint64_t time_id = 0LL;
uint64_t local_id = (ipaddr << 32) | (pidid << 16) | (selfid);
uint64_t since_date = 1288834974657LL; // Thu, 04 Nov 2010 01:42:54 GMT
uint64_t msstamp = getCurrentMsTime();
uint64_t rand = 0;
uint64_t rand_mask = -1LL ^ (-1LL << (32 + 5 + 12));
if (msstamp < last_msstamp) {
LOG_ERROR("ms " << msstamp << " time less last:" << last_msstamp);
last_msstamp = msstamp;
srand(static_cast<uint32_t>(msstamp));
rand = random();
time_id = ((msstamp - since_date) << 22 | (rand & rand_mask));
snprintf(&snowflake_id[0], sizeof(snowflake_id), "0x%.16llx%.16llx",
local_id, time_id);
return &snowflake_id[0];
}
if (last_msstamp == msstamp) {
sequence = (sequence + 1) & sequence_mask;
if (0 == sequence) {
msstamp = waitNextMills(last_msstamp);
}
} else {
sequence = 0;
}
last_msstamp = msstamp;
time_id = (((msstamp - since_date) << 22) | sequence);
snprintf(&snowflake_id[0], sizeof(snowflake_id), "0x%.16llx%.16llx", local_id,
time_id);
return &snowflake_id[0];
}
int64_t Utils::waitNextMills(int64_t last_ms) {
int64_t msstamp = getCurrentMsTime();
while (msstamp <= last_ms) {
msstamp = getCurrentMsTime();
}
return msstamp;
}
bool Utils::getFirstIpAddr(std::string &local_host) {
int32_t sockfd;
int32_t ip_num = 0;
char buf[1024] = {0};
struct ifreq *ifreq;
struct ifreq if_flag;
struct ifconf ifconf;
ifconf.ifc_len = sizeof(buf);
ifconf.ifc_buf = buf;
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
LOG_ERROR("open the local socket(AF_INET, SOCK_DGRAM) failure!");
return false;
}
ioctl(sockfd, SIOCGIFCONF, &ifconf);
ifreq = (struct ifreq *)buf;
ip_num = ifconf.ifc_len / sizeof(struct ifreq);
for (int32_t i = 0; i < ip_num; i++, ifreq++) {
if (ifreq->ifr_flags != AF_INET) {
continue;
}
if (0 == strncmp(&ifreq->ifr_name[0], "lo", sizeof("lo"))) {
continue;
}
memcpy(&if_flag.ifr_name[0], &ifreq->ifr_name[0], sizeof(ifreq->ifr_name));
if ((ioctl(sockfd, SIOCGIFFLAGS, (char *)&if_flag)) < 0) {
continue;
}
if ((if_flag.ifr_flags & IFF_LOOPBACK) || !(if_flag.ifr_flags & IFF_UP)) {
continue;
}
if (!strncmp(
inet_ntoa(((struct sockaddr_in *)&(ifreq->ifr_addr))->sin_addr),
"127.0.0.1", 7)) {
continue;
}
local_host =
inet_ntoa(((struct sockaddr_in *)&(ifreq->ifr_addr))->sin_addr);
close(sockfd);
return true;
}
close(sockfd);
// local_host = "127.0.0.1";
return false;
}
bool Utils::bindCPU(int32_t cpu_id) {
int32_t cpunum = get_nprocs();
int32_t cpucore = cpu_id;
cpu_set_t mask;
if (abs(cpu_id) > cpunum) {
return false;
}
if (cpu_id < 0) {
cpucore = cpunum + cpu_id;
}
CPU_ZERO(&mask);
CPU_SET(cpucore, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask) < 0) {
LOG_ERROR("set CPU affinity" << cpu_id << " cpunum:" << cpunum
<< " errno: " << errno);
}
return true;
}
bool Utils::parseHost(const std::string &host, std::string &ip) {
bool success = false;
struct addrinfo *res = NULL;
struct addrinfo hint;
char ipStr[17];
bzero(ipStr, 17);
bzero(&hint, sizeof(hint));
hint.ai_family = AF_INET;
hint.ai_protocol = SOCK_STREAM;
int32_t ret = getaddrinfo(host.c_str(), NULL, &hint, &res);
if (ret) {
freeaddrinfo(res);
LOG_ERROR("fail to resolve host:" << host);
return false;
}
for (struct addrinfo *ptr = res; ptr != NULL; ptr = ptr->ai_next) {
struct sockaddr_in *sa = (struct sockaddr_in *)ptr->ai_addr;
if (NULL != sa) {
inet_ntop(AF_INET, &sa->sin_addr.s_addr, ipStr, sizeof(ipStr));
ip = ipStr;
success = true;
break;
}
}
freeaddrinfo(res);
return success;
}
bool Utils::getUrlByDNS(const std::string &url, std::string &ipUrl) {
std::string host;
std::string ip;
size_t pos = url.find("://");
if (pos == std::string::npos) {
return false;
}
size_t sta = pos + 3;
size_t end = url.find(":", sta);
if (end != std::string::npos) {
host = url.substr(sta, end - sta);
if (!isalpha(host.at(0))) {
ipUrl = url;
return true;
}
if (parseHost(host, ip)) {
ipUrl = url;
ipUrl.replace(ipUrl.find(host), host.length(), ip);
return true;
}
}
return false;
}
int32_t Utils::requestUrl(const std::string &url, std::string &urlByDNS,
std::string &res, uint32_t timeout) {
if (!getUrlByDNS(url, urlByDNS)) {
LOG_ERROR("host resolve error, fail to request url " << url);
return SdkCode::kErrorCURL;
}
CURL *curl = NULL;
curl = curl_easy_init();
if (!curl) {
LOG_ERROR("failed to Init curl object");
return SdkCode::kErrorCURL;
}
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(curl, CURLOPT_URL, urlByDNS.c_str());
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Utils::getUrlResponse);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1L);
CURLcode ret = curl_easy_perform(curl);
LOG_INFO("request from tdm:" << res);
if (ret != 0) {
LOG_ERROR("failed to request data from " << urlByDNS);
if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
int32_t code;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
if (code != 200) {
LOG_ERROR("tdm responsed with code " << code);
if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
if (res.empty()) {
LOG_ERROR("tdm return empty data");
if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
if (curl) curl_easy_cleanup(curl);
return 0;
}
std::string Utils::Base64Encode(const std::string &data) {
size_t in_len = data.size();
size_t out_len = 4 * ((in_len + 2) / 3);
std::string ret(out_len, '\0');
size_t i;
char *p = const_cast<char *>(ret.c_str());
for (i = 0; i < in_len - 2; i += 3) {
*p++ = Base64Table[(data[i] >> 2) & 0x3F];
*p++ =
Base64Table[((data[i] & 0x3) << 4) | ((int)(data[i + 1] & 0xF0) >> 4)];
*p++ = Base64Table[((data[i + 1] & 0xF) << 2) |
((int)(data[i + 2] & 0xC0) >> 6)];
*p++ = Base64Table[data[i + 2] & 0x3F];
}
if (i < in_len) {
*p++ = Base64Table[(data[i] >> 2) & 0x3F];
if (i == (in_len - 1)) {
*p++ = Base64Table[((data[i] & 0x3) << 4)];
*p++ = '=';
} else {
*p++ = Base64Table[((data[i] & 0x3) << 4) |
((int)(data[i + 1] & 0xF0) >> 4)];
*p++ = Base64Table[((data[i + 1] & 0xF) << 2)];
}
*p++ = '=';
}
return ret;
}
std::string Utils::GenBasicAuthCredential(const std::string &id,
const std::string &key) {
std::string credential = id + constants::kBasicAuthJoiner + key;
std::string result = constants::kBasicAuthPrefix;
result.append(constants::kBasicAuthSeparator);
result.append(Base64Encode(credential));
return result;
}
int32_t Utils::requestUrl(std::string &res, const HttpRequest *request) {
CURL *curl = NULL;
struct curl_slist *list = NULL;
curl = curl_easy_init();
if (!curl) {
LOG_ERROR("failed to init curl object");
return SdkCode::kErrorCURL;
}
// http header
list = curl_slist_append(list,"Content-Type: application/x-www-form-urlencoded");
if (request->need_auth && !request->auth_id.empty() &&
!request->auth_key.empty()) {
// Authorization: Basic xxxxxxxx
std::string auth = constants::kBasicAuthHeader;
auth.append(constants::kBasicAuthSeparator);
auth.append(GenBasicAuthCredential(request->auth_id, request->auth_key));
LOG_INFO("request manager, auth-header:" << auth.c_str());
list = curl_slist_append(list, auth.c_str());
}
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
// set url
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(curl, CURLOPT_URL, request->url.c_str());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, request->post_data.c_str());
curl_easy_setopt(curl, CURLOPT_TIMEOUT, request->timeout);
// register callback and get res
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Utils::getUrlResponse);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
// execute curl request
CURLcode ret = curl_easy_perform(curl);
if (ret != 0) {
LOG_ERROR(curl_easy_strerror(ret));
LOG_ERROR("failed to request data from " << request->url.c_str());
if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
int32_t code;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
if (code != 200) {
if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
if (res.empty()) {
LOG_ERROR("Empty response");
if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
// Clean work
curl_easy_cleanup(curl);
return 0;
}
size_t Utils::getUrlResponse(void *buffer, size_t size, size_t count,
void *response) {
std::string *str = (std::string *)response;
(*str).append((char *)buffer, size * count);
return size * count;
}
bool Utils::readFile(const std::string &file_path, std::string &content) {
std::ifstream f(file_path.c_str());
if (f.fail()) {
LOG_ERROR("fail to read file:" << file_path << "please check file path");
return false;
}
std::stringstream ss;
ss << f.rdbuf();
content = ss.str();
return true;
}
static const char kWhitespaceCharSet[] = " \n\r\t\f\v";
std::string Utils::trim(const std::string &source) {
std::string target = source;
if (!target.empty()) {
size_t foud_pos = target.find_first_not_of(kWhitespaceCharSet);
if (foud_pos != std::string::npos) {
target = target.substr(foud_pos);
}
foud_pos = target.find_last_not_of(kWhitespaceCharSet);
if (foud_pos != std::string::npos) {
target = target.substr(0, foud_pos + 1);
}
}
return target;
}
int32_t Utils::splitOperate(const std::string &source,
std::vector<std::string> &result,
const std::string &delimiter) {
std::string item_str;
std::string::size_type pos1 = 0;
std::string::size_type pos2 = 0;
result.clear();
if (!source.empty()) {
pos1 = 0;
pos2 = source.find(delimiter);
while (std::string::npos != pos2) {
item_str = trim(source.substr(pos1, pos2 - pos1));
pos1 = pos2 + delimiter.size();
pos2 = source.find(delimiter, pos1);
if (!item_str.empty()) {
result.push_back(item_str);
}
}
if (pos1 != source.length()) {
item_str = trim(source.substr(pos1));
if (!item_str.empty()) {
result.push_back(item_str);
}
}
}
return result.size();
}
std::string Utils::getVectorStr(std::vector<std::string> &vs) {
std::string res;
for (auto &it : vs) {
res += it + ", ";
}
return res;
}
} // namespace inlong