in storage/ndb/src/kernel/vm/SimulatedBlock.hpp [420:2053]
class alignas(NDB_CL) SimulatedBlock
: public SegmentUtils /* SimulatedBlock implements the Interface */
{
friend class TraceLCP;
friend class SafeCounter;
friend class SafeCounterManager;
friend class AsyncFile;
friend class PosixAsyncFile; // FIXME
friend class Win32AsyncFile;
friend class Pgman;
friend class Page_cache_client;
friend class Lgman;
friend class Logfile_client;
friend class Tablespace_client;
friend class Dbtup_client;
friend struct Pool_context;
friend struct SectionHandle;
friend class LockQueue;
friend class SimplePropertiesSectionWriter;
friend class SegmentedSectionGuard;
friend class DynArr256Pool; // for cerrorInsert
friend struct thr_data;
public:
friend class BlockComponent;
~SimulatedBlock() override;
static void *operator new(size_t sz) {
void *ptr = NdbMem_AlignedAlloc(NDB_CL, sz);
require(ptr != NULL);
#ifdef VM_TRACE
#ifndef NDB_PURIFY
#ifdef VM_TRACE
const int initValue = 0xf3;
#else
const int initValue = 0x0;
#endif
char *charptr = (char *)ptr;
const int p = (sz / 4096);
const int r = (sz % 4096);
for (int i = 0; i < p; i++) memset(charptr + (i * 4096), initValue, 4096);
if (r > 0) memset(charptr + p * 4096, initValue, r);
#endif
#endif
return ptr;
}
static void operator delete(void *ptr) { NdbMem_AlignedFree(ptr); }
static const Uint32 BOUNDED_DELAY = 0xFFFFFF00;
protected:
/**
* Constructor
*/
SimulatedBlock(BlockNumber blockNumber, struct Block_context &ctx,
Uint32 instanceNumber = 0);
/**********************************************************
* Handling of execFunctions
*/
typedef void (SimulatedBlock::*ExecFunction)(Signal *signal);
void addRecSignalImpl(GlobalSignalNumber g, ExecFunction fun, bool f = false);
void installSimulatedBlockFunctions();
void handle_execute_error(GlobalSignalNumber gsn);
void initCommon();
inline void executeFunction(GlobalSignalNumber gsn, Signal *signal,
ExecFunction f);
inline void executeFunction(GlobalSignalNumber gsn, Signal *signal,
ExecFunction f, BlockReference ref, Uint32 len);
/*
Signal scope management, see signal classes for declarations
*/
struct FunctionAndScope {
ExecFunction m_execFunction;
SignalScope m_signalScope;
};
FunctionAndScope theSignalHandlerArray[MAX_GSN + 1];
void addSignalScopeImpl(GlobalSignalNumber gsn, SignalScope scope);
void checkSignalSender(GlobalSignalNumber gsn, Signal *signal,
SignalScope scope);
[[noreturn]] void handle_sender_error(GlobalSignalNumber gsn, Signal *signal,
SignalScope scope);
public:
typedef void (SimulatedBlock::*CallbackFunction)(Signal *,
Uint32 callbackData,
Uint32 returnCode);
struct Callback {
CallbackFunction m_callbackFunction;
Uint32 m_callbackData;
};
inline void executeFunction(GlobalSignalNumber gsn, Signal *signal);
inline void executeFunction_async(GlobalSignalNumber gsn, Signal *signal);
/* Multiple block instances */
Uint32 instance() const { return theInstance; }
ExecFunction getExecuteFunction(GlobalSignalNumber gsn) {
return theSignalHandlerArray[gsn].m_execFunction;
}
SimulatedBlock *getInstance(Uint32 instanceNumber) {
ndbrequire(theInstance == 0); // valid only on main instance
if (instanceNumber == 0) return this;
ndbrequire(instanceNumber < MaxInstances);
if (theInstanceList != 0) return theInstanceList[instanceNumber];
return 0;
}
void addInstance(SimulatedBlock *b, Uint32 theInstanceNo);
virtual void loadWorkers() {}
virtual void prepare_scan_ctx(Uint32 scanPtrI) {}
struct ThreadContext {
Uint32 threadId;
EmulatedJamBuffer *jamBuffer;
Uint32 *watchDogCounter;
SectionSegmentPool::Cache *sectionPoolCache;
NDB_TICKS *pHighResTimer;
};
/* Setup state of a block object for executing in a particular thread. */
void assignToThread(ThreadContext ctx);
/* For multithreaded ndbd, get the id of owning thread. */
uint32 getThreadId() const { return m_threadId; }
/**
* To call EXECUTE_DIRECT on THRMAN we need to get its instance number.
* Its instance number is always 1 higher than the thread id since 0
* is used for the proxy instance and then there is one instance per
* thread.
*/
Uint32 getThrmanInstance() const {
if (isNdbMt()) {
return m_threadId + 1;
} else {
return 0;
}
}
static bool isMultiThreaded();
/* Configuration based alternative. Applies only to this node */
static bool isNdbMt() { return globalData.isNdbMt; }
static bool isNdbMtLqh() { return globalData.isNdbMtLqh; }
static Uint32 getLqhWorkers() { return globalData.ndbMtLqhWorkers; }
/**
* Assert that thread calling this function is "owner" of block instance
*/
#ifdef VM_TRACE
void assertOwnThread();
#else
void assertOwnThread() {}
#endif
/*
* Instance key (1-4) is used only when sending a signal. Receiver
* maps it to actual instance (0, if receiver is not MT LQH).
*
* For performance reason, DBTC gets instance key directly from DBDIH
* via DI*GET*NODES*REQ signals.
*/
static Uint32 getInstance(Uint32 tableId, Uint32 fragId);
static Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
static Uint32 getInstanceKeyCanFail(Uint32 tabId, Uint32 fragId);
static Uint32 getInstanceFromKey(Uint32 instanceKey); // local use only
static Uint32 getInstanceNoCanFail(Uint32 tableId, Uint32 fragId);
Uint32 getInstanceNo(Uint32 nodeId, Uint32 instanceKey);
Uint32 getInstanceNo(Uint32 nodeId, Uint32 tableId, Uint32 fragId);
Uint32 getInstanceFromKey(Uint32 nodeId, Uint32 instanceKey);
#if defined(ERROR_INSERT)
Uint32 getErrorInsertValue() const { return ERROR_INSERT_VALUE; }
Uint32 getErrorInsertExtra() const { return ERROR_INSERT_EXTRA; }
#endif
/**
* This method will make sure that when callback in called each
* thread running an instance any of the threads in blocks[]
* will have executed a signal
*/
void synchronize_threads(Signal *signal, const BlockThreadBitmask &threads,
const Callback &cb, JobBufferLevel req_prio,
JobBufferLevel conf_prio);
void synchronize_threads_for_blocks(
Signal *, const Uint32 blocks[], const Callback &,
JobBufferLevel req_prio = JBB,
JobBufferLevel conf_prio = ILLEGAL_JB_LEVEL);
/**
* This method will make sure that all external signals from nodes handled by
* transporters in current thread are processed.
* Should be called from a TRPMAN-worker.
*/
void synchronize_external_signals(Signal *signal, const Callback &cb);
/**
* This method make sure that the path specified in blocks[]
* will be traversed before returning
*/
void synchronize_path(Signal *, const Uint32 blocks[], const Callback &,
JobBufferLevel = JBB);
/**
* These methods are used to assist blocks to use the TIME_SIGNAL to
* generate drum beats with a regular delay. elapsed_time will report
* back the elapsed time since last call but will never report more
* than max delay. max_delay = 2 * delay here.
*/
void init_elapsed_time(Signal *signal, NDB_TICKS &latestTIME_SIGNAL);
void sendTIME_SIGNAL(Signal *signal, const NDB_TICKS currentTime,
Uint32 delay);
Uint64 elapsed_time(Signal *signal, const NDB_TICKS currentTime,
NDB_TICKS &latestTIME_SIGNAL, Uint32 max_delay);
private:
struct SyncThreadRecord {
BlockThreadBitmask m_threads;
Callback m_callback;
Uint32 m_cnt;
Uint32 m_next;
Uint32 nextPool;
};
typedef ArrayPool<SyncThreadRecord> SyncThreadRecord_pool;
SyncThreadRecord_pool c_syncThreadPool;
void execSYNC_THREAD_REQ(Signal *);
void execSYNC_THREAD_CONF(Signal *);
void sendSYNC_THREAD_REQ(Signal *, Ptr<SimulatedBlock::SyncThreadRecord>);
void execSYNC_REQ(Signal *);
void execSYNC_PATH_REQ(Signal *);
void execSYNC_PATH_CONF(Signal *);
public:
virtual const char *get_filename(Uint32 fd) const { return ""; }
void EXECUTE_DIRECT_FN(ExecFunction f, Signal *signal);
protected:
static Callback TheEmptyCallback;
void TheNULLCallbackFunction(class Signal *, Uint32, Uint32);
static Callback TheNULLCallback;
void execute(Signal *signal, Callback &c, Uint32 returnCode);
/**
* Various methods to get data from ndbd/ndbmtd such as time
* spent in sleep, sending and executing, number of signals
* in queue, and send buffer level.
*
* Also retrieving a thread name (this name must be pointing to a
* static pointer since it will be stored and kept for a long
* time. So the pointer cannot be changed.
*
* Finally also the ability to query for send thread information.
*/
// void getSendBufferLevel(TrpId trp_id, SB_LevelType &level);
Uint32 getEstimatedJobBufferLevel();
Uint32 getCPUSocket(Uint32 thr_no);
void setOverloadStatus(OverloadStatus new_status);
void setWakeupThread(Uint32 wakeup_instance);
void setNodeOverloadStatus(OverloadStatus new_status);
void setSendNodeOverloadStatus(OverloadStatus new_status);
void startChangeNeighbourNode();
void setNeighbourNode(NodeId node);
void setNoSend();
void endChangeNeighbourNode();
void getPerformanceTimers(Uint64 µs_sleep, Uint64 &spin_time,
Uint64 &buffer_full_micros_sleep,
Uint64 µs_send);
void getSendPerformanceTimers(Uint32 send_instance, Uint64 &exec_time,
Uint64 &sleep_time, Uint64 &spin_time,
Uint64 &user_time_os, Uint64 &kernel_time_os,
Uint64 &elapsed_time_os);
Uint32 getConfiguredSpintime();
void setSpintime(Uint32 new_spintime);
Uint32 getWakeupLatency();
void setWakeupLatency(Uint32);
Uint32 getNumSendThreads();
Uint32 getNumThreads();
Uint32 getMainThrmanInstance();
const char *getThreadName();
const char *getThreadDescription();
void flush_send_buffers();
void set_watchdog_counter();
void assign_recv_thread_new_trp(TrpId trp_id);
void assign_multi_trps_to_send_threads();
bool epoll_add_trp(TrpId trp_id);
bool is_recv_thread_for_new_trp(TrpId trp_id);
NDB_TICKS getHighResTimer() const { return *m_pHighResTimer; }
/**********************************************************
* Send signal - dialects
*/
template <typename Recv, typename... Args>
void sendSignal(Recv recv, GlobalSignalNumber gsn, Signal *signal,
Args... args) const {
sendSignal(recv, gsn, reinterpret_cast<Signal25 *>(signal), args...);
}
void sendSignal(BlockReference ref, GlobalSignalNumber gsn, Signal25 *signal,
Uint32 length, JobBufferLevel jbuf) const;
void sendSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn,
Signal25 *signal, Uint32 length, JobBufferLevel jbuf) const;
void sendSignal(BlockReference ref, GlobalSignalNumber gsn, Signal25 *signal,
Uint32 length, JobBufferLevel jbuf,
SectionHandle *sections) const;
void sendSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn,
Signal25 *signal, Uint32 length, JobBufferLevel jbuf,
SectionHandle *sections) const;
void sendSignal(BlockReference ref, GlobalSignalNumber gsn, Signal25 *signal,
Uint32 length, JobBufferLevel jbuf, LinearSectionPtr ptr[3],
Uint32 noOfSections) const;
void sendSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn,
Signal25 *signal, Uint32 length, JobBufferLevel jbuf,
LinearSectionPtr ptr[3], Uint32 noOfSections) const;
/* NoRelease sendSignal variants do not release sections as
* a side-effect of sending. This requires extra
* copying for local sends
*/
template <typename Recv, typename... Args>
void sendSignalNoRelease(Recv recv, GlobalSignalNumber gsn, Signal *signal,
Args... args) const {
sendSignalNoRelease(recv, gsn, reinterpret_cast<Signal25 *>(signal),
args...);
}
void sendSignalNoRelease(BlockReference ref, GlobalSignalNumber gsn,
Signal25 *signal, Uint32 length, JobBufferLevel jbuf,
SectionHandle *sections) const;
void sendSignalNoRelease(NodeReceiverGroup rg, GlobalSignalNumber gsn,
Signal25 *signal, Uint32 length, JobBufferLevel jbuf,
SectionHandle *sections) const;
// Send multiple signal with delay. In this VM the jobbufffer level has
// no effect on on delayed signals
//
template <typename Recv, typename... Args>
void sendSignalWithDelay(Recv recv, GlobalSignalNumber gsn, Signal *signal,
Args... args) const {
sendSignalWithDelay(recv, gsn, reinterpret_cast<Signal25 *>(signal),
args...);
}
void sendSignalWithDelay(BlockReference ref, GlobalSignalNumber gsn,
Signal25 *signal, Uint32 delayInMilliSeconds,
Uint32 length) const;
void sendSignalWithDelay(BlockReference ref, GlobalSignalNumber gsn,
Signal25 *signal, Uint32 delayInMilliSeconds,
Uint32 length, SectionHandle *sections) const;
void sendSignalOverAllLinks(BlockReference ref, GlobalSignalNumber gsn,
Signal25 *signal, Uint32 length,
JobBufferLevel jbuf) const;
/**
* EXECUTE_DIRECT comes in five variants.
*
* EXECUTE_DIRECT_FN/2 with explicit function, not signal number, see above.
*
* EXECUTE_DIRECT/4 calls another block within same thread.
*
* EXECUTE_DIRECT_MT/5 used when other block may be in another thread.
*
* EXECUTE_DIRECT_WITH_RETURN/4 calls another block within same thread and
* expects that result is passed in signal using prepareRETURN_DIRECT.
*
* EXECUTE_DIRECT_WITH_SECTIONS/5 with sections to block in same thread.
*/
void EXECUTE_DIRECT(Uint32 block, Uint32 gsn, Signal *signal, Uint32 len);
/*
* Instance defaults to instance of sender. Using explicit
* instance argument asserts that the call is thread-safe.
*/
void EXECUTE_DIRECT_MT(Uint32 block, Uint32 gsn, Signal *signal, Uint32 len,
Uint32 givenInstanceNo);
void EXECUTE_DIRECT_WITH_RETURN(Uint32 block, Uint32 gsn, Signal *signal,
Uint32 len);
void EXECUTE_DIRECT_WITH_SECTIONS(Uint32 block, Uint32 gsn, Signal *signal,
Uint32 len, SectionHandle *sections);
/**
* prepareRETURN_DIRECT is used to pass a return signal
* direct back to caller of EXECUTE_DIRECT_WITH_RETURN.
*
* The call to prepareRETURN_DIRECT should be immediately followed by
* return and bring back control to caller of EXECUTE_DIRECT_WITH_RETURN.
*/
void prepareRETURN_DIRECT(Uint32 gsn, Signal *signal, Uint32 len);
class SectionSegmentPool &getSectionSegmentPool();
void release(SegmentedSectionPtr &ptr);
void release(SegmentedSectionPtrPOD &ptr) {
SegmentedSectionPtr tmp(ptr);
release(tmp);
ptr.setNull();
}
void releaseSection(Uint32 firstSegmentIVal);
void releaseSections(struct SectionHandle &);
bool import(Ptr<SectionSegment> &first, const Uint32 *src, Uint32 len);
bool import(SegmentedSectionPtr &ptr, const Uint32 *src, Uint32 len) const;
bool import(SectionHandle *dst, LinearSectionPtr src[3], Uint32 cnt);
bool appendToSection(Uint32 &firstSegmentIVal, const Uint32 *src, Uint32 len);
bool dupSection(Uint32 ©FirstIVal, Uint32 srcFirstIVal);
bool writeToSection(Uint32 firstSegmentIVal, Uint32 offset, const Uint32 *src,
Uint32 len);
void handle_invalid_sections_in_send_signal(const Signal25 *) const;
void handle_lingering_sections_after_execute(const Signal *) const;
void handle_invalid_fragmentInfo(Signal25 *) const;
template <typename SecPtr>
void handle_send_failed(SendStatus, Signal25 *, Uint32, SecPtr[]) const;
void handle_out_of_longsignal_memory(Signal25 *) const;
/**
* Send routed signals (ONLY LOCALLY)
*
* NOTE: Only localhost is allowed!
*/
struct RoutePath {
Uint32 ref;
JobBufferLevel prio;
};
void sendRoutedSignal(RoutePath path[],
Uint32 pathcnt, // #hops
Uint32 dst[], // Final destination(s)
Uint32 dstcnt, // #final destination(s)
Uint32 gsn, // Final GSN
Signal *, Uint32 len,
JobBufferLevel prio, // Final prio
SectionHandle *handle = 0);
/**
* Check that signal sent from remote node
* is guaranteed to be correctly serialized wrt to NODE_FAILREP
*/
bool checkNodeFailSequence(Signal *);
#ifdef ERROR_INSERT
void setDelayedPrepare();
#endif
/**********************************************************
* Fragmented signals
*/
/**
* Assemble fragments
*
* @return true if all fragments has arrived
* false otherwise
*/
bool assembleFragments(Signal *signal);
/**
* Assemble dropped fragments
*
* Should be called at the start of a Dropped Signal Report
* (GSN_DROPPED_SIGNAL_REP) handler when it is expected that
* the block could receive fragmented signals.
* No dropped signal handling should be done until this method
* returns true.
*
* @return true if all fragments has arrived and dropped signal
* handling can proceed.
* false otherwise
*/
bool assembleDroppedFragments(Signal *signal);
/* If send size is > FRAGMENT_WORD_SIZE, fragments of this size
* will be sent by the sendFragmentedSignal variants
*/
static constexpr Uint32 FRAGMENT_WORD_SIZE = 240;
static constexpr Uint32 BATCH_FRAGMENT_WORD_SIZE = 240 * 8;
void sendFragmentedSignal(BlockReference ref, GlobalSignalNumber gsn,
Signal *signal, Uint32 length, JobBufferLevel jbuf,
SectionHandle *sections,
Callback & = TheEmptyCallback,
Uint32 messageSize = FRAGMENT_WORD_SIZE);
void sendFragmentedSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn,
Signal *signal, Uint32 length, JobBufferLevel jbuf,
SectionHandle *sections,
Callback & = TheEmptyCallback,
Uint32 messageSize = FRAGMENT_WORD_SIZE);
void sendFragmentedSignal(BlockReference ref, GlobalSignalNumber gsn,
Signal *signal, Uint32 length, JobBufferLevel jbuf,
LinearSectionPtr ptr[3], Uint32 noOfSections,
Callback & = TheEmptyCallback,
Uint32 messageSize = FRAGMENT_WORD_SIZE);
void sendFragmentedSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn,
Signal *signal, Uint32 length, JobBufferLevel jbuf,
LinearSectionPtr ptr[3], Uint32 noOfSections,
Callback & = TheEmptyCallback,
Uint32 messageSize = FRAGMENT_WORD_SIZE);
void sendBatchedFragmentedSignal(
BlockReference ref, GlobalSignalNumber gsn, Signal *signal, Uint32 length,
JobBufferLevel jbuf, SectionHandle *sections, bool noRelease,
Callback & = TheEmptyCallback,
Uint32 messageSize = BATCH_FRAGMENT_WORD_SIZE);
void sendBatchedFragmentedSignal(
NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal *signal,
Uint32 length, JobBufferLevel jbuf, SectionHandle *sections,
bool noRelease, Callback & = TheEmptyCallback,
Uint32 messageSize = BATCH_FRAGMENT_WORD_SIZE);
void sendBatchedFragmentedSignal(
BlockReference ref, GlobalSignalNumber gsn, Signal *signal, Uint32 length,
JobBufferLevel jbuf, LinearSectionPtr ptr[3], Uint32 noOfSections,
Callback & = TheEmptyCallback,
Uint32 messageSize = BATCH_FRAGMENT_WORD_SIZE);
void sendBatchedFragmentedSignal(
NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal *signal,
Uint32 length, JobBufferLevel jbuf, LinearSectionPtr ptr[3],
Uint32 noOfSections, Callback & = TheEmptyCallback,
Uint32 messageSize = BATCH_FRAGMENT_WORD_SIZE);
/**
* simBlockNodeFailure
*
* Method must be called by blocks that send or receive
* remote Fragmented Signals when they detect a node
* (NDBD or API) failure.
* If the block needs to acknowledge or perform further
* processing after completing block-level node failure
* handling, it can supply a Callback which will be invoked
* when block-level node failure handling has completed.
* Otherwise TheEmptyCallback is used.
* If TheEmptyCallback is used, all failure handling is
* performed in the current timeslice, to avoid any
* races.
*
* Parameters
* signal : Current signal*
* failedNodeId : Node id of failed node
* cb : Callback to be executed when block-level
* node failure handling completed.
* TheEmptyCallback is passed if no further
* processing is required.
* Returns
* Number of 'resources' cleaned up in call.
* Callback return code is total resources cleaned up.
*
*/
Uint32 simBlockNodeFailure(Signal *signal, Uint32 failedNodeId,
Callback &cb = TheEmptyCallback);
/**********************************************************
* Fragmented signals structures
*/
/**
* Struct used when assembling fragmented long signals at receiver side
*/
struct FragmentInfo {
FragmentInfo(Uint32 fragId, Uint32 sender);
Uint32 m_senderRef;
Uint32 m_fragmentId;
Uint32 m_sectionPtrI[3];
union {
Uint32 nextPool;
Uint32 nextHash;
};
Uint32 prevHash;
inline bool equal(FragmentInfo const &p) const {
return m_senderRef == p.m_senderRef && m_fragmentId == p.m_fragmentId;
}
inline Uint32 hashValue() const { return m_senderRef + m_fragmentId; }
inline bool isDropped() const {
/* IsDropped when entry in hash, but no segments stored */
return ((m_sectionPtrI[0] == RNIL) && (m_sectionPtrI[1] == RNIL) &&
(m_sectionPtrI[2] == RNIL));
}
}; // sizeof() = 32 bytes
typedef ArrayPool<FragmentInfo> FragmentInfo_pool;
typedef DLHashTable<FragmentInfo_pool, FragmentInfo> FragmentInfo_hash;
/**
* Struct used when sending fragmented signals
*/
struct FragmentSendInfo {
FragmentSendInfo();
enum Status { SendNotComplete = 0, SendComplete = 1, SendCancelled = 2 };
Uint8 m_status;
Uint8 m_prio;
Uint8 m_fragInfo;
enum Flags { SendNoReleaseSeg = 0x1 };
Uint8 m_flags;
Uint16 m_gsn;
Uint16 m_messageSize; // Size of each fragment
Uint32 m_fragmentId;
union {
// Similar to Ptr<SectionSegment> but a POD, as needed in a union.
struct {
SectionSegment *p;
Uint32 i;
} m_segmented;
LinearSectionPtr m_linear;
} m_sectionPtr[3];
LinearSectionPtr m_theDataSection;
NodeReceiverGroup m_nodeReceiverGroup; // 3
Callback m_callback;
union {
Uint32 nextPool;
Uint32 nextList;
};
Uint32 prevList;
};
typedef ArrayPool<FragmentSendInfo> FragmentSendInfo_pool;
typedef DLList<FragmentSendInfo_pool> FragmentSendInfo_list;
/**
* sendFirstFragment
* Used by sendFragmentedSignal
* noRelease can only be used if the caller can guarantee
* not to free the supplied sections until all fragments
* have been sent.
*/
bool sendFirstFragment(FragmentSendInfo &info, NodeReceiverGroup rg,
GlobalSignalNumber gsn, Signal *signal, Uint32 length,
JobBufferLevel jbuf, SectionHandle *sections,
bool noRelease,
Uint32 messageSize = FRAGMENT_WORD_SIZE);
bool sendFirstFragment(FragmentSendInfo &info, NodeReceiverGroup rg,
GlobalSignalNumber gsn, Signal *signal, Uint32 length,
JobBufferLevel jbuf, LinearSectionPtr ptr[3],
Uint32 noOfSections,
Uint32 messageSize = FRAGMENT_WORD_SIZE);
/**
* Send signal fragment
*
* @see sendFragmentedSignal
*/
void sendNextSegmentedFragment(Signal *signal, FragmentSendInfo &info);
/**
* Send signal fragment
*
* @see sendFragmentedSignal
*/
void sendNextLinearFragment(Signal *signal, FragmentSendInfo &info);
BlockNumber number() const;
/**
* Ensure that signal's sender is same node
*/
void LOCAL_SIGNAL(Signal *signal) const {
ndbrequire(refToNode(signal->getSendersBlockRef()) == theNodeId);
}
/**
* Is reference for our node?
*/
bool local_ref(BlockReference ref) const {
return (refToNode(ref) == theNodeId || refToNode(ref) == 0);
}
public:
/* Must be public so that we can jam() outside of block scope. */
EmulatedJamBuffer *jamBuffer() const;
protected:
BlockReference reference() const;
NodeId getOwnNodeId() const;
/**
* Refresh Watch Dog in initialising code
*
*/
void refresh_watch_dog(Uint32 place = 1);
volatile Uint32 *get_watch_dog();
void update_watch_dog_timer(Uint32 interval);
/**
* Prog error
* This function should be called when this node should be shutdown
* If the cause of the shutdown is known use extradata to add an
* errormessage describing the problem
*/
[[noreturn]] void progError(int line, int err_code,
const char *extradata = NULL,
const char *check = "") const;
private:
[[noreturn]] void signal_error(Uint32, Uint32, Uint32, const char *,
int) const;
const NodeId theNodeId;
const BlockNumber theNumber;
const Uint32 theInstance;
const BlockReference theReference;
/*
* Instance 0 is the main instance. It creates/owns other instances.
* In MT LQH main instance is the LQH proxy and the others ("workers")
* are real LQHs run by multiple threads.
*/
protected:
enum { MaxInstances = NDBMT_MAX_BLOCK_INSTANCES };
private:
SimulatedBlock **theInstanceList; // set in main, indexed by instance
SimulatedBlock *theMainInstance; // set in all
/*
Thread id currently executing this block.
Not used in singlethreaded ndbd.
*/
Uint32 m_threadId;
/*
Jam buffer reference.
In multithreaded ndbd, this is different in each thread, and must be
updated if migrating the block to another thread.
*/
EmulatedJamBuffer *m_jamBuffer;
/* For multithreaded ndb, the thread-specific watchdog counter. */
Uint32 *m_watchDogCounter;
/* Read-only high res timer pointer */
const NDB_TICKS *m_pHighResTimer;
SectionSegmentPool::Cache *m_sectionPoolCache;
Uint32 doNodeFailureCleanup(Signal *signal, Uint32 failedNodeId,
Uint32 resource, Uint32 cursor,
Uint32 elementsCleaned, Callback &cb);
bool doCleanupFragInfo(Uint32 failedNodeId, Uint32 &cursor,
Uint32 &rtUnitsUsed, Uint32 &elementsCleaned);
bool doCleanupFragSend(Uint32 failedNodeId, Uint32 &cursor,
Uint32 &rtUnitsUsed, Uint32 &elementsCleaned);
protected:
Block_context m_ctx;
NewVARIABLE *allocateBat(int batSize);
void freeBat();
static const NewVARIABLE *getBatVar(BlockNumber blockNo, Uint32 instanceNo,
Uint32 varNo);
static BlockReference calcTcBlockRef(NodeId aNode);
static BlockReference calcLqhBlockRef(NodeId aNode);
static BlockReference calcQlqhBlockRef(NodeId aNode);
static BlockReference calcAccBlockRef(NodeId aNode);
static BlockReference calcTupBlockRef(NodeId aNode);
static BlockReference calcTuxBlockRef(NodeId aNode);
static BlockReference calcDihBlockRef(NodeId aNode);
static BlockReference calcQmgrBlockRef(NodeId aNode);
static BlockReference calcDictBlockRef(NodeId aNode);
static BlockReference calcNdbCntrBlockRef(NodeId aNode);
static BlockReference calcTrixBlockRef(NodeId aNode);
static BlockReference calcBackupBlockRef(NodeId aNode);
static BlockReference calcSumaBlockRef(NodeId aNode);
static BlockReference calcApiClusterMgrBlockRef(NodeId aNode);
// matching instance on same node e.g. LQH-ACC-TUP
BlockReference calcInstanceBlockRef(BlockNumber aBlock);
// matching instance on another node e.g. LQH-LQH
// valid only if receiver has same number of workers
BlockReference calcInstanceBlockRef(BlockNumber aBlock, NodeId aNode);
/**
* allocRecord
* Allocates memory for the datastructures where ndb keeps the data
*
*/
void *allocRecord(const char *type, size_t s, size_t n, bool clear = true,
Uint32 paramId = 0);
void *allocRecordAligned(const char *type, size_t s, size_t n,
void **unaligned_buffer,
Uint32 align = NDB_O_DIRECT_WRITE_ALIGNMENT,
bool clear = true, Uint32 paramId = 0);
/**
* Deallocate record
*
* NOTE: Also resets pointer
*/
void deallocRecord(void **, const char *type, size_t s, size_t n);
/**
* Allocate memory from global pool,
* returns #chunks used
*
* Typically used by part of code, not converted to use global pool
* directly, but allocates everything during startup
*/
struct AllocChunk {
Uint32 ptrI;
Uint32 cnt;
};
Uint32 allocChunks(AllocChunk dst[], Uint32 /* size of dst */ arraysize,
Uint32 /* resource group */ rg,
Uint32 /* no of pages to allocate */ pages,
Uint32 paramId /* for error message if failing */);
static int sortchunks(const void *, const void *);
/**
* General info event (sent to cluster log)
*/
void infoEvent(const char *msg, ...) const ATTRIBUTE_FORMAT(printf, 2, 3);
void warningEvent(const char *msg, ...) ATTRIBUTE_FORMAT(printf, 2, 3);
/**
* Get node state
*/
const NodeState &getNodeState() const;
/**
* Get node info
*/
const NodeInfo &getNodeInfo(NodeId nodeId) const;
NodeInfo &setNodeInfo(NodeId);
const NodeVersionInfo &getNodeVersionInfo() const;
NodeVersionInfo &setNodeVersionInfo();
Uint32 change_and_get_io_laggers(int change);
/**********************
* Xfrm stuff
*
* xfrm the attr / key for **hash** generation.
* - Keys being equal should generate identical xfrm'ed strings.
* - Uniquenes of two non equal keys are preferred, but not required.
*/
/**
* @return length
*/
Uint32 xfrm_key_hash(Uint32 tab, const Uint32 *src, Uint32 *dst,
Uint32 dstSize,
Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const;
Uint32 xfrm_attr_hash(Uint32 attrDesc, const CHARSET_INFO *cs,
const Uint32 *src, Uint32 &srcPos, Uint32 *dst,
Uint32 &dstPos, Uint32 dstSize) const;
/*******************
* Compare either a full (non-NULL) key, or a single attr.
*
* Character strings are compared taking their normalized
* 'weight' into consideration, as defined by their collation.
*
* No intermediate xfrm'ed string are produced during the compare.
*
* return '<0', '==0' or '>0' for 's1<s2', s1==s2, 's2>s2' resp.
*/
int cmp_key(Uint32 tab, const Uint32 *s1, const Uint32 *s2) const;
int cmp_attr(Uint32 attrDesc, const CHARSET_INFO *cs, const Uint32 *s1,
Uint32 s1Len, const Uint32 *s2, Uint32 s2Len) const;
/**
*
*/
Uint32 create_distr_key(Uint32 tableId, const Uint32 *src, Uint32 *dst,
const Uint32 keyPaLen[MAX_ATTRIBUTES_IN_INDEX]) const;
/**
* if ndbd,
* wakeup main-loop if sleeping on IO
* if ndbmtd
* wakeup thread running block-instance
*/
void wakeup();
/**
* setup struct for wakeup
*/
void setup_wakeup();
/**
* Get receiver thread index for node
* MAX_NODES == no receiver thread
*/
Uint32 get_recv_thread_idx(TrpId trp_id);
private:
NewVARIABLE *NewVarRef; /* New Base Address Table for block */
Uint16 theBATSize; /* # entries in BAT */
protected:
GlobalPage_safepool &m_global_page_pool;
GlobalPage_pool &m_shared_page_pool;
void execNDB_TAMPER(Signal *signal);
void execNODE_STATE_REP(Signal *signal);
void execCHANGE_NODE_STATE_REQ(Signal *signal);
void execSIGNAL_DROPPED_REP(Signal *signal);
void execCONTINUE_FRAGMENTED(Signal *signal);
void execSTOP_FOR_CRASH(Signal *signal);
void execAPI_START_REP(Signal *signal);
void execNODE_START_REP(Signal *signal);
void execLOCAL_ROUTE_ORD(Signal *);
public:
void execSEND_PACKED(Signal *signal);
private:
/**
* Node state
*/
NodeState theNodeState;
Uint32 c_fragmentIdCounter;
FragmentInfo_pool c_fragmentInfoPool;
FragmentInfo_hash c_fragmentInfoHash;
bool c_fragSenderRunning;
FragmentSendInfo_pool c_fragmentSendPool;
FragmentSendInfo_list c_linearFragmentSendList;
FragmentSendInfo_list c_segmentedFragmentSendList;
protected:
Uint32 debugPrintFragmentCounts();
public:
class MutexManager {
friend class Mutex;
friend class SimulatedBlock;
friend class DbUtil;
public:
MutexManager(class SimulatedBlock &);
bool setSize(Uint32 maxNoOfActiveMutexes);
Uint32 getSize() const; // Get maxNoOfActiveMutexes
private:
/**
* core interface
*/
struct ActiveMutex {
ActiveMutex() {}
Uint32 m_gsn; // state
Uint32 m_mutexId;
Callback m_callback;
union {
Uint32 nextPool;
Uint32 nextList;
};
Uint32 prevList;
};
typedef Ptr<ActiveMutex> ActiveMutexPtr;
typedef ArrayPool<ActiveMutex> ActiveMutex_pool;
typedef DLList<ActiveMutex_pool> ActiveMutex_list;
bool seize(ActiveMutexPtr &ptr);
void release(Uint32 activeMutexPtrI);
void getPtr(ActiveMutexPtr &ptr) const;
void create(Signal *, ActiveMutexPtr &);
void destroy(Signal *, ActiveMutexPtr &);
void lock(Signal *, ActiveMutexPtr &, Uint32 flags);
void unlock(Signal *, ActiveMutexPtr &);
private:
void execUTIL_CREATE_LOCK_REF(Signal *signal);
void execUTIL_CREATE_LOCK_CONF(Signal *signal);
void execUTIL_DESTORY_LOCK_REF(Signal *signal);
void execUTIL_DESTORY_LOCK_CONF(Signal *signal);
void execUTIL_LOCK_REF(Signal *signal);
void execUTIL_LOCK_CONF(Signal *signal);
void execUTIL_UNLOCK_REF(Signal *signal);
void execUTIL_UNLOCK_CONF(Signal *signal);
SimulatedBlock &m_block;
ActiveMutex_pool m_mutexPool;
ActiveMutex_list m_activeMutexes;
BlockReference reference() const;
[[noreturn]] void progError(int line, int err_code, const char *extra = 0,
const char *check = "");
};
friend class MutexManager;
MutexManager c_mutexMgr;
void ignoreMutexUnlockCallback(Signal *signal, Uint32 ptrI, Uint32 retVal);
virtual bool getParam(const char *param, Uint32 *retVal) { return false; }
SafeCounterManager c_counterMgr;
private:
void execUTIL_CREATE_LOCK_REF(Signal *signal);
void execUTIL_CREATE_LOCK_CONF(Signal *signal);
void execUTIL_DESTORY_LOCK_REF(Signal *signal);
void execUTIL_DESTORY_LOCK_CONF(Signal *signal);
void execUTIL_LOCK_REF(Signal *signal);
void execUTIL_LOCK_CONF(Signal *signal);
void execUTIL_UNLOCK_REF(Signal *signal);
void execUTIL_UNLOCK_CONF(Signal *signal);
void check_sections(Signal25 *signal, Uint32 oldSecCount,
Uint32 newSecCount) const;
protected:
void fsRefError(Signal *signal, Uint32 line, const char *msg);
void execFSWRITEREF(Signal *signal);
void execFSREADREF(Signal *signal);
void execFSOPENREF(Signal *signal);
void execFSCLOSEREF(Signal *signal);
void execFSREMOVEREF(Signal *signal);
void execFSSYNCREF(Signal *signal);
void execFSAPPENDREF(Signal *signal);
// MT LQH callback CONF via signal
public:
struct CallbackPtr {
Uint32 m_callbackIndex;
Uint32 m_callbackData;
};
protected:
enum CallbackFlags {
CALLBACK_DIRECT = 0x0001, // use EXECUTE_DIRECT (assumed thread safe)
CALLBACK_ACK = 0x0002 // send ack at the end of callback timeslice
};
struct CallbackEntry {
CallbackFunction m_function;
Uint32 m_flags;
};
struct CallbackTable {
Uint32 m_count;
CallbackEntry *m_entry; // array
};
CallbackTable *m_callbackTableAddr; // set by block if used
enum {
THE_NULL_CALLBACK = 0 // must assign TheNULLCallbackFunction
};
void block_require(void);
void execute(Signal *signal, CallbackPtr &cptr, Uint32 returnCode);
const CallbackEntry &getCallbackEntry(Uint32 ci);
void sendCallbackConf(Signal *signal, Uint32 fullBlockNo, CallbackPtr &cptr,
Uint32 senderData, Uint32 callbackInfo,
Uint32 returnCode);
void execCALLBACK_CONF(Signal *signal);
// Variable for storing inserted errors, see pc.H
ERROR_INSERT_VARIABLE;
#ifdef VM_TRACE_TIME
public:
void clearTimes();
void printTimes(FILE *output);
void addTime(Uint32 gsn, Uint64 time);
void subTime(Uint32 gsn, Uint64 time);
struct TimeTrace {
Uint32 cnt;
Uint64 sum, sub;
} m_timeTrace[MAX_GSN + 1];
Uint32 m_currentGsn;
#endif
#if defined(USE_INIT_GLOBAL_VARIABLES)
void init_global_ptrs(void **tmp, size_t cnt);
void init_global_uint32_ptrs(void **tmp, size_t cnt);
void init_global_uint32(void **tmp, size_t cnt);
void init_global_block();
void disable_global_variables();
void enable_global_variables();
#endif
#ifdef VM_TRACE
public:
FileOutputStream debugOutFile;
NdbOut debugOut;
NdbOut &debugOutStream() { return debugOut; }
bool debugOutOn();
void debugOutLock() { globalSignalLoggers.lock(); }
void debugOutUnlock() { globalSignalLoggers.unlock(); }
const char *debugOutTag(char *buf, int line);
#endif
const char *getPartitionBalanceString(Uint32 fct) {
switch (fct) {
case NDB_PARTITION_BALANCE_SPECIFIC:
return "NDB_PARTITION_BALANCE_SPECIFIC";
case NDB_PARTITION_BALANCE_FOR_RA_BY_NODE:
return "NDB_PARTITION_BALANCE_FOR_RA_BY_NODE";
case NDB_PARTITION_BALANCE_FOR_RP_BY_NODE:
return "NDB_PARTITION_BALANCE_FOR_RP_BY_NODE";
case NDB_PARTITION_BALANCE_FOR_RP_BY_LDM:
return "NDB_PARTITION_BALANCE_FOR_RP_BY_LDM";
case NDB_PARTITION_BALANCE_FOR_RA_BY_LDM:
return "NDB_PARTITION_BALANCE_FOR_RA_BY_LDM";
case NDB_PARTITION_BALANCE_FOR_RA_BY_LDM_X_2:
return "NDB_PARTITION_BALANCE_FOR_RA_BY_LDM_X_2";
case NDB_PARTITION_BALANCE_FOR_RA_BY_LDM_X_3:
return "NDB_PARTITION_BALANCE_FOR_RA_BY_LDM_X_3";
case NDB_PARTITION_BALANCE_FOR_RA_BY_LDM_X_4:
return "NDB_PARTITION_BALANCE_FOR_RA_BY_LDM_X_4";
default:
ndbabort();
}
return NULL;
}
void ndbinfo_send_row(Signal *signal, const DbinfoScanReq &req,
const Ndbinfo::Row &row, Ndbinfo::Ratelimit &rl) const;
void ndbinfo_send_scan_break(Signal *signal, DbinfoScanReq &req,
const Ndbinfo::Ratelimit &rl, Uint32 data1,
Uint32 data2 = 0, Uint32 data3 = 0,
Uint32 data4 = 0) const;
void ndbinfo_send_scan_conf(Signal *signal, DbinfoScanReq &req,
const Ndbinfo::Ratelimit &rl) const;
#ifdef NDB_DEBUG_RES_OWNERSHIP
/* Utils to lock and unlock the global section segment pool */
void lock_global_ssp();
void unlock_global_ssp();
#endif
/* Needs to be defined in mt.hpp as well to work */
// #define DEBUG_SCHED_STATS 1
#define AVERAGE_SIGNAL_SIZE 16
#define MIN_QUERY_INSTANCES_PER_RR_GROUP 4
#define MAX_QUERY_INSTANCES_PER_RR_GROUP 18
#define LOAD_SCAN_FRAGREQ 5
#define RR_LOAD_REFRESH_COUNT 48
#define NUM_LQHKEYREQ_COUNTS 4
#define NUM_SCAN_FRAGREQ_COUNTS 1
#define MAX_LDM_THREAD_GROUPS_PER_RR_GROUP 8
#define MAX_RR_GROUPS \
((MAX_NDBMT_QUERY_THREADS + (MIN_QUERY_INSTANCES_PER_RR_GROUP - 1)) / \
MIN_QUERY_INSTANCES_PER_RR_GROUP)
#define MAX_DISTRIBUTION_WEIGHT 16
#define MAX_NUM_DISTR_SIGNAL \
(MAX_DISTRIBUTION_WEIGHT * MAX_QUERY_INSTANCES_PER_RR_GROUP)
#define MAX_LDM_DISTRIBUTION_WEIGHT 100
#define MAX_DISTR_THREADS (MAX_NDBMT_LQH_THREADS + MAX_NDBMT_QUERY_THREADS)
public:
struct RoundRobinInfo {
/**
* These variables are used to perform round robin for not fully
* partitioned tables that are executed outside of their own
* LDM thread group. Our own LDM thread group is part of this
* round robin group.
*
* It is also used separately for LDM groups.
*
* The first two variables control this for key lookup and the
* next two for table scans and range scans.
*/
Uint32 m_load_indicator_counter;
Uint32 m_lqhkeyreq_to_same_thread;
Uint32 m_lqhkeyreq_distr_signal_index;
Uint32 m_scan_fragreq_to_same_thread;
Uint32 m_scan_distr_signal_index;
/**
* m_distribution_signal_size is the current size of the round robin
* array m_distribution_signal. It is updated by
* calculate_distribution_signal once every 100ms.
* m_distribution_signal contains block references to the the LDM
* and query threads DBLQH/DBQLQH.
*/
Uint32 m_distribution_signal_size;
Uint32 m_distribution_signal[MAX_NUM_DISTR_SIGNAL];
};
static Uint32 m_rr_group[MAX_DISTR_THREADS];
static Uint32 m_num_lqhkeyreq_counts;
static Uint32 m_num_scan_fragreq_counts;
static Uint32 m_rr_load_refresh_count;
static Uint32 m_num_rr_groups;
static Uint32 m_num_query_thread_per_ldm;
static Uint32 m_num_distribution_threads;
static bool m_inited_rr_groups;
struct NextRoundInfo {
Uint32 m_next_pos;
Uint32 m_next_index;
};
struct LdmThreadState {
Uint32 m_current_weight;
Uint32 m_load_indicator;
Uint32 m_lqhkeyreq_counter;
Uint32 m_scan_fragreq_counter;
Uint32 m_current_weight_lqhkeyreq_count;
Uint32 m_current_weight_scan_fragreq_count;
};
struct QueryThreadState {
Uint32 m_load_indicator;
Uint32 m_max_lqhkeyreq_count;
Uint32 m_max_scan_fragreq_count;
Uint32 m_current_stolen_lqhkeyreq;
Uint32 m_current_stolen_scan_fragreq;
};
class DistributionHandler {
friend class SimulatedBlock;
public:
Uint32 m_weights[MAX_DISTR_THREADS];
struct NextRoundInfo m_next_round[MAX_DISTR_THREADS];
Uint32 m_distr_references[MAX_DISTR_THREADS];
struct RoundRobinInfo m_rr_info[MAX_RR_GROUPS];
struct LdmThreadState m_ldm_state[MAX_NDBMT_LQH_THREADS];
struct QueryThreadState m_query_state[MAX_NDBMT_QUERY_THREADS];
#ifdef DEBUG_SCHED_STATS
Uint64 m_lqhkeyreq_ldm;
Uint64 m_lqhkeyreq_lq;
Uint64 m_lqhkeyreq_rr;
Uint64 m_scan_fragreq_ldm;
Uint64 m_scan_fragreq_lq;
Uint64 m_scan_fragreq_rr;
Uint32 m_lqhkeyreq_ldm_count[MAX_NDBMT_LQH_THREADS];
Uint32 m_lqhkeyreq_qt_count[MAX_NDBMT_QUERY_THREADS];
Uint32 m_scan_fragreq_ldm_count[MAX_NDBMT_LQH_THREADS];
Uint32 m_scan_fragreq_qt_count[MAX_NDBMT_QUERY_THREADS];
#endif
};
void print_static_distr_info(DistributionHandler *handle);
void print_debug_sched_stats(DistributionHandler *const);
void get_load_indicators(DistributionHandler *const, Uint32);
/**
* 100ms have passed and it is time to update the DistributionInfo and
* RoundRobinInfo to reflect the current CPU load in the node.
*/
void calculate_distribution_signal(DistributionHandler *handle) {
Uint32 num_ldm_instances = getNumLDMInstances();
if (globalData.ndbMtQueryThreads == 0) {
jam();
return;
}
jam();
jamLine(m_num_rr_groups);
ndbrequire(m_inited_rr_groups);
ndbrequire(m_num_distribution_threads);
for (Uint32 rr_group = 0; rr_group < m_num_rr_groups; rr_group++) {
jam();
for (Uint32 thr_no = 0; thr_no < m_num_distribution_threads; thr_no++) {
/**
* We only use the Query threads for round robin groups. Thus
* scalable access to tables with only a few partitions is only
* handled by query threads. This has a number of advantages.
* 1) We protect the LDM threads from being overloaded. This means
* that we always have bandwidth for scalable writes even if
* there is a lot of complex queries being processed.
* 2) We can maintain statistics for traffic towards fragments.
* These statistics will only be maintained by the LDM threads
* since these can access those variables without having to
* protect the variables for concurrent access. This gives us
* a view on fragment usage without causing bottlenecks in
* query execution.
* Query threads will not maintain any type of fragment stats.
* There is some table stats that are required to be maintained
* to ensure that we can drop tables and alter tables in a safe
* manner.
* 3) Query threads have to do a bit of work to setup a number of
* variables for access to metadata each real-time break. Since
* LDM threads only execute their own fragments the LDM threads
* can execute more efficiently and thus guarantee maintained
* good performance.
*
* Historically the advice have been to place LDM threads on their
* own CPU core without using any hyperthreading. Modern CPUs can
* make increasingly good use of hyperthreading. By removing the
* dependency between LDM threads and our partitioning scheme we
* ensure that increasing the number of LDM threads doesn't have
* negative effect on scalability. By creating Query threads to
* assist LDM threads we ensure that we can maintain the good
* characteristics of LDM threads while still providing increased
* read scalability.
*
* Query threads gives us a simple manner to make very good use of
* the other hyperthread(s) in the CPU core by mostly schedule
* queries towards the query thread that normally would be sent
* to the LDM thread in the same CPU core. This gives us a
* possibility to make efficient use of modern CPU cores.
*
* Thus LDM threads are free to change variables about statistics
* even when executing as query threads, thus they don't require
* exclusive access although they will be updating some shared
* data structures since query threads will not touch those and
* in addition they will not read them. We still have to be careful
* with data placement to avoid false CPU cache sharing.
*
* We accomplish this here by setting next round to 0xFFFFFFFF which
* means the same as giving the LDM threads weight 0. However the
* weight of LDM threads is still used for handling the first
* level of scheduling. Thus LDM threads will still be heavily
* involved in handling queries towards its own fragments, also the
* READ COMMITTED queries.
*/
if (thr_no < num_ldm_instances || m_rr_group[thr_no] != rr_group) {
handle->m_next_round[thr_no].m_next_pos = Uint32(~0);
} else {
jam();
jamLine(thr_no);
handle->m_next_round[thr_no].m_next_pos = 0;
}
}
struct RoundRobinInfo *rr_info = &handle->m_rr_info[rr_group];
calculate_rr_distribution(handle, rr_info);
Uint32 q_inx = 0;
Uint32 num_distr_threads = m_num_distribution_threads;
for (Uint32 i = num_ldm_instances; i < num_distr_threads; i++) {
jam();
Uint32 q_weight = handle->m_weights[i];
struct QueryThreadState *q_state = &handle->m_query_state[q_inx];
/**
* Give the query thread in the same LDM group a bit of priority,
* but only to a certain degree.
*/
q_state->m_max_lqhkeyreq_count = q_weight;
q_state->m_max_scan_fragreq_count = q_weight / 2;
q_inx++;
}
}
jam();
jamLine(num_ldm_instances);
for (Uint32 i = 0; i < num_ldm_instances; i++) {
struct LdmThreadState *ldm_state = &handle->m_ldm_state[i];
ldm_state->m_current_weight = handle->m_weights[i];
}
}
void calculate_rr_distribution(DistributionHandler *handle,
struct RoundRobinInfo *rr_info) {
Uint32 dist_pos = 0;
for (Uint32 curr_pos = 0; curr_pos <= MAX_DISTRIBUTION_WEIGHT; curr_pos++) {
for (Uint32 thr_no = 0; thr_no < m_num_distribution_threads; thr_no++) {
if (handle->m_next_round[thr_no].m_next_pos == curr_pos) {
count_next_round(handle, thr_no);
if (curr_pos != 0) {
ndbrequire(dist_pos < MAX_NUM_DISTR_SIGNAL);
rr_info->m_distribution_signal[dist_pos] =
handle->m_distr_references[thr_no];
dist_pos++;
}
}
}
}
/**
* All round robin groups must have at least one query thread assigned
* to handle its work. Actually all query threads should at least do
* some work for the round robin group even at high load.
*/
ndbrequire(dist_pos);
rr_info->m_distribution_signal_size = dist_pos;
rr_info->m_lqhkeyreq_to_same_thread = NUM_LQHKEYREQ_COUNTS;
rr_info->m_scan_fragreq_to_same_thread = NUM_SCAN_FRAGREQ_COUNTS;
}
void count_next_round(DistributionHandler *handle, Uint32 thr_no) {
Uint32 weight = handle->m_weights[thr_no];
if (weight == 0) {
/**
* Weight 0 means that we will not use this thread at all since it is
* overloaded.
*/
handle->m_next_round[thr_no].m_next_pos = Uint32(~0);
}
Uint32 curr_pos = handle->m_next_round[thr_no].m_next_pos;
if (curr_pos == 0) {
/* Initialise index */
handle->m_next_round[thr_no].m_next_index = 0;
}
/**
* The idea with the mathematics here is to spread the block reference
* in an even way. We can put it in 16 positions, if weight is 16 we
* should use all positions, if weight is 8 we should use every second
* position. When weight is e.g. 3 we want to use 3 positions.
* For numbers like 9 it becomes a bit more complex, here we need to
* use alternative 1 or 2 steps forward such that we end up with 9
* positions in total.
*
* One algorithm that solves this is to always move
* MAX - curr_pos / (weight - move_step) steps forward
* E.g. for 9 this becomes:
* 16 - 0 / (9 - 0) = 1
* 16 - 1 / (9 - 1) = 1
* 16 - 2 / (9 - 2) = 2
* 16 - 4 / (9 - 3) = 2 ...
* Thus using position 1, 2, 4, 6, 8, 10, 12, 14 and 16.
*
* and weight 11 gives
* 16 - 0 / (11 - 0) = 1
* 16 - 1 / (11 - 1) = 1
* 16 - 2 / (11 - 2) = 1
* 16 - 3 / (11 - 3) = 1
* 16 - 4 / (11 - 4) = 1
* 16 - 5 / (11 - 5) = 1
* 16 - 6 / (11 - 6) = 2
* 16 - 8 / (11 - 7) = 2
* 16 - 10 / (11 - 8) = 2
* 16 - 12 / (11 - 9) = 2
* 16 - 14 / (11 - 10) = 2
* Thus position 1, 2, 3, 4, 5, 6, 8, 10, 12, 14 and 16 are the ones
* used by weight 11.
*
* Weight 3 gives
* 16 - 0 / (3 - 0) = 5
* 16 - 5 / (3 - 1) = 5
* 16 - 10 / (3 - 2) = 6
* Thus weight 3 uses positions 5, 10 and 16.
*
* The current position we derive from the position in the loop, but it
* is harder to derive the step that we are moving (move_step above).
* Thus we have to ensure that this is tracked with a variable outside
* of count_next_round. It is probably possible to derive it using the
* weight and current position, but it is easy enough to keep track of
* it as well.
*/
Uint32 move_step = handle->m_next_round[thr_no].m_next_index;
if (move_step < weight) {
Uint32 forward_steps =
(MAX_DISTRIBUTION_WEIGHT - curr_pos) / (weight - move_step);
handle->m_next_round[thr_no].m_next_pos = curr_pos + forward_steps;
} else {
handle->m_next_round[thr_no].m_next_pos = Uint32(~0);
}
handle->m_next_round[thr_no].m_next_index = move_step + 1;
}
void init_rr_groups() {
/**
* Round robin groups are created in a simple fashion where
* each LDM group is assigned to a Round Robin group, the
* next LDM group is assigned to the next Round Robin group.
* The number of Round Robin groups is determined when we
* configure the thread configuration.
*
* Given that thread configuration is performed before this step,
* and this includes also CPU locking, the thread configuration
* will assume that this simple algorithm is used here to decide
* on the Round Robin groups. Thus this modules have an implicit
* relationship to each other that must be maintained.
*
* The only output of the thread configuration is
* globalData.ndbMtLqhWorkers
* globalData.ndbQueryThreads
* globalData.ndbRRGroups
* globalData.QueryThreadsPerLdm
*/
Uint32 num_ldm_instances = globalData.ndbMtLqhWorkers;
Uint32 num_rr_groups = globalData.ndbRRGroups;
Uint32 num_query_thread_per_ldm = globalData.QueryThreadsPerLdm;
Uint32 num_distr_threads =
num_ldm_instances * (1 + globalData.QueryThreadsPerLdm);
m_num_query_thread_per_ldm = num_query_thread_per_ldm;
m_num_rr_groups = num_rr_groups;
m_num_distribution_threads = num_distr_threads;
Uint32 rr_group = 0;
Uint32 next_query_instance = num_ldm_instances;
for (Uint32 i = 0; i < MAX_DISTR_THREADS; i++) {
m_rr_group[i] = 0xFFFFFFFF; // Ensure group not valid as init value
}
if (num_query_thread_per_ldm == 0) {
return;
}
for (Uint32 i = 0; i < num_ldm_instances; i++) {
m_rr_group[i] = rr_group;
for (Uint32 j = 0; j < num_query_thread_per_ldm; j++) {
m_rr_group[next_query_instance] = rr_group;
next_query_instance++;
}
rr_group++;
if (rr_group == num_rr_groups) {
rr_group = 0;
}
}
}
void fill_distr_references(DistributionHandler *handle) {
Uint32 num_query_thread_per_ldm = globalData.QueryThreadsPerLdm;
Uint32 num_ldm_instances = getNumLDMInstances();
Uint32 num_distr_threads = num_ldm_instances + globalData.ndbMtQueryThreads;
memset(handle, 0, sizeof(*handle));
if (num_query_thread_per_ldm == 0) {
/* No scheduling required with no query threads */
return;
}
/**
* Initialise variables that are static over the lifetime of
* this nodes life.
*/
ndbrequire(globalData.ndbMtQueryThreads ==
(num_query_thread_per_ldm * num_ldm_instances));
/**
* Setup quick access to the LQH in the query thread and the LDM
* threads.
*
* m_distr_references lists references to the LQH of the LDM
* threads first and then after that comes the query threads.
* This array is used by calculate_distribution_signal to fill
* the m_distribution_signal array based on the CPU usage of
* the query threads.
*
* So these two arrays contain the same information, but accessed
* in different ways.
*/
for (Uint32 i = 0; i < num_ldm_instances; i++) {
handle->m_distr_references[i] = numberToRef(DBLQH, i + 1, getOwnNodeId());
struct LdmThreadState *ldm_state = &handle->m_ldm_state[i];
ldm_state->m_load_indicator = 1;
ldm_state->m_current_weight = 33;
ldm_state->m_lqhkeyreq_counter = 0;
ldm_state->m_scan_fragreq_counter = 0;
ldm_state->m_current_weight_lqhkeyreq_count = 0;
ldm_state->m_current_weight_scan_fragreq_count = 0;
}
ndbrequire(num_ldm_instances + globalData.ndbMtQueryThreads <=
MAX_DISTR_THREADS);
for (Uint32 i = 0; i < globalData.ndbMtQueryThreads; i++) {
handle->m_distr_references[i + num_ldm_instances] =
numberToRef(DBQLQH, (i + 1), getOwnNodeId());
struct QueryThreadState *q_state = &handle->m_query_state[i];
q_state->m_load_indicator = 1;
q_state->m_max_lqhkeyreq_count = 0;
q_state->m_max_scan_fragreq_count = 0;
q_state->m_current_stolen_lqhkeyreq = 0;
q_state->m_current_stolen_scan_fragreq = 0;
}
for (Uint32 i = 0; i < num_ldm_instances; i++) {
handle->m_weights[i] = 33;
}
for (Uint32 i = num_ldm_instances; i < num_distr_threads; i++) {
handle->m_weights[i] = 8;
}
/* m_rr_info initialised to all 0s by above memset which is ok */
}
Uint32 get_query_block_no(Uint32 nodeId) {
Uint32 blockNo = DBLQH;
Uint32 num_query_threads = getNodeInfo(nodeId).m_query_threads;
if (num_query_threads > 0) {
/**
* There is query threads in the receiving node, this means that we will
* send the query to a virtual block with the same instance number in the
* same node id. The receiver will figure out how to map this to a
* real block number (could be our own node if nodeId = ownNodeId).
*/
blockNo = V_QUERY;
}
return blockNo;
}
Uint32 get_lqhkeyreq_ref(DistributionHandler *handle, Uint32 instance_no);
Uint32 get_scan_fragreq_ref(DistributionHandler *handle, Uint32 instance_no);
/**
* A number of support routines to avoid spreading out a lot of
* if-statements all over the code base.
*/
Uint32 getFirstLDMThreadInstance() {
if (unlikely(!isNdbMtLqh()))
return 0;
else if (unlikely(globalData.ndbMtLqhThreads == 0)) {
return globalData.ndbMtMainThreads + globalData.ndbMtQueryThreads +
globalData.ndbMtTcThreads;
} else {
/* Adjust for proxy instance with + 1 */
return globalData.ndbMtMainThreads + 1;
}
}
Uint32 getNumLDMInstances() {
if (unlikely(!isNdbMtLqh())) return 1;
return globalData.ndbMtLqhWorkers;
}
Uint32 getNumTCInstances() {
if (unlikely(!isNdbMtLqh()))
return 1;
else if (unlikely(globalData.ndbMtTcThreads == 0))
return 1;
else
return globalData.ndbMtTcThreads;
}
void query_thread_memory_barrier() {
if (globalData.ndbMtQueryThreads > 0) {
mb();
}
}
#if defined(USE_INIT_GLOBAL_VARIABLES)
/**
* Optional method to check / init global variables between
* job buffer signal executions
*/
virtual void checkInitGlobalVariables();
#endif
protected:
/**
* SegmentUtils methods
*/
SectionSegment *getSegmentPtr(Uint32 iVal) override;
bool seizeSegment(Ptr<SectionSegment> &p) override;
void releaseSegment(Uint32 iVal) override;
void releaseSegmentList(Uint32 firstSegmentIVal) override;
/** End of SegmentUtils methods */
};