aios/catalog/service/CatalogController.cpp (946 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "catalog/service/CatalogController.h" #include "autil/EnvUtil.h" #include "catalog/tools/BSConfigMaker.h" #include "catalog/util/ProtoUtil.h" #include "fslib/fs/FileSystem.h" namespace catalog { AUTIL_LOG_SETUP(catalog, CatalogController); CatalogController::CatalogController(const std::string &catalogName, const std::shared_ptr<IStore> &store) : _catalogName(catalogName), _store(store) {} CatalogController::~CatalogController() {} bool CatalogController::init() { _reader = _store->createReader(_catalogName); if (_reader == nullptr) { return false; } _writer = _store->createWriter(_catalogName); if (_writer == nullptr) { return false; } _syncThread = autil::LoopThread::createLoopThread(std::bind(&CatalogController::syncLoop, this), 2 * 1000 * 1000); if (!_syncThread) { AUTIL_LOG(ERROR, "create sync loop thread failed"); return false; } return true; } bool CatalogController::recover() { CatalogVersion version = kInvalidCatalogVersion; const auto &status = _reader->getLatestVersion(version); if (!isOk(status)) { AUTIL_LOG(ERROR, "missing catalog:[%s] with error:[%s]", _catalogName.c_str(), status.message().c_str()); return false; } auto snapshot = retrieveSnapshot(version); if (snapshot == nullptr) { return false; } { autil::ScopedWriteLock lock(_buildLock); auto status = _reader->read(&_builds); if (!isOk(status)) { AUTIL_LOG(ERROR, "build of catalog [%s] read failed, with error:[%s]", _catalogName.c_str(), status.message().c_str()); return false; } } // set _snapshot in the end, // to make sure recover is finished when sync thread see a valid snapshot { autil::ScopedWriteLock lock(_lock); _snapshot = std::move(snapshot); } return true; } Status CatalogController::getCurrentPartitions(const Catalog *catalog, std::map<PartitionId, const Partition *> &partitions) { for (const auto &[_, database] : catalog->databases()) { for (const auto &[_, table] : database->tables()) { for (const auto &[_, partition] : table->partitions()) { const auto &id = partition->id(); PartitionId partitionId = {id.partitionName, id.tableName, id.databaseName, id.catalogName}; partitions[partitionId] = partition.get(); } } } return StatusBuilder::ok(); } Status CatalogController::getCurrentBuildIds(const std::string &catalogName, std::map<PartitionId, uint32_t> &buildIds) { proto::ListBuildRequest request; request.set_catalog_name(catalogName); proto::ListBuildResponse response; listBuild(&request, &response); CATALOG_CHECK_OK(response.status()); buildIds.clear(); for (int i = 0; i < response.build_ids_size(); ++i) { const auto &buildId = response.build_ids(i); PartitionId partitionId = { buildId.partition_name(), buildId.table_name(), buildId.database_name(), buildId.catalog_name()}; buildIds[partitionId] = buildId.generation_id(); } return StatusBuilder::ok(); } void CatalogController::createBuild(const Partition *partition, const std::string &storeRoot) { const auto &tableStructure = partition->tableStructure(); if (!tableStructure || tableStructure->tableStructureConfig().build_type() != proto::BuildType::OFFLINE) { return; } if (storeRoot.empty()) { AUTIL_LOG(ERROR, "store root is empty, create build failed"); return; } proto::CreateBuildRequest request; if (auto status = partition->toProto(request.mutable_partition()); !isOk(status)) { AUTIL_LOG(ERROR, "create build failed, [%s]", status.message().c_str()); return; } request.set_store_root(storeRoot); proto::CreateBuildResponse response; createBuild(&request, &response); if (response.status().code() != proto::ResponseStatus::OK) { AUTIL_LOG(ERROR, "create build failed, [%s]", response.status().message().c_str()); return; } } void CatalogController::updateBuild(const Partition *partition, const std::string &storeRoot, uint32_t generationId) { proto::UpdateBuildRequest request; auto build = request.mutable_build(); auto buildId = build->mutable_build_id(); const auto &partitionId = partition->id(); buildId->set_generation_id(generationId); buildId->set_partition_name(partitionId.partitionName); buildId->set_table_name(partitionId.tableName); buildId->set_database_name(partitionId.databaseName); buildId->set_catalog_name(partitionId.catalogName); auto target = build->mutable_target(); auto buildType = partition->tableStructure()->tableStructureConfig().build_type(); if (buildType == proto::BuildType::OFFLINE) { target->set_type(proto::BuildTarget::BATCH_BUILD); } target->set_build_state(proto::BuildState::RUNNING); std::string templatePath; if (auto status = getBSTemplateConfigPath(&templatePath); !isOk(status)) { AUTIL_LOG(ERROR, "get template config path failed, [%s]", status.message().c_str()); return; } std::string configPath; auto status = BSConfigMaker::Make(*partition, templatePath, storeRoot, &configPath); if (!isOk(status)) { AUTIL_LOG(ERROR, "make config failed, [%s]", status.message().c_str()); return; } target->set_config_path(configPath); proto::BuildCommonResponse response; updateBuild(&request, &response); if (response.status().code() != proto::ResponseStatus::OK) { AUTIL_LOG(ERROR, "update build failed, [%s]", response.status().message().c_str()); return; } } void CatalogController::dropBuild(const PartitionId &partitionId) { proto::DropBuildRequest request; auto buildId = request.mutable_build_id(); buildId->set_partition_name(partitionId.partitionName); buildId->set_table_name(partitionId.tableName); buildId->set_database_name(partitionId.databaseName); buildId->set_catalog_name(partitionId.catalogName); proto::BuildCommonResponse response; dropBuild(&request, &response); if (response.status().code() != proto::ResponseStatus::OK) { AUTIL_LOG(ERROR, "drop build failed, [%s]", response.status().message().c_str()); return; } } std::string CatalogController::getStoreRoot(const std::unique_ptr<Catalog> &catalog, const PartitionId &id) { for (const auto &[name, database] : catalog->databases()) { if (name == id.databaseName) { return database->databaseConfig().store_root(); } } return ""; } void CatalogController::syncLoop() { auto curCatalog = std::make_unique<Catalog>(); std::map<PartitionId, const Partition *> partitions; if (auto status = getCatalog(curCatalog.get()); !isOk(status)) { return; } if (!isOk(getCurrentPartitions(curCatalog.get(), partitions))) { AUTIL_LOG(ERROR, "get current partitions failed"); return; } std::map<PartitionId, /*generation id*/ uint32_t> buildIds; if (!isOk(getCurrentBuildIds(curCatalog->id(), buildIds))) { AUTIL_LOG(ERROR, "get current builds failed"); return; } for (const auto &[partitionId, partition] : partitions) { auto iter = _lastPartitions.find(partitionId); // not change if (iter != _lastPartitions.end() && partition->version() == iter->second->version()) { continue; } auto idIter = buildIds.find(partitionId); // add partition if (idIter == buildIds.end()) { createBuild(partition, getStoreRoot(curCatalog, partitionId)); continue; } if (partition->getTableStructure()->tableStructureConfig().build_type() != proto::BuildType::OFFLINE) { dropBuild(partitionId); continue; } if (iter != _lastPartitions.end() && isSchemaChanged(partition, iter->second)) { createBuild(partition, getStoreRoot(curCatalog, partitionId)); continue; } // update partition updateBuild(partition, getStoreRoot(curCatalog, partitionId), idIter->second); } // drop partition for (const auto &[partitionId, _] : _lastPartitions) { if (partitions.count(partitionId) == 0) { dropBuild(partitionId); } } _lastPartitions.swap(partitions); _lastCatalog = std::move(curCatalog); } const proto::EntityStatus &CatalogController::status() { autil::ScopedReadLock lock(_lock); return _snapshot->status(); } #define GET_SNAPSHOT(RESPONSE, CATALOG_VERSION) \ autil::ScopedReadLock lock(_lock); \ auto snapshot = getSnapshot(_snapshot, (CATALOG_VERSION)); \ CATALOG_REQUIRES(RESPONSE, \ snapshot != nullptr, \ Status::NOT_FOUND, \ "missing catalog:[", \ _catalogName, \ "] with version:[", \ (CATALOG_VERSION), \ "]") void CatalogController::getCatalog(const proto::GetCatalogRequest *request, proto::GetCatalogResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); CATALOG_REQUIRES_OK(response, snapshot->toProto(response->mutable_catalog(), request->detail_level())); } void CatalogController::createCatalog(const proto::CreateCatalogRequest *request, proto::CommonResponse *response) { auto status = checkAndUpdate( [](CatalogSnapshot *snapshot) { CATALOG_CHECK(snapshot == nullptr, Status::INTERNAL_ERROR, "create catalog check failed"); return StatusBuilder::ok(); }, [&](Catalog &catalog) { CATALOG_CHECK_OK(catalog.create(request->catalog())); return StatusBuilder::ok(); }, [&](CatalogSnapshot *oldSnapshot, CatalogSnapshot *newSnapshot) { AUTIL_LOG(INFO, "catalog:[%s] installing new version:[%ld]", _catalogName.c_str(), newSnapshot->version()); CATALOG_CHECK_OK(_writer->write(oldSnapshot, newSnapshot)); AUTIL_LOG(INFO, "catalog:[%s] new version:[%ld] installed", _catalogName.c_str(), newSnapshot->version()); return StatusBuilder::ok(); }, [&](CatalogSnapshot *oldSnapshot, CatalogSnapshot *newSnapshot) { response->set_source_catalog_version(kInvalidCatalogVersion); response->set_target_catalog_version(newSnapshot->version()); return StatusBuilder::ok(); }); CATALOG_REQUIRES_OK(response, status); } void CatalogController::dropCatalog(const proto::DropCatalogRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.drop(); }); } void CatalogController::updateCatalog(const proto::UpdateCatalogRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.update(request->catalog()); }); } void CatalogController::updateCatalogStatus(const proto::UpdateCatalogStatusRequest *request, proto::CommonResponse *response) { UpdateResult updateResult; auto status = checkAndUpdate( [&](CatalogSnapshot *snapshot) { CATALOG_CHECK(snapshot != nullptr, Status::INTERNAL_ERROR, "old snapshot not exists"); auto inputVersion = request->catalog_version(); auto currentVersion = snapshot->version(); CATALOG_CHECK(currentVersion == inputVersion, Status::VERSION_EXPIRED, "version:[", inputVersion, "] for catalog:[", snapshot->catalogName(), "] expired, expected version:[", currentVersion, "]"); CATALOG_CHECK(request->status().code() == proto::EntityStatus::PUBLISHED, Status::INVALID_ARGUMENTS, "unsupported status: [", proto::EntityStatus::Code_Name(request->status().code()), "] to update"); return StatusBuilder::ok(); }, [&](Catalog &catalog) { return StatusBuilder::ok(); }, [&](CatalogSnapshot *oldSnapshot, CatalogSnapshot *newSnapshot) { AUTIL_LOG( INFO, "catalog:[%s] updating status with version:[%ld]", _catalogName.c_str(), newSnapshot->version()); CATALOG_CHECK_OK(_writer->updateStatus(newSnapshot, request->status())); AUTIL_LOG( INFO, "catalog:[%s] status updated with version:[%ld]", _catalogName.c_str(), newSnapshot->version()); return StatusBuilder::ok(); }, [&](CatalogSnapshot *oldSnapshot, CatalogSnapshot *newSnapshot) { response->set_source_catalog_version(oldSnapshot->version()); response->set_target_catalog_version(newSnapshot->version()); return StatusBuilder::ok(); }); CATALOG_REQUIRES_OK(response, status); } void CatalogController::listDatabase(const proto::ListDatabaseRequest *request, proto::ListDatabaseResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); const auto &names = snapshot->listDatabase(); *(response->mutable_database_names()) = {names.begin(), names.end()}; } void CatalogController::getDatabase(const proto::GetDatabaseRequest *request, proto::GetDatabaseResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); if (request->ignore_not_found_error() && !snapshot->hasDatabase(request->database_name())) { return; } Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); CATALOG_REQUIRES_OK(response, database->toProto(response->mutable_database(), request->detail_level())); } void CatalogController::createDatabase(const proto::CreateDatabaseRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.createDatabase(request->database()); }); } void CatalogController::dropDatabase(const proto::DropDatabaseRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.dropDatabase({request->database_name(), request->catalog_name()}); }); } void CatalogController::updateDatabase(const proto::UpdateDatabaseRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updateDatabase(request->database()); }); } void CatalogController::listTable(const proto::ListTableRequest *request, proto::ListTableResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); const auto &names = database->listTable(); *(response->mutable_table_names()) = {names.begin(), names.end()}; } void CatalogController::getTable(const proto::GetTableRequest *request, proto::GetTableResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); if (request->ignore_not_found_error() && !database->hasTable(request->table_name())) { return; } Table *table = nullptr; CATALOG_REQUIRES_OK(response, database->getTable(request->table_name(), table)); CATALOG_REQUIRES_OK(response, table->toProto(response->mutable_table(), request->detail_level())); } void CatalogController::createTable(const proto::CreateTableRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { if (request->validate_table_structure()) { CATALOG_CHECK_OK(BSConfigMaker::validateSchema(request->table())); } return catalog.createTable(request->table()); }); } void CatalogController::dropTable(const proto::DropTableRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.dropTable({ request->table_name(), request->database_name(), request->catalog_name(), }); }); } void CatalogController::updateTable(const proto::UpdateTableRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updateTable(request->table()); }); } void CatalogController::listTableRelatedTableGroup(const proto::ListTableRelatedTableGroupRequest *request, proto::ListTableRelatedTableGroupResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); const auto &tableName = request->table_name(); Table *table = nullptr; CATALOG_REQUIRES_OK(response, database->getTable(tableName, table)); for (auto &it : database->tableGroups()) { const auto &loadStrategies = it.second->loadStrategies(); if (loadStrategies.find(tableName) != loadStrategies.end()) { response->add_table_group_names(it.first); } } } void CatalogController::getTableStructure(const proto::GetTableStructureRequest *request, proto::GetTableStructureResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); Table *table = nullptr; CATALOG_REQUIRES_OK(response, database->getTable(request->table_name(), table)); const auto &tableStructure = table->tableStructure(); CATALOG_REQUIRES(response, tableStructure != nullptr, Status::NOT_FOUND, "no table_structure for table:[", request->database_name(), ".", request->table_name(), "]"); CATALOG_REQUIRES_OK(response, tableStructure->toProto(response->mutable_table_structure(), request->detail_level())); } void CatalogController::updateTableStructure(const proto::UpdateTableStructureRequest *request, proto::CommonResponse *response) { execute( request, response, [&](Catalog &catalog) { return catalog.updateTableStructure(request->table_structure()); }); } void CatalogController::addColumn(const proto::AddColumnRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.addColumn(*request); }); } void CatalogController::updateColumn(const proto::UpdateColumnRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updateColumn(*request); }); } void CatalogController::dropColumn(const proto::DropColumnRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.dropColumn(*request); }); } void CatalogController::createIndex(const proto::CreateIndexRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.createIndex(*request); }); } void CatalogController::updateIndex(const proto::UpdateIndexRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updateIndex(*request); }); } void CatalogController::dropIndex(const proto::DropIndexRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.dropIndex(*request); }); } void CatalogController::listPartition(const proto::ListPartitionRequest *request, proto::ListPartitionResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); auto partitionType = request->partition_type(); if (partitionType == proto::PartitionType::TABLE_PARTITION) { Table *table = nullptr; CATALOG_REQUIRES_OK(response, database->getTable(request->table_name(), table)); const auto &names = table->listPartition(); *(response->mutable_partition_names()) = {names.begin(), names.end()}; } else { CATALOG_REQUIRES(response, false, Status::INVALID_ARGUMENTS, "unsupported partition_type:[", proto::PartitionType_Code_Name(partitionType), "]"); } } void CatalogController::getPartition(const proto::GetPartitionRequest *request, proto::GetPartitionResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); auto partitionType = request->partition_type(); if (partitionType == proto::PartitionType::TABLE_PARTITION) { Table *table = nullptr; CATALOG_REQUIRES_OK(response, database->getTable(request->table_name(), table)); if (request->ignore_not_found_error() && !table->hasPartition(request->partition_name())) { return; } Partition *partition = nullptr; CATALOG_REQUIRES_OK(response, table->getPartition(request->partition_name(), partition)); CATALOG_REQUIRES_OK(response, partition->toProto(response->mutable_partition(), request->detail_level())); } else { CATALOG_REQUIRES(response, false, Status::INVALID_ARGUMENTS, "unsupported partition_type:[", proto::PartitionType_Code_Name(partitionType), "]"); } } void CatalogController::createPartition(const proto::CreatePartitionRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.createPartition(*request); }); } void CatalogController::dropPartition(const proto::DropPartitionRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.dropPartition(*request); }); } void CatalogController::updatePartition(const proto::UpdatePartitionRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updatePartition(*request); }); } void CatalogController::updatePartitionTableStructure(const proto::UpdatePartitionTableStructureRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updatePartitionTableStructure(*request); }); } void CatalogController::listTableGroup(const proto::ListTableGroupRequest *request, proto::ListTableGroupResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); const auto &names = database->listTableGroup(); *(response->mutable_table_group_names()) = {names.begin(), names.end()}; } void CatalogController::getTableGroup(const proto::GetTableGroupRequest *request, proto::GetTableGroupResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); if (request->ignore_not_found_error() && !database->hasTableGroup(request->table_group_name())) { return; } TableGroup *tableGroup = nullptr; CATALOG_REQUIRES_OK(response, database->getTableGroup(request->table_group_name(), tableGroup)); CATALOG_REQUIRES_OK(response, tableGroup->toProto(response->mutable_table_group(), request->detail_level())); } void CatalogController::createTableGroup(const proto::CreateTableGroupRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.createTableGroup(request->table_group()); }); } void CatalogController::dropTableGroup(const proto::DropTableGroupRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.dropTableGroup({ request->table_group_name(), request->database_name(), request->catalog_name(), }); }); } void CatalogController::updateTableGroup(const proto::UpdateTableGroupRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updateTableGroup(request->table_group()); }); } void CatalogController::listLoadStrategy(const proto::ListLoadStrategyRequest *request, proto::ListLoadStrategyResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); TableGroup *tableGroup = nullptr; CATALOG_REQUIRES_OK(response, database->getTableGroup(request->table_group_name(), tableGroup)); const auto &names = tableGroup->listLoadStrategy(); *(response->mutable_table_names()) = {names.begin(), names.end()}; } void CatalogController::getLoadStrategy(const proto::GetLoadStrategyRequest *request, proto::GetLoadStrategyResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); TableGroup *tableGroup = nullptr; CATALOG_REQUIRES_OK(response, database->getTableGroup(request->table_group_name(), tableGroup)); if (request->ignore_not_found_error() && !tableGroup->hasLoadStrategy(request->table_name())) { return; } LoadStrategy *loadStrategy = nullptr; CATALOG_REQUIRES_OK(response, tableGroup->getLoadStrategy(request->table_name(), loadStrategy)); CATALOG_REQUIRES_OK(response, loadStrategy->toProto(response->mutable_load_strategy(), request->detail_level())); } void CatalogController::createLoadStrategy(const proto::CreateLoadStrategyRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.createLoadStrategy(request->load_strategy()); }); } void CatalogController::dropLoadStrategy(const proto::DropLoadStrategyRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.dropLoadStrategy({ request->table_name(), request->table_group_name(), request->database_name(), request->catalog_name(), }); }); } void CatalogController::updateLoadStrategy(const proto::UpdateLoadStrategyRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updateLoadStrategy(request->load_strategy()); }); } void CatalogController::listFunction(const proto::ListFunctionRequest *request, proto::ListFunctionResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); const auto &names = database->listFunction(); *(response->mutable_function_names()) = {names.begin(), names.end()}; } void CatalogController::getFunction(const proto::GetFunctionRequest *request, proto::GetFunctionResponse *response) { GET_SNAPSHOT(response, request->catalog_version()); Database *database = nullptr; CATALOG_REQUIRES_OK(response, snapshot->getDatabase(request->database_name(), database)); if (request->ignore_not_found_error() && !database->hasFunction(request->function_name())) { return; } Function *function = nullptr; CATALOG_REQUIRES_OK(response, database->getFunction(request->function_name(), function)); CATALOG_REQUIRES_OK(response, function->toProto(response->mutable_function(), request->detail_level())); } void CatalogController::createFunction(const proto::CreateFunctionRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.createFunction(request->function()); }); } void CatalogController::dropFunction(const proto::DropFunctionRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.dropFunction({ request->function_name(), request->database_name(), request->catalog_name(), }); }); } void CatalogController::updateFunction(const proto::UpdateFunctionRequest *request, proto::CommonResponse *response) { execute(request, response, [&](Catalog &catalog) { return catalog.updateFunction(request->function()); }); } void CatalogController::listBuild(const proto::ListBuildRequest *request, proto::ListBuildResponse *response) { CATALOG_REQUIRES( response, !request->catalog_name().empty(), Status::INVALID_ARGUMENTS, "catalog name is not specified"); proto::BuildId targetBuildId; targetBuildId.set_catalog_name(request->catalog_name()); targetBuildId.set_database_name(request->database_name()); targetBuildId.set_table_name(request->table_name()); targetBuildId.set_partition_name(request->partition_name()); { autil::ScopedReadLock lock(_buildLock); for (const auto &build : _builds) { if (isBuildIdMatch(targetBuildId, build.build_id())) { *response->add_build_ids() = build.build_id(); *response->add_build_targets() = build.target(); } } } } void CatalogController::getBuild(const proto::GetBuildRequest *request, proto::GetBuildResponse *response) { CATALOG_REQUIRES(response, !request->build_id().catalog_name().empty(), Status::INVALID_ARGUMENTS, "catalog name is not specified in build id"); { autil::ScopedReadLock lock(_buildLock); for (const auto &build : _builds) { if (isBuildIdMatch(request->build_id(), build.build_id())) { *response->add_builds() = build; } } } if (!request->build_id().partition_name().empty()) { CATALOG_REQUIRES(response, response->builds_size() > 0, Status::NOT_FOUND, "build id [", request->build_id().ShortDebugString(), "] not found"); } } void CatalogController::createBuild(const proto::CreateBuildRequest *request, proto::CreateBuildResponse *response) { CATALOG_REQUIRES(response, request->has_partition(), Status::INVALID_ARGUMENTS, "partition is missing"); CATALOG_REQUIRES(response, !request->store_root().empty(), Status::INVALID_ARGUMENTS, "store root is empty"); proto::Build newBuild; CATALOG_REQUIRES_OK(response, createBuild(request->partition(), request->store_root(), &newBuild)); CATALOG_REQUIRES_OK(response, checkBuildId(newBuild)); { autil::ScopedWriteLock lock(_buildLock); auto builds = _builds; for (const auto &build : builds) { CATALOG_REQUIRES(response, !ProtoUtil::compareProto(build.build_id(), newBuild.build_id()), Status::INTERNAL_ERROR, "build id [", newBuild.build_id().ShortDebugString(), "] already exist, try it later after 1s"); } builds.push_back(newBuild); auto status = _writer->write(builds); if (isOk(status)) { _builds.swap(builds); } CATALOG_REQUIRES_OK(response, status); } *response->mutable_build_id() = newBuild.build_id(); } void CatalogController::dropBuild(const proto::DropBuildRequest *request, proto::BuildCommonResponse *response) { CATALOG_REQUIRES(response, !request->build_id().catalog_name().empty(), Status::INVALID_ARGUMENTS, "catalog name is not specified in build id"); std::vector<proto::Build> builds; { autil::ScopedWriteLock lock(_buildLock); for (const auto &build : _builds) { if (!isBuildIdMatch(request->build_id(), build.build_id())) { builds.push_back(build); } } auto status = _writer->write(builds); if (isOk(status)) { _builds.swap(builds); } CATALOG_REQUIRES_OK(response, status); } } void CatalogController::updateBuild(const proto::UpdateBuildRequest *request, proto::BuildCommonResponse *response) { const auto &newBuild = request->build(); CATALOG_REQUIRES_OK(response, checkBuildId(newBuild)); { autil::ScopedWriteLock lock(_buildLock); auto builds = _builds; for (auto &build : builds) { if (!ProtoUtil::compareProto(request->build().build_id(), build.build_id())) { continue; } if (request->build().has_target()) { const auto &requestTarget = request->build().target(); auto target = build.mutable_target(); if (requestTarget.type() != proto::BuildTarget::NONE) { CATALOG_REQUIRES(response, target->type() == requestTarget.type(), Status::INVALID_ARGUMENTS, "type of build cannot be modified, [", target->type(), "] to [", requestTarget.type(), "]"); } if (requestTarget.build_state() != proto::BuildState::NONE) { target->set_build_state(requestTarget.build_state()); } if (!requestTarget.config_path().empty()) { target->set_config_path(requestTarget.config_path()); } } if (request->build().has_current()) { const auto &requestCurrent = request->build().current(); auto current = build.mutable_current(); if (requestCurrent.build_state() != proto::BuildState::NONE) { current->set_build_state(requestCurrent.build_state()); } if (!requestCurrent.config_path().empty()) { current->set_config_path(requestCurrent.config_path()); } if (!requestCurrent.last_error().empty()) { current->set_last_error(requestCurrent.last_error()); } if (!requestCurrent.index_root().empty()) { current->set_index_root(requestCurrent.index_root()); } if (requestCurrent.shards_size() > 0) { current->clear_shards(); for (int i = 0; i < requestCurrent.shards_size(); ++i) { *current->add_shards() = requestCurrent.shards(i); } } } break; } auto status = _writer->write(builds); if (isOk(status)) { _builds.swap(builds); } CATALOG_REQUIRES_OK(response, status); } } template <typename T> void CatalogController::execute(const T *request, proto::CommonResponse *response, UpdateFunction doUpdate) { auto status = checkAndUpdate( [&](CatalogSnapshot *snapshot) { CATALOG_CHECK(snapshot != nullptr, Status::INTERNAL_ERROR, "old snapshot not exists"); auto inputVersion = request->catalog_version(); auto currentVersion = snapshot->version(); CATALOG_CHECK(currentVersion == inputVersion, Status::VERSION_EXPIRED, "version:[", inputVersion, "] for catalog:[", snapshot->catalogName(), "] expired, expected version:[", currentVersion, "]"); return StatusBuilder::ok(); }, std::move(doUpdate), [&](CatalogSnapshot *oldSnapshot, CatalogSnapshot *newSnapshot) { AUTIL_LOG(INFO, "catalog:[%s] installing new version:[%ld]", _catalogName.c_str(), newSnapshot->version()); CATALOG_CHECK_OK(_writer->write(oldSnapshot, newSnapshot)); AUTIL_LOG(INFO, "catalog:[%s] new version:[%ld] installed", _catalogName.c_str(), newSnapshot->version()); return StatusBuilder::ok(); }, [&](CatalogSnapshot *oldSnapshot, CatalogSnapshot *newSnapshot) { response->set_source_catalog_version(oldSnapshot->version()); response->set_target_catalog_version(newSnapshot->version()); return StatusBuilder::ok(); }); CATALOG_REQUIRES_OK(response, status); } Status CatalogController::checkAndUpdate(CheckFunction doCheck, UpdateFunction doUpdate, WriteFunction doWrite, FinishFunction doFinish) { autil::ScopedWriteLock lock(_lock); // check CATALOG_CHECK_OK(doCheck(_snapshot.get())); // create or copy auto catalog = _snapshot == nullptr ? std::make_unique<Catalog>() : _snapshot->clone(); CATALOG_CHECK(catalog != nullptr, Status::INTERNAL_ERROR, "failed to clone snapshot"); // update CATALOG_CHECK_OK(doUpdate(*catalog)); CATALOG_CHECK_OK(alignVersion(_snapshot.get(), *catalog)); // write CATALOG_CHECK_OK(doWrite(_snapshot.get(), catalog.get())); // finish CATALOG_CHECK_OK(doFinish(_snapshot.get(), catalog.get())); _snapshot = std::move(catalog); return StatusBuilder::ok(); } Status CatalogController::alignVersion(CatalogSnapshot *oldSnapshot, Catalog &newSnapshot) { CatalogVersion nextVersion = kInitCatalogVersion; if (oldSnapshot == nullptr) { CatalogVersion latestVersion = kInvalidCatalogVersion; CATALOG_CHECK_OK(_reader->getLatestVersion(latestVersion)); if (latestVersion == kInvalidCatalogVersion) { nextVersion = kInitCatalogVersion; } else { nextVersion = latestVersion + 1; } } else { nextVersion = oldSnapshot->version() + 1; } newSnapshot.alignVersion(nextVersion); return StatusBuilder::ok(); } std::shared_ptr<CatalogSnapshot> CatalogController::getSnapshot(std::shared_ptr<CatalogSnapshot> currentSnapshot, CatalogVersion targetVersion) { if (targetVersion <= 0 || targetVersion == currentSnapshot->version()) { return currentSnapshot; } // TODO(chekong.ygm): 后续可以考虑引入snapshot cache return retrieveSnapshot(targetVersion); } std::unique_ptr<CatalogSnapshot> CatalogController::retrieveSnapshot(CatalogVersion version) { proto::Catalog proto; const auto &s1 = _reader->read(version, &proto); if (!isOk(s1)) { AUTIL_LOG(ERROR, "failed to read catalog:[%s] with version:[%ld] and error:[%s]", _catalogName.c_str(), version, s1.message().c_str()); return nullptr; } auto catalog = std::make_unique<Catalog>(); const auto &s2 = catalog->fromProto(proto); if (!isOk(s2)) { AUTIL_LOG(ERROR, "failed to parse catalog:[%s] with version:[%ld] and error:[%s]", _catalogName.c_str(), version, s2.message().c_str()); return nullptr; } return catalog; } Status CatalogController::checkBuildId(const proto::Build &build) { CATALOG_CHECK(build.has_build_id(), Status::INVALID_ARGUMENTS, "build id not specified"); const auto &buildId = build.build_id(); CATALOG_CHECK(buildId.generation_id() > 0, Status::INVALID_ARGUMENTS, "invalid generation id [", buildId.generation_id(), "] in build id"); CATALOG_CHECK(!buildId.partition_name().empty(), Status::INVALID_ARGUMENTS, "partition name is empty in build id"); CATALOG_CHECK(!buildId.table_name().empty(), Status::INVALID_ARGUMENTS, "table name is empty in build id"); CATALOG_CHECK(!buildId.database_name().empty(), Status::INVALID_ARGUMENTS, "database name is empty in build id"); CATALOG_CHECK(!buildId.catalog_name().empty(), Status::INVALID_ARGUMENTS, "catalog name is empty in build id"); return StatusBuilder::ok(); } bool CatalogController::isBuildIdMatch(const proto::BuildId &targetBuildId, const proto::BuildId &buildId) { if (targetBuildId.database_name().empty()) { return buildId.catalog_name() == targetBuildId.catalog_name(); } if (targetBuildId.table_name().empty()) { return buildId.catalog_name() == targetBuildId.catalog_name() && buildId.database_name() == targetBuildId.database_name(); } if (targetBuildId.partition_name().empty()) { return buildId.catalog_name() == targetBuildId.catalog_name() && buildId.database_name() == targetBuildId.database_name() && buildId.table_name() == targetBuildId.table_name(); } if (targetBuildId.generation_id() == 0) { return buildId.catalog_name() == targetBuildId.catalog_name() && buildId.database_name() == targetBuildId.database_name() && buildId.table_name() == targetBuildId.table_name() && buildId.partition_name() == targetBuildId.partition_name(); } return buildId.catalog_name() == targetBuildId.catalog_name() && buildId.database_name() == targetBuildId.database_name() && buildId.table_name() == targetBuildId.table_name() && buildId.partition_name() == targetBuildId.partition_name() && buildId.generation_id() == targetBuildId.generation_id(); } Status CatalogController::createBuild(const proto::Partition &part, const std::string &storeRoot, proto::Build *build) { Partition partition; CATALOG_CHECK_OK(partition.fromProto(part)); const auto &partitionId = partition.id(); auto buildId = build->mutable_build_id(); buildId->set_generation_id(autil::TimeUtility::currentTimeInSeconds()); buildId->set_partition_name(partitionId.partitionName); buildId->set_table_name(partitionId.tableName); buildId->set_database_name(partitionId.databaseName); buildId->set_catalog_name(partitionId.catalogName); auto buildType = partition.getTableStructure()->tableStructureConfig().build_type(); CATALOG_CHECK(buildType == proto::BuildType::OFFLINE, Status::INVALID_ARGUMENTS, "build_type [", buildType, "] is not OFFLINE, do not create build"); auto target = build->mutable_target(); target->set_type(proto::BuildTarget::BATCH_BUILD); target->set_build_state(proto::BuildState::RUNNING); std::string templatePath; CATALOG_CHECK_OK(getBSTemplateConfigPath(&templatePath)); std::string configPath; CATALOG_CHECK_OK(BSConfigMaker::Make(partition, templatePath, storeRoot, &configPath)); target->set_config_path(configPath); return StatusBuilder::ok(); } Status CatalogController::getBSTemplateConfigPath(std::string *templatePath) { *templatePath = autil::EnvUtil::getEnv("BS_TEMPLATE_CONFIG_PATH", std::string("/usr/local/etc/template")); CATALOG_CHECK(fslib::fs::FileSystem::isExist(*templatePath) == fslib::EC_TRUE, Status::INTERNAL_ERROR, "bs template config path [", *templatePath, "] not exist"); return StatusBuilder::ok(); } Status CatalogController::getCatalog(Catalog *catalog) { autil::ScopedReadLock lock(_lock); proto::Catalog proto; CATALOG_CHECK(_snapshot != nullptr, Status::INTERNAL_ERROR, "catalog snapshot is nullptr"); auto s = _snapshot->toProto(&proto); if (!isOk(s)) { return s; } catalog->fromProto(proto); return StatusBuilder::ok(); } std::map<PartitionId, std::map<uint32_t, proto::Build>> CatalogController::getCurrentBuilds() { autil::ScopedReadLock lock(_buildLock); std::map<PartitionId, std::map<uint32_t, proto::Build>> buildMap; for (const auto &build : _builds) { const auto &buildId = build.build_id(); PartitionId id = {buildId.partition_name(), buildId.table_name(), buildId.database_name(), buildId.catalog_name(), proto::PartitionType::TABLE_PARTITION}; auto iter = buildMap.find(id); if (iter != buildMap.end()) { iter->second.emplace(buildId.generation_id(), build); } else { buildMap.emplace(id, std::map<uint32_t, proto::Build>{{buildId.generation_id(), build}}); } } return buildMap; } bool CatalogController::isSchemaChanged(const Partition *newPart, const Partition *oldPart) { // check table structure const TableStructure *newTableStructure = newPart->getTableStructure(); const TableStructure *oldTableStructure = oldPart->getTableStructure(); if (newTableStructure && oldTableStructure) { proto::TableStructure newProto; proto::TableStructure oldProto; newTableStructure->toProto(&newProto); oldTableStructure->toProto(&oldProto); bool noDiff = ProtoUtil::compareProto(newProto, oldProto); if (!noDiff) { return true; } } // check data source bool noDiff = ProtoUtil::compareProto(newPart->dataSource(), oldPart->dataSource()); if (!noDiff) { return true; } return false; } } // namespace catalog