interface/rdbc/net_channel.cpp (179 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "interface/rdbc/net_channel.h"
#include <glog/logging.h>
#include "platform/common/data_comm/data_comm.h"
#include "platform/common/network/tcp_socket.h"
namespace resdb {
NetChannel::NetChannel(const std::string& ip, int port) : ip_(ip), port_(port) {
socket_ = std::make_unique<TcpSocket>();
socket_->SetSendTimeout(300);
socket_->SetRecvTimeout(read_timeouts_);
}
void NetChannel::SetRecvTimeout(int microseconds) {
read_timeouts_ = microseconds;
socket_->SetRecvTimeout(read_timeouts_);
}
void NetChannel::SetAsyncSend(bool is_async_send) {
is_async_send_ = is_async_send;
}
NetChannel::NetChannel(std::unique_ptr<Socket> socket, bool connected) {
SetSocket(std::move(socket));
connected_ = connected;
}
void NetChannel::Close() { socket_->Close(); }
void NetChannel::SetSignatureVerifier(SignatureVerifier* verifier) {
verifier_ = verifier;
}
void NetChannel::SetSocket(std::unique_ptr<Socket> socket) {
socket_ = std::move(socket);
}
void NetChannel::SetDestReplicaInfo(const ReplicaInfo& replica) {
ip_ = replica.ip();
port_ = replica.port();
}
void NetChannel::IsLongConnection(bool long_connect_tion) {
long_connect_tion_ = long_connect_tion;
}
int NetChannel::Connect() {
socket_->ReInit();
socket_->SetAsync(is_async_send_);
return socket_->Connect(ip_, port_);
}
int NetChannel::SendDataInternal(const std::string& data) {
return socket_->Send(data);
}
// Connect to the server if not connected and send data.
int NetChannel::SendFromKeepAlive(const std::string& data) {
for (int i = 0; i < max_retry_time_; ++i) {
if (!long_connecting_) {
if (Connect() == 0) {
long_connecting_ = true;
}
if (!long_connecting_) {
LOG(ERROR) << "connect fail:" << ip_ << " port:" << port_;
continue;
}
}
int ret = SendDataInternal(data);
if (ret >= 0) {
return ret;
}
long_connecting_ = false;
}
return -1;
}
int NetChannel::Send(const std::string& data) {
if (long_connect_tion_) {
return SendFromKeepAlive(data);
}
for (int i = 0; i < max_retry_time_; ++i) {
if (!connected_) {
if (Connect()) {
LOG(ERROR) << "connect fail:" << ip_ << " port:" << port_;
continue;
}
}
int ret = SendDataInternal(data);
if (ret >= 0) {
return ret;
} else if (ret == -1) {
LOG(ERROR) << "send data fail:" << ip_ << " port:" << port_;
}
}
return -1;
}
// Receive data from the server.
int NetChannel::Recv(std::string* data) {
std::unique_ptr<DataInfo> resp = std::make_unique<DataInfo>();
int ret = socket_->Recv(&resp->buff, &resp->data_len);
if (ret > 0) {
*data = std::string((char*)resp->buff, resp->data_len);
}
return ret;
}
// Sign the message if verifier has been provied and send the message to the
// server.
std::string NetChannel::GetRawMessageString(
const google::protobuf::Message& message, SignatureVerifier* verifier) {
ResDBMessage sig_message;
if (!message.SerializeToString(sig_message.mutable_data())) {
return "";
}
if (verifier != nullptr) {
auto signature_or = verifier->SignMessage(sig_message.data());
if (!signature_or.ok()) {
LOG(ERROR) << "Sign message fail";
return "";
}
sig_message.mutable_signature()->Swap(&(*signature_or));
} else {
// LOG(ERROR) << " no verifier";
}
std::string message_str;
if (!sig_message.SerializeToString(&message_str)) {
return "";
}
return message_str;
}
int NetChannel::SendRawMessageData(const std::string& message_str) {
return Send(message_str);
}
int NetChannel::RecvRawMessageData(std::string* message_str) {
return Recv(message_str);
}
int NetChannel::SendRawMessage(const google::protobuf::Message& message) {
ResDBMessage sig_message;
if (!message.SerializeToString(sig_message.mutable_data())) {
return -1;
}
if (verifier_ != nullptr) {
auto signature_or = verifier_->SignMessage(sig_message.data());
if (!signature_or.ok()) {
LOG(ERROR) << "Sign message fail";
return -1;
}
sig_message.mutable_signature()->Swap(&(*signature_or));
}
std::string message_str;
if (!sig_message.SerializeToString(&message_str)) {
return -1;
}
return Send(message_str);
}
int NetChannel::RecvRawMessageStr(std::string* message) {
std::string recv_str;
int ret = Recv(&recv_str);
if (ret <= 0) {
return ret;
}
ResDBMessage sig_message;
if (!sig_message.ParseFromString(recv_str)) {
LOG(ERROR) << "parse to sig fail";
return -1;
}
*message = sig_message.data();
return 0;
}
int NetChannel::RecvRawMessage(google::protobuf::Message* message) {
std::string resp_data;
int ret = Recv(&resp_data);
if (ret < 0) {
LOG(ERROR) << "recv fail:" << ret;
return -1;
}
ResDBMessage sig_message;
if (!sig_message.ParseFromString(resp_data)) {
LOG(ERROR) << "parse sig msg fail";
return -1;
}
if (!message->ParseFromString(sig_message.data())) {
LOG(ERROR) << "parse response msg fail";
return -1;
}
return 0;
}
int NetChannel::SendRequest(const google::protobuf::Message& message,
Request::Type type, bool need_response) {
Request request;
request.set_type(type);
request.set_need_response(need_response);
if (!message.SerializeToString(request.mutable_data())) {
return -1;
}
return SendRawMessage(request);
}
} // namespace resdb