lib/MessageBuilder.cc (123 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include <assert.h> #include <pulsar/MessageBuilder.h> #include <memory> #include <stdexcept> #include <utility> #include "KeyValueImpl.h" #include "LogUtils.h" #include "MessageImpl.h" #include "ObjectPool.h" #include "PulsarApi.pb.h" #include "SharedBuffer.h" #include "TimeUtils.h" DECLARE_LOG_OBJECT() using namespace pulsar; namespace pulsar { ObjectPool<MessageImpl, 100000> messagePool; std::shared_ptr<MessageImpl> MessageBuilder::createMessageImpl() { return messagePool.create(); } MessageBuilder::MessageBuilder() { impl_ = createMessageImpl(); } MessageBuilder& MessageBuilder::create() { impl_ = createMessageImpl(); return *this; } Message MessageBuilder::build() { return Message(impl_); } void MessageBuilder::checkMetadata() { if (!impl_.get()) { LOG_ERROR("Cannot reuse the same message builder to build a message"); abort(); } } MessageBuilder& MessageBuilder::setContent(const void* data, size_t size) { checkMetadata(); impl_->payload = SharedBuffer::copy((char*)data, size); return *this; } MessageBuilder& MessageBuilder::setAllocatedContent(void* data, size_t size) { checkMetadata(); impl_->payload = SharedBuffer::wrap((char*)data, size); return *this; } MessageBuilder& MessageBuilder::setContent(const std::string& data) { checkMetadata(); impl_->payload = SharedBuffer::copy((char*)data.c_str(), data.length()); return *this; } MessageBuilder& MessageBuilder::setContent(std::string&& data) { checkMetadata(); impl_->payload = SharedBuffer::take(std::move(data)); return *this; } MessageBuilder& MessageBuilder::setContent(const KeyValue& data) { impl_->keyValuePtr = data.impl_; return *this; } MessageBuilder& MessageBuilder::setProperty(const std::string& name, const std::string& value) { checkMetadata(); proto::KeyValue* keyValue = proto::KeyValue().New(); keyValue->set_key(name); keyValue->set_value(value); impl_->metadata.mutable_properties()->AddAllocated(keyValue); return *this; } MessageBuilder& MessageBuilder::setProperties(const StringMap& properties) { checkMetadata(); for (StringMap::const_iterator it = properties.begin(); it != properties.end(); it++) { setProperty(it->first, it->second); } return *this; } MessageBuilder& MessageBuilder::setPartitionKey(const std::string& partitionKey) { checkMetadata(); impl_->metadata.set_partition_key(partitionKey); return *this; } MessageBuilder& MessageBuilder::setOrderingKey(const std::string& orderingKey) { checkMetadata(); impl_->metadata.set_ordering_key(orderingKey); return *this; } MessageBuilder& MessageBuilder::setEventTimestamp(uint64_t eventTimestamp) { checkMetadata(); impl_->metadata.set_event_time(eventTimestamp); return *this; } MessageBuilder& MessageBuilder::setSequenceId(int64_t sequenceId) { if (sequenceId < 0) { throw std::invalid_argument("sequenceId needs to be >= 0"); } checkMetadata(); impl_->metadata.set_sequence_id(sequenceId); return *this; } MessageBuilder& MessageBuilder::setDeliverAfter(std::chrono::milliseconds delay) { return setDeliverAt(TimeUtils::currentTimeMillis() + delay.count()); } MessageBuilder& MessageBuilder::setDeliverAt(uint64_t deliveryTimestamp) { checkMetadata(); impl_->metadata.set_deliver_at_time(deliveryTimestamp); return *this; } MessageBuilder& MessageBuilder::setReplicationClusters(const std::vector<std::string>& clusters) { checkMetadata(); google::protobuf::RepeatedPtrField<std::string> r(clusters.begin(), clusters.end()); r.Swap(impl_->metadata.mutable_replicate_to()); return *this; } MessageBuilder& MessageBuilder::disableReplication(bool flag) { checkMetadata(); google::protobuf::RepeatedPtrField<std::string> r; if (flag) { r.AddAllocated(new std::string("__local__")); } r.Swap(impl_->metadata.mutable_replicate_to()); return *this; } const char* MessageBuilder::data() const { assert(impl_->payload.data()); return impl_->payload.data(); } size_t MessageBuilder::size() const { assert(impl_->payload.data()); return impl_->payload.readableBytes(); } } // namespace pulsar