src/cluster/sync_migrate_context.cc (45 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "cluster/sync_migrate_context.h"
void SyncMigrateContext::Suspend() {
auto bev = conn_->GetBufferEvent();
SetCB(bev);
if (timeout_ > 0) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
timeval tm = {timeout_, 0};
evtimer_add(timer_.get(), &tm);
}
}
void SyncMigrateContext::Resume(const Status &migrate_result) {
migrate_result_ = migrate_result;
auto s = conn_->Owner()->EnableWriteEvent(conn_->GetFD());
if (!s.IsOK()) {
error("[server] Failed to enable write event on the sync migrate connection {}: {}", conn_->GetFD(), s.Msg());
}
}
void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) {
auto &&slot_migrator = srv_->slot_migrator;
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
timer_.reset();
slot_migrator->CancelSyncCtx();
}
conn_->OnEvent(bev, events);
}
void SyncMigrateContext::TimerCB(int, [[maybe_unused]] int16_t events) {
auto &&slot_migrator = srv_->slot_migrator;
conn_->Reply(conn_->NilString());
timer_.reset();
slot_migrator->CancelSyncCtx();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}
void SyncMigrateContext::OnWrite(bufferevent *bev) {
if (migrate_result_) {
conn_->Reply(redis::RESP_OK);
} else {
conn_->Reply(redis::Error(migrate_result_));
}
timer_.reset();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}