interface/rdbc/net_channel.cpp (179 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "interface/rdbc/net_channel.h" #include <glog/logging.h> #include "platform/common/data_comm/data_comm.h" #include "platform/common/network/tcp_socket.h" namespace resdb { NetChannel::NetChannel(const std::string& ip, int port) : ip_(ip), port_(port) { socket_ = std::make_unique<TcpSocket>(); socket_->SetSendTimeout(300); socket_->SetRecvTimeout(read_timeouts_); } void NetChannel::SetRecvTimeout(int microseconds) { read_timeouts_ = microseconds; socket_->SetRecvTimeout(read_timeouts_); } void NetChannel::SetAsyncSend(bool is_async_send) { is_async_send_ = is_async_send; } NetChannel::NetChannel(std::unique_ptr<Socket> socket, bool connected) { SetSocket(std::move(socket)); connected_ = connected; } void NetChannel::Close() { socket_->Close(); } void NetChannel::SetSignatureVerifier(SignatureVerifier* verifier) { verifier_ = verifier; } void NetChannel::SetSocket(std::unique_ptr<Socket> socket) { socket_ = std::move(socket); } void NetChannel::SetDestReplicaInfo(const ReplicaInfo& replica) { ip_ = replica.ip(); port_ = replica.port(); } void NetChannel::IsLongConnection(bool long_connect_tion) { long_connect_tion_ = long_connect_tion; } int NetChannel::Connect() { socket_->ReInit(); socket_->SetAsync(is_async_send_); return socket_->Connect(ip_, port_); } int NetChannel::SendDataInternal(const std::string& data) { return socket_->Send(data); } // Connect to the server if not connected and send data. int NetChannel::SendFromKeepAlive(const std::string& data) { for (int i = 0; i < max_retry_time_; ++i) { if (!long_connecting_) { if (Connect() == 0) { long_connecting_ = true; } if (!long_connecting_) { LOG(ERROR) << "connect fail:" << ip_ << " port:" << port_; continue; } } int ret = SendDataInternal(data); if (ret >= 0) { return ret; } long_connecting_ = false; } return -1; } int NetChannel::Send(const std::string& data) { if (long_connect_tion_) { return SendFromKeepAlive(data); } for (int i = 0; i < max_retry_time_; ++i) { if (!connected_) { if (Connect()) { LOG(ERROR) << "connect fail:" << ip_ << " port:" << port_; continue; } } int ret = SendDataInternal(data); if (ret >= 0) { return ret; } else if (ret == -1) { LOG(ERROR) << "send data fail:" << ip_ << " port:" << port_; } } return -1; } // Receive data from the server. int NetChannel::Recv(std::string* data) { std::unique_ptr<DataInfo> resp = std::make_unique<DataInfo>(); int ret = socket_->Recv(&resp->buff, &resp->data_len); if (ret > 0) { *data = std::string((char*)resp->buff, resp->data_len); } return ret; } // Sign the message if verifier has been provied and send the message to the // server. std::string NetChannel::GetRawMessageString( const google::protobuf::Message& message, SignatureVerifier* verifier) { ResDBMessage sig_message; if (!message.SerializeToString(sig_message.mutable_data())) { return ""; } if (verifier != nullptr) { auto signature_or = verifier->SignMessage(sig_message.data()); if (!signature_or.ok()) { LOG(ERROR) << "Sign message fail"; return ""; } sig_message.mutable_signature()->Swap(&(*signature_or)); } else { // LOG(ERROR) << " no verifier"; } std::string message_str; if (!sig_message.SerializeToString(&message_str)) { return ""; } return message_str; } int NetChannel::SendRawMessageData(const std::string& message_str) { return Send(message_str); } int NetChannel::RecvRawMessageData(std::string* message_str) { return Recv(message_str); } int NetChannel::SendRawMessage(const google::protobuf::Message& message) { ResDBMessage sig_message; if (!message.SerializeToString(sig_message.mutable_data())) { return -1; } if (verifier_ != nullptr) { auto signature_or = verifier_->SignMessage(sig_message.data()); if (!signature_or.ok()) { LOG(ERROR) << "Sign message fail"; return -1; } sig_message.mutable_signature()->Swap(&(*signature_or)); } std::string message_str; if (!sig_message.SerializeToString(&message_str)) { return -1; } return Send(message_str); } int NetChannel::RecvRawMessageStr(std::string* message) { std::string recv_str; int ret = Recv(&recv_str); if (ret <= 0) { return ret; } ResDBMessage sig_message; if (!sig_message.ParseFromString(recv_str)) { LOG(ERROR) << "parse to sig fail"; return -1; } *message = sig_message.data(); return 0; } int NetChannel::RecvRawMessage(google::protobuf::Message* message) { std::string resp_data; int ret = Recv(&resp_data); if (ret < 0) { LOG(ERROR) << "recv fail:" << ret; return -1; } ResDBMessage sig_message; if (!sig_message.ParseFromString(resp_data)) { LOG(ERROR) << "parse sig msg fail"; return -1; } if (!message->ParseFromString(sig_message.data())) { LOG(ERROR) << "parse response msg fail"; return -1; } return 0; } int NetChannel::SendRequest(const google::protobuf::Message& message, Request::Type type, bool need_response) { Request request; request.set_type(type); request.set_need_response(need_response); if (!message.SerializeToString(request.mutable_data())) { return -1; } return SendRawMessage(request); } } // namespace resdb