in src/meta/server_state.cpp [612:770]
dsn::error_code server_state::sync_apps_from_remote_storage()
{
dsn::error_code err;
dsn::task_tracker tracker;
dist::meta_state_service *storage = _meta_svc->get_remote_storage();
auto sync_partition = [this, storage, &err, &tracker](
std::shared_ptr<app_state> &app, int partition_id, const std::string &partition_path) {
storage->get_data(
partition_path,
LPC_META_CALLBACK,
[this, app, partition_id, partition_path, &err](error_code ec,
const blob &value) mutable {
if (ec == ERR_OK) {
partition_configuration pc;
dsn::json::json_forwarder<partition_configuration>::decode(value, pc);
CHECK(pc.pid.get_app_id() == app->app_id &&
pc.pid.get_partition_index() == partition_id,
"invalid partition config");
{
zauto_write_lock l(_lock);
app->partitions[partition_id] = pc;
for (const dsn::rpc_address &addr : pc.last_drops) {
app->helpers->contexts[partition_id].record_drop_history(addr);
}
if (app->status == app_status::AS_CREATING &&
(pc.partition_flags & pc_flags::dropped) != 0) {
recall_partition(app, partition_id);
} else if (app->status == app_status::AS_DROPPING &&
(pc.partition_flags & pc_flags::dropped) == 0) {
drop_partition(app, partition_id);
} else
process_one_partition(app);
// check consistency between app bulk_loading flag and app bulk load dir
if (app->helpers->partitions_in_progress.load() == 0 &&
app->status == app_status::AS_AVAILABLE &&
_meta_svc->get_bulk_load_service()) {
bool is_bulk_loading = app->is_bulk_loading;
_meta_svc->get_bulk_load_service()->check_app_bulk_load_states(
std::move(app), is_bulk_loading);
}
}
} else if (ec == ERR_OBJECT_NOT_FOUND) {
auto init_partition_count = app->init_partition_count > 0
? app->init_partition_count
: app->partition_count;
if (partition_id < init_partition_count) {
LOG_WARNING(
"partition node {} not exist on remote storage, may half create before",
partition_path);
init_app_partition_node(app, partition_id, nullptr);
} else if (partition_id >= app->partition_count / 2) {
LOG_WARNING(
"partition node {} not exist on remote storage, may half split before",
partition_path);
zauto_write_lock l(_lock);
app->helpers->split_states.status[partition_id - app->partition_count / 2] =
split_status::SPLITTING;
app->helpers->split_states.splitting_count++;
app->partitions[partition_id].ballot = invalid_ballot;
app->partitions[partition_id].pid = gpid(app->app_id, partition_id);
process_one_partition(app);
}
} else {
LOG_ERROR("get partition node failed, reason({})", ec);
err = ec;
}
},
&tracker);
};
auto sync_app = [&](const std::string &app_path) {
storage->get_data(
app_path,
LPC_META_CALLBACK,
[this, app_path, &err, &sync_partition](error_code ec, const blob &value) {
if (ec == ERR_OK) {
app_info info;
CHECK(dsn::json::json_forwarder<app_info>::decode(value, info),
"invalid json data");
std::shared_ptr<app_state> app = app_state::create(info);
{
zauto_write_lock l(_lock);
_all_apps.emplace(app->app_id, app);
if (app->status == app_status::AS_AVAILABLE) {
app->status = app_status::AS_CREATING;
_exist_apps.emplace(app->app_name, app);
} else if (app->status == app_status::AS_DROPPED) {
app->status = app_status::AS_DROPPING;
} else {
CHECK(false,
"invalid status({}) for app({}) in remote storage",
enum_to_string(app->status),
app->get_logname());
}
}
app->helpers->split_states.splitting_count = 0;
for (int i = 0; i < app->partition_count; i++) {
std::string partition_path =
app_path + "/" + boost::lexical_cast<std::string>(i);
sync_partition(app, i, partition_path);
}
} else {
LOG_ERROR("get app info from meta state service failed, path = {}, err = {}",
app_path,
ec);
err = ec;
}
},
&tracker);
};
_all_apps.clear();
_exist_apps.clear();
std::string transaction_state;
storage
->get_data(_apps_root,
LPC_META_CALLBACK,
[&err, &transaction_state](error_code ec, const blob &value) {
err = ec;
if (ec == dsn::ERR_OK) {
transaction_state.assign(value.data(), value.length());
}
})
->wait();
if (ERR_OBJECT_NOT_FOUND == err)
return err;
CHECK_EQ_MSG(ERR_OK, err, "can't handle this error");
CHECK(transaction_state == std::string(unlock_state) || transaction_state.empty(),
"invalid transaction state({})",
transaction_state);
storage->get_children(
_apps_root,
LPC_META_CALLBACK,
[&](error_code ec, const std::vector<std::string> &apps) {
if (ec == ERR_OK) {
for (const auto &appid_str : apps) {
sync_app(_apps_root + "/" + appid_str);
}
} else {
LOG_ERROR("get app list from meta state service failed, path = {}, err = {}",
_apps_root,
ec);
err = ec;
}
},
&tracker);
tracker.wait_outstanding_tasks();
if (err == ERR_OK) {
return _all_apps.empty() ? ERR_OBJECT_NOT_FOUND : ERR_OK;
}
return err;
}