platform/networkstrate/async_replica_client.cpp (82 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/networkstrate/async_replica_client.h"
#include <boost/bind/bind.hpp>
#include "platform/common/queue/lock_free_queue.h"
#include "platform/proto/replica_info.pb.h"
namespace resdb {
AsyncReplicaClient::AsyncReplicaClient(boost::asio::io_service* io_service,
const std::string& ip, int port,
bool is_use_long_conn)
: socket_(*io_service),
endpoint_(boost::asio::ip::address::from_string(ip), port),
in_process_(false) {}
AsyncReplicaClient::~AsyncReplicaClient() {}
int AsyncReplicaClient::SendMessage(const std::string& data) {
queue_.Push(std::make_unique<std::string>(data));
if (!in_process_.load()) {
bool old_value = false;
if (in_process_.compare_exchange_strong(old_value, true,
std::memory_order_acq_rel,
std::memory_order_acq_rel)) {
OnSendNewMessage();
}
}
return 0;
}
void AsyncReplicaClient::OnSendNewMessage() {
std::unique_ptr<std::string> data = queue_.Pop(0);
if (data == nullptr || data->empty()) {
in_process_ = false;
return;
}
pending_data_ = std::move(data);
status_ = 0;
OnSendMessage();
}
void AsyncReplicaClient::OnSendMessage() {
if (status_ == 0) {
data_size_ = pending_data_->size();
sending_data_size_ = sizeof(data_size_);
sending_data_ptr_ = reinterpret_cast<char*>(&data_size_);
sending_data_idx_ = 0;
status_ = 1;
OnSend();
} else if (status_ == 1) {
sending_data_size_ = data_size_;
sending_data_ptr_ = pending_data_->c_str();
sending_data_idx_ = 0;
status_ = 2;
OnSend();
} else {
status_ = 0;
OnSendNewMessage();
}
}
void AsyncReplicaClient::OnSend() {
socket_.async_write_some(
boost::asio::buffer(sending_data_ptr_ + sending_data_idx_,
sending_data_size_ - sending_data_idx_),
[&](const boost::system::error_code& error, size_t send_size) {
if (error) {
ReConnect();
} else {
sending_data_idx_ += send_size;
if (sending_data_idx_ >= sending_data_size_) {
OnSendMessage();
} else {
OnSend();
}
}
});
}
void AsyncReplicaClient::ReConnect() {
socket_.async_connect(endpoint_, [&](const boost::system::error_code& error) {
if (!error) {
status_ = 0;
OnSendMessage();
} else {
usleep(10000);
ReConnect();
}
});
}
} // namespace resdb