cppcache/src/EventId.cpp (141 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "EventId.hpp" #include <atomic> #include <cstdint> #include <cstring> #include <geode/DataInput.hpp> #include "Utils.hpp" #include "util/Log.hpp" namespace apache { namespace geode { namespace client { class ThreadIdCounter { public: static std::atomic<int64_t>& instance() { static std::atomic<int64_t> threadId_(0); return threadId_; } static int64_t next() { return ++instance(); } }; class EventIdTSS { public: static EventIdTSS& instance() { thread_local EventIdTSS eventId_; return eventId_; } int64_t nextSequenceId() { sequenceId_++; return sequenceId_; } int64_t currentSequenceId() { return sequenceId_; } int64_t threadId() { return threadId_; } private: EventIdTSS(); int64_t threadId_; int64_t sequenceId_; }; EventIdTSS::EventIdTSS() : threadId_(ThreadIdCounter::next()), sequenceId_(0) { LOGDEBUG("EventIdTSS::EventIdTSS(%p): threadId_=%" PRId64 ", sequenceId_=%" PRId64, this, threadId_, sequenceId_); } void EventId::toData(DataOutput& output) const { // This method is always expected to write out nonstatic distributed // memberid. Note that binary representation of EventId is NOT THE // SAME here as when serialized into part of a message (via the writeIdsData // method). LOGDEBUG("EventId::toData(%p) - called", this); output.writeBytes(reinterpret_cast<const int8_t*>(clientId_), clientIdLength_); output.writeArrayLen(18); char longCode = 3; output.write(static_cast<uint8_t>(longCode)); output.writeInt(threadId_); output.write(static_cast<uint8_t>(longCode)); output.writeInt(sequenceId_); output.writeInt(bucketId_); output.write(breadcrumbCounter_); } void EventId::fromData(DataInput& input) { LOGDEBUG("EventId::fromData(%p) - called", this); clientIdLength_ = input.readArrayLength(); input.readBytesOnly(reinterpret_cast<int8_t*>(clientId_), clientIdLength_); input.readArrayLength(); threadId_ = getEventIdData(input, input.read()); sequenceId_ = getEventIdData(input, input.read()); bucketId_ = input.readInt32(); breadcrumbCounter_ = input.read(); } const char* EventId::clientId() const { return clientId_; } int32_t EventId::clientIdLength() const { return clientIdLength_; } int64_t EventId::threadId() const { return threadId_; } int64_t EventId::sequenceNumber() const { return sequenceId_; } int64_t EventId::getEventIdData(DataInput& input, char numberCode) { int64_t retVal = 0; LOGDEBUG("EventId::getEventIdData(%p) - called", this); // Read number based on numeric code written by java server. if (numberCode == 0) { return input.read(); } else if (numberCode == 1) { retVal = input.readInt16(); } else if (numberCode == 2) { int32_t intVal; intVal = input.readInt32(); retVal = intVal; } else if (numberCode == 3) { int64_t longVal; longVal = input.readInt64(); retVal = longVal; } return retVal; } std::shared_ptr<Serializable> EventId::createDeserializable() { LOGDEBUG("EventId::createDeserializable - called"); // use false since we dont want to inc sequence // (for de-serialization) return std::make_shared<EventId>(false); } EventId::EventId(char* memId, uint32_t memIdLen, int64_t thr, int64_t seq) { LOGDEBUG("EventId::EventId(%p) - memId=%s, memIdLen=%d, thr=%" PRId64 ", seq=%" PRId64, this, Utils::convertBytesToString(memId, memIdLen).c_str(), memIdLen, thr, seq); // TODO: statics being assigned; not thread-safe?? std::memcpy(clientId_, memId, memIdLen); clientIdLength_ = memIdLen; threadId_ = thr; sequenceId_ = seq; bucketId_ = -1; breadcrumbCounter_ = 0; } EventId::EventId(bool doInit, uint32_t reserveSize, bool fullValueAfterDeltaFail) : /* adongre * CID 28934: Uninitialized scalar field (UNINIT_CTOR) */ clientIdLength_(0), threadId_(0), sequenceId_(0), bucketId_(-1), breadcrumbCounter_(0) { LOGDEBUG( "EventId::EventId(%p) - doInit=%s, reserveSize=%d, " "fullValueAfterDeltaFail=%s", this, doInit ? "true" : "false", reserveSize, fullValueAfterDeltaFail ? "true" : "false"); if (!doInit) return; if (fullValueAfterDeltaFail) { /// need to send old sequence id initFromTSS_SameThreadIdAndSameSequenceId(); } else { initFromTSS(); } for (uint32_t i = 0; i < reserveSize; i++) { EventIdTSS::instance().nextSequenceId(); } } void EventId::initFromTSS() { threadId_ = EventIdTSS::instance().threadId(); sequenceId_ = EventIdTSS::instance().nextSequenceId(); LOGDEBUG("EventId::initFromTSS(%p) - called, tid=%" PRId64 ", seqid=%" PRId64, this, threadId_, sequenceId_); } void EventId::initFromTSS_SameThreadIdAndSameSequenceId() { threadId_ = EventIdTSS::instance().threadId(); sequenceId_ = EventIdTSS::instance().currentSequenceId(); LOGDEBUG( "EventId::initFromTSS_SameThreadIdAndSameSequenceId(%p) - called, " "tid=%" PRId64 ", seqid=%" PRId64, this, threadId_, sequenceId_); } } // namespace client } // namespace geode } // namespace apache