extensions/couchbase/controllerservices/CouchbaseClusterService.cpp (214 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "CouchbaseClusterService.h" #include "core/Resource.h" #include "couchbase/codec/raw_binary_transcoder.hxx" #include "couchbase/codec/raw_json_transcoder.hxx" #include "couchbase/codec/raw_string_transcoder.hxx" #include "utils/TimeUtil.h" namespace org::apache::nifi::minifi::couchbase { namespace { constexpr auto temporary_connection_errors = std::to_array<::couchbase::errc::common>({ ::couchbase::errc::common::temporary_failure, ::couchbase::errc::common::request_canceled, ::couchbase::errc::common::internal_server_failure, ::couchbase::errc::common::cas_mismatch, ::couchbase::errc::common::ambiguous_timeout, ::couchbase::errc::common::unambiguous_timeout, ::couchbase::errc::common::rate_limited, ::couchbase::errc::common::quota_limited }); CouchbaseErrorType getErrorType(const std::error_code& error_code) { for (const auto& temporary_error : temporary_connection_errors) { if (static_cast<int>(temporary_error) == error_code.value()) { return CouchbaseErrorType::TEMPORARY; } } return CouchbaseErrorType::FATAL; } } // namespace CouchbaseClient::CouchbaseClient(std::string connection_string, std::string username, std::string password, minifi::controllers::SSLContextService* ssl_context_service, const std::shared_ptr<core::logging::Logger>& logger) : connection_string_(std::move(connection_string)), logger_(logger), cluster_options_(buildClusterOptions(std::move(username), std::move(password), ssl_context_service)) { } ::couchbase::cluster_options CouchbaseClient::buildClusterOptions(std::string username, std::string password, minifi::controllers::SSLContextService* ssl_context_service) { if (username.empty() && (!ssl_context_service || (ssl_context_service && ssl_context_service->getCertificateFile().empty()))) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Neither username and password nor SSLContextService is provided for Couchbase authentication"); } if (!username.empty() && ssl_context_service && !ssl_context_service->getCertificateFile().empty()) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Either username and password or mTLS certificate authentication should be used in the SSLContextService for Couchbase, " "but not both"); } if (!username.empty()) { logger_->log_debug("Using username and password authentication for Couchbase server"); if (password.empty()) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Password missing for Couchbase server authentication"); } ::couchbase::cluster_options cluster_options(std::move(username), std::move(password)); if (ssl_context_service && !ssl_context_service->getCACertificate().empty()) { logger_->log_debug("Setting Couchbase client CA certificate path to '{}'", ssl_context_service->getCACertificate().string()); cluster_options.security().trust_certificate(ssl_context_service->getCACertificate().string()); } return cluster_options; } logger_->log_debug("Using mTLS authentication for Couchbase server"); logger_->log_debug("Setting Couchbase client SSL key file path to '{}'", ssl_context_service->getPrivateKeyFile().string()); logger_->log_debug("Setting Couchbase client certificate file path to '{}'", ssl_context_service->getCertificateFile().string()); if (ssl_context_service->getPrivateKeyFile().empty() || ssl_context_service->getCertificateFile().empty()) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Couchbase client private key path or client certificate path is empty"); } ::couchbase::cluster_options cluster_options(::couchbase::certificate_authenticator(ssl_context_service->getCertificateFile().string(), ssl_context_service->getPrivateKeyFile().string())); if (!ssl_context_service->getCACertificate().empty()) { logger_->log_debug("Setting Couchbase client CA certificate path to '{}'", ssl_context_service->getCACertificate().string()); cluster_options.security().trust_certificate(ssl_context_service->getCACertificate().string()); } cluster_options.security().tls_verify(::couchbase::tls_verify_mode::peer); return cluster_options; } nonstd::expected<::couchbase::collection, CouchbaseErrorType> CouchbaseClient::getCollection(const CouchbaseCollection& collection) { auto connection_result = establishConnection(); if (!connection_result) { return nonstd::make_unexpected(connection_result.error()); } std::lock_guard<std::mutex> lock(cluster_mutex_); return cluster_->bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name); } nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::upsert( const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) { auto collection_result = getCollection(collection); if (!collection_result.has_value()) { return nonstd::make_unexpected(collection_result.error()); } std::pair<::couchbase::error, ::couchbase::mutation_result> result; if (document_type == CouchbaseValueType::Json) { result = collection_result->upsert<::couchbase::codec::raw_json_transcoder>(document_id, buffer, options).get(); } else if (document_type == CouchbaseValueType::String) { std::string data_str(reinterpret_cast<const char*>(buffer.data()), buffer.size()); result = collection_result->upsert<::couchbase::codec::raw_string_transcoder>(document_id, data_str, options).get(); } else { result = collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get(); } auto& [upsert_err, upsert_resp] = result; if (upsert_err.ec()) { // ambiguous_timeout should not be retried as we do not know if the insert was successful or not if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && upsert_err.ec().value() != static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) { logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'", document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message()); return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY); } logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' with error code: '{}', message: '{}'", document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message()); return nonstd::make_unexpected(CouchbaseErrorType::FATAL); } else { return CouchbaseUpsertResult { { collection.bucket_name, upsert_resp.cas().value(), }, (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->sequence_number() : 0), (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_uuid() : 0), gsl::narrow<uint16_t>(upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_id() : 0) }; } } nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> CouchbaseClient::get(const CouchbaseCollection& collection, const std::string& document_id, CouchbaseValueType return_type) { auto collection_result = getCollection(collection); if (!collection_result.has_value()) { return nonstd::make_unexpected(collection_result.error()); } ::couchbase::get_options options; options.with_expiry(true); auto [get_err, resp] = collection_result->get(document_id, options).get(); if (get_err.ec()) { if (getErrorType(get_err.ec()) == CouchbaseErrorType::TEMPORARY) { logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' due to timeout", document_id, collection.bucket_name, collection.scope_name, collection.collection_name); return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY); } std::string cause = get_err.cause() ? get_err.cause()->message() : ""; logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' with error code: '{}', message: '{}'", document_id, collection.bucket_name, collection.scope_name, collection.collection_name, get_err.ec(), get_err.message()); return nonstd::make_unexpected(CouchbaseErrorType::FATAL); } else { try { CouchbaseGetResult result; result.bucket_name = collection.bucket_name; result.cas = resp.cas().value(); if (return_type == CouchbaseValueType::Json) { result.value = resp.content_as<::couchbase::codec::binary, ::couchbase::codec::raw_json_transcoder>(); } else if (return_type == CouchbaseValueType::String) { result.value = resp.content_as<::couchbase::codec::raw_string_transcoder>(); } else { result.value = resp.content_as<::couchbase::codec::raw_binary_transcoder>(); } if (resp.expiry_time().has_value()) { result.expiry = utils::timeutils::getTimeStr(*resp.expiry_time()); } return result; } catch (const std::exception& ex) { logger_->log_error("Failed to get content for document '{}' from collection '{}.{}.{}' with the following exception: '{}'", document_id, collection.bucket_name, collection.scope_name, collection.collection_name, ex.what()); return nonstd::make_unexpected(CouchbaseErrorType::FATAL); } } } void CouchbaseClient::close() { std::lock_guard<std::mutex> lock(cluster_mutex_); if (cluster_) { cluster_->close().wait(); } cluster_ = std::nullopt; } nonstd::expected<void, CouchbaseErrorType> CouchbaseClient::establishConnection() { std::lock_guard<std::mutex> lock(cluster_mutex_); if (cluster_) { return {}; } auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, cluster_options_).get(); if (connect_err.ec()) { logger_->log_error("Failed to connect to Couchbase cluster with error code: '{}' and message: '{}'", connect_err.ec(), connect_err.message()); return nonstd::make_unexpected(getErrorType(connect_err.ec())); } cluster_ = std::move(cluster); return {}; } namespace controllers { void CouchbaseClusterService::initialize() { setSupportedProperties(Properties); } void CouchbaseClusterService::onEnable() { std::string connection_string = getProperty(ConnectionString.name) | utils::orThrow("required property"); std::string username = getProperty(UserName.name).value_or(""); std::string password = getProperty(UserPassword.name).value_or(""); if (connection_string.empty()) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing connection string"); } if ((username.empty() || password.empty()) && linked_services_.empty()) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing username and password or SSLContextService as a linked service"); } minifi::controllers::SSLContextService* ssl_context_service_ptr = nullptr; if (!linked_services_.empty()) { auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(linked_services_[0]); if (!ssl_context_service) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Linked service is not an SSLContextService"); } ssl_context_service_ptr = ssl_context_service.get(); } client_ = std::make_unique<CouchbaseClient>(connection_string, username, password, ssl_context_service_ptr, logger_); auto result = client_->establishConnection(); if (!result) { if (result.error() == CouchbaseErrorType::FATAL) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Failed to connect to Couchbase cluster with fatal error"); } logger_->log_warn("Failed to connect to Couchbase cluster with temporary error, will retry connection when a Couchbase processor is triggered"); } } gsl::not_null<std::shared_ptr<CouchbaseClusterService>> CouchbaseClusterService::getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property) { std::shared_ptr<CouchbaseClusterService> couchbase_cluster_service; if (auto connection_controller_name = context.getProperty(property)) { couchbase_cluster_service = std::dynamic_pointer_cast<CouchbaseClusterService>(context.getControllerService(*connection_controller_name, context.getProcessor().getUUID())); } if (!couchbase_cluster_service) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing Couchbase Cluster Service"); } return gsl::make_not_null(couchbase_cluster_service); } REGISTER_RESOURCE(CouchbaseClusterService, ControllerService); } // namespace controllers } // namespace org::apache::nifi::minifi::couchbase