wangle/channel/broadcast/ObservingHandler-inl.h (89 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once namespace wangle { template <typename T, typename R, typename P> ObservingHandler<T, R, P>::ObservingHandler( const R& routingData, BroadcastPool<T, R, P>* broadcastPool) : routingData_(routingData), broadcastPool_(CHECK_NOTNULL(broadcastPool)) {} template <typename T, typename R, typename P> ObservingHandler<T, R, P>::~ObservingHandler() { if (broadcastHandler_) { auto broadcastHandler = broadcastHandler_; broadcastHandler_ = nullptr; broadcastHandler->unsubscribe(subscriptionId_); } if (deleted_) { *deleted_ = true; } } template <typename T, typename R, typename P> void ObservingHandler<T, R, P>::transportActive(Context* ctx) { if (broadcastHandler_) { // Already connected return; } // Pause ingress until the remote connection is established and // broadcast handler is ready auto pipeline = dynamic_cast<ObservingPipeline<T>*>(ctx->getPipeline()); CHECK(pipeline); pipeline->transportInactive(); auto deleted = deleted_; broadcastPool_->getHandler(routingData_) .thenValue( [this, pipeline, deleted](BroadcastHandler<T, R>* broadcastHandler) { if (*deleted) { return; } broadcastHandler_ = broadcastHandler; subscriptionId_ = broadcastHandler_->subscribe(this); VLOG(10) << "Subscribed to a broadcast"; // Resume ingress pipeline->transportActive(); }) .thenError( folly::tag_t<std::exception>{}, [this, ctx, deleted](const std::exception& ex) { if (*deleted) { return; } LOG(ERROR) << "Error subscribing to a broadcast: " << ex.what(); this->close(ctx); }); } template <typename T, typename R, typename P> void ObservingHandler<T, R, P>::readEOF(Context* ctx) { this->close(ctx); } template <typename T, typename R, typename P> void ObservingHandler<T, R, P>::readException( Context* ctx, folly::exception_wrapper ex) { LOG(ERROR) << "Error on read: " << exceptionStr(ex); this->close(ctx); } template <typename T, typename R, typename P> void ObservingHandler<T, R, P>::onNext(const T& data) { auto ctx = this->getContext(); auto deleted = deleted_; this->write(ctx, data).thenError( folly::tag_t<std::exception>{}, [this, ctx, deleted](const std::exception& ex) { if (*deleted) { return; } LOG(ERROR) << "Error on write: " << ex.what(); this->close(ctx); }); } template <typename T, typename R, typename P> void ObservingHandler<T, R, P>::onError(folly::exception_wrapper ex) { LOG(ERROR) << "Error observing a broadcast: " << exceptionStr(ex); // broadcastHandler_ will clear its subscribers and delete itself broadcastHandler_ = nullptr; this->close(this->getContext()); } template <typename T, typename R, typename P> void ObservingHandler<T, R, P>::onCompleted() { // broadcastHandler_ will clear its subscribers and delete itself broadcastHandler_ = nullptr; this->close(this->getContext()); } template <typename T, typename R, typename P> R& ObservingHandler<T, R, P>::routingData() { return routingData_; } } // namespace wangle