example/client/KatranSimpleClient.cpp (473 lines of code) (raw):
/* Copyright (C) 2018-present, Facebook, Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <memory>
#include <string>
#include <vector>
#include <folly/Conv.h>
#include <folly/Format.h>
#include <folly/IPAddress.h>
#include <folly/String.h>
#include <folly/init/Init.h>
#include <folly/io/async/AsyncSocket.h>
#include <re2/re2.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>
#include "KatranSimpleClient.h"
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using apache::thrift::HeaderClientChannel;
using folly::AsyncSocket;
namespace {
constexpr uint64_t IPPROTO_TCP = 6;
constexpr uint64_t IPPROTO_UDP = 17;
constexpr uint64_t DEFAULT_FLAG = 0;
constexpr uint64_t NO_SPORT = 1;
constexpr uint64_t NO_LRU = 2;
constexpr uint64_t QUIC_VIP = 4;
constexpr uint64_t DPORT_HASH = 8;
constexpr uint64_t LOCAL_VIP = 32;
constexpr uint32_t LOCAL_REAL = 2;
const std::map<std::string, uint64_t> vipFlagTranslationTable = {
{"", DEFAULT_FLAG},
{"NO_SPORT", NO_SPORT},
{"NO_LRU", NO_LRU},
{"QUIC_VIP", QUIC_VIP},
{"DPORT_HASH", DPORT_HASH},
{"LOCAL_VIP", LOCAL_VIP},
};
const std::map<std::string, uint32_t> realFlagTranslationTable = {
{"LOCAL_REAL", LOCAL_REAL},
};
}; // namespace
namespace lb {
namespace katran {
KatranSimpleClient::KatranSimpleClient(const std::string& host, int port)
: host_(host), port_(port) {
auto addr = folly::SocketAddress(host_, port_);
client_ = createKatranClient(addr);
}
void KatranSimpleClient::changeMac(const std::string& mac) {
Mac newMac;
newMac.mac = mac;
if (client_->sync_changeMac(std::move(newMac))) {
LOG(INFO) << folly::sformat("Mac address changed to {}.", mac);
} else {
LOG(ERROR) << "ERROR: Mac address could not be changed.";
}
}
void KatranSimpleClient::getMac() {
::lb::katran::Mac mac;
client_->sync_getMac(mac);
LOG(INFO) << folly::sformat("Mac address is: {}", mac.mac);
}
void KatranSimpleClient::addOrModifyService(
const std::string& address,
const std::string& flags,
int proto,
bool modify,
bool setFlags) {
LOG(INFO) << folly::sformat("Adding service: {} {}", address, proto);
auto vip = parseToVip(address, proto);
const auto& it = vipFlagTranslationTable.find(flags);
if (it == vipFlagTranslationTable.cend()) {
LOG(ERROR) << folly::sformat("ERROR: unrecognized flag: {}", flags);
return;
}
VipMeta vipMeta;
vipMeta.vip = std::move(vip);
vipMeta.flags = it->second;
vipMeta.setFlag = setFlags;
if (modify) {
if (client_->sync_modifyVip(std::move(vipMeta))) {
LOG(INFO) << folly::sformat("Vip: {} modified", address);
} else {
LOG(ERROR) << "ERROR: Vip not modified";
}
} else {
if (client_->sync_addVip(std::move(vipMeta))) {
LOG(INFO) << "Vip added";
} else {
LOG(ERROR) << "ERROR: Vip not added";
}
}
}
void KatranSimpleClient::delService(const std::string& address, int proto) {
LOG(INFO) << folly::sformat("Deleting service: {} {}", address, proto);
auto vip = parseToVip(address, proto);
if (client_->sync_delVip(std::move(vip))) {
LOG(INFO) << "Vip deleted";
} else {
LOG(ERROR) << "ERROR: Vip not deleted";
}
}
void KatranSimpleClient::updateReal(
const std::string& address,
uint32_t flags,
bool setFlags) {
LOG(INFO) << folly::sformat("Updating real: {} {}", address, proto);
RealMeta realMeta;
realMeta.address = address;
realMeta.flags = flags;
realMeta.setFlags = setFlags;
if (client_->sync_modifyReal(std::move(realMeta))) {
LOG(INFO) << "Real updated";
} else {
LOG(ERROR) << "ERROR: Real not updated";
}
}
void KatranSimpleClient::updateServerForVip(
const std::string& vipAddr,
int proto,
const std::string& realAddr,
uint64_t weight,
const std::string& flags,
bool del) {
auto vip = parseToVip(vipAddr, proto);
const auto& it = realFlagTranslationTable.find(flags);
if (it == realFlagTranslationTable.cend()) {
LOG(ERROR) << folly::sformat("ERROR: unrecognized flag: {}", flags);
return;
}
auto real = parseToReal(realAddr, weight, it->second);
Action action;
if (del) {
action = Action::DEL;
} else {
action = Action::ADD;
}
Reals reals;
reals.push_back(real);
if (client_->sync_modifyRealsForVip(
action, std::move(reals), std::move(vip))) {
LOG(INFO) << folly::sformat("Reals for vip: {} modified", vipAddr);
} else {
LOG(INFO) << folly::sformat("Reals for vip: {} not modified", vipAddr);
}
}
void KatranSimpleClient::modifyQuicMappings(
const std::string& mapping,
bool del) {
Action action;
if (del) {
action = Action::DEL;
} else {
action = Action::ADD;
}
QuicReal quicReal = parseToQuicReal(mapping);
QuicReals reals;
reals.push_back(std::move(quicReal));
if (client_->sync_modifyQuicRealsMapping(action, std::move(reals))) {
LOG(INFO) << "Modified Quic Mappings";
} else {
LOG(ERROR) << "Error encountered while modifying the given Quic mappings";
}
}
std::vector<::lb::katran::Vip> KatranSimpleClient::getAllVips() {
std::vector<::lb::katran::Vip> vips;
client_->sync_getAllVips(vips);
return vips;
}
hcMap KatranSimpleClient::getAllHcs() {
hcMap retMap;
client_->sync_getHealthcheckersDst(retMap);
return retMap;
}
Reals KatranSimpleClient::getRealsForVip(const Vip& vip) {
Reals reals;
client_->sync_getRealsForVip(reals, vip);
return reals;
}
uint64_t KatranSimpleClient::getFlags(const Vip& vip) {
return client_->sync_getVipFlags(vip);
}
std::string KatranSimpleClient::parseVipFlags(uint64_t flags) {
std::string flagsStr = "";
if ((flags & NO_SPORT) > 0) {
flagsStr += " NO_SPORT ";
}
if ((flags & NO_LRU) > 0) {
flagsStr += " NO_LRU ";
}
if ((flags & QUIC_VIP) > 0) {
flagsStr += " QUIC_VIP ";
}
if ((flags & DPORT_HASH) > 0) {
flagsStr += " DPORT_HASH ";
}
if ((flags & LOCAL_VIP) > 0) {
flagsStr += " LOCAL_VIP ";
}
return flagsStr;
}
std::string KatranSimpleClient::parseRealFlags(uint32_t flags) {
std::string flagsStr = "";
if ((flags & LOCAL_REAL) > 0) {
flagsStr += " LOCAL_REAL ";
}
return flagsStr;
}
void KatranSimpleClient::list(const std::string& address, int proto) {
auto vips = getAllVips();
LOG(INFO) << folly::sformat("vips len: {}", vips.size());
for (const auto& vip : vips) {
listVipAndReals(vip);
}
}
void KatranSimpleClient::listVipAndReals(const Vip& vip) {
Reals reals = getRealsForVip(vip);
std::string proto;
if (vip.protocol == ::IPPROTO_TCP) {
proto = "tcp";
} else {
proto = "udp";
}
LOG(INFO) << folly::sformat(
"VIP: {:<20} Port: {:06d}, Protocol: {}", vip.address, vip.port, proto);
uint64_t flags = getFlags(vip);
LOG(INFO) << folly::sformat("Vip's flags: {}", parseVipFlags(flags));
for (auto real : reals) {
LOG(INFO) << folly::sformat(
"-> {:<20} weight {} flags {}",
real.address,
real.weight,
parseRealFlags(real.flags));
}
}
void KatranSimpleClient::clearAll() {
LOG(INFO) << "Deleting Vips";
auto vips = getAllVips();
for (auto& vip : vips) {
if (client_->sync_delVip(vip)) {
LOG(INFO) << "All vips deleted";
} else {
LOG(ERROR) << "Error encountered while deleting all vips";
}
}
LOG(INFO) << "Deleting HealthChecks";
hcMap hcs = getAllHcs();
for (auto& it : hcs) {
uint32_t somark = it.first;
if (client_->sync_delHealthcheckerDst(somark)) {
LOG(INFO) << folly::sformat("Delelted hc w/ somark: {}", somark);
} else {
LOG(ERROR) << folly::sformat(
"error while deleting hc w/ somark: {}", somark);
}
}
}
void KatranSimpleClient::listQm() {
LOG(INFO) << "printing address to quic's connection id mapping";
QuicReals qreals;
client_->sync_getQuicRealsMapping(qreals);
for (const auto& qr : qreals) {
LOG(INFO) << folly::sformat(
"real: {} = connection id: {}", qr.address, qr.id);
}
}
void KatranSimpleClient::addHc(const std::string& address, uint32_t somark) {
Healthcheck hc;
hc.address = address;
hc.somark = somark;
if (client_->sync_addHealthcheckerDst(std::move(hc))) {
LOG(INFO) << folly::sformat(
"added hc w/ somark: {} and addr {}", somark, address);
} else {
LOG(ERROR) << folly::sformat(
"error while add hc w/ somark: {} and addr {}", somark, address);
}
}
void KatranSimpleClient::delHc(uint32_t somark) {
if (client_->sync_delHealthcheckerDst(somark)) {
LOG(INFO) << folly::sformat("Deleted hc w/ somark: {}", somark);
} else {
LOG(ERROR) << folly::sformat(
"error while deleting hc w/ somark: {}", somark);
}
}
void KatranSimpleClient::listHc() {
hcMap hcs = getAllHcs();
for (const auto& it : hcs) {
LOG(INFO) << folly::sformat("somark: {} address: {}", it.first, it.second);
}
}
void KatranSimpleClient::showSumStats() {
uint64_t oldPkts = 0;
uint64_t oldBytes = 0;
auto vips = getAllVips();
LOG(INFO) << folly::sformat("vips len {}", vips.size());
while (true) {
uint64_t pkts = 0;
uint64_t bytes = 0;
for (const auto& vip : vips) {
Stats stats;
client_->sync_getStatsForVip(stats, vip);
pkts += stats.v1;
bytes += stats.v2;
}
auto diffPkts = pkts - oldPkts;
auto diffBytes = bytes - oldBytes;
LOG(INFO) << folly::sformat(
"summary: {} pkts/sec, {} bytes/sec", diffPkts, diffBytes);
oldPkts = pkts;
oldBytes = bytes;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void KatranSimpleClient::showIcmpStats() {
int64_t oldIcmpV4 = 0;
int64_t oldIcmpV6 = 0;
while (true) {
Stats stats;
client_->sync_getIcmpTooBigStats(stats);
auto IcmpV4 = stats.v1 - oldIcmpV4;
auto IcmpV6 = stats.v2 - oldIcmpV6;
LOG(INFO) << folly::sformat(
"ICMP \"packet too big\": v4 {} pkts/sec, v6 {} pkts/sec",
IcmpV4,
IcmpV6);
oldIcmpV4 = IcmpV4;
oldIcmpV6 = IcmpV6;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void KatranSimpleClient::showLruStats() {
uint64_t oldTotalPkts = 0;
uint64_t oldMiss = 0;
uint64_t oldTcpMiss = 0;
uint64_t oldTcpNonSynMiss = 0;
uint64_t oldFallbackLru = 0;
while (true) {
float lruMiss = 0;
float tcpMiss = 0;
float tcpNonSynMiss = 0;
float udpMiss = 0;
float lruHit = 0;
Stats stats;
client_->sync_getLruStats(stats);
Stats missStats;
client_->sync_getLruMissStats(missStats);
Stats fallbackStats;
client_->sync_getLruFallbackStats(fallbackStats);
uint64_t diffTotal = stats.v1 - oldTotalPkts;
uint64_t diffMiss = stats.v2 - oldMiss;
uint64_t diffTcpMiss = missStats.v1 - oldTcpMiss;
uint64_t diffTcpNonSynMiss = missStats.v2 - oldTcpNonSynMiss;
uint64_t diffFallbackLru = fallbackStats.v1 - oldFallbackLru;
if (diffTotal != 0) {
lruMiss = float(diffMiss) / float(diffTotal);
tcpMiss = float(diffTcpMiss) / float(diffTotal);
tcpNonSynMiss = float(diffTcpNonSynMiss) / float(diffTotal);
udpMiss = 1 - (tcpMiss + tcpNonSynMiss);
lruHit = 1 - lruMiss;
}
LOG(INFO) << folly::sformat(
"summary: {:08d} pkts/sec. lru hit: {:.2f}, lru miss: {:.2f}",
diffTotal,
lruHit * 100,
lruMiss * 100);
LOG(INFO) << folly::sformat(
"(tcp syn: {:.2f}, tcp non-syn: {:.2f}, udp: {:.2f})",
tcpMiss,
tcpNonSynMiss,
udpMiss);
LOG(INFO) << folly::sformat(
" fallback lru hit: {:08d} pkts/sec", diffFallbackLru);
oldTotalPkts = stats.v1;
oldMiss = stats.v2;
oldTcpMiss = missStats.v1;
oldTcpNonSynMiss = missStats.v2;
oldFallbackLru = fallbackStats.v1;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void KatranSimpleClient::showPerVipStats() {
auto vips = getAllVips();
std::map<std::string, uint64_t> statsMap;
// initialize per vip keys
for (const auto& vip : vips) {
Stats stats;
client_->sync_getStatsForVip(stats, vip);
auto pktKey = folly::to<std::string>(
vip.address, ":", vip.port, ":", vip.protocol, ":pkts");
auto bytesKey = folly::to<std::string>(
vip.address, ":", vip.port, ":", vip.protocol, ":bytes");
statsMap[pktKey] = 0;
statsMap[bytesKey] = 0;
}
while (true) {
for (const auto& vip : vips) {
auto pktKey = folly::to<std::string>(
vip.address, ":", vip.port, ":", vip.protocol, ":pkts");
auto bytesKey = folly::to<std::string>(
vip.address, ":", vip.port, ":", vip.protocol, ":bytes");
Stats stats;
client_->sync_getStatsForVip(stats, vip);
auto diffPkts = stats.v1 - statsMap[pktKey];
auto diffBytes = stats.v2 - statsMap[bytesKey];
LOG(INFO) << folly::sformat(
"vip: {:<20} {:08d} pkts/sec, {:08d} bytes/sec",
vip.address,
diffPkts,
diffBytes);
statsMap[pktKey] = stats.v1;
statsMap[bytesKey] = stats.v2;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
Vip KatranSimpleClient::parseToVip(
const std::string& address,
uint32_t protocol) {
Vip vip;
// v6 address, format: [<addr>]:<port>
std::string host;
std::string port;
if (address.find("[") != std::string::npos) {
std::string regex("\\[(.*?)\\]:(.*)");
if (!RE2::FullMatch(address, regex, &host, &port)) {
LOG(ERROR) << folly::sformat("ERROR: invalid v6 address: {}", address);
}
} else {
// v4 address. format <addr>:<port>
std::vector<std::string> pair;
folly::split(":", address, pair);
CHECK_EQ(pair.size(), 2)
<< "Invalid ipv4 format. Expected format is <addr>:<port>";
host = pair[0];
port = pair[1];
}
auto parsedIPAddr = folly::IPAddress::tryFromString(host);
if (!parsedIPAddr.hasValue()) {
LOG(ERROR) << folly::sformat(
"ERROR: Invalid IP address provided: {}", host);
}
vip.protocol = protocol;
vip.address = host;
vip.port = folly::to<int>(port);
return vip;
}
Real KatranSimpleClient::parseToReal(
const std::string& address,
uint32_t weight,
uint32_t flags) {
Real real;
real.address = address;
real.weight = weight;
real.flags = flags;
return real;
}
QuicReal KatranSimpleClient::parseToQuicReal(const std::string& mapping) {
std::vector<std::string> mappings;
folly::split("=", mapping, mappings);
if (mappings.size() != 2) {
LOG(ERROR) << "ERROR: quic mapping must be in <addr>=<id> format";
}
QuicReal real;
real.address = mappings[0];
real.id = folly::to<uint32_t>(mappings[1]);
return real;
}
std::unique_ptr<KatranServiceAsyncClient>
KatranSimpleClient::createKatranClient(const folly::SocketAddress& addr) {
AsyncSocket::UniquePtr sock(new AsyncSocket(&evb_, addr));
sock->setZeroCopy(true);
auto channel = HeaderClientChannel::newChannel(std::move(sock));
return std::make_unique<KatranServiceAsyncClient>(std::move(channel));
}
} // namespace katran
} // namespace lb