aios/catalog/service/CatalogServiceImpl.cpp (448 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "catalog/service/CatalogServiceImpl.h" #include <memory> #include "catalog/service/CatalogAccessLog.h" #include "catalog/store/StoreFactory.h" #include "catalog/util/StatusBuilder.h" namespace catalog { AUTIL_LOG_SETUP(catalog, CatalogServiceImpl); template <typename T> class ServiceRpcGuard { public: explicit ServiceRpcGuard(::google::protobuf::Closure *done, T *response) : _done(done), _response(response) { // TODO(chekong.ygm): 指标汇报 } ~ServiceRpcGuard() { if (_response != nullptr && !_response->has_status()) { _response->mutable_status()->set_code(Status::OK); } if (_done != nullptr) { _done->Run(); } } ServiceRpcGuard(const ServiceRpcGuard &) = delete; ServiceRpcGuard &operator=(const ServiceRpcGuard &) = delete; ServiceRpcGuard(ServiceRpcGuard &&) = default; public: ::google::protobuf::Closure *_done; T *_response; }; #define RPC_GUARD(REQUEST, RESPONSE, DONE) \ auto guard = ServiceRpcGuard(DONE, RESPONSE); \ CatalogAccessLog catalogAccessLog(REQUEST, RESPONSE, __func__); \ CATALOG_REQUIRES(RESPONSE, _serviceReady, Status::SERVICE_NOT_READY) #define RPC_GUARD2(CATALOG_NAME, REQUEST, RESPONSE, DONE) \ RPC_GUARD(REQUEST, RESPONSE, DONE); \ const auto &catalogName = (CATALOG_NAME); \ CATALOG_REQUIRES(RESPONSE, !catalogName.empty(), Status::INVALID_ARGUMENTS, "catalog_name is not specified"); \ auto catalogController = _controllerManager->get(catalogName); \ CATALOG_REQUIRES(RESPONSE, catalogController, Status::NOT_FOUND, "catalog:[", catalogName, "] not found") CatalogServiceImpl::CatalogServiceImpl(const std::string &storeUri) : _serviceReady(false), _storeUri(storeUri) {} CatalogServiceImpl::~CatalogServiceImpl() {} bool CatalogServiceImpl::start() { _store = StoreFactory::getInstance()->create(_storeUri); if (!_store) { AUTIL_LOG(ERROR, "failed to create store with uri:[%s]", _storeUri.c_str()); return false; } _controllerManager = std::make_shared<CatalogControllerManager>(_store); if (!_controllerManager->recover()) { AUTIL_LOG(ERROR, "failed to recover"); return false; } AUTIL_LOG(INFO, "catalog service is ready"); _serviceReady = true; return true; } void CatalogServiceImpl::stop() { _serviceReady = false; } void CatalogServiceImpl::listCatalog(::google::protobuf::RpcController *controller, const proto::ListCatalogRequest *request, proto::ListCatalogResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD(request, response, done); const auto &catalogNames = _controllerManager->list(); *(response->mutable_catalog_names()) = {catalogNames.begin(), catalogNames.end()}; if (!response->has_status()) { response->mutable_status()->set_code(Status::OK); } } void CatalogServiceImpl::getCatalog(::google::protobuf::RpcController *controller, const proto::GetCatalogRequest *request, proto::GetCatalogResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD(request, response, done); const auto &catalogName = request->catalog_name(); CATALOG_REQUIRES(response, !catalogName.empty(), Status::INVALID_ARGUMENTS, "catalog_name is not specified"); auto catalogController = _controllerManager->get(catalogName); if (catalogController == nullptr) { CATALOG_REQUIRES( response, request->ignore_not_found_error(), Status::NOT_FOUND, "catalog:[", catalogName, "] not found"); } else { catalogController->getCatalog(request, response); } } void CatalogServiceImpl::createCatalog(::google::protobuf::RpcController *controller, const proto::CreateCatalogRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD(request, response, done); const auto &catalogName = request->catalog().catalog_name(); CATALOG_REQUIRES(response, !catalogName.empty(), Status::INVALID_ARGUMENTS, "catalog_name is not specified"); auto status = _controllerManager->tryAdd(catalogName, [&]() -> std::unique_ptr<CatalogController> { auto catalogController = std::make_unique<CatalogController>(catalogName, _store); if (!catalogController->init()) { return nullptr; } catalogController->createCatalog(request, response); if (isOk(response->status())) { return catalogController; } else { return nullptr; } }); CATALOG_REQUIRES_OK(response, status); } void CatalogServiceImpl::dropCatalog(::google::protobuf::RpcController *controller, const proto::DropCatalogRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD(request, response, done); const auto &catalogName = request->catalog_name(); CATALOG_REQUIRES(response, !catalogName.empty(), Status::INVALID_ARGUMENTS, "catalog_name is not specified"); auto status = _controllerManager->tryRemove(catalogName, [&](CatalogControllerPtr catalogController) { catalogController->dropCatalog(request, response); return isOk(response->status()); }); CATALOG_REQUIRES_OK(response, status); } void CatalogServiceImpl::updateCatalog(::google::protobuf::RpcController *controller, const proto::UpdateCatalogRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog().catalog_name(), request, response, done); catalogController->updateCatalog(request, response); } void CatalogServiceImpl::updateCatalogStatus(::google::protobuf::RpcController *controller, const proto::UpdateCatalogStatusRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->updateCatalogStatus(request, response); } void CatalogServiceImpl::listDatabase(::google::protobuf::RpcController *controller, const proto::ListDatabaseRequest *request, proto::ListDatabaseResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->listDatabase(request, response); } void CatalogServiceImpl::getDatabase(::google::protobuf::RpcController *controller, const proto::GetDatabaseRequest *request, proto::GetDatabaseResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->getDatabase(request, response); } void CatalogServiceImpl::createDatabase(::google::protobuf::RpcController *controller, const proto::CreateDatabaseRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->database().catalog_name(), request, response, done); catalogController->createDatabase(request, response); } void CatalogServiceImpl::dropDatabase(::google::protobuf::RpcController *controller, const proto::DropDatabaseRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->dropDatabase(request, response); } void CatalogServiceImpl::updateDatabase(::google::protobuf::RpcController *controller, const proto::UpdateDatabaseRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->database().catalog_name(), request, response, done); catalogController->updateDatabase(request, response); } void CatalogServiceImpl::listTable(::google::protobuf::RpcController *controller, const proto::ListTableRequest *request, proto::ListTableResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->listTable(request, response); } void CatalogServiceImpl::getTable(::google::protobuf::RpcController *controller, const proto::GetTableRequest *request, proto::GetTableResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->getTable(request, response); } void CatalogServiceImpl::createTable(::google::protobuf::RpcController *controller, const proto::CreateTableRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->table().catalog_name(), request, response, done); catalogController->createTable(request, response); } void CatalogServiceImpl::dropTable(::google::protobuf::RpcController *controller, const proto::DropTableRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->dropTable(request, response); } void CatalogServiceImpl::updateTable(::google::protobuf::RpcController *controller, const proto::UpdateTableRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->table().catalog_name(), request, response, done); catalogController->updateTable(request, response); } void CatalogServiceImpl::listTableRelatedTableGroup(::google::protobuf::RpcController *controller, const proto::ListTableRelatedTableGroupRequest *request, proto::ListTableRelatedTableGroupResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->listTableRelatedTableGroup(request, response); } void CatalogServiceImpl::getTableStructure(::google::protobuf::RpcController *controller, const proto::GetTableStructureRequest *request, proto::GetTableStructureResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->getTableStructure(request, response); } void CatalogServiceImpl::updateTableStructure(::google::protobuf::RpcController *controller, const proto::UpdateTableStructureRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->table_structure().catalog_name(), request, response, done); catalogController->updateTableStructure(request, response); } void CatalogServiceImpl::addColumn(::google::protobuf::RpcController *controller, const proto::AddColumnRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->addColumn(request, response); } void CatalogServiceImpl::updateColumn(::google::protobuf::RpcController *controller, const proto::UpdateColumnRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->updateColumn(request, response); } void CatalogServiceImpl::dropColumn(::google::protobuf::RpcController *controller, const proto::DropColumnRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->dropColumn(request, response); } void CatalogServiceImpl::createIndex(::google::protobuf::RpcController *controller, const proto::CreateIndexRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->createIndex(request, response); } void CatalogServiceImpl::updateIndex(::google::protobuf::RpcController *controller, const proto::UpdateIndexRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->updateIndex(request, response); } void CatalogServiceImpl::dropIndex(::google::protobuf::RpcController *controller, const proto::DropIndexRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->dropIndex(request, response); } void CatalogServiceImpl::listPartition(::google::protobuf::RpcController *controller, const proto::ListPartitionRequest *request, proto::ListPartitionResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->listPartition(request, response); } void CatalogServiceImpl::getPartition(::google::protobuf::RpcController *controller, const proto::GetPartitionRequest *request, proto::GetPartitionResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->getPartition(request, response); } void CatalogServiceImpl::createPartition(::google::protobuf::RpcController *controller, const proto::CreatePartitionRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->partition().catalog_name(), request, response, done); catalogController->createPartition(request, response); } void CatalogServiceImpl::dropPartition(::google::protobuf::RpcController *controller, const proto::DropPartitionRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->dropPartition(request, response); } void CatalogServiceImpl::updatePartition(::google::protobuf::RpcController *controller, const proto::UpdatePartitionRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->partition().catalog_name(), request, response, done); catalogController->updatePartition(request, response); } void CatalogServiceImpl::updatePartitionTableStructure(::google::protobuf::RpcController *controller, const proto::UpdatePartitionTableStructureRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->updatePartitionTableStructure(request, response); } void CatalogServiceImpl::listTableGroup(::google::protobuf::RpcController *controller, const proto::ListTableGroupRequest *request, proto::ListTableGroupResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->listTableGroup(request, response); } void CatalogServiceImpl::getTableGroup(::google::protobuf::RpcController *controller, const proto::GetTableGroupRequest *request, proto::GetTableGroupResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->getTableGroup(request, response); } void CatalogServiceImpl::createTableGroup(::google::protobuf::RpcController *controller, const proto::CreateTableGroupRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->table_group().catalog_name(), request, response, done); catalogController->createTableGroup(request, response); } void CatalogServiceImpl::dropTableGroup(::google::protobuf::RpcController *controller, const proto::DropTableGroupRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->dropTableGroup(request, response); } void CatalogServiceImpl::updateTableGroup(::google::protobuf::RpcController *controller, const proto::UpdateTableGroupRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->table_group().catalog_name(), request, response, done); catalogController->updateTableGroup(request, response); } void CatalogServiceImpl::listLoadStrategy(::google::protobuf::RpcController *controller, const proto::ListLoadStrategyRequest *request, proto::ListLoadStrategyResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->listLoadStrategy(request, response); } void CatalogServiceImpl::getLoadStrategy(::google::protobuf::RpcController *controller, const proto::GetLoadStrategyRequest *request, proto::GetLoadStrategyResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->getLoadStrategy(request, response); } void CatalogServiceImpl::createLoadStrategy(::google::protobuf::RpcController *controller, const proto::CreateLoadStrategyRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->load_strategy().catalog_name(), request, response, done); catalogController->createLoadStrategy(request, response); } void CatalogServiceImpl::dropLoadStrategy(::google::protobuf::RpcController *controller, const proto::DropLoadStrategyRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->dropLoadStrategy(request, response); } void CatalogServiceImpl::updateLoadStrategy(::google::protobuf::RpcController *controller, const proto::UpdateLoadStrategyRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->load_strategy().catalog_name(), request, response, done); catalogController->updateLoadStrategy(request, response); } void CatalogServiceImpl::listFunction(::google::protobuf::RpcController *controller, const proto::ListFunctionRequest *request, proto::ListFunctionResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->listFunction(request, response); } void CatalogServiceImpl::getFunction(::google::protobuf::RpcController *controller, const proto::GetFunctionRequest *request, proto::GetFunctionResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->getFunction(request, response); } void CatalogServiceImpl::createFunction(::google::protobuf::RpcController *controller, const proto::CreateFunctionRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->function().catalog_name(), request, response, done); catalogController->createFunction(request, response); } void CatalogServiceImpl::dropFunction(::google::protobuf::RpcController *controller, const proto::DropFunctionRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->dropFunction(request, response); } void CatalogServiceImpl::updateFunction(::google::protobuf::RpcController *controller, const proto::UpdateFunctionRequest *request, proto::CommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->function().catalog_name(), request, response, done); catalogController->updateFunction(request, response); } void CatalogServiceImpl::listBuild(::google::protobuf::RpcController *controller, const proto::ListBuildRequest *request, proto::ListBuildResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->catalog_name(), request, response, done); catalogController->listBuild(request, response); } void CatalogServiceImpl::getBuild(::google::protobuf::RpcController *controller, const proto::GetBuildRequest *request, proto::GetBuildResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->build_id().catalog_name(), request, response, done); catalogController->getBuild(request, response); } void CatalogServiceImpl::createBuild(::google::protobuf::RpcController *controller, const proto::CreateBuildRequest *request, proto::CreateBuildResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->partition().catalog_name(), request, response, done); catalogController->createBuild(request, response); } void CatalogServiceImpl::dropBuild(::google::protobuf::RpcController *controller, const proto::DropBuildRequest *request, proto::BuildCommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->build_id().catalog_name(), request, response, done); catalogController->dropBuild(request, response); } void CatalogServiceImpl::updateBuild(::google::protobuf::RpcController *controller, const proto::UpdateBuildRequest *request, proto::BuildCommonResponse *response, ::google::protobuf::Closure *done) { RPC_GUARD2(request->build().build_id().catalog_name(), request, response, done); catalogController->updateBuild(request, response); } std::shared_ptr<CatalogControllerManager> CatalogServiceImpl::getManager() { return _controllerManager; } #undef RPC_GUARD2 #undef RPC_GUARD } // namespace catalog