dsn::error_code server_state::sync_apps_from_remote_storage()

in src/meta/server_state.cpp [612:770]


dsn::error_code server_state::sync_apps_from_remote_storage()
{
    dsn::error_code err;
    dsn::task_tracker tracker;

    dist::meta_state_service *storage = _meta_svc->get_remote_storage();
    auto sync_partition = [this, storage, &err, &tracker](
        std::shared_ptr<app_state> &app, int partition_id, const std::string &partition_path) {
        storage->get_data(
            partition_path,
            LPC_META_CALLBACK,
            [this, app, partition_id, partition_path, &err](error_code ec,
                                                            const blob &value) mutable {
                if (ec == ERR_OK) {
                    partition_configuration pc;
                    dsn::json::json_forwarder<partition_configuration>::decode(value, pc);

                    CHECK(pc.pid.get_app_id() == app->app_id &&
                              pc.pid.get_partition_index() == partition_id,
                          "invalid partition config");
                    {
                        zauto_write_lock l(_lock);
                        app->partitions[partition_id] = pc;
                        for (const dsn::rpc_address &addr : pc.last_drops) {
                            app->helpers->contexts[partition_id].record_drop_history(addr);
                        }

                        if (app->status == app_status::AS_CREATING &&
                            (pc.partition_flags & pc_flags::dropped) != 0) {
                            recall_partition(app, partition_id);
                        } else if (app->status == app_status::AS_DROPPING &&
                                   (pc.partition_flags & pc_flags::dropped) == 0) {
                            drop_partition(app, partition_id);
                        } else
                            process_one_partition(app);
                        // check consistency between app bulk_loading flag and app bulk load dir
                        if (app->helpers->partitions_in_progress.load() == 0 &&
                            app->status == app_status::AS_AVAILABLE &&
                            _meta_svc->get_bulk_load_service()) {
                            bool is_bulk_loading = app->is_bulk_loading;
                            _meta_svc->get_bulk_load_service()->check_app_bulk_load_states(
                                std::move(app), is_bulk_loading);
                        }
                    }
                } else if (ec == ERR_OBJECT_NOT_FOUND) {
                    auto init_partition_count = app->init_partition_count > 0
                                                    ? app->init_partition_count
                                                    : app->partition_count;
                    if (partition_id < init_partition_count) {
                        LOG_WARNING(
                            "partition node {} not exist on remote storage, may half create before",
                            partition_path);
                        init_app_partition_node(app, partition_id, nullptr);
                    } else if (partition_id >= app->partition_count / 2) {
                        LOG_WARNING(
                            "partition node {} not exist on remote storage, may half split before",
                            partition_path);
                        zauto_write_lock l(_lock);
                        app->helpers->split_states.status[partition_id - app->partition_count / 2] =
                            split_status::SPLITTING;
                        app->helpers->split_states.splitting_count++;
                        app->partitions[partition_id].ballot = invalid_ballot;
                        app->partitions[partition_id].pid = gpid(app->app_id, partition_id);
                        process_one_partition(app);
                    }

                } else {
                    LOG_ERROR("get partition node failed, reason({})", ec);
                    err = ec;
                }
            },
            &tracker);
    };

    auto sync_app = [&](const std::string &app_path) {
        storage->get_data(
            app_path,
            LPC_META_CALLBACK,
            [this, app_path, &err, &sync_partition](error_code ec, const blob &value) {
                if (ec == ERR_OK) {
                    app_info info;
                    CHECK(dsn::json::json_forwarder<app_info>::decode(value, info),
                          "invalid json data");
                    std::shared_ptr<app_state> app = app_state::create(info);
                    {
                        zauto_write_lock l(_lock);
                        _all_apps.emplace(app->app_id, app);
                        if (app->status == app_status::AS_AVAILABLE) {
                            app->status = app_status::AS_CREATING;
                            _exist_apps.emplace(app->app_name, app);
                        } else if (app->status == app_status::AS_DROPPED) {
                            app->status = app_status::AS_DROPPING;
                        } else {
                            CHECK(false,
                                  "invalid status({}) for app({}) in remote storage",
                                  enum_to_string(app->status),
                                  app->get_logname());
                        }
                    }
                    app->helpers->split_states.splitting_count = 0;
                    for (int i = 0; i < app->partition_count; i++) {
                        std::string partition_path =
                            app_path + "/" + boost::lexical_cast<std::string>(i);
                        sync_partition(app, i, partition_path);
                    }
                } else {
                    LOG_ERROR("get app info from meta state service failed, path = {}, err = {}",
                              app_path,
                              ec);
                    err = ec;
                }
            },
            &tracker);
    };

    _all_apps.clear();
    _exist_apps.clear();

    std::string transaction_state;
    storage
        ->get_data(_apps_root,
                   LPC_META_CALLBACK,
                   [&err, &transaction_state](error_code ec, const blob &value) {
                       err = ec;
                       if (ec == dsn::ERR_OK) {
                           transaction_state.assign(value.data(), value.length());
                       }
                   })
        ->wait();

    if (ERR_OBJECT_NOT_FOUND == err)
        return err;
    CHECK_EQ_MSG(ERR_OK, err, "can't handle this error");
    CHECK(transaction_state == std::string(unlock_state) || transaction_state.empty(),
          "invalid transaction state({})",
          transaction_state);

    storage->get_children(
        _apps_root,
        LPC_META_CALLBACK,
        [&](error_code ec, const std::vector<std::string> &apps) {
            if (ec == ERR_OK) {
                for (const auto &appid_str : apps) {
                    sync_app(_apps_root + "/" + appid_str);
                }
            } else {
                LOG_ERROR("get app list from meta state service failed, path = {}, err = {}",
                          _apps_root,
                          ec);
                err = ec;
            }
        },
        &tracker);
    tracker.wait_outstanding_tasks();
    if (err == ERR_OK) {
        return _all_apps.empty() ? ERR_OBJECT_NOT_FOUND : ERR_OK;
    }
    return err;
}