storage/ndb/src/kernel/vm/SimulatedBlock.cpp (3,395 lines of code) (raw):

/* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ /* This file is used to build both the multithreaded and the singlethreaded ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the macro not defined). */ #include <ndb_global.h> #include "SimulatedBlock.hpp" #include <NdbOut.hpp> #include <OutputStream.hpp> #include <GlobalData.hpp> #include <Emulator.hpp> #include <WatchDog.hpp> #include <ErrorHandlingMacros.hpp> #include <TimeQueue.hpp> #include <TransporterRegistry.hpp> #include <SignalLoggerManager.hpp> #include <FastScheduler.hpp> #include "ndbd_malloc.hpp" #include <signaldata/EventReport.hpp> #include <signaldata/ContinueFragmented.hpp> #include <signaldata/NodeStateSignalData.hpp> #include <signaldata/FsRef.hpp> #include <signaldata/SignalDroppedRep.hpp> #include <signaldata/LocalRouteOrd.hpp> #include <signaldata/TransIdAI.hpp> #include <signaldata/Sync.hpp> #include <DebuggerNames.hpp> #include "LongSignal.hpp" #include <Properties.hpp> #include "Configuration.hpp" #include <AttributeDescriptor.hpp> #include <NdbSqlUtil.hpp> #include "../blocks/dbdih/Dbdih.hpp" #include <signaldata/CallbackSignal.hpp> #include "LongSignalImpl.hpp" #include <EventLogger.hpp> extern EventLogger * g_eventLogger; #define ljamEntry() jamEntryLine(30000 + __LINE__) #define ljam() jamLine(30000 + __LINE__) // // Constructor, Destructor // SimulatedBlock::SimulatedBlock(BlockNumber blockNumber, struct Block_context & ctx, Uint32 instanceNumber) : theNodeId(globalData.ownId), theNumber(blockNumber), theInstance(instanceNumber), theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)), theInstanceList(0), theMainInstance(0), m_ctx(ctx), m_global_page_pool(globalData.m_global_page_pool), m_shared_page_pool(globalData.m_shared_page_pool), c_fragmentInfoHash(c_fragmentInfoPool), c_linearFragmentSendList(c_fragmentSendPool), c_segmentedFragmentSendList(c_fragmentSendPool), c_mutexMgr(* this), c_counterMgr(* this) #ifdef VM_TRACE ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream()))) #endif #ifdef VM_TRACE_TIME ,m_currentGsn(0) #endif { m_threadId = 0; m_watchDogCounter = NULL; m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM); NewVarRef = 0; SimulatedBlock* mainBlock = globalData.getBlock(blockNumber); if (theInstance == 0) { ndbrequire(mainBlock == 0); mainBlock = this; globalData.setBlock(blockNumber, mainBlock); } else { ndbrequire(mainBlock != 0); mainBlock->addInstance(this, theInstance); } theMainInstance = mainBlock; c_fragmentIdCounter = 1; c_fragSenderRunning = false; #ifdef VM_TRACE_TIME clearTimes(); #endif for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++) theExecArray[i] = 0; installSimulatedBlockFunctions(); m_callbackTableAddr = 0; CLEAR_ERROR_INSERT_VALUE; #ifdef VM_TRACE m_global_variables = new Ptr<void> * [1]; m_global_variables[0] = 0; m_global_variables_save = 0; #endif } void SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance) { ndbrequire(theMainInstance == this); ndbrequire(number() == b->number()); if (theInstanceList == 0) { theInstanceList = new SimulatedBlock* [MaxInstances]; ndbrequire(theInstanceList != 0); for (Uint32 i = 0; i < MaxInstances; i++) theInstanceList[i] = 0; } ndbrequire(theInstance < MaxInstances); ndbrequire(theInstanceList[theInstance] == 0); theInstanceList[theInstance] = b; } void SimulatedBlock::initCommon() { Uint32 count = 10; this->getParam("FragmentSendPool", &count); c_fragmentSendPool.setSize(count); count = 10; this->getParam("FragmentInfoPool", &count); c_fragmentInfoPool.setSize(count); count = 10; this->getParam("FragmentInfoHash", &count); c_fragmentInfoHash.setSize(count); count = 5; this->getParam("ActiveMutexes", &count); c_mutexMgr.setSize(count); count = 5; this->getParam("ActiveCounters", &count); c_counterMgr.setSize(count); count = 5; this->getParam("ActiveThreadSync", &count); c_syncThreadPool.setSize(count); } SimulatedBlock::~SimulatedBlock() { freeBat(); #ifdef VM_TRACE_TIME printTimes(stdout); #endif #ifdef VM_TRACE delete [] m_global_variables; #endif if (theInstanceList != 0) { Uint32 i; for (i = 0; i < MaxInstances; i++) delete theInstanceList[i]; delete [] theInstanceList; } theInstanceList = 0; } void SimulatedBlock::installSimulatedBlockFunctions(){ ExecFunction * a = theExecArray; a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP; a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ; a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER; a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP; a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED; a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH; a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF; a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF; a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF; a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF; a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF; a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF; a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF; a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF; a[GSN_FSOPENREF] = &SimulatedBlock::execFSOPENREF; a[GSN_FSCLOSEREF] = &SimulatedBlock::execFSCLOSEREF; a[GSN_FSWRITEREF] = &SimulatedBlock::execFSWRITEREF; a[GSN_FSREADREF] = &SimulatedBlock::execFSREADREF; a[GSN_FSREMOVEREF] = &SimulatedBlock::execFSREMOVEREF; a[GSN_FSSYNCREF] = &SimulatedBlock::execFSSYNCREF; a[GSN_FSAPPENDREF] = &SimulatedBlock::execFSAPPENDREF; a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP; a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP; a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED; a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF; a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ; a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF; a[GSN_LOCAL_ROUTE_ORD] = &SimulatedBlock::execLOCAL_ROUTE_ORD; a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ; a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ; a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF; } void SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn, ExecFunction f, bool force){ if(gsn > MAX_GSN || (!force && theExecArray[gsn] != 0)){ char errorMsg[255]; BaseString::snprintf(errorMsg, 255, "GSN %d(%d))", gsn, MAX_GSN); ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg); } theExecArray[gsn] = f; } void SimulatedBlock::assignToThread(ThreadContext ctx) { m_threadId = ctx.threadId; m_jamBuffer = ctx.jamBuffer; m_watchDogCounter = ctx.watchDogCounter; m_sectionPoolCache = ctx.sectionPoolCache; } Uint32 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId) { Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH); Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId); return instanceKey; } Uint32 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey) { Uint32 lqhWorkers = globalData.ndbMtLqhWorkers; Uint32 instanceNo; if (lqhWorkers == 0) { instanceNo = 0; } else { assert(instanceKey != 0); instanceNo = 1 + (instanceKey - 1) % lqhWorkers; } return instanceNo; } void SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo, const char* filename, int lineno) const { char objRef[255]; BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno); char probData[255]; BaseString::snprintf(probData, 255, "Signal (GSN: %d, Length: %d, Rec Block No: %d)", gsn, len, recBlockNo); ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO, probData, objRef); } extern class SectionSegmentPool g_sectionSegmentPool; #define check_sections(signal, cnt, cnt2) do { if (unlikely(cnt)) { handle_invalid_sections_in_send_signal(signal); } else if (unlikely(cnt2 == 0 && (signal->header.m_fragmentInfo != 0 && signal->header.m_fragmentInfo != 3))) { handle_invalid_fragmentInfo(signal); } } while(0) void SimulatedBlock::handle_invalid_sections_in_send_signal(Signal* signal) const { //Uint32 cnt = signal->header.m_noOfSections; #if defined VM_TRACE || defined ERROR_INSERT ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO, "Unhandled sections in sendSignal", ""); #else infoEvent("Unhandled sections in sendSignal!!"); #endif } void SimulatedBlock::handle_lingering_sections_after_execute(Signal* signal) const { //Uint32 cnt = signal->header.m_noOfSections; #if defined VM_TRACE || defined ERROR_INSERT ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO, "Unhandled sections after execute", ""); #else infoEvent("Unhandled sections after execute"); #endif } void SimulatedBlock::handle_lingering_sections_after_execute(SectionHandle* handle) const { //Uint32 cnt = signal->header.m_noOfSections; #if defined VM_TRACE || defined ERROR_INSERT ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO, "Unhandled sections(handle) after execute", ""); #else infoEvent("Unhandled sections(handle) after execute"); #endif } void SimulatedBlock::handle_invalid_fragmentInfo(Signal* signal) const { #if defined VM_TRACE || defined ERROR_INSERT ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO, "Incorrect header->m_fragmentInfo in sendSignal()", ""); #else signal->header.m_fragmentInfo = 0; infoEvent("Incorrect header->m_fragmentInfo in sendSignal"); #endif } void SimulatedBlock::handle_out_of_longsignal_memory(Signal * signal) const { ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY, "Out of LongMessageBuffer in sendSignal", ""); } void SimulatedBlock::handle_send_failed(SendStatus ss, Signal * signal) const { switch(ss){ case SEND_BUFFER_FULL: ErrorReporter::handleError(NDBD_EXIT_GENERIC, "Out of SendBufferMemory in sendSignal", ""); break; case SEND_MESSAGE_TOO_BIG: ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE, "Message to big in sendSignal", ""); break; case SEND_UNKNOWN_NODE: ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE, "Unknown node in sendSignal", ""); break; case SEND_OK: case SEND_BLOCKED: case SEND_DISCONNECTED: break; } ndbrequire(false); } static void linkSegments(Uint32 head, Uint32 tail){ Ptr<SectionSegment> headPtr; g_sectionSegmentPool.getPtr(headPtr, head); Ptr<SectionSegment> tailPtr; g_sectionSegmentPool.getPtr(tailPtr, tail); Ptr<SectionSegment> oldTailPtr; g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment); /* Can only efficiently link segments if linking to the end of a * multiple-of-segment-size sized chunk */ if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0) { #if defined VM_TRACE || defined ERROR_INSERT ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO, "Bad head segment size", ""); #else ndbout_c("linkSegments : Bad head segment size"); #endif } headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment; headPtr.p->m_sz += tailPtr.p->m_sz; oldTailPtr.p->m_nextSegment = tailPtr.i; } void getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){ Uint32 tSec0 = ptr[0].i; Uint32 tSec1 = ptr[1].i; Uint32 tSec2 = ptr[2].i; SectionSegment * p; switch(secCount){ case 3: p = g_sectionSegmentPool.getPtr(tSec2); ptr[2].p = p; ptr[2].sz = p->m_sz; case 2: p = g_sectionSegmentPool.getPtr(tSec1); ptr[1].p = p; ptr[1].sz = p->m_sz; case 1: p = g_sectionSegmentPool.getPtr(tSec0); ptr[0].p = p; ptr[0].sz = p->m_sz; case 0: return; } char msg[40]; sprintf(msg, "secCount=%d", secCount); ErrorReporter::handleAssert(msg, __FILE__, __LINE__); } void getSection(SegmentedSectionPtr & ptr, Uint32 i){ ptr.i = i; SectionSegment * p = g_sectionSegmentPool.getPtr(i); ptr.p = p; ptr.sz = p->m_sz; } Uint32 getSectionSz(Uint32 id) { return g_sectionSegmentPool.getPtr(id)->m_sz; } Uint32* getLastWordPtr(Uint32 id) { SectionSegment* first= g_sectionSegmentPool.getPtr(id); SectionSegment* last= g_sectionSegmentPool.getPtr(first->m_lastSegment); Uint32 offset= (first->m_sz -1) % SectionSegment::DataLength; return &last->theData[offset]; } #ifdef NDBD_MULTITHREADED #define SB_SP_ARG *m_sectionPoolCache, #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache, #else #define SB_SP_ARG #define SB_SP_REL_ARG #endif static void releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){ Uint32 tSec0 = ptr[0].i; Uint32 tSz0 = ptr[0].sz; Uint32 tSec1 = ptr[1].i; Uint32 tSz1 = ptr[1].sz; Uint32 tSec2 = ptr[2].i; Uint32 tSz2 = ptr[2].sz; switch(secCount){ case 3: g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG relSz(tSz2), tSec2, ptr[2].p->m_lastSegment); case 2: g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG relSz(tSz1), tSec1, ptr[1].p->m_lastSegment); case 1: g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG relSz(tSz0), tSec0, ptr[0].p->m_lastSegment); case 0: return; } char msg[40]; sprintf(msg, "secCount=%d", secCount); ErrorReporter::handleAssert(msg, __FILE__, __LINE__); } void SimulatedBlock::sendSignal(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer) const { BlockReference sendBRef = reference(); Uint32 noOfSections = signal->header.m_noOfSections; Uint32 recBlock = refToBlock(ref); Uint32 recNode = refToNode(ref); Uint32 ourProcessor = globalData.ownId; signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.m_noOfSections = 0; check_sections(signal, noOfSections, 0); Uint32 tSignalId = signal->header.theSignalId; if ((length == 0) || length > 25 || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if #ifdef VM_TRACE if(globalData.testOn){ Uint16 proc = (recNode == 0 ? globalData.ownId : recNode); signal->header.theSendersBlockRef = sendBRef; globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], proc); } #endif if(recNode == ourProcessor || recNode == 0) { signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = sendBRef; #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) sendlocal(m_threadId, &signal->header, signal->theData, NULL); else sendprioa(m_threadId, &signal->header, signal->theData, NULL); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif return; } else { // send distributed Signal SignalHeader sh; Uint32 tTrace = signal->getTrace(); sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = refToBlock(sendBRef); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = 0; sh.m_fragmentInfo = 0; #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode); #endif SendStatus ss; #ifdef NDBD_MULTITHREADED ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0], recNode, 0); #else ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, (LinearSectionPtr*)0); #endif if (unlikely(! (ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED))) { handle_send_failed(ss, signal); } } return; } void SimulatedBlock::sendSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer) const { Uint32 noOfSections = signal->header.m_noOfSections; Uint32 tSignalId = signal->header.theSignalId; Uint32 tTrace = signal->getTrace(); Uint32 ourProcessor = globalData.ownId; Uint32 recBlock = rg.m_block; signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = reference(); signal->header.m_noOfSections = 0; check_sections(signal, noOfSections, 0); if ((length == 0) || (length > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if SignalHeader sh; sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = refToBlock(reference()); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = 0; sh.m_fragmentInfo = 0; /** * Check own node */ if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){ #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], ourProcessor); } #endif #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) sendlocal(m_threadId, &signal->header, signal->theData, NULL); else sendprioa(m_threadId, &signal->header, signal->theData, NULL); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif rg.m_nodes.clear((Uint32)0); rg.m_nodes.clear(ourProcessor); } /** * Do the big loop */ Uint32 recNode = 0; while(!rg.m_nodes.isclear()){ recNode = rg.m_nodes.find(recNode + 1); rg.m_nodes.clear(recNode); #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], recNode); } #endif #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode); #endif SendStatus ss; #ifdef NDBD_MULTITHREADED ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0], recNode, 0); #else ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, (LinearSectionPtr*)0); #endif if (unlikely(! (ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED))) { handle_send_failed(ss, signal); } } return; } bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len); void SimulatedBlock::sendSignal(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer, LinearSectionPtr ptr[3], Uint32 noOfSections) const { BlockReference sendBRef = reference(); Uint32 recBlock = refToBlock(ref); Uint32 recNode = refToNode(ref); Uint32 ourProcessor = globalData.ownId; check_sections(signal, signal->header.m_noOfSections, noOfSections); signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.m_noOfSections = noOfSections; Uint32 tSignalId = signal->header.theSignalId; Uint32 tFragInfo = signal->header.m_fragmentInfo; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if #ifdef VM_TRACE if(globalData.testOn){ Uint16 proc = (recNode == 0 ? globalData.ownId : recNode); signal->header.theSendersBlockRef = sendBRef; globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], proc, ptr, noOfSections); } #endif if(recNode == ourProcessor || recNode == 0) { signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = sendBRef; /** * We have to copy the data */ bool ok = true; Ptr<SectionSegment> segptr[3]; for(Uint32 i = 0; i<noOfSections; i++){ ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz); signal->theData[length+i] = segptr[i].i; } if (unlikely(! ok)) { handle_out_of_longsignal_memory(signal); } #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) sendlocal(m_threadId, &signal->header, signal->theData, signal->theData+length); else sendprioa(m_threadId, &signal->header, signal->theData, signal->theData+length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif signal->header.m_noOfSections = 0; return; } else { // send distributed Signal SignalHeader sh; Uint32 tTrace = signal->getTrace(); Uint32 noOfSections = signal->header.m_noOfSections; sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = refToBlock(sendBRef); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInfo; #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode); #endif SendStatus ss; #ifdef NDBD_MULTITHREADED ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0], recNode, ptr); #else ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, ptr); #endif if (unlikely(! (ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED))) { handle_send_failed(ss, signal); } } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return; } void SimulatedBlock::sendSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer, LinearSectionPtr ptr[3], Uint32 noOfSections) const { Uint32 tSignalId = signal->header.theSignalId; Uint32 tTrace = signal->getTrace(); Uint32 tFragInfo = signal->header.m_fragmentInfo; Uint32 ourProcessor = globalData.ownId; Uint32 recBlock = rg.m_block; check_sections(signal, signal->header.m_noOfSections, noOfSections); signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = reference(); signal->header.m_noOfSections = noOfSections; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if SignalHeader sh; sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = refToBlock(reference()); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInfo; /** * Check own node */ if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){ #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], ourProcessor, ptr, noOfSections); } #endif /** * We have to copy the data */ bool ok = true; Ptr<SectionSegment> segptr[3]; for(Uint32 i = 0; i<noOfSections; i++){ ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz); signal->theData[length+i] = segptr[i].i; } if (unlikely(! ok)) { handle_out_of_longsignal_memory(signal); } #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) sendlocal(m_threadId, &signal->header, signal->theData, signal->theData + length); else sendprioa(m_threadId, &signal->header, signal->theData, signal->theData + length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif rg.m_nodes.clear((Uint32)0); rg.m_nodes.clear(ourProcessor); } /** * Do the big loop */ Uint32 recNode = 0; while(!rg.m_nodes.isclear()){ recNode = rg.m_nodes.find(recNode + 1); rg.m_nodes.clear(recNode); #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], recNode, ptr, noOfSections); } #endif #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode); #endif SendStatus ss; #ifdef NDBD_MULTITHREADED ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0], recNode, ptr); #else ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, ptr); #endif if (unlikely(! (ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED))) { handle_send_failed(ss, signal); } } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return; } void SimulatedBlock::sendSignal(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer, SectionHandle* sections) const { Uint32 noOfSections = sections->m_cnt; BlockReference sendBRef = reference(); Uint32 recBlock = refToBlock(ref); Uint32 recNode = refToNode(ref); Uint32 ourProcessor = globalData.ownId; check_sections(signal, signal->header.m_noOfSections, noOfSections); signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.m_noOfSections = noOfSections; Uint32 tSignalId = signal->header.theSignalId; Uint32 tFragInfo = signal->header.m_fragmentInfo; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if #ifdef VM_TRACE if(globalData.testOn){ Uint16 proc = (recNode == 0 ? globalData.ownId : recNode); signal->header.theSendersBlockRef = sendBRef; globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], proc, sections->m_ptr, noOfSections); } #endif if(recNode == ourProcessor || recNode == 0) { signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = sendBRef; /** * We have to copy the data */ Uint32 * dst = signal->theData + length; * dst ++ = sections->m_ptr[0].i; * dst ++ = sections->m_ptr[1].i; * dst ++ = sections->m_ptr[2].i; #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) sendlocal(m_threadId, &signal->header, signal->theData, signal->theData + length); else sendprioa(m_threadId, &signal->header, signal->theData, signal->theData + length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif } else { // send distributed Signal SignalHeader sh; Uint32 tTrace = signal->getTrace(); sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = refToBlock(sendBRef); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInfo; #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode); #endif SendStatus ss; #ifdef NDBD_MULTITHREADED ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0], recNode, &g_sectionSegmentPool, sections->m_ptr); #else ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, g_sectionSegmentPool, sections->m_ptr); #endif if (unlikely(! (ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED))) { handle_send_failed(ss, signal); } ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr); } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; sections->m_cnt = 0; return; } void SimulatedBlock::sendSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer, SectionHandle * sections) const { Uint32 noOfSections = sections->m_cnt; Uint32 tSignalId = signal->header.theSignalId; Uint32 tTrace = signal->getTrace(); Uint32 tFragInfo = signal->header.m_fragmentInfo; Uint32 ourProcessor = globalData.ownId; Uint32 recBlock = rg.m_block; check_sections(signal, signal->header.m_noOfSections, noOfSections); signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = reference(); signal->header.m_noOfSections = noOfSections; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if SignalHeader sh; sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = refToBlock(reference()); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInfo; /** * Check own node */ bool release = true; if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)) { release = false; #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], ourProcessor, sections->m_ptr, noOfSections); } #endif /** * We have to copy the data */ Uint32 * dst = signal->theData + length; * dst ++ = sections->m_ptr[0].i; * dst ++ = sections->m_ptr[1].i; * dst ++ = sections->m_ptr[2].i; #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) sendlocal(m_threadId, &signal->header, signal->theData, signal->theData + length); else sendprioa(m_threadId, &signal->header, signal->theData, signal->theData + length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif rg.m_nodes.clear((Uint32)0); rg.m_nodes.clear(ourProcessor); } /** * Do the big loop */ Uint32 recNode = 0; while(!rg.m_nodes.isclear()){ recNode = rg.m_nodes.find(recNode + 1); rg.m_nodes.clear(recNode); #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], recNode, sections->m_ptr, noOfSections); } #endif #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode); #endif SendStatus ss; #ifdef NDBD_MULTITHREADED ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0], recNode, &g_sectionSegmentPool, sections->m_ptr); #else ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, g_sectionSegmentPool, sections->m_ptr); #endif if (unlikely(! (ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED))) { handle_send_failed(ss, signal); } } if (release) { ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr); } sections->m_cnt = 0; signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return; } void SimulatedBlock::sendSignalNoRelease(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer, SectionHandle* sections) const { /** * Implementation the same as sendSignal(), except that * the sections are duplicated when sending locally, and * not released */ Uint32 noOfSections = sections->m_cnt; BlockReference sendBRef = reference(); Uint32 recBlock = refToBlock(ref); Uint32 recNode = refToNode(ref); Uint32 ourProcessor = globalData.ownId; check_sections(signal, signal->header.m_noOfSections, noOfSections); signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.m_noOfSections = noOfSections; Uint32 tSignalId = signal->header.theSignalId; Uint32 tFragInfo = signal->header.m_fragmentInfo; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if #ifdef VM_TRACE if(globalData.testOn){ Uint16 proc = (recNode == 0 ? globalData.ownId : recNode); signal->header.theSendersBlockRef = sendBRef; globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], proc, sections->m_ptr, noOfSections); } #endif if(recNode == ourProcessor || recNode == 0) { signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = sendBRef; Uint32 * dst = signal->theData + length; /* We need to copy the segmented section data into separate * sections when sending locally and keeping a copy ourselves */ for (Uint32 sec=0; sec < noOfSections; sec++) { Uint32 secCopy; bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i); ndbrequire (ok); * dst ++ = secCopy; } #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) sendlocal(m_threadId, &signal->header, signal->theData, signal->theData + length); else sendprioa(m_threadId, &signal->header, signal->theData, signal->theData + length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif } else { // send distributed Signal SignalHeader sh; Uint32 tTrace = signal->getTrace(); sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = refToBlock(sendBRef); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInfo; #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode); #endif SendStatus ss; #ifdef NDBD_MULTITHREADED ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0], recNode, &g_sectionSegmentPool, sections->m_ptr); #else ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, g_sectionSegmentPool, sections->m_ptr); #endif ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return; } void SimulatedBlock::sendSignalNoRelease(NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer, SectionHandle * sections) const { /** * Implementation the same as sendSignal(), except that * the sections are duplicated when sending locally, and * not released */ Uint32 noOfSections = sections->m_cnt; Uint32 tSignalId = signal->header.theSignalId; Uint32 tTrace = signal->getTrace(); Uint32 tFragInfo = signal->header.m_fragmentInfo; Uint32 ourProcessor = globalData.ownId; Uint32 recBlock = rg.m_block; check_sections(signal, signal->header.m_noOfSections, noOfSections); signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = reference(); signal->header.m_noOfSections = noOfSections; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if SignalHeader sh; sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = refToBlock(reference()); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInfo; /** * Check own node */ if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)) { #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], ourProcessor, sections->m_ptr, noOfSections); } #endif Uint32 * dst = signal->theData + length; /* We need to copy the segmented section data into separate * sections when sending locally and keeping a copy ourselves */ for (Uint32 sec=0; sec < noOfSections; sec++) { Uint32 secCopy; bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i); ndbrequire (ok); * dst ++ = secCopy; } #ifdef NDBD_MULTITHREADED if (jobBuffer == JBB) sendlocal(m_threadId, &signal->header, signal->theData, signal->theData + length); else sendprioa(m_threadId, &signal->header, signal->theData, signal->theData + length); #else globalScheduler.execute(signal, jobBuffer, recBlock, gsn); #endif rg.m_nodes.clear((Uint32)0); rg.m_nodes.clear(ourProcessor); } /** * Do the big loop */ Uint32 recNode = 0; while(!rg.m_nodes.isclear()){ recNode = rg.m_nodes.find(recNode + 1); rg.m_nodes.clear(recNode); #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], recNode, sections->m_ptr, noOfSections); } #endif #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode); #endif SendStatus ss; #ifdef NDBD_MULTITHREADED ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0], recNode, &g_sectionSegmentPool, sections->m_ptr); #else ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, g_sectionSegmentPool, sections->m_ptr); #endif ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return; } void SimulatedBlock::sendSignalWithDelay(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 delayInMilliSeconds, Uint32 length) const { BlockNumber bnr = refToBlock(ref); check_sections(signal, signal->header.m_noOfSections, 0); signal->header.theLength = length; signal->header.theSendersSignalId = signal->header.theSignalId; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = bnr; signal->header.theSendersBlockRef = reference(); #ifdef VM_TRACE { if(globalData.testOn){ globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds, signal->header, 0, &signal->theData[0], globalData.ownId); } } #endif #ifdef NDBD_MULTITHREADED senddelay(m_threadId, &signal->header, delayInMilliSeconds); #else globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds); #endif // befor 2nd parameter to globalTimeQueue.insert // (Priority)theSendSig[sigIndex].jobBuffer } void SimulatedBlock::sendSignalWithDelay(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 delayInMilliSeconds, Uint32 length, SectionHandle * sections) const { Uint32 noOfSections = sections->m_cnt; BlockNumber bnr = refToBlock(ref); BlockReference sendBRef = reference(); if (bnr == 0) { bnr_error(); }//if check_sections(signal, signal->header.m_noOfSections, noOfSections); signal->header.theLength = length; signal->header.theSendersSignalId = signal->header.theSignalId; signal->header.theSendersBlockRef = sendBRef; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = bnr; signal->header.m_noOfSections = noOfSections; Uint32 * dst = signal->theData + length; * dst ++ = sections->m_ptr[0].i; * dst ++ = sections->m_ptr[1].i; * dst ++ = sections->m_ptr[2].i; #ifdef VM_TRACE { if(globalData.testOn){ globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds, signal->header, 0, &signal->theData[0], globalData.ownId); } } #endif #ifdef NDBD_MULTITHREADED senddelay(m_threadId, &signal->header, delayInMilliSeconds); #else globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds); #endif signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; sections->m_cnt = 0; } void SimulatedBlock::release(SegmentedSectionPtr & ptr) { ::release(SB_SP_ARG ptr); } void SimulatedBlock::releaseSection(Uint32 firstSegmentIVal) { ::releaseSection(SB_SP_ARG firstSegmentIVal); } void SimulatedBlock::releaseSections(SectionHandle& handle) { ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr); handle.m_cnt = 0; } bool SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len) { return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len); } bool SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len) { return ::import(SB_SP_ARG first, src, len); } bool SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len) { Ptr<SectionSegment> tmp; if (::import(SB_SP_ARG tmp, src, len)) { ptr.i = tmp.i; ptr.p = tmp.p; ptr.sz = len; return true; } return false; } bool SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal) { return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal); } bool SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32 offset, const Uint32* src, Uint32 len) { return ::writeToSection(firstSegmentIVal, offset, src, len); } class SectionSegmentPool& SimulatedBlock::getSectionSegmentPool(){ return g_sectionSegmentPool; } NewVARIABLE * SimulatedBlock::allocateBat(int batSize){ NewVARIABLE* bat = NewVarRef; bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE)); NewVarRef = bat; theBATSize = batSize; return bat; } void SimulatedBlock::freeBat(){ if(NewVarRef != 0){ free(NewVarRef); NewVarRef = 0; } } const NewVARIABLE * SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){ assert(blockNo == blockToMain(blockNo)); SimulatedBlock * sb = globalData.getBlock(blockNo); if (sb != 0 && instanceNo != 0) sb = sb->getInstance(instanceNo); if(sb == 0) return 0; return sb->NewVarRef; } Uint16 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){ assert(blockNo == blockToMain(blockNo)); SimulatedBlock * sb = globalData.getBlock(blockNo); if (sb != 0 && instanceNo != 0) sb = sb->getInstance(instanceNo); if(sb == 0) return 0; return sb->theBATSize; } void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId) { return allocRecordAligned(type, s, n, 0, 0, clear, paramId); } void* SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId) { void * p = NULL; Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0; size_t size = n*s + over_alloc; Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc; refresh_watch_dog(9); if (real_size > 0){ #ifdef VM_TRACE_MEM ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes", getBlockName(number()), type, s, n, real_size); #endif if( real_size == (Uint64)size ) p = ndbd_malloc(size); if (p == NULL){ char buf1[255]; char buf2[255]; struct ndb_mgm_param_info param_info; size_t size = sizeof(ndb_mgm_param_info); if(0 != paramId && 0 == ndb_mgm_get_db_parameter_info(paramId, &param_info, &size)) { BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s", getBlockName(number()), param_info.m_name); } else { BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s", getBlockName(number()), type); } BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes", (Uint32)s, (Uint32)n, (Uint64)real_size); ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2); } if(clear){ char * ptr = (char*)p; const Uint32 chunk = 128 * 1024; while(size > chunk){ refresh_watch_dog(9); memset(ptr, 0, chunk); ptr += chunk; size -= chunk; } refresh_watch_dog(9); memset(ptr, 0, size); } if (unaligned_buffer) { *unaligned_buffer = p; p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc)); #ifdef VM_TRACE g_eventLogger->info("'%s' (%u) %llu %llu, alignment correction %u bytes", type, align, (Uint64)p, (Uint64)p+n*s, (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer)); #endif } } return p; } void SimulatedBlock::deallocRecord(void ** ptr, const char * type, size_t s, size_t n){ (void)type; if(* ptr != 0){ ndbd_free(* ptr, n*s); * ptr = 0; } } int SimulatedBlock::sortchunks(const void * _e0, const void * _e1) { const AllocChunk *p0 = (const AllocChunk*)_e0; const AllocChunk *p1 = (const AllocChunk*)_e1; if (p0->ptrI > p1->ptrI) return 1; if (p0->ptrI < p1->ptrI) return -1; return 0; } Uint32 SimulatedBlock::allocChunks(AllocChunk dst[], Uint32 arraysize, Uint32 rg, Uint32 pages, Uint32 paramId) { const Uint32 save = pages; // For fail Uint32 i = 0; for (; i<arraysize && pages > 0; i++) { Uint32 cnt = pages; m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1); if (unlikely(cnt == 0)) goto fail; pages -= cnt; dst[i].cnt = cnt; } if (unlikely(pages != 0)) goto fail; qsort(dst, i, sizeof(dst[0]), sortchunks); return i; fail: char buf1[255]; char buf2[255]; struct ndb_mgm_param_info param_info; size_t size = sizeof(ndb_mgm_param_info); if (ndb_mgm_get_db_parameter_info(paramId, &param_info, &size) != 0) { ndbassert(false); param_info.m_name = "<unknown>"; } BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s", getBlockName(number()), param_info.m_name); BaseString::snprintf(buf2, sizeof(buf2), "Requested: %llu bytes", Uint64(save) * sizeof(GlobalPage)); ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2); return 0; } void SimulatedBlock::refresh_watch_dog(Uint32 place) { #ifdef NDBD_MULTITHREADED (*m_watchDogCounter) = place; #else globalData.incrementWatchDogCounter(place); #endif } void SimulatedBlock::update_watch_dog_timer(Uint32 interval) { extern EmulatorData globalEmulatorData; globalEmulatorData.theWatchDog->setCheckInterval(interval); } void SimulatedBlock::progError(int line, int err_code, const char* extra) const { jamLine(line); const char *aBlockName = getBlockName(number(), "VM Kernel"); // Pack status of interesting config variables // so that we can print them in error.log int magicStatus = (m_ctx.m_config.stopOnError()<<1) + (m_ctx.m_config.getInitialStart()<<2); /* Add line number to block name */ char buf[100]; BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x", aBlockName, line, magicStatus); ErrorReporter::handleError(err_code, extra, buf); } void SimulatedBlock::infoEvent(const char * msg, ...) const { if(msg == 0) return; SignalT<25> signalT; signalT.theData[0] = NDB_LE_InfoEvent; char * buf = (char *)(signalT.theData+1); va_list ap; va_start(ap, msg); BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4 va_end(ap); int len = strlen(buf) + 1; if(len > 96){ len = 96; buf[95] = 0; } /** * Init and put it into the job buffer */ memset(&signalT.header, 0, sizeof(SignalHeader)); const Signal * signal = globalScheduler.getVMSignals(); Uint32 tTrace = signal->header.theTrace; Uint32 tSignalId = signal->header.theSignalId; signalT.header.theVerId_signalNumber = GSN_EVENT_REP; signalT.header.theReceiversBlockNumber = CMVMI; signalT.header.theSendersBlockRef = reference(); signalT.header.theTrace = tTrace; signalT.header.theSignalId = tSignalId; signalT.header.theLength = ((len+3)/4)+1; #ifdef NDBD_MULTITHREADED sendlocal(m_threadId, &signalT.header, signalT.theData, signalT.m_sectionPtrI); #else globalScheduler.execute(&signalT.header, JBB, signalT.theData, signalT.m_sectionPtrI); #endif } void SimulatedBlock::warningEvent(const char * msg, ...) const { if(msg == 0) return; SignalT<25> signalT; signalT.theData[0] = NDB_LE_WarningEvent; char * buf = (char *)(signalT.theData+1); va_list ap; va_start(ap, msg); BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4 va_end(ap); int len = strlen(buf) + 1; if(len > 96){ len = 96; buf[95] = 0; } /** * Init and put it into the job buffer */ memset(&signalT.header, 0, sizeof(SignalHeader)); const Signal * signal = globalScheduler.getVMSignals(); Uint32 tTrace = signal->header.theTrace; Uint32 tSignalId = signal->header.theSignalId; signalT.header.theVerId_signalNumber = GSN_EVENT_REP; signalT.header.theReceiversBlockNumber = CMVMI; signalT.header.theSendersBlockRef = reference(); signalT.header.theTrace = tTrace; signalT.header.theSignalId = tSignalId; signalT.header.theLength = ((len+3)/4)+1; #ifdef NDBD_MULTITHREADED sendlocal(m_threadId, &signalT.header, signalT.theData, signalT.m_sectionPtrI); #else globalScheduler.execute(&signalT.header, JBB, signalT.theData, signalT.m_sectionPtrI); #endif } void SimulatedBlock::execNODE_STATE_REP(Signal* signal){ const NodeStateRep * const rep = (NodeStateRep *)&signal->theData[0]; this->theNodeState = rep->nodeState; } void SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){ const ChangeNodeStateReq * const req = (ChangeNodeStateReq *)&signal->theData[0]; this->theNodeState = req->nodeState; const Uint32 senderData = req->senderData; const BlockReference senderRef = req->senderRef; /** * Pack return signal */ ChangeNodeStateConf * const conf = (ChangeNodeStateConf *)&signal->theData[0]; conf->senderData = senderData; sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal, ChangeNodeStateConf::SignalLength, JBB); } void SimulatedBlock::execNDB_TAMPER(Signal * signal){ if (signal->getLength() == 1) { SET_ERROR_INSERT_VALUE(signal->theData[0]); } else { SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]); } } void SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){ /* Note no need for fragmented signal handling as we are * going to crash this node */ char msg[64]; const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0]; BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()), rep->originalGsn, rep->originalLength,rep->originalSectionCount); ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY, msg, __FILE__, NST_ErrorHandler); } void SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){ ljamEntry(); ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */ switch (sig->type) { case ContinueFragmented::CONTINUE_SENDING : { ljam(); Ptr<FragmentSendInfo> fragPtr; c_segmentedFragmentSendList.first(fragPtr); for(; !fragPtr.isNull();){ ljam(); Ptr<FragmentSendInfo> copyPtr = fragPtr; c_segmentedFragmentSendList.next(fragPtr); sendNextSegmentedFragment(signal, * copyPtr.p); if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){ ljam(); if(copyPtr.p->m_callback.m_callbackFunction != 0) { ljam(); execute(signal, copyPtr.p->m_callback, 0); }//if c_segmentedFragmentSendList.release(copyPtr); } } c_linearFragmentSendList.first(fragPtr); for(; !fragPtr.isNull();){ ljam(); Ptr<FragmentSendInfo> copyPtr = fragPtr; c_linearFragmentSendList.next(fragPtr); sendNextLinearFragment(signal, * copyPtr.p); if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){ ljam(); if(copyPtr.p->m_callback.m_callbackFunction != 0) { ljam(); execute(signal, copyPtr.p->m_callback, 0); }//if c_linearFragmentSendList.release(copyPtr); } } if(c_segmentedFragmentSendList.isEmpty() && c_linearFragmentSendList.isEmpty()){ ljam(); c_fragSenderRunning = false; return; } sig->type = ContinueFragmented::CONTINUE_SENDING; sig->line = __LINE__; sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB); break; } case ContinueFragmented::CONTINUE_CLEANUP: { ljam(); const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2; /* Check length of signal */ ndbassert(signal->getLength() == ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS + callbackWords); Callback cb; memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2); doNodeFailureCleanup(signal, sig->cleanup.failedNodeId, sig->cleanup.resource, sig->cleanup.cursor, sig->cleanup.elementsCleaned, cb); break; } default: ndbrequire(false); } } void SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal) { #ifdef NDBD_MULTITHREADED mt_execSTOP_FOR_CRASH(); #endif } void SimulatedBlock::execNODE_START_REP(Signal* signal) { } void SimulatedBlock::execAPI_START_REP(Signal* signal) { } void SimulatedBlock::execSEND_PACKED(Signal* signal) { } // MT LQH callback CONF via signal const SimulatedBlock::CallbackEntry& SimulatedBlock::getCallbackEntry(Uint32 ci) { ndbrequire(m_callbackTableAddr != 0); const CallbackTable& ct = *m_callbackTableAddr; ndbrequire(ci < ct.m_count); return ct.m_entry[ci]; } void SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo, CallbackPtr& cptr, Uint32 returnCode) { Uint32 blockNo = blockToMain(fullBlockNo); Uint32 instanceNo = blockToInstance(fullBlockNo); SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo); ndbrequire(b != 0); const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex); // wl4391_todo add as arg if this is not enough Uint32 senderData = returnCode; if (!isNdbMtLqh()) { Callback c; c.m_callbackFunction = ce.m_function; c.m_callbackData = cptr.m_callbackData; b->execute(signal, c, returnCode); if (ce.m_flags & CALLBACK_ACK) { jam(); CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend(); ack->senderData = senderData; EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK, signal, CallbackAck::SignalLength); } } else { CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend(); conf->senderData = senderData; conf->senderRef = reference(); conf->callbackIndex = cptr.m_callbackIndex; conf->callbackData = cptr.m_callbackData; conf->returnCode = returnCode; if (ce.m_flags & CALLBACK_DIRECT) { jam(); EXECUTE_DIRECT(blockNo, GSN_CALLBACK_CONF, signal, CallbackConf::SignalLength, instanceNo); } else { jam(); BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId()); sendSignal(ref, GSN_CALLBACK_CONF, signal, CallbackConf::SignalLength, JBB); } } cptr.m_callbackIndex = ZNIL; } void SimulatedBlock::execCALLBACK_CONF(Signal* signal) { const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr(); Uint32 senderData = conf->senderData; Uint32 senderRef = conf->senderRef; ndbrequire(m_callbackTableAddr != 0); const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex); CallbackFunction function = ce.m_function; Callback callback; callback.m_callbackFunction = function; callback.m_callbackData = conf->callbackData; execute(signal, callback, conf->returnCode); if (ce.m_flags & CALLBACK_ACK) { jam(); CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend(); ack->senderData = senderData; sendSignal(senderRef, GSN_CALLBACK_ACK, signal, CallbackAck::SignalLength, JBB); } } #ifdef VM_TRACE_TIME void SimulatedBlock::clearTimes() { for(Uint32 i = 0; i <= MAX_GSN; i++){ m_timeTrace[i].cnt = 0; m_timeTrace[i].sum = 0; m_timeTrace[i].sub = 0; } } void SimulatedBlock::printTimes(FILE * output){ fprintf(output, "-- %s --\n", getBlockName(number())); Uint64 sum = 0; for(Uint32 i = 0; i <= MAX_GSN; i++){ Uint32 n = m_timeTrace[i].cnt; if(n != 0){ double dn = n; double avg = m_timeTrace[i].sum; double avg2 = avg - m_timeTrace[i].sub; avg /= dn; avg2 /= dn; fprintf(output, //name ; cnt ; loc ; acc "%s ; #%d ; %dus ; %dus ; %dms\n", getSignalName(i), n, (Uint32)avg, (Uint32)avg2, (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000)); sum += (m_timeTrace[i].sum - m_timeTrace[i].sub); } } sum = (sum + 500)/ 1000; fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum); fprintf(output, "\n"); fflush(output); } #endif SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){ m_fragmentId = fragId; m_senderRef = sender; m_sectionPtrI[0] = RNIL; m_sectionPtrI[1] = RNIL; m_sectionPtrI[2] = RNIL; } SimulatedBlock::FragmentSendInfo::FragmentSendInfo() { } bool SimulatedBlock::assembleFragments(Signal * signal){ Uint32 sigLen = signal->length() - 1; Uint32 fragId = signal->theData[sigLen]; Uint32 fragInfo = signal->header.m_fragmentInfo; Uint32 senderRef = signal->getSendersBlockRef(); Uint32 *sectionPtr = signal->m_sectionPtrI; if(fragInfo == 0){ return true; } const Uint32 secs = signal->header.m_noOfSections; const Uint32 * const secNos = &signal->theData[sigLen - secs]; if(fragInfo == 1){ /** * First in train */ Ptr<FragmentInfo> fragPtr; if(!c_fragmentInfoHash.seize(fragPtr)){ ndbrequire(false); return false; } new (fragPtr.p)FragmentInfo(fragId, senderRef); c_fragmentInfoHash.add(fragPtr); for(Uint32 i = 0; i<secs; i++){ Uint32 sectionNo = secNos[i]; ndbassert(sectionNo < 3); fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i]; } ndbassert(! fragPtr.p->isDropped() ); /** * Don't release allocated segments */ signal->header.m_fragmentInfo = 0; signal->header.m_noOfSections = 0; return false; } FragmentInfo key(fragId, senderRef); Ptr<FragmentInfo> fragPtr; if(c_fragmentInfoHash.find(fragPtr, key)){ /** * FragInfo == 2 or 3 */ if ( likely(! fragPtr.p->isDropped()) ) { Uint32 i; for(i = 0; i<secs; i++){ Uint32 sectionNo = secNos[i]; ndbassert(sectionNo < 3); Uint32 sectionPtrI = sectionPtr[i]; if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){ linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI); } else { fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI; } } /** * fragInfo = 2 */ if(fragInfo == 2){ signal->header.m_fragmentInfo = 0; signal->header.m_noOfSections = 0; return false; } /** * fragInfo = 3 */ for(i = 0; i<3; i++){ Uint32 ptrI = fragPtr.p->m_sectionPtrI[i]; if(ptrI != RNIL){ signal->m_sectionPtrI[i] = ptrI; } else { break; } } signal->setLength(sigLen - secs); signal->header.m_noOfSections = i; signal->header.m_fragmentInfo = 0; c_fragmentInfoHash.release(fragPtr); return true; } else { /* This fragmented signal has already had at least 1 fragment * dropped. We must release the received segments. */ for (Uint32 i=0; i < secs; i++) releaseSection( sectionPtr[i] ); signal->header.m_fragmentInfo = 0; signal->header.m_noOfSections = 0; /* FragInfo == 2 * More fragments to come, keep waiting */ if (fragInfo == 2) return false; /* FragInfo == 3 * That was the last fragment. * We're now ready for handling the dropped signal. */ SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData; Uint32 gsn = signal->header.theVerId_signalNumber; Uint32 len = signal->header.theLength; Uint32 newLen= (len > 22 ? 22 : len); memmove(rep->originalData, signal->theData, (4 * newLen)); rep->originalGsn = gsn; rep->originalLength = len; rep->originalSectionCount = 0; signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP; signal->header.theLength = newLen + 3; signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 3; /** * NOTE: Don't use EXECUTE_DIRECT as it * sets sendersBlockRef to reference() */ /* Perform dropped signal handling, in this thread, now */ executeFunction(GSN_SIGNAL_DROPPED_REP, signal); /* return false to caller - they should not process the signal */ return false; } // else (isDropped()) } /** * Unable to find fragment */ ndbrequire(false); return false; } bool SimulatedBlock::assembleDroppedFragments(Signal* signal) { /* This method is called at the start of a SIGNAL_DROPPED_REP * handler when there is a chance that the dropped signal could * be part of a fragmented signal. * If the dropped signal was a fragmented signal, this * needs to be handled specially to ensure that fragments * of the signal are correctly dropped to avoid segment * leaks etc. * There are a number of cases : * 1) First fragment dropped (FragInfo=1) * All remaining fragments must be dropped when they * arrive. The Signal dropped report handler must be * executed when the last fragment has arrived. * 2) Middle fragment dropped (FragInfo=2) * Any existing stored segments must be released. * All remaining fragments must be dropped when they * arrive. * 3) Last fragment dropped (FragInfo=3) * Any existing stored segments must be released. * Signal Dropped handling can occur, so return true. * * To indicate that a fragment has been dropped for a signal, * all the section I Values in the fragment's hash entry are * set to RNIL. * Signal Dropped Report handling is performed when the last * fragment arrives. If the last fragment is not dropped * by the transporter layer then normal fragment assembly * arranges for dropped signal handling to occur. */ Uint32 sigLen = signal->length() - 1; Uint32 fragId = signal->theData[sigLen]; Uint32 fragInfo = signal->header.m_fragmentInfo; Uint32 senderRef = signal->getSendersBlockRef(); if(fragInfo == 0){ return true; } /* This method is for handling SIGNAL_DROPPED_REP only */ ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP); ndbrequire(signal->header.m_noOfSections == 0); if(fragInfo == 1){ /** * First in train */ Ptr<FragmentInfo> fragPtr; if(!c_fragmentInfoHash.seize(fragPtr)){ ndbrequire(false); return false; } new (fragPtr.p)FragmentInfo(fragId, senderRef); c_fragmentInfoHash.add(fragPtr); /* Mark entry in hash as belonging to dropped signal so subsequent * fragments can also be dropped */ fragPtr.p->m_sectionPtrI[0]= RNIL; fragPtr.p->m_sectionPtrI[1]= RNIL; fragPtr.p->m_sectionPtrI[2]= RNIL; /* Wait for last fragment before SignalDroppedRep handling */ signal->header.m_fragmentInfo = 0; return false; } FragmentInfo key(fragId, senderRef); Ptr<FragmentInfo> fragPtr; if(c_fragmentInfoHash.find(fragPtr, key)){ /** * FragInfo == 2 or 3 */ if (! fragPtr.p->isDropped() ) { /* Fragmented Signal not already marked as dropped * Need to free stored segments */ releaseSection(fragPtr.p->m_sectionPtrI[0]); releaseSection(fragPtr.p->m_sectionPtrI[1]); releaseSection(fragPtr.p->m_sectionPtrI[2]); /* Mark as dropped now */ fragPtr.p->m_sectionPtrI[0]= RNIL; fragPtr.p->m_sectionPtrI[1]= RNIL; fragPtr.p->m_sectionPtrI[2]= RNIL; ndbassert( fragPtr.p->isDropped() ); } /** * fragInfo = 2 * Still waiting for final fragments. * Return false to caller. */ if(fragInfo == 2){ signal->header.m_fragmentInfo = 0; return false; } /** * fragInfo = 3 * All fragments received, remove entry * from hash and return to caller for * dropped signal handling. */ signal->header.m_fragmentInfo = 0; c_fragmentInfoHash.release(fragPtr); return true; } /** * Unable to find fragment */ ndbrequire(false); return false; } /** * doCleanupFragInfo * Iterate over block's Fragment assembly hash, looking * for in-assembly fragments from the failed node * Release these * Returns after each scanned bucket to avoid consuming * too much time. * * Parameters * failedNodeId : Node id of failed node * cursor : Hash bucket to start iteration from * rtUnitsUsed : Total rt units used * elementsCleaned : Number of elements cleaned * * Updates * cursor : Hash bucket to continue iteration from * rtUnitsUsed : += units used * elementsCleaned : += elements cleaned * * Returns * true if all FragInfo structs cleaned up * false if more to do */ bool SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId, Uint32& cursor, Uint32& rtUnitsUsed, Uint32& elementsCleaned) { ljam(); DLHashTable<FragmentInfo>::Iterator iter; c_fragmentInfoHash.next(cursor, iter); const Uint32 startBucket = iter.bucket; while (!iter.isNull() && (iter.bucket == startBucket)) { ljam(); Ptr<FragmentInfo> curr = iter.curr; c_fragmentInfoHash.next(iter); FragmentInfo* fragInfo = curr.p; if (refToNode(fragInfo->m_senderRef) == failedNodeId) { ljam(); /* We were assembling a fragmented signal from the * failed node, discard the partially assembled * sections and free the FragmentInfo hash entry */ for(Uint32 s = 0; s<3; s++) { if (fragInfo->m_sectionPtrI[s] != RNIL) { ljam(); SegmentedSectionPtr ssptr; getSection(ssptr, fragInfo->m_sectionPtrI[s]); release(ssptr); } } /* Release FragmentInfo hash element */ c_fragmentInfoHash.release(curr); elementsCleaned++; rtUnitsUsed+=3; } rtUnitsUsed++; } // while cursor = iter.bucket; return iter.isNull(); } bool SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId, Uint32& cursor, Uint32& rtUnitsUsed, Uint32& elementsCleaned) { ljam(); Ptr<FragmentSendInfo> fragPtr; const Uint32 NumSendLists = 2; ndbrequire(cursor < NumSendLists); DLList<FragmentSendInfo>* fragSendLists[ NumSendLists ] = { &c_segmentedFragmentSendList, &c_linearFragmentSendList }; DLList<FragmentSendInfo>* list = fragSendLists[ cursor ]; list->first(fragPtr); for(; !fragPtr.isNull();){ ljam(); Ptr<FragmentSendInfo> copyPtr = fragPtr; list->next(fragPtr); rtUnitsUsed++; NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup; if (rg.m_nodes.get(failedNodeId)) { ljam(); /* Fragmented signal is being sent to node */ rg.m_nodes.clear(failedNodeId); if (rg.m_nodes.isclear()) { ljam(); /* No other nodes in receiver group - send * is cancelled * Will be cleaned up in the usual CONTINUE_FRAGMENTED * handling code. */ copyPtr.p->m_status = FragmentSendInfo::SendCancelled; } elementsCleaned++; } } /* Next time we'll do the next list */ cursor++; return (cursor == NumSendLists); } Uint32 SimulatedBlock::doNodeFailureCleanup(Signal* signal, Uint32 failedNodeId, Uint32 resource, Uint32 cursor, Uint32 elementsCleaned, Callback& cb) { ljam(); const bool userCallback = (cb.m_callbackFunction != 0); const Uint32 maxRtUnits = userCallback ? #ifdef VM_TRACE 2 : #else 16 : #endif ~0; /* Must complete all processing in this call */ Uint32 rtUnitsUsed = 0; /* Loop over resources, cleaning them up */ do { bool resourceDone= false; switch(resource) { case ContinueFragmented::RES_FRAGSEND: { ljam(); resourceDone = doCleanupFragSend(failedNodeId, cursor, rtUnitsUsed, elementsCleaned); break; } case ContinueFragmented::RES_FRAGINFO: { ljam(); resourceDone = doCleanupFragInfo(failedNodeId, cursor, rtUnitsUsed, elementsCleaned); break; } case ContinueFragmented::RES_LAST: { ljam(); /* Node failure processing complete, execute user callback if provided */ if (userCallback) execute(signal, cb, elementsCleaned); return elementsCleaned; } default: ndbrequire(false); } /* Did we complete cleaning up this resource? */ if (resourceDone) { resource++; cursor= 0; } } while (rtUnitsUsed <= maxRtUnits); ljam(); /* Not yet completed failure handling. * Must have exhausted RT units. * Update cursor and re-invoke */ ndbassert(userCallback); /* Send signal to continue processing */ ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); sig->type = ContinueFragmented::CONTINUE_CLEANUP; sig->cleanup.failedNodeId = failedNodeId; sig->cleanup.resource = resource; sig->cleanup.cursor = cursor; sig->cleanup.elementsCleaned= elementsCleaned; Uint32 callbackWords = (sizeof(Callback) + 3) >> 2; Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS + callbackWords; ndbassert(sigLen <= 25); // Should be STATIC_ASSERT memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2); sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB); return elementsCleaned; } Uint32 SimulatedBlock::simBlockNodeFailure(Signal* signal, Uint32 failedNodeId, Callback& cb) { ljam(); return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb); } Uint32 SimulatedBlock::debugPrintFragmentCounts() { const char* blockName = getBlockName(theNumber); DLHashTable<FragmentInfo>::Iterator iter; Uint32 fragmentInfoCount = 0; c_fragmentInfoHash.first(iter); while(!iter.isNull()) { fragmentInfoCount++; c_fragmentInfoHash.next(iter); } Ptr<FragmentSendInfo> ptr; Uint32 linSendInfoCount = 0; c_linearFragmentSendList.first(ptr); while (!ptr.isNull()) { linSendInfoCount++; c_linearFragmentSendList.next(ptr); } Uint32 segSendInfoCount = 0; c_segmentedFragmentSendList.first(ptr); while (!ptr.isNull()) { segSendInfoCount++; c_segmentedFragmentSendList.next(ptr); } ndbout_c("%s : Fragment assembly hash entry count : %d", blockName, fragmentInfoCount); ndbout_c("%s : Linear fragment send list size : %d", blockName, linSendInfoCount); ndbout_c("%s : Segmented fragment send list size : %d", blockName, segSendInfoCount); return fragmentInfoCount + linSendInfoCount + segSendInfoCount; } bool SimulatedBlock::sendFirstFragment(FragmentSendInfo & info, NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jbuf, SectionHandle* sections, bool noRelease, Uint32 messageSize) { Uint32 noSections = sections->m_cnt; SegmentedSectionPtr * ptr = sections->m_ptr; info.m_sectionPtr[0].m_segmented.i = RNIL; info.m_sectionPtr[1].m_segmented.i = RNIL; info.m_sectionPtr[2].m_segmented.i = RNIL; Uint32 totalSize = 0; switch(noSections){ case 3: info.m_sectionPtr[2].m_segmented.i = ptr[2].i; info.m_sectionPtr[2].m_segmented.p = ptr[2].p; totalSize += ptr[2].sz; case 2: info.m_sectionPtr[1].m_segmented.i = ptr[1].i; info.m_sectionPtr[1].m_segmented.p = ptr[1].p; totalSize += ptr[1].sz; case 1: info.m_sectionPtr[0].m_segmented.i = ptr[0].i; info.m_sectionPtr[0].m_segmented.p = ptr[0].p; totalSize += ptr[0].sz; } if(totalSize <= messageSize + SectionSegment::DataLength){ /** * Send signal directly */ if (noRelease) sendSignalNoRelease(rg, gsn, signal, length, jbuf, sections); else sendSignal(rg, gsn, signal, length, jbuf, sections); info.m_status = FragmentSendInfo::SendComplete; return true; } /** * Setup info object */ info.m_status = FragmentSendInfo::SendNotComplete; info.m_prio = (Uint8)jbuf; info.m_gsn = gsn; info.m_fragInfo = 1; info.m_flags = 0; info.m_messageSize = messageSize; info.m_fragmentId = c_fragmentIdCounter++; info.m_nodeReceiverGroup = rg; info.m_callback.m_callbackFunction = 0; if (noRelease) { /* Record info that we are not releasing segments */ info.m_flags|= FragmentSendInfo::SendNoReleaseSeg; } else { /** * Clear sections in caller's handle. Actual send * will consume them */ sections->m_cnt = 0; } /* Store main signal data in a segment for sending later */ Ptr<SectionSegment> tmp; if(!import(tmp, &signal->theData[0], length)) { handle_out_of_longsignal_memory(0); return false; } info.m_theDataSection.p = &tmp.p->theData[0]; info.m_theDataSection.sz = length; tmp.p->theData[length] = tmp.i; sendNextSegmentedFragment(signal, info); if(c_fragmentIdCounter == 0){ /** * Fragment id 0 is invalid */ c_fragmentIdCounter = 1; } return true; } #if 0 #define lsout(x) x #else #define lsout(x) #endif void SimulatedBlock::sendNextSegmentedFragment(Signal* signal, FragmentSendInfo & info){ if (unlikely(info.m_status == FragmentSendInfo::SendCancelled)) { /* Send was cancelled - all dest. nodes have failed * since send was started */ if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)) { /* * Free any sections still to be sent */ SectionHandle handle(this); for (Uint32 s = 0; s < 3; s++) { Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i; if (sectionI != RNIL) { getSection(handle.m_ptr[handle.m_cnt], sectionI); info.m_sectionPtr[s].m_segmented.i = RNIL; info.m_sectionPtr[s].m_segmented.p = NULL; handle.m_cnt++; } } releaseSections(handle); } /* Free inline signal data storage section */ Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz]; g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI); info.m_status = FragmentSendInfo::SendComplete; return; } /** * Setup main signal data from stored copy */ const Uint32 sigLen = info.m_theDataSection.sz; memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen); Uint32 sz = 0; Uint32 maxSz = info.m_messageSize; Int32 secNo = 2; Uint32 secCount = 0; Uint32 * secNos = &signal->theData[sigLen]; SectionHandle sections(this); SegmentedSectionPtr *ptr = sections.m_ptr; bool split= false; Uint32 splitSectionStartI= RNIL; SectionSegment* splitSectionStartP= NULL; Uint32 splitSectionLastSegment= RNIL; Uint32 splitSectionSz= 0; enum { Unknown = 0, Full = 1 } loop = Unknown; for(; secNo >= 0 && secCount < 3; secNo--){ Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i; if(ptrI == RNIL) continue; info.m_sectionPtr[secNo].m_segmented.i = RNIL; SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p; const Uint32 size = ptrP->m_sz; ptr[secCount].i = ptrI; ptr[secCount].p = ptrP; ptr[secCount].sz = size; secNos[secCount] = secNo; secCount++; const Uint32 sizeLeft = maxSz - sz; if(size <= sizeLeft){ /** * The section fits */ sz += size; lsout(ndbout_c("section %d saved as %d", secNo, secCount-1)); continue; } const Uint32 overflow = size - sizeLeft; // > 0 if(overflow <= SectionSegment::DataLength){ /** * Only one segment left to send * send even if sizeLeft <= size */ lsout(ndbout_c("section %d saved as %d but full over: %d", secNo, secCount-1, overflow)); secNo--; break; } // size >= 61 if(sizeLeft < SectionSegment::DataLength){ /** * Less than one segment left (space) * dont bother sending */ secCount--; info.m_sectionPtr[secNo].m_segmented.i = ptrI; loop = Full; lsout(ndbout_c("section %d not saved", secNo)); break; } /** * Split list * 1) Find place to split * 2) Rewrite header (the part that will be sent) * 3) Write new header (for remaining part) * 4) Store new header on FragmentSendInfo - record */ // size >= 61 && sizeLeft >= 60 Uint32 sum = SectionSegment::DataLength; Uint32 prevPtrI = ptrI; ptrI = ptrP->m_nextSegment; const Uint32 fill = sizeLeft - SectionSegment::DataLength; while(sum < fill){ prevPtrI = ptrI; ptrP = g_sectionSegmentPool.getPtr(ptrI); ptrI = ptrP->m_nextSegment; sum += SectionSegment::DataLength; } Uint32 prev = secCount - 1; /** * Record details of the section pre-split * This allows the split to be 'healed' afterwards in the * no release case. */ split= true; splitSectionStartI= ptr[prev].i; splitSectionStartP= ptr[prev].p; splitSectionLastSegment= splitSectionStartP->m_lastSegment; splitSectionSz= splitSectionStartP->m_sz; /** * Rewrite header w.r.t size and last * This is what will be sent in this fragment. */ splitSectionStartP->m_lastSegment = prevPtrI; splitSectionStartP->m_sz = sum; ptr[prev].sz = sum; /** * Write "new" list header * This is what remains to be sent in this section */ ptrP = g_sectionSegmentPool.getPtr(ptrI); ptrP->m_lastSegment = splitSectionLastSegment; ptrP->m_sz = size - sum; /** * And store it on info-record */ info.m_sectionPtr[secNo].m_segmented.i = ptrI; info.m_sectionPtr[secNo].m_segmented.p = ptrP; loop = Full; lsout(ndbout_c("section %d split into %d", secNo, prev)); break; } lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d", loop, secNo, secCount, sz)); /** * Store fragment id */ secNos[secCount] = info.m_fragmentId; Uint32 fragInfo = info.m_fragInfo; info.m_fragInfo = 2; switch(loop){ case Unknown: if(secNo >= 0){ lsout(ndbout_c("Unknown - Full")); /** * Not finished */ break; } // Fall through lsout(ndbout_c("Unknown - Done")); info.m_status = FragmentSendInfo::SendComplete; ndbassert(fragInfo == 2); fragInfo = 3; case Full: break; } signal->header.m_fragmentInfo = fragInfo; signal->header.m_noOfSections = 0; sections.m_cnt = secCount; if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg) { sendSignalNoRelease(info.m_nodeReceiverGroup, info.m_gsn, signal, sigLen + secCount + 1, (JobBufferLevel)info.m_prio, &sections); /* NoRelease leaves SectionHandle populated, we'll * clear it here. The actual sections themselves * remain allocated. */ sections.m_cnt = 0; if (split) { /* There was a split section, which required us to modify the * segment list. * Now restore the split section's segment list back to * its previous state * (Only really required for first segment, but we do * it for all of them, to be a good citizen) */ ndbrequire( splitSectionStartI != RNIL ); ndbrequire( splitSectionStartP != NULL ); ndbrequire( splitSectionLastSegment != RNIL ); splitSectionStartP->m_lastSegment= splitSectionLastSegment; splitSectionStartP->m_sz= splitSectionSz; /* Check our handiwork */ assert(verifySection(splitSectionStartI)); } } else { /* Normal, release sections case */ sendSignal(info.m_nodeReceiverGroup, info.m_gsn, signal, sigLen + secCount + 1, (JobBufferLevel)info.m_prio, &sections); } if(fragInfo == 3){ /** * This is the last signal * Release saved 'main signal' words segment */ g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]); } } bool SimulatedBlock::sendFirstFragment(FragmentSendInfo & info, NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jbuf, LinearSectionPtr ptr[3], Uint32 noOfSections, Uint32 messageSize){ check_sections(signal, signal->header.m_noOfSections, noOfSections); info.m_sectionPtr[0].m_linear.p = NULL; info.m_sectionPtr[1].m_linear.p = NULL; info.m_sectionPtr[2].m_linear.p = NULL; Uint32 totalSize = 0; switch(noOfSections){ case 3: info.m_sectionPtr[2].m_linear = ptr[2]; totalSize += ptr[2].sz; case 2: info.m_sectionPtr[1].m_linear = ptr[1]; totalSize += ptr[1].sz; case 1: info.m_sectionPtr[0].m_linear = ptr[0]; totalSize += ptr[0].sz; } if(totalSize <= messageSize + SectionSegment::DataLength){ /** * Send signal directly */ sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections); info.m_status = FragmentSendInfo::SendComplete; /** * Indicate to sendLinearSignalFragment * that we'r already done */ return true; } /** * Setup info object */ info.m_status = FragmentSendInfo::SendNotComplete; info.m_prio = (Uint8)jbuf; info.m_gsn = gsn; info.m_messageSize = messageSize; info.m_fragInfo = 1; info.m_flags = 0; info.m_fragmentId = c_fragmentIdCounter++; info.m_nodeReceiverGroup = rg; info.m_callback.m_callbackFunction = 0; Ptr<SectionSegment> tmp; if(unlikely(!import(tmp, &signal->theData[0], length))) { handle_out_of_longsignal_memory(0); return false; } info.m_theDataSection.p = &tmp.p->theData[0]; info.m_theDataSection.sz = length; tmp.p->theData[length] = tmp.i; sendNextLinearFragment(signal, info); if(c_fragmentIdCounter == 0){ /** * Fragment id 0 is invalid */ c_fragmentIdCounter = 1; } return true; } void SimulatedBlock::sendNextLinearFragment(Signal* signal, FragmentSendInfo & info){ if (unlikely(info.m_status == FragmentSendInfo::SendCancelled)) { /* Send was cancelled - all dest. nodes have failed * since send was started */ /* Free inline signal data storage section */ Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz]; g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI); info.m_status = FragmentSendInfo::SendComplete; return; } /** * Store "theData" */ const Uint32 sigLen = info.m_theDataSection.sz; memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen); Uint32 sz = 0; Uint32 maxSz = info.m_messageSize; Int32 secNo = 2; Uint32 secCount = 0; Uint32 * secNos = &signal->theData[sigLen]; LinearSectionPtr signalPtr[3]; enum { Unknown = 0, Full = 2 } loop = Unknown; for(; secNo >= 0 && secCount < 3; secNo--){ Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p; if(ptrP == NULL) continue; info.m_sectionPtr[secNo].m_linear.p = NULL; const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz; signalPtr[secCount].p = ptrP; signalPtr[secCount].sz = size; secNos[secCount] = secNo; secCount++; const Uint32 sizeLeft = maxSz - sz; if(size <= sizeLeft){ /** * The section fits */ sz += size; lsout(ndbout_c("section %d saved as %d", secNo, secCount-1)); continue; } const Uint32 overflow = size - sizeLeft; // > 0 if(overflow <= SectionSegment::DataLength){ /** * Only one segment left to send * send even if sizeLeft <= size */ lsout(ndbout_c("section %d saved as %d but full over: %d", secNo, secCount-1, overflow)); secNo--; break; } // size >= 61 if(sizeLeft < SectionSegment::DataLength){ /** * Less than one segment left (space) * dont bother sending */ secCount--; info.m_sectionPtr[secNo].m_linear.p = ptrP; loop = Full; lsout(ndbout_c("section %d not saved", secNo)); break; } /** * Split list * 1) Find place to split * 2) Rewrite header (the part that will be sent) * 3) Write new header (for remaining part) * 4) Store new header on FragmentSendInfo - record */ Uint32 sum = sizeLeft; sum /= SectionSegment::DataLength; sum *= SectionSegment::DataLength; /** * Rewrite header w.r.t size */ Uint32 prev = secCount - 1; signalPtr[prev].sz = sum; /** * Write/store "new" header */ info.m_sectionPtr[secNo].m_linear.p = ptrP + sum; info.m_sectionPtr[secNo].m_linear.sz = size - sum; loop = Full; lsout(ndbout_c("section %d split into %d", secNo, prev)); break; } lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d", loop, secNo, secCount, sz)); /** * Store fragment id */ secNos[secCount] = info.m_fragmentId; Uint32 fragInfo = info.m_fragInfo; info.m_fragInfo = 2; switch(loop){ case Unknown: if(secNo >= 0){ lsout(ndbout_c("Unknown - Full")); /** * Not finished */ break; } // Fall through lsout(ndbout_c("Unknown - Done")); info.m_status = FragmentSendInfo::SendComplete; ndbassert(fragInfo == 2); fragInfo = 3; case Full: break; } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = fragInfo; sendSignal(info.m_nodeReceiverGroup, info.m_gsn, signal, sigLen + secCount + 1, (JobBufferLevel)info.m_prio, signalPtr, secCount); if(fragInfo == 3){ /** * This is the last signal */ g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]); } } void SimulatedBlock::sendFragmentedSignal(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jbuf, SectionHandle* sections, Callback & c, Uint32 messageSize){ bool res = true; Ptr<FragmentSendInfo> tmp; res = c_segmentedFragmentSendList.seize(tmp); ndbrequire(res); res = sendFirstFragment(* tmp.p, NodeReceiverGroup(ref), gsn, signal, length, jbuf, sections, false, // Release sections on send messageSize); ndbrequire(res); if(tmp.p->m_status == FragmentSendInfo::SendComplete){ c_segmentedFragmentSendList.release(tmp); if(c.m_callbackFunction != 0) execute(signal, c, 0); return; } tmp.p->m_callback = c; if(!c_fragSenderRunning) { SaveSignal<2> save(signal); c_fragSenderRunning = true; ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); sig->type = ContinueFragmented::CONTINUE_SENDING; sig->line = __LINE__; sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB); } } void SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jbuf, SectionHandle * sections, Callback & c, Uint32 messageSize){ bool res = true; Ptr<FragmentSendInfo> tmp; res = c_segmentedFragmentSendList.seize(tmp); ndbrequire(res); res = sendFirstFragment(* tmp.p, rg, gsn, signal, length, jbuf, sections, false, // Release sections on send messageSize); ndbrequire(res); if(tmp.p->m_status == FragmentSendInfo::SendComplete){ c_segmentedFragmentSendList.release(tmp); if(c.m_callbackFunction != 0) execute(signal, c, 0); return; } tmp.p->m_callback = c; if(!c_fragSenderRunning) { SaveSignal<2> save(signal); c_fragSenderRunning = true; ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); sig->type = ContinueFragmented::CONTINUE_SENDING; sig->line = __LINE__; sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB); } } SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0}; void SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32) { abort(); /* should never be called */ } SimulatedBlock::Callback SimulatedBlock::TheNULLCallback = { &SimulatedBlock::TheNULLCallbackFunction, 0 }; void SimulatedBlock::sendFragmentedSignal(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jbuf, LinearSectionPtr ptr[3], Uint32 noOfSections, Callback & c, Uint32 messageSize){ bool res = true; Ptr<FragmentSendInfo> tmp; res = c_linearFragmentSendList.seize(tmp); ndbrequire(res); res = sendFirstFragment(* tmp.p, NodeReceiverGroup(ref), gsn, signal, length, jbuf, ptr, noOfSections, messageSize); ndbrequire(res); if(tmp.p->m_status == FragmentSendInfo::SendComplete){ c_linearFragmentSendList.release(tmp); if(c.m_callbackFunction != 0) execute(signal, c, 0); return; } tmp.p->m_callback = c; if(!c_fragSenderRunning) { SaveSignal<2> save(signal); c_fragSenderRunning = true; ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); sig->type = ContinueFragmented::CONTINUE_SENDING; sig->line = __LINE__; sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB); } } void SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jbuf, LinearSectionPtr ptr[3], Uint32 noOfSections, Callback & c, Uint32 messageSize){ bool res = true; Ptr<FragmentSendInfo> tmp; res = c_linearFragmentSendList.seize(tmp); ndbrequire(res); res = sendFirstFragment(* tmp.p, rg, gsn, signal, length, jbuf, ptr, noOfSections, messageSize); ndbrequire(res); if(tmp.p->m_status == FragmentSendInfo::SendComplete){ c_linearFragmentSendList.release(tmp); if(c.m_callbackFunction != 0) execute(signal, c, 0); return; } tmp.p->m_callback = c; if(!c_fragSenderRunning) { SaveSignal<2> save(signal); c_fragSenderRunning = true; ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); sig->type = ContinueFragmented::CONTINUE_SENDING; sig->line = __LINE__; sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB); } } NodeInfo & SimulatedBlock::setNodeInfo(NodeId nodeId) { ndbrequire(nodeId > 0 && nodeId < MAX_NODES); return globalData.m_nodeInfo[nodeId]; } bool SimulatedBlock::isMultiThreaded() { #ifdef NDBD_MULTITHREADED return true; #else return false; #endif } void SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){ ljamEntry(); c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal); } void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){ ljamEntry(); c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal); } void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){ ljamEntry(); c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal); } void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){ ljamEntry(); c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal); } void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){ ljamEntry(); c_mutexMgr.execUTIL_LOCK_REF(signal); } void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){ ljamEntry(); c_mutexMgr.execUTIL_LOCK_CONF(signal); } void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){ ljamEntry(); c_mutexMgr.execUTIL_UNLOCK_REF(signal); } void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){ ljamEntry(); c_mutexMgr.execUTIL_UNLOCK_CONF(signal); } void SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal, Uint32 ptrI, Uint32 retVal){ c_mutexMgr.release(ptrI); } void SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg) { const FsRef *fsRef = (FsRef*)signal->getDataPtr(); Uint32 errorCode = fsRef->errorCode; Uint32 osErrorCode = fsRef->osErrorCode; char msg2[100]; sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode); progError(line, errorCode, msg2); } void SimulatedBlock::execFSWRITEREF(Signal* signal) { fsRefError(signal, __LINE__, "File system write failed"); } void SimulatedBlock::execFSREADREF(Signal* signal) { fsRefError(signal, __LINE__, "File system read failed"); } void SimulatedBlock::execFSCLOSEREF(Signal* signal) { fsRefError(signal, __LINE__, "File system close failed"); } void SimulatedBlock::execFSOPENREF(Signal* signal) { fsRefError(signal, __LINE__, "File system open failed"); } void SimulatedBlock::execFSREMOVEREF(Signal* signal) { fsRefError(signal, __LINE__, "File system remove failed"); } void SimulatedBlock::execFSSYNCREF(Signal* signal) { fsRefError(signal, __LINE__, "File system sync failed"); } void SimulatedBlock::execFSAPPENDREF(Signal* signal) { fsRefError(signal, __LINE__, "File system append failed"); } #ifdef VM_TRACE static Ptr<void> * m_empty_global_variables[] = { 0 }; void SimulatedBlock::disable_global_variables() { m_global_variables_save = m_global_variables; m_global_variables = m_empty_global_variables; } void SimulatedBlock::enable_global_variables() { if (m_global_variables == m_empty_global_variables) { m_global_variables = m_global_variables_save; } } void SimulatedBlock::clear_global_variables(){ Ptr<void> ** tmp = m_global_variables; while(* tmp != 0){ (* tmp)->i = RNIL; (* tmp)->p = 0; tmp++; } } void SimulatedBlock::init_globals_list(void ** tmp, size_t cnt){ m_global_variables = new Ptr<void> * [cnt+1]; for(size_t i = 0; i<cnt; i++){ m_global_variables[i] = (Ptr<void>*)tmp[i]; } m_global_variables[cnt] = 0; } #endif #include "KeyDescriptor.hpp" Uint32 SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src, Uint32 *dst, Uint32 dstSize, Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const { const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab); const Uint32 noOfKeyAttr = desc->noOfKeyAttr; Uint32 i = 0; Uint32 srcPos = 0; Uint32 dstPos = 0; while (i < noOfKeyAttr) { const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i]; Uint32 dstWords = xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo, src, srcPos, dst, dstPos, dstSize); keyPartLen[i++] = dstWords; if (unlikely(dstWords == 0)) return 0; } if (0) { for(Uint32 i = 0; i<dstPos; i++) { printf("%.8x ", dst[i]); } printf("\n"); } return dstPos; } Uint32 SimulatedBlock::xfrm_attr(Uint32 attrDesc, CHARSET_INFO* cs, const Uint32* src, Uint32 & srcPos, Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const { Uint32 array = AttributeDescriptor::getArrayType(attrDesc); Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDesc); Uint32 srcWords = ~0; Uint32 dstWords = ~0; uchar* dstPtr = (uchar*)&dst[dstPos]; const uchar* srcPtr = (const uchar*)&src[srcPos]; if (cs == NULL) { jam(); Uint32 len; LINT_INIT(len); switch(array){ case NDB_ARRAYTYPE_SHORT_VAR: len = 1 + srcPtr[0]; break; case NDB_ARRAYTYPE_MEDIUM_VAR: len = 2 + srcPtr[0] + (srcPtr[1] << 8); break; #ifndef VM_TRACE default: abort(); #endif case NDB_ARRAYTYPE_FIXED: len = srcBytes; } srcWords = (len + 3) >> 2; dstWords = srcWords; memcpy(dstPtr, srcPtr, dstWords << 2); if (0) { ndbout_c("srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d", srcPos, dstPos, len, srcWords, dstWords); for(Uint32 i = 0; i<srcWords; i++) printf("%.8x ", src[srcPos + i]); printf("\n"); } } else { jam(); Uint32 typeId = AttributeDescriptor::getType(attrDesc); Uint32 lb, len; bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len); if (unlikely(!ok)) return 0; Uint32 xmul = cs->strxfrm_multiply; if (xmul == 0) xmul = 1; /* * Varchar end-spaces are ignored in comparisons. To get same hash * we blank-pad to maximum length via strnxfrm. */ Uint32 dstLen = xmul * (srcBytes - lb); ndbrequire(dstLen <= ((dstSize - dstPos) << 2)); int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len); if (unlikely(n == -1)) return 0; while ((n & 3) != 0) { dstPtr[n++] = 0; } dstWords = (n >> 2); srcWords = (lb + len + 3) >> 2; } dstPos += dstWords; srcPos += srcWords; return dstWords; } Uint32 SimulatedBlock::create_distr_key(Uint32 tableId, const Uint32 *src, Uint32* dst, const Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const { const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId); const Uint32 noOfKeyAttr = desc->noOfKeyAttr; Uint32 noOfDistrKeys = desc->noOfDistrKeys; Uint32 i = 0; Uint32 dstPos = 0; /* --Note that src and dst may be the same location-- */ if(keyPartLen) { while (i < noOfKeyAttr && noOfDistrKeys) { Uint32 attr = desc->keyAttr[i].attributeDescriptor; Uint32 len = keyPartLen[i]; if(AttributeDescriptor::getDKey(attr)) { noOfDistrKeys--; memmove(dst+dstPos, src, len << 2); dstPos += len; } src += len; i++; } } else { while (i < noOfKeyAttr && noOfDistrKeys) { Uint32 attr = desc->keyAttr[i].attributeDescriptor; Uint32 len = AttributeDescriptor::getSizeInWords(attr); ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED); if(AttributeDescriptor::getDKey(attr)) { noOfDistrKeys--; memmove(dst+dstPos, src, len << 2); dstPos += len; } src += len; i++; } } return dstPos; } CArray<KeyDescriptor> g_key_descriptor_pool; void SimulatedBlock::sendRoutedSignal(RoutePath path[], Uint32 pathcnt, Uint32 dst[], Uint32 dstcnt, Uint32 gsn, Signal * signal, Uint32 sigLen, JobBufferLevel prio, SectionHandle * userhandle) { ndbrequire(pathcnt > 0); // don't support (now) directly multi-cast pathcnt--; // first hop is made from here Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt; ndbrequire(len <= 25); SectionHandle handle(this, signal); if (userhandle) { ljam(); handle.m_cnt = userhandle->m_cnt; for (Uint32 i = 0; i<handle.m_cnt; i++) handle.m_ptr[i] = userhandle->m_ptr[i]; userhandle->m_cnt = 0; } if (len + sigLen > 25) { ljam(); /** * we need to store theData in a section */ ndbrequire(handle.m_cnt < 3); handle.m_ptr[2] = handle.m_ptr[1]; handle.m_ptr[1] = handle.m_ptr[0]; Ptr<SectionSegment> tmp; if (unlikely(! import(tmp, signal->theData, sigLen))) { handle_out_of_longsignal_memory(0); } handle.m_ptr[0].p = tmp.p; handle.m_ptr[0].i = tmp.i; handle.m_ptr[0].sz = sigLen; handle.m_cnt ++; } else { ljam(); memmove(signal->theData + len, signal->theData, 4 * sigLen); len += sigLen; } LocalRouteOrd * ord = (LocalRouteOrd*)signal->getDataPtrSend(); ord->cnt = (pathcnt << 16) | (dstcnt); ord->gsn = gsn; ord->prio = Uint32(prio); Uint32 * dstptr = ord->path; for (Uint32 i = 1; i <= pathcnt; i++) { ndbrequire(refToNode(path[i].ref) == 0 || refToNode(path[i].ref) == getOwnNodeId()); * dstptr++ = path[i].ref; * dstptr++ = Uint32(path[i].prio); } for (Uint32 i = 0; i<dstcnt; i++) { ndbrequire(refToNode(dst[i]) == 0 || refToNode(dst[i]) == getOwnNodeId()); * dstptr++ = dst[i]; } sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len, path[0].prio, &handle); } void SimulatedBlock::execLOCAL_ROUTE_ORD(Signal* signal) { ljamEntry(); if (!assembleFragments(signal)) { ljam(); return; } if (ERROR_INSERTED(1001)) { /** * This NDBCNTR error code 1001 */ ljam(); SectionHandle handle(this, signal); sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200, signal->getLength(), &handle); return; } LocalRouteOrd* ord = (LocalRouteOrd*)signal->getDataPtr(); Uint32 pathcnt = ord->cnt >> 16; Uint32 dstcnt = ord->cnt & 0xFFFF; Uint32 sigLen = signal->getLength(); if (pathcnt == 0) { /** * Send to final destination(s); */ ljam(); Uint32 gsn = ord->gsn; Uint32 prio = ord->prio; memcpy(signal->theData+25, ord->path, 4*dstcnt); SectionHandle handle(this, signal); if (sigLen > LocalRouteOrd::StaticLen + dstcnt) { ljam(); /** * Data is at end of this... */ memmove(signal->theData, signal->theData + LocalRouteOrd::StaticLen + dstcnt, 4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt))); sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt); } else { ljam(); /** * Put section 0 in signal->theData */ sigLen = handle.m_ptr[0].sz; ndbrequire(sigLen <= 25); copy(signal->theData, handle.m_ptr[0]); release(handle.m_ptr[0]); for (Uint32 i = 0; i < handle.m_cnt - 1; i++) handle.m_ptr[i] = handle.m_ptr[i+1]; handle.m_cnt--; } /* * The extra if-statement is as sendSignalNoRelease will copy sections * which is not necessary is only sending to one destination */ if (dstcnt > 1) { jam(); for (Uint32 i = 0; i<dstcnt; i++) { ljam(); sendSignalNoRelease(signal->theData[25+i], gsn, signal, sigLen, JobBufferLevel(prio), &handle); } releaseSections(handle); } else { jam(); sendSignal(signal->theData[25+0], gsn, signal, sigLen, JobBufferLevel(prio), &handle); } } else { /** * Reroute */ ljam(); SectionHandle handle(this, signal); Uint32 ref = ord->path[0]; Uint32 prio = ord->path[1]; Uint32 len = sigLen - 2; ord->cnt = ((pathcnt - 1) << 16) | dstcnt; memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen)); sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len, JobBufferLevel(prio), &handle); } } #ifdef VM_TRACE bool SimulatedBlock::debugOutOn() { SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut; return globalData.testOn && globalSignalLoggers.logMatch(number(), mask); } const char* SimulatedBlock::debugOutTag(char *buf, int line) { char blockbuf[40]; char instancebuf[40]; char linebuf[40]; char timebuf[40]; sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN")); instancebuf[0] = 0; if (instance() != 0) sprintf(instancebuf, "/%u", instance()); sprintf(linebuf, " %d", line); timebuf[0] = 0; #ifdef VM_TRACE_TIME { NDB_TICKS t = NdbTick_CurrentMillisecond(); uint s = (t / 1000) % 3600; uint ms = t % 1000; sprintf(timebuf, " - %u.%03u -", s, ms); } #endif sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf); return buf; } #endif void SimulatedBlock::synchronize_threads_for_blocks(Signal * signal, const Uint32 blocks[], const Callback & cb, JobBufferLevel prio) { #ifndef NDBD_MULTITHREADED Callback copy = cb; execute(signal, copy, 0); #else ljam(); Uint32 ref[32]; // max threads Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(), ref, NDB_ARRAY_SIZE(ref)); if (cnt == 0) { ljam(); Callback copy = cb; execute(signal, copy, 0); return; } Ptr<SyncThreadRecord> ptr; ndbrequire(c_syncThreadPool.seize(ptr)); ptr.p->m_cnt = cnt; ptr.p->m_callback = cb; signal->theData[0] = reference(); signal->theData[1] = ptr.i; signal->theData[2] = Uint32(prio); for (Uint32 i = 0; i<cnt; i++) { sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio); } #endif } void SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal) { ljamEntry(); Uint32 ref = signal->theData[0]; Uint32 prio = signal->theData[2]; sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(), JobBufferLevel(prio)); } void SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal) { ljamEntry(); Ptr<SyncThreadRecord> ptr; c_syncThreadPool.getPtr(ptr, signal->theData[1]); if (ptr.p->m_cnt == 1) { ljam(); Callback copy = ptr.p->m_callback; c_syncThreadPool.release(ptr); execute(signal, copy, 0); return; } ptr.p->m_cnt --; } void SimulatedBlock::execSYNC_REQ(Signal* signal) { ljamEntry(); Uint32 ref = signal->theData[0]; Uint32 prio = signal->theData[2]; sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(), JobBufferLevel(prio)); } void SimulatedBlock::synchronize_path(Signal * signal, const Uint32 blocks[], const Callback & cb, JobBufferLevel prio) { ljam(); // reuse SyncThreadRecord Ptr<SyncThreadRecord> ptr; ndbrequire(c_syncThreadPool.seize(ptr)); ptr.p->m_cnt = 0; // with count of 0 ptr.p->m_callback = cb; SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend()); req->senderData = ptr.i; req->prio = Uint32(prio); req->count = 1; if (blocks[0] == 0) { ljam(); ndbrequire(false); // TODO } else { ljam(); Uint32 len = 0; for (; blocks[len+1] != 0; len++) { req->path[len] = blocks[len+1]; } req->pathlen = 1 + len; req->path[len] = reference(); sendSignal(numberToRef(blocks[0], getOwnNodeId()), GSN_SYNC_PATH_REQ, signal, SyncPathReq::SignalLength + (1 + len), prio); } } void SimulatedBlock::execSYNC_PATH_REQ(Signal* signal) { ljamEntry(); SyncPathReq * req = CAST_PTR(SyncPathReq, signal->getDataPtrSend()); if (req->pathlen == 1) { ljam(); SyncPathReq copy = *req; SyncPathConf* conf = CAST_PTR(SyncPathConf, signal->getDataPtrSend()); conf->senderData = copy.senderData; conf->count = copy.count; sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal, SyncPathConf::SignalLength, JobBufferLevel(copy.prio)); } else { ljam(); Uint32 ref = numberToRef(req->path[0], getOwnNodeId()); req->pathlen--; memmove(req->path, req->path + 1, 4 * req->pathlen); sendSignal(ref, GSN_SYNC_PATH_REQ, signal, SyncPathReq::SignalLength + (1 + req->pathlen), JobBufferLevel(req->prio)); } } void SimulatedBlock::execSYNC_PATH_CONF(Signal* signal) { ljamEntry(); SyncPathConf conf = * CAST_CONSTPTR(SyncPathConf, signal->getDataPtr()); Ptr<SyncThreadRecord> ptr; c_syncThreadPool.getPtr(ptr, conf.senderData); if (ptr.p->m_cnt == 0) { ljam(); ptr.p->m_cnt = conf.count; } if (ptr.p->m_cnt == 1) { ljam(); Callback copy = ptr.p->m_callback; c_syncThreadPool.release(ptr); execute(signal, copy, 0); return; } ptr.p->m_cnt --; } bool SimulatedBlock::checkNodeFailSequence(Signal* signal) { Uint32 ref = signal->getSendersBlockRef(); /** * Make sure that a signal being part of node-failure handling * from a remote node, does not get to us before we got the NODE_FAILREP * (this to avoid tricky state handling) * * To ensure this, we send the signal via QMGR (GSN_COMMIT_FAILREQ) * and NDBCNTR (which sends NODE_FAILREP) * * The extra time should be negilable * * Note, make an exception for signals sent by our self * as they are only sent as a consequence of NODE_FAILREP */ if (ref == reference() || (refToNode(ref) == getOwnNodeId() && refToMain(ref) == NDBCNTR)) { ljam(); return true; } RoutePath path[2]; path[0].ref = QMGR_REF; path[0].prio = JBB; path[1].ref = NDBCNTR_REF; path[1].prio = JBB; Uint32 dst[1]; dst[0] = reference(); SectionHandle handle(this, signal); Uint32 gsn = signal->header.theVerId_signalNumber; Uint32 len = signal->getLength(); sendRoutedSignal(path, 2, dst, 1, gsn, signal, len, JBB, &handle); return false; } void SimulatedBlock::setup_wakeup() { #ifdef NDBD_MULTITHREADED #else globalTransporterRegistry.setup_wakeup_socket(); #endif } void SimulatedBlock::wakeup() { #ifdef NDBD_MULTITHREADED mt_wakeup(this); #else globalTransporterRegistry.wakeup(); #endif } void SimulatedBlock::ndbinfo_send_row(Signal* signal, const DbinfoScanReq& req, const Ndbinfo::Row& row, Ndbinfo::Ratelimit& rl) const { // Check correct number of columns against table assert(row.columns() == Ndbinfo::getTable(req.tableId).columns()); TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend(); tidai->connectPtr= req.resultData; tidai->transId[0]= req.transId[0]; tidai->transId[1]= req.transId[1]; LinearSectionPtr ptr[3]; ptr[0].p = row.getDataPtr(); ptr[0].sz = row.getLength(); rl.rows++; rl.bytes += row.getLength(); sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI, signal, TransIdAI::HeaderLength, JBB, ptr, 1); } void SimulatedBlock::ndbinfo_send_scan_break(Signal* signal, DbinfoScanReq& req, const Ndbinfo::Ratelimit& rl, Uint32 data1, Uint32 data2, Uint32 data3, Uint32 data4) const { DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend(); const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz; MEMCOPY_NO_WORDS(conf, &req, signal_length); conf->returnedRows = rl.rows; // Update the cursor with current item number Ndbinfo::ScanCursor* cursor = (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf); cursor->data[0] = data1; cursor->data[1] = data2; cursor->data[2] = data3; cursor->data[3] = data4; // Increase number of rows and bytes sent to far cursor->totalRows += rl.rows; cursor->totalBytes += rl.bytes; Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true); sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB); } void SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal, DbinfoScanReq& req, const Ndbinfo::Ratelimit& rl) const { DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend(); const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz; Uint32 sender_ref = req.resultRef; MEMCOPY_NO_WORDS(conf, &req, signal_length); conf->returnedRows = rl.rows; if (req.cursor_sz) { jam(); // Update the cursor with current item number Ndbinfo::ScanCursor* cursor = (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf); // Reset all data holders memset(cursor->data, 0, sizeof(cursor->data)); // Increase number of rows and bytes sent to far cursor->totalRows += rl.rows; cursor->totalBytes += rl.bytes; Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false); sender_ref = cursor->senderRef; } sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal, signal_length, JBB); } #ifdef VM_TRACE void SimulatedBlock::assertOwnThread() { #ifdef NDBD_MULTITHREADED mt_assert_own_thread(this); #endif } #endif