storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp (5,653 lines of code) (raw):

/* Copyright (c) 2004, 2011, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #define DBSPJ_C #include "Dbspj.hpp" #include <SectionReader.hpp> #include <signaldata/LqhKey.hpp> #include <signaldata/QueryTree.hpp> #include <signaldata/TcKeyRef.hpp> #include <signaldata/RouteOrd.hpp> #include <signaldata/TransIdAI.hpp> #include <signaldata/DiGetNodes.hpp> #include <signaldata/DihScanTab.hpp> #include <signaldata/AttrInfo.hpp> #include <Interpreter.hpp> #include <AttributeHeader.hpp> #include <AttributeDescriptor.hpp> #include <KeyDescriptor.hpp> #include <md5_hash.hpp> #include <signaldata/TcKeyConf.hpp> #include <signaldata/NodeFailRep.hpp> #include <signaldata/ReadNodesConf.hpp> // Use DEBUG to print messages that should be // seen only when we debug the product #ifdef VM_TRACE #define DEBUG(x) ndbout << "DBSPJ: "<< x << endl; #define DEBUG_LQHKEYREQ #define DEBUG_SCAN_FRAGREQ #else #define DEBUG(x) #endif #if 1 #define DEBUG_CRASH() ndbrequire(false) #else #define DEBUG_CRASH() #endif #if 1 #undef DEBUG #define DEBUG(x) #undef DEBUG_LQHKEYREQ #undef DEBUG_SCAN_FRAGREQ #endif const Ptr<Dbspj::TreeNode> Dbspj::NullTreeNodePtr = { 0, RNIL }; const Dbspj::RowRef Dbspj::NullRowRef = { RNIL, GLOBAL_PAGE_SIZE_WORDS, { 0 } }; /** A noop for now.*/ void Dbspj::execREAD_CONFIG_REQ(Signal* signal) { jamEntry(); const ReadConfigReq req = *reinterpret_cast<const ReadConfigReq*>(signal->getDataPtr()); Pool_context pc; pc.m_block = this; DEBUG("execREAD_CONFIG_REQ"); DEBUG("sizeof(Request): " << sizeof(Request) << " sizeof(TreeNode): " << sizeof(TreeNode)); m_arenaAllocator.init(1024, RT_SPJ_ARENA_BLOCK, pc); m_request_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_REQUEST, pc); m_treenode_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_TREENODE, pc); m_scanfraghandle_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_SCANFRAG, pc); m_lookup_request_hash.setSize(16); m_scan_request_hash.setSize(16); void* ptr = m_ctx.m_mm.get_memroot(); m_page_pool.set((RowPage*)ptr, (Uint32)~0); Record_info ri; Dependency_map::createRecordInfo(ri, RT_SPJ_DATABUFFER); m_dependency_map_pool.init(&m_arenaAllocator, ri, pc); ReadConfigConf* const conf = reinterpret_cast<ReadConfigConf*>(signal->getDataPtrSend()); conf->senderRef = reference(); conf->senderData = req.senderData; sendSignal(req.senderRef, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB); }//Dbspj::execREAD_CONF_REQ() static Uint32 f_STTOR_REF = 0; void Dbspj::execSTTOR(Signal* signal) { //#define UNIT_TEST_DATABUFFER2 jamEntry(); /* START CASE */ const Uint16 tphase = signal->theData[1]; f_STTOR_REF = signal->getSendersBlockRef(); ndbout << "Dbspj::execSTTOR() inst:" << instance() << " phase=" << tphase << endl; if (tphase == 1) { jam(); signal->theData[0] = 0; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 1000, 1); } if (tphase == 4) { jam(); signal->theData[0] = reference(); sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB); return; } sendSTTORRY(signal); #ifdef UNIT_TEST_DATABUFFER2 if (tphase == 120) { ndbout_c("basic test of ArenaPool / DataBuffer2"); for (Uint32 i = 0; i<100; i++) { ArenaHead ah; if (!m_arenaAllocator.seize(ah)) { ndbout_c("Failed to allocate arena"); break; } ndbout_c("*** LOOP %u", i); Uint32 sum = 0; Dependency_map::Head head; LocalArenaPoolImpl pool(ah, m_dependency_map_pool); for (Uint32 j = 0; j<100; j++) { Uint32 sz = rand() % 1000; if (0) ndbout_c("adding %u", sz); Local_dependency_map list(pool, head); for (Uint32 i = 0; i<sz; i++) signal->theData[i] = sum + i; list.append(signal->theData, sz); sum += sz; } { ndbrequire(head.getSize() == sum); Local_dependency_map list(pool, head); Dependency_map::ConstDataBufferIterator it; Uint32 cnt = 0; for (list.first(it); !it.isNull(); list.next(it)) { ndbrequire(* it.data == cnt); cnt++; } ndbrequire(cnt == sum); } Resource_limit rl; if (m_ctx.m_mm.get_resource_limit(7, rl)) { ndbout_c("Resource %d min: %d max: %d curr: %d", 7, rl.m_min, rl.m_max, rl.m_curr); } { ndbout_c("release map"); Local_dependency_map list(pool, head); list.release(); } ndbout_c("release all"); m_arenaAllocator.release(ah); ndbout_c("*** LOOP %u sum: %u", i, sum); } } #endif }//Dbspj::execSTTOR() void Dbspj::sendSTTORRY(Signal* signal) { signal->theData[0] = 0; signal->theData[1] = 0; /* BLOCK CATEGORY */ signal->theData[2] = 0; /* SIGNAL VERSION NUMBER */ signal->theData[3] = 4; #ifdef UNIT_TEST_DATABUFFER2 signal->theData[4] = 120; /* Start phase end*/ #else signal->theData[4] = 255; #endif signal->theData[5] = 255; sendSignal(f_STTOR_REF, GSN_STTORRY, signal, 6, JBB); } void Dbspj::execREAD_NODESCONF(Signal* signal) { jamEntry(); ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr(); if (getNodeState().getNodeRestartInProgress()) { jam(); c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes); c_alive_nodes.set(getOwnNodeId()); } else { jam(); c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes); NdbNodeBitmask tmp; tmp.assign(NdbNodeBitmask::Size, conf->startedNodes); c_alive_nodes.bitOR(tmp); } sendSTTORRY(signal); } void Dbspj::execINCL_NODEREQ(Signal* signal) { jamEntry(); const Uint32 senderRef = signal->theData[0]; const Uint32 nodeId = signal->theData[1]; ndbrequire(!c_alive_nodes.get(nodeId)); c_alive_nodes.set(nodeId); signal->theData[0] = nodeId; signal->theData[1] = reference(); sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB); } void Dbspj::execNODE_FAILREP(Signal* signal) { jamEntry(); const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr(); NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes); c_alive_nodes.bitANDC(failed); signal->theData[0] = 1; signal->theData[1] = 0; failed.copyto(NdbNodeBitmask::Size, signal->theData + 2); sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size, JBB); } void Dbspj::execAPI_FAILREQ(Signal* signal) { jamEntry(); Uint32 failedApiNode = signal->theData[0]; ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR /** * We only need to care about lookups * as SCAN's are aborted by DBTC */ signal->theData[0] = failedApiNode; signal->theData[1] = reference(); sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB); } void Dbspj::execCONTINUEB(Signal* signal) { jamEntry(); switch(signal->theData[0]) { case 0: releaseGlobal(signal); return; case 1: nodeFail_checkRequests(signal); return; case 2: nodeFail_checkRequests(signal); return; } ndbrequire(false); } void Dbspj::nodeFail_checkRequests(Signal* signal) { jam(); const Uint32 type = signal->theData[0]; const Uint32 bucket = signal->theData[1]; NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, signal->theData+2); Request_iterator iter; Request_hash * hash; switch(type){ case 1: hash = &m_lookup_request_hash; break; case 2: hash = &m_scan_request_hash; break; } hash->next(bucket, iter); const Uint32 RT_BREAK = 64; for(Uint32 i = 0; (i<RT_BREAK || iter.bucket == bucket) && !iter.curr.isNull(); i++) { jam(); Ptr<Request> requestPtr = iter.curr; hash->next(iter); i += nodeFail(signal, requestPtr, failed); } if (!iter.curr.isNull()) { jam(); signal->theData[0] = type; signal->theData[1] = bucket; failed.copyto(NdbNodeBitmask::Size, signal->theData+2); sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size, JBB); } else if (type == 1) { jam(); signal->theData[0] = 2; signal->theData[1] = 0; failed.copyto(NdbNodeBitmask::Size, signal->theData+2); sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size, JBB); } else if (type == 2) { jam(); ndbout_c("Finished with handling node-failure"); } } /** * MODULE LQHKEYREQ */ void Dbspj::execLQHKEYREQ(Signal* signal) { jamEntry(); c_Counters.incr_counter(CI_READS_RECEIVED, 1); const LqhKeyReq* req = reinterpret_cast<const LqhKeyReq*>(signal->getDataPtr()); /** * #0 - KEYINFO contains key for first operation (used for hash in TC) * #1 - ATTRINFO contains tree + parameters * (unless StoredProcId is set, when only paramters are sent, * but this is not yet implemented) */ SectionHandle handle = SectionHandle(this, signal); SegmentedSectionPtr ssPtr; handle.getSection(ssPtr, LqhKeyReq::AttrInfoSectionNum); Uint32 err; Ptr<Request> requestPtr = { 0, RNIL }; do { ArenaHead ah; err = DbspjErr::OutOfQueryMemory; if (unlikely(!m_arenaAllocator.seize(ah))) break; m_request_pool.seize(ah, requestPtr); new (requestPtr.p) Request(ah); do_init(requestPtr.p, req, signal->getSendersBlockRef()); Uint32 len_cnt; { SectionReader r0(ssPtr, getSectionSegmentPool()); err = DbspjErr::ZeroLengthQueryTree; if (unlikely(!r0.getWord(&len_cnt))) break; } Uint32 len = QueryTree::getLength(len_cnt); Uint32 cnt = QueryTree::getNodeCnt(len_cnt); { SectionReader treeReader(ssPtr, getSectionSegmentPool()); SectionReader paramReader(ssPtr, getSectionSegmentPool()); paramReader.step(len); // skip over tree to parameters Build_context ctx; ctx.m_resultRef = req->variableData[0]; ctx.m_savepointId = req->savePointId; ctx.m_scanPrio = 1; ctx.m_start_signal = signal; ctx.m_keyPtr.i = handle.m_ptr[LqhKeyReq::KeyInfoSectionNum].i; ctx.m_senderRef = signal->getSendersBlockRef(); err = build(ctx, requestPtr, treeReader, paramReader); if (unlikely(err != 0)) break; } /** * a query being shipped as a LQHKEYREQ may only return finite rows * i.e be a (multi-)lookup */ ndbassert(requestPtr.p->isLookup()); ndbassert(requestPtr.p->m_node_cnt == cnt); err = DbspjErr::InvalidRequest; if (unlikely(!requestPtr.p->isLookup() || requestPtr.p->m_node_cnt != cnt)) break; /** * Store request in list(s)/hash(es) */ store_lookup(requestPtr); release(ssPtr); handle.clear(); start(signal, requestPtr); return; } while (0); /** * Error handling below, * 'err' may contain error code. */ if (!requestPtr.isNull()) { jam(); m_request_pool.release(requestPtr); } releaseSections(handle); handle_early_lqhkey_ref(signal, req, err); } void Dbspj::do_init(Request* requestP, const LqhKeyReq* req, Uint32 senderRef) { requestP->m_bits = 0; requestP->m_errCode = 0; requestP->m_state = Request::RS_BUILDING; requestP->m_node_cnt = 0; requestP->m_cnt_active = 0; requestP->m_rows = 0; requestP->m_active_nodes.clear(); requestP->m_outstanding = 0; requestP->m_transId[0] = req->transId1; requestP->m_transId[1] = req->transId2; bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data)); #ifdef SPJ_TRACE_TIME requestP->m_cnt_batches = 0; requestP->m_sum_rows = 0; requestP->m_sum_running = 0; requestP->m_sum_waiting = 0; requestP->m_save_time = spj_now(); #endif const Uint32 reqInfo = req->requestInfo; Uint32 tmp = req->clientConnectPtr; if (LqhKeyReq::getDirtyFlag(reqInfo) && LqhKeyReq::getOperation(reqInfo) == ZREAD) { jam(); ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo)); //const Uint32 apiRef = lqhKeyReq->variableData[0]; //const Uint32 apiOpRec = lqhKeyReq->variableData[1]; tmp = req->variableData[1]; requestP->m_senderData = tmp; requestP->m_senderRef = senderRef; } else { if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1) { if (LqhKeyReq::getApplicationAddressFlag(reqInfo)) tmp = req->variableData[2]; else tmp = req->variableData[0]; } requestP->m_senderData = tmp; requestP->m_senderRef = senderRef; } requestP->m_rootResultData = tmp; } void Dbspj::store_lookup(Ptr<Request> requestPtr) { ndbassert(requestPtr.p->isLookup()); Ptr<Request> tmp; bool found = m_lookup_request_hash.find(tmp, *requestPtr.p); ndbrequire(found == false); m_lookup_request_hash.add(requestPtr); } void Dbspj::handle_early_lqhkey_ref(Signal* signal, const LqhKeyReq * lqhKeyReq, Uint32 err) { /** * Error path... */ ndbrequire(err); const Uint32 reqInfo = lqhKeyReq->requestInfo; const Uint32 transid[2] = { lqhKeyReq->transId1, lqhKeyReq->transId2 }; if (LqhKeyReq::getDirtyFlag(reqInfo) && LqhKeyReq::getOperation(reqInfo) == ZREAD) { jam(); /* Dirty read sends TCKEYREF direct to client, and nothing to TC */ ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo)); const Uint32 apiRef = lqhKeyReq->variableData[0]; const Uint32 apiOpRec = lqhKeyReq->variableData[1]; TcKeyRef* const tcKeyRef = reinterpret_cast<TcKeyRef*>(signal->getDataPtrSend()); tcKeyRef->connectPtr = apiOpRec; tcKeyRef->transId[0] = transid[0]; tcKeyRef->transId[1] = transid[1]; tcKeyRef->errorCode = err; sendTCKEYREF(signal, apiRef, signal->getSendersBlockRef()); } else { jam(); const Uint32 returnref = signal->getSendersBlockRef(); const Uint32 clientPtr = lqhKeyReq->clientConnectPtr; Uint32 TcOprec = clientPtr; if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1) { if (LqhKeyReq::getApplicationAddressFlag(reqInfo)) TcOprec = lqhKeyReq->variableData[2]; else TcOprec = lqhKeyReq->variableData[0]; } LqhKeyRef* const ref = reinterpret_cast<LqhKeyRef*>(signal->getDataPtrSend()); ref->userRef = clientPtr; ref->connectPtr = TcOprec; ref->errorCode = err; ref->transId1 = transid[0]; ref->transId2 = transid[1]; sendSignal(returnref, GSN_LQHKEYREF, signal, LqhKeyRef::SignalLength, JBB); } } void Dbspj::sendTCKEYREF(Signal* signal, Uint32 ref, Uint32 routeRef) { const Uint32 nodeId = refToNode(ref); const bool connectedToNode = getNodeInfo(nodeId).m_connected; if (likely(connectedToNode)) { jam(); sendSignal(ref, GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB); } else { jam(); memmove(signal->theData+25, signal->theData, 4*TcKeyRef::SignalLength); RouteOrd* ord = (RouteOrd*)signal->getDataPtrSend(); ord->dstRef = ref; ord->srcRef = reference(); ord->gsn = GSN_TCKEYREF; ord->cnt = 0; LinearSectionPtr ptr[3]; ptr[0].p = signal->theData+25; ptr[0].sz = TcKeyRef::SignalLength; sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB, ptr, 1); } } void Dbspj::sendTCKEYCONF(Signal* signal, Uint32 len, Uint32 ref, Uint32 routeRef) { const Uint32 nodeId = refToNode(ref); const bool connectedToNode = getNodeInfo(nodeId).m_connected; if (likely(connectedToNode)) { jam(); sendSignal(ref, GSN_TCKEYCONF, signal, len, JBB); } else { jam(); memmove(signal->theData+25, signal->theData, 4*len); RouteOrd* ord = (RouteOrd*)signal->getDataPtrSend(); ord->dstRef = ref; ord->srcRef = reference(); ord->gsn = GSN_TCKEYCONF; ord->cnt = 0; LinearSectionPtr ptr[3]; ptr[0].p = signal->theData+25; ptr[0].sz = len; sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB, ptr, 1); } } /** * END - MODULE LQHKEYREQ */ /** * MODULE SCAN_FRAGREQ */ void Dbspj::execSCAN_FRAGREQ(Signal* signal) { jamEntry(); /* Reassemble if the request was fragmented */ if (!assembleFragments(signal)) { jam(); return; } const ScanFragReq * req = (ScanFragReq *)&signal->theData[0]; #ifdef DEBUG_SCAN_FRAGREQ ndbout_c("Incomming SCAN_FRAGREQ "); printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), ScanFragReq::SignalLength + 2, DBLQH); #endif /** * #0 - ATTRINFO contains tree + parameters * (unless StoredProcId is set, when only paramters are sent, * but this is not yet implemented) * #1 - KEYINFO if first op is index scan - contains bounds for first scan * if first op is lookup - contains keyinfo for lookup */ SectionHandle handle = SectionHandle(this, signal); SegmentedSectionPtr ssPtr; handle.getSection(ssPtr, ScanFragReq::AttrInfoSectionNum); Uint32 err; Ptr<Request> requestPtr = { 0, RNIL }; do { ArenaHead ah; err = DbspjErr::OutOfQueryMemory; if (unlikely(!m_arenaAllocator.seize(ah))) break; m_request_pool.seize(ah, requestPtr); new (requestPtr.p) Request(ah); do_init(requestPtr.p, req, signal->getSendersBlockRef()); Uint32 len_cnt; { SectionReader r0(ssPtr, getSectionSegmentPool()); err = DbspjErr::ZeroLengthQueryTree; if (unlikely(!r0.getWord(&len_cnt))) break; } Uint32 len = QueryTree::getLength(len_cnt); Uint32 cnt = QueryTree::getNodeCnt(len_cnt); { SectionReader treeReader(ssPtr, getSectionSegmentPool()); SectionReader paramReader(ssPtr, getSectionSegmentPool()); paramReader.step(len); // skip over tree to parameters Build_context ctx; ctx.m_resultRef = req->resultRef; ctx.m_scanPrio = ScanFragReq::getScanPrio(req->requestInfo); ctx.m_savepointId = req->savePointId; ctx.m_batch_size_rows = req->batch_size_rows; ctx.m_start_signal = signal; ctx.m_senderRef = signal->getSendersBlockRef(); if (handle.m_cnt > 1) { jam(); ctx.m_keyPtr.i = handle.m_ptr[ScanFragReq::KeyInfoSectionNum].i; } else { jam(); ctx.m_keyPtr.i = RNIL; } err = build(ctx, requestPtr, treeReader, paramReader); if (unlikely(err != 0)) break; } ndbassert(requestPtr.p->isScan()); ndbassert(requestPtr.p->m_node_cnt == cnt); err = DbspjErr::InvalidRequest; if (unlikely(!requestPtr.p->isScan() || requestPtr.p->m_node_cnt != cnt)) break; /** * Store request in list(s)/hash(es) */ store_scan(requestPtr); release(ssPtr); handle.clear(); start(signal, requestPtr); return; } while (0); if (!requestPtr.isNull()) { jam(); m_request_pool.release(requestPtr); } releaseSections(handle); handle_early_scanfrag_ref(signal, req, err); } void Dbspj::do_init(Request* requestP, const ScanFragReq* req, Uint32 senderRef) { requestP->m_bits = 0; requestP->m_errCode = 0; requestP->m_state = Request::RS_BUILDING; requestP->m_node_cnt = 0; requestP->m_cnt_active = 0; requestP->m_rows = 0; requestP->m_active_nodes.clear(); requestP->m_outstanding = 0; requestP->m_senderRef = senderRef; requestP->m_senderData = req->senderData; requestP->m_transId[0] = req->transId1; requestP->m_transId[1] = req->transId2; requestP->m_rootResultData = req->resultData; bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data)); #ifdef SPJ_TRACE_TIME requestP->m_cnt_batches = 0; requestP->m_sum_rows = 0; requestP->m_sum_running = 0; requestP->m_sum_waiting = 0; requestP->m_save_time = spj_now(); #endif } void Dbspj::store_scan(Ptr<Request> requestPtr) { ndbassert(requestPtr.p->isScan()); Ptr<Request> tmp; bool found = m_scan_request_hash.find(tmp, *requestPtr.p); ndbrequire(found == false); m_scan_request_hash.add(requestPtr); } void Dbspj::handle_early_scanfrag_ref(Signal* signal, const ScanFragReq * _req, Uint32 err) { ScanFragReq req = *_req; Uint32 senderRef = signal->getSendersBlockRef(); ScanFragRef * ref = (ScanFragRef*)&signal->theData[0]; ref->senderData = req.senderData; ref->transId1 = req.transId1; ref->transId2 = req.transId2; ref->errorCode = err; sendSignal(senderRef, GSN_SCAN_FRAGREF, signal, ScanFragRef::SignalLength, JBB); } /** * END - MODULE SCAN_FRAGREQ */ /** * MODULE GENERIC */ Uint32 Dbspj::build(Build_context& ctx, Ptr<Request> requestPtr, SectionReader & tree, SectionReader & param) { Uint32 tmp0, tmp1; Uint32 err = DbspjErr::ZeroLengthQueryTree; ctx.m_cnt = 0; ctx.m_scan_cnt = 0; tree.getWord(&tmp0); Uint32 loop = QueryTree::getNodeCnt(tmp0); DEBUG("::build()"); err = DbspjErr::InvalidTreeNodeCount; if (loop == 0 || loop > NDB_SPJ_MAX_TREE_NODES) { DEBUG_CRASH(); goto error; } while (ctx.m_cnt < loop) { DEBUG(" - loop " << ctx.m_cnt << " pos: " << tree.getPos().currPos); tree.peekWord(&tmp0); param.peekWord(&tmp1); Uint32 node_op = QueryNode::getOpType(tmp0); Uint32 node_len = QueryNode::getLength(tmp0); Uint32 param_op = QueryNodeParameters::getOpType(tmp1); Uint32 param_len = QueryNodeParameters::getLength(tmp1); err = DbspjErr::QueryNodeTooBig; if (unlikely(node_len >= NDB_ARRAY_SIZE(m_buffer0))) { DEBUG_CRASH(); goto error; } err = DbspjErr::QueryNodeParametersTooBig; if (unlikely(param_len >= NDB_ARRAY_SIZE(m_buffer1))) { DEBUG_CRASH(); goto error; } err = DbspjErr::InvalidTreeNodeSpecification; if (unlikely(tree.getWords(m_buffer0, node_len) == false)) { DEBUG_CRASH(); goto error; } err = DbspjErr::InvalidTreeParametersSpecification; if (unlikely(param.getWords(m_buffer1, param_len) == false)) { DEBUG_CRASH(); goto error; } #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ) printf("node: "); for (Uint32 i = 0; i<node_len; i++) printf("0x%.8x ", m_buffer0[i]); printf("\n"); printf("param: "); for (Uint32 i = 0; i<param_len; i++) printf("0x%.8x ", m_buffer1[i]); printf("\n"); #endif err = DbspjErr::UnknowQueryOperation; if (unlikely(node_op != param_op)) { DEBUG_CRASH(); goto error; } const OpInfo* info = getOpInfo(node_op); if (unlikely(info == 0)) { DEBUG_CRASH(); goto error; } QueryNode* qn = (QueryNode*)m_buffer0; QueryNodeParameters * qp = (QueryNodeParameters*)m_buffer1; qn->len = node_len; qp->len = param_len; err = (this->*(info->m_build))(ctx, requestPtr, qn, qp); if (unlikely(err != 0)) { DEBUG_CRASH(); goto error; } /** * only first node gets access to signal */ ctx.m_start_signal = 0; /** * TODO handle error, by aborting request */ ndbrequire(ctx.m_cnt < NDB_ARRAY_SIZE(ctx.m_node_list)); ctx.m_cnt++; } requestPtr.p->m_node_cnt = ctx.m_cnt; /** * Init ROW_BUFFERS for those TreeNodes requiring either * T_ROW_BUFFER or T_ROW_BUFFER_MAP. */ if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) { Ptr<TreeNode> treeNodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr)) { if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) { jam(); treeNodePtr.p->m_row_map.init(); } else if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER) { jam(); treeNodePtr.p->m_row_list.init(); } } } if (ctx.m_scan_cnt > 1) { jam(); requestPtr.p->m_bits |= Request::RT_MULTI_SCAN; /** * Iff, multi-scan is non-bushy (normal case) * we don't strictly need RT_VAR_ALLOC for RT_ROW_BUFFERS * but could instead pop-row stack frame, * however this is not implemented... * * so, use RT_VAR_ALLOC */ if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) { jam(); requestPtr.p->m_bits |= Request::RT_VAR_ALLOC; } } return 0; error: jam(); return err; } Uint32 Dbspj::createNode(Build_context& ctx, Ptr<Request> requestPtr, Ptr<TreeNode> & treeNodePtr) { /** * In the future, we can have different TreeNode-allocation strategies * that can be setup using the Build_context * */ if (m_treenode_pool.seize(requestPtr.p->m_arena, treeNodePtr)) { DEBUG("createNode - seize -> ptrI: " << treeNodePtr.i); new (treeNodePtr.p) TreeNode(requestPtr.i); ctx.m_node_list[ctx.m_cnt] = treeNodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); list.addLast(treeNodePtr); treeNodePtr.p->m_node_no = ctx.m_cnt; return 0; } return DbspjErr::OutOfOperations; } void Dbspj::start(Signal* signal, Ptr<Request> requestPtr) { if (requestPtr.p->m_bits & Request::RT_NEED_PREPARE) { jam(); requestPtr.p->m_outstanding = 0; requestPtr.p->m_state = Request::RS_PREPARING; Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) { jam(); ndbrequire(nodePtr.p->m_info != 0); if (nodePtr.p->m_info->m_prepare != 0) { jam(); (this->*(nodePtr.p->m_info->m_prepare))(signal, requestPtr, nodePtr); } } /** * preferably RT_NEED_PREPARE should only be set if blocking * calls are used, in which case m_outstanding should have been increased */ ndbassert(requestPtr.p->m_outstanding); } checkPrepareComplete(signal, requestPtr, 0); } void Dbspj::checkPrepareComplete(Signal * signal, Ptr<Request> requestPtr, Uint32 cnt) { ndbrequire(requestPtr.p->m_outstanding >= cnt); requestPtr.p->m_outstanding -= cnt; if (requestPtr.p->m_outstanding == 0) { jam(); if (unlikely((requestPtr.p->m_state & Request::RS_ABORTING) != 0)) { jam(); batchComplete(signal, requestPtr); return; } requestPtr.p->m_state = Request::RS_RUNNING; Ptr<TreeNode> nodePtr; { Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); ndbrequire(list.first(nodePtr)); } ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->m_start != 0); (this->*(nodePtr.p->m_info->m_start))(signal, requestPtr, nodePtr); } } void Dbspj::checkBatchComplete(Signal * signal, Ptr<Request> requestPtr, Uint32 cnt) { ndbrequire(requestPtr.p->m_outstanding >= cnt); requestPtr.p->m_outstanding -= cnt; if (requestPtr.p->m_outstanding == 0) { jam(); batchComplete(signal, requestPtr); } } void Dbspj::batchComplete(Signal* signal, Ptr<Request> requestPtr) { ndbrequire(requestPtr.p->m_outstanding == 0); // "definition" of batchComplete bool is_complete = requestPtr.p->m_cnt_active == 0; bool need_complete_phase = requestPtr.p->m_bits & Request::RT_NEED_COMPLETE; if (requestPtr.p->isLookup()) { ndbassert(requestPtr.p->m_cnt_active == 0); } if (!is_complete || (is_complete && need_complete_phase == false)) { /** * one batch complete, and either * - request not complete * - or not complete_phase needed */ jam(); if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0) { ndbassert(is_complete); } prepareNextBatch(signal, requestPtr); sendConf(signal, requestPtr, is_complete); } else if (is_complete && need_complete_phase) { jam(); /** * run complete-phase */ complete(signal, requestPtr); return; } if (requestPtr.p->m_cnt_active == 0) { jam(); /** * request completed */ cleanup(requestPtr); } else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0) { jam(); /** * release unneeded buffers and position cursor for SCAN_NEXTREQ */ releaseScanBuffers(requestPtr); } else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0) { jam(); /** * if not multiple scans in request, simply release all pages allocated * for row buffers (all rows will be released anyway) */ releaseRequestBuffers(requestPtr, true); } } /** * Locate next TreeNode(s) to retrieve more rows from. * * Calculate set of the 'm_active_nodes' we will receive from in NEXTREQ. * Add these TreeNodes to the cursor list to be iterated. */ void Dbspj::prepareNextBatch(Signal* signal, Ptr<Request> requestPtr) { requestPtr.p->m_cursor_nodes.init(); requestPtr.p->m_active_nodes.clear(); if (requestPtr.p->m_cnt_active == 0) { jam(); return; } if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT) { /** * If REPEAT_SCAN_RESULT we handle bushy scans by return more *new* rows * from only one of the active child scans. If there are multiple * bushy scans not being able to return their current result set in * a single batch, result sets from the other child scans are repeated * until all rows has been returned to the API client. * * Hence, the cross joined results from the bushy scans are partly * produced within the SPJ block on a 'batchsize granularity', * and partly is the responsibility of the API-client by iterating * the result rows within the current result batches. * (Opposed to non-REPEAT_SCAN_RESULT, the client only have to care about * the current batched rows - no buffering is required) */ jam(); Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); /** * Locate last 'TN_ACTIVE' TreeNode which is the only one choosen * to return more *new* rows. */ for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr)) { if (nodePtr.p->m_state == TreeNode::TN_ACTIVE) { jam(); DEBUG("Will fetch more from 'active' m_node_no: " << nodePtr.p->m_node_no); /** * A later NEXTREQ will request a *new* batch of rows from this TreeNode. */ registerActiveCursor(requestPtr, nodePtr); break; } } /** * Restart/repeat other (index scan) child batches which: * - Being 'after' nodePtr located above. * - Not being an ancestor of (depends on) any 'active' TreeNode. * (As these scans are started when rows from these parent nodes * arrives.) */ if (!nodePtr.isNull()) { jam(); DEBUG("Calculate 'active', w/ cursor on m_node_no: " << nodePtr.p->m_node_no); /* Restart any partial index-scans after this 'TN_ACTIVE' TreeNode */ for (list.next(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) { jam(); if (!nodePtr.p->m_ancestors.overlaps (requestPtr.p->m_active_nodes)) { jam(); ndbrequire(nodePtr.p->m_state != TreeNode::TN_ACTIVE); ndbrequire(nodePtr.p->m_info != 0); if (nodePtr.p->m_info->m_parent_batch_repeat != 0) { jam(); (this->*(nodePtr.p->m_info->m_parent_batch_repeat))(signal, requestPtr, nodePtr); } } } } // if (!nodePtr.isNull() } else // not 'RT_REPEAT_SCAN_RESULT' { /** * If not REPEAT_SCAN_RESULT multiple active TreeNodes may return their * remaining result simultaneously. In case of bushy-scans, these * concurrent result streams are cross joins of each other * in SQL terms. In order to produce the cross joined result, it is * the responsibility of the API-client to buffer these streams and * iterate them to produce the cross join. */ jam(); Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); TreeNodeBitMask ancestors_of_active; for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr)) { /** * If we are active (i.e not consumed all rows originating * from parent rows) and we are not in the set of parents * for any active child: * * Then, this is a position that execSCAN_NEXTREQ should continue */ if (nodePtr.p->m_state == TreeNode::TN_ACTIVE && !ancestors_of_active.get (nodePtr.p->m_node_no)) { jam(); DEBUG("Add 'active' m_node_no: " << nodePtr.p->m_node_no); registerActiveCursor(requestPtr, nodePtr); ancestors_of_active.bitOR(nodePtr.p->m_ancestors); } } } // if (RT_REPEAT_SCAN_RESULT) DEBUG("Calculated 'm_active_nodes': " << requestPtr.p->m_active_nodes.rep.data[0]); } void Dbspj::sendConf(Signal* signal, Ptr<Request> requestPtr, bool is_complete) { if (requestPtr.p->isScan()) { if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0)) { jam(); /** * We aborted request ourselves (due to node-failure ?) * but TC haven't contacted us...so we can't reply yet... */ ndbrequire(is_complete); ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0); return; } if (requestPtr.p->m_errCode == 0) { jam(); ScanFragConf * conf= reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend()); conf->senderData = requestPtr.p->m_senderData; conf->transId1 = requestPtr.p->m_transId[0]; conf->transId2 = requestPtr.p->m_transId[1]; conf->completedOps = requestPtr.p->m_rows; conf->fragmentCompleted = is_complete ? 1 : 0; conf->total_len = requestPtr.p->m_active_nodes.rep.data[0]; c_Counters.incr_counter(CI_SCAN_BATCHES_RETURNED, 1); c_Counters.incr_counter(CI_SCAN_ROWS_RETURNED, requestPtr.p->m_rows); #ifdef SPJ_TRACE_TIME Uint64 now = spj_now(); Uint64 then = requestPtr.p->m_save_time; requestPtr.p->m_sum_rows += requestPtr.p->m_rows; requestPtr.p->m_sum_running += Uint32(now - then); requestPtr.p->m_cnt_batches++; requestPtr.p->m_save_time = now; if (is_complete) { Uint32 cnt = requestPtr.p->m_cnt_batches; ndbout_c("batches: %u avg_rows: %u avg_running: %u avg_wait: %u", cnt, (requestPtr.p->m_sum_rows / cnt), (requestPtr.p->m_sum_running / cnt), cnt == 1 ? 0 : requestPtr.p->m_sum_waiting / (cnt - 1)); } #endif /** * reset for next batch */ requestPtr.p->m_rows = 0; if (!is_complete) { jam(); requestPtr.p->m_state |= Request::RS_WAITING; } #ifdef DEBUG_SCAN_FRAGREQ ndbout_c("Dbspj::sendConf() sending SCAN_FRAGCONF "); printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(), conf->total_len, DBLQH); #endif sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal, ScanFragConf::SignalLength, JBB); } else { jam(); ndbrequire(is_complete); ScanFragRef * ref= reinterpret_cast<ScanFragRef*>(signal->getDataPtrSend()); ref->senderData = requestPtr.p->m_senderData; ref->transId1 = requestPtr.p->m_transId[0]; ref->transId2 = requestPtr.p->m_transId[1]; ref->errorCode = requestPtr.p->m_errCode; sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal, ScanFragRef::SignalLength, JBB); } } else { ndbassert(is_complete); if (requestPtr.p->m_errCode) { jam(); Uint32 resultRef = getResultRef(requestPtr); TcKeyRef* ref = (TcKeyRef*)signal->getDataPtr(); ref->connectPtr = requestPtr.p->m_senderData; ref->transId[0] = requestPtr.p->m_transId[0]; ref->transId[1] = requestPtr.p->m_transId[1]; ref->errorCode = requestPtr.p->m_errCode; ref->errorData = 0; sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef); } } } Uint32 Dbspj::getResultRef(Ptr<Request> requestPtr) { Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) { if (nodePtr.p->m_info == &g_LookupOpInfo) { jam(); return nodePtr.p->m_lookup_data.m_api_resultRef; } } ndbrequire(false); return 0; } void Dbspj::releaseScanBuffers(Ptr<Request> requestPtr) { Ptr<TreeNode> treeNodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); TreeNodeBitMask ancestors_of_active; for (list.last(treeNodePtr); !treeNodePtr.isNull(); list.prev(treeNodePtr)) { /** * If there are no active children, * then we can cleanup in our sub-branch */ if (!ancestors_of_active.get(treeNodePtr.p->m_node_no)) { if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER) { jam(); releaseNodeRows(requestPtr, treeNodePtr); } /** * Cleanup ACTIVE nodes fetching more rows in a NEXTREQ, * or nodes being in 'm_active_nodes' as they will 'repeat'. * (and then become active) */ if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE || requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no)) { jam(); cleanupChildBranch(requestPtr, treeNodePtr); } } /** * Collect ancestors of all nodes which are, or will * become active in NEXTREQ (possibly repeated) */ if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE || requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no)) { ancestors_of_active.bitOR(treeNodePtr.p->m_ancestors); } } /** * Needs to be atleast 1 active otherwise we should have * taken the cleanup "path" in batchComplete */ ndbrequire(requestPtr.p->m_cnt_active >= 1); } void Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { Uint32 bit = treeNodePtr.p->m_node_no; ndbrequire(!requestPtr.p->m_active_nodes.get(bit)); requestPtr.p->m_active_nodes.set(bit); Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes); #ifdef VM_TRACE { Ptr<TreeNode> nodePtr; for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) { ndbrequire(nodePtr.i != treeNodePtr.i); } } #endif list.add(treeNodePtr); } void Dbspj::cleanupChildBranch(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes); Dependency_map::ConstDataBufferIterator it; for (list.first(it); !it.isNull(); list.next(it)) { jam(); Ptr<TreeNode> childPtr; m_treenode_pool.getPtr(childPtr, *it.data); if (childPtr.p->m_info->m_parent_batch_cleanup != 0) { jam(); (this->*(childPtr.p->m_info->m_parent_batch_cleanup))(requestPtr, childPtr); } cleanupChildBranch(requestPtr,childPtr); } } void Dbspj::releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { /** * Release all rows associated with tree node */ // only when var-alloc, or else stack will be popped wo/ consideration // to individual rows ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC); ndbassert(treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER); /** * Two ways to iterate... */ if ((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0) { jam(); Uint32 cnt = 0; SLFifoRowListIterator iter; for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); ) { jam(); RowRef pos = iter.m_ref; next(iter); releaseRow(requestPtr, pos); cnt ++; } treeNodePtr.p->m_row_list.init(); DEBUG("SLFifoRowListIterator: released " << cnt << " rows!"); } else { jam(); Uint32 cnt = 0; RowMapIterator iter; for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); ) { jam(); RowRef pos = iter.m_ref; // this could be made more efficient by not actually seting up m_row_ptr next(iter); releaseRow(requestPtr, pos); cnt++; } treeNodePtr.p->m_row_map.init(); DEBUG("RowMapIterator: released " << cnt << " rows!"); } } void Dbspj::releaseRow(Ptr<Request> requestPtr, RowRef pos) { ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC); ndbassert(pos.m_allocator == 1); Ptr<RowPage> ptr; m_page_pool.getPtr(ptr, pos.m_page_id); ((Var_page*)ptr.p)->free_record(pos.m_page_pos, Var_page::CHAIN); Uint32 free_space = ((Var_page*)ptr.p)->free_space; if (free_space == 0) { jam(); LocalDLFifoList<RowPage> list(m_page_pool, requestPtr.p->m_rowBuffer.m_page_list); list.remove(ptr); releasePage(ptr); } else if (free_space > requestPtr.p->m_rowBuffer.m_var.m_free) { LocalDLFifoList<RowPage> list(m_page_pool, requestPtr.p->m_rowBuffer.m_page_list); list.remove(ptr); list.addLast(ptr); requestPtr.p->m_rowBuffer.m_var.m_free = free_space; } } void Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr, bool reset) { /** * Release all pages for request */ { { LocalDLFifoList<RowPage> list(m_page_pool, requestPtr.p->m_rowBuffer.m_page_list); if (!list.isEmpty()) { jam(); Ptr<RowPage> first, last; list.first(first); list.last(last); releasePages(first.i, last); list.remove(); } } requestPtr.p->m_rowBuffer.stack_init(); } if (reset) { Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) { jam(); if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER) { jam(); if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) { jam(); nodePtr.p->m_row_map.init(); } else { nodePtr.p->m_row_list.init(); } } } } } void Dbspj::reportBatchComplete(Signal * signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes); Dependency_map::ConstDataBufferIterator it; for (list.first(it); !it.isNull(); list.next(it)) { jam(); Ptr<TreeNode> childPtr; m_treenode_pool.getPtr(childPtr, * it.data); if (childPtr.p->m_bits & TreeNode::T_NEED_REPORT_BATCH_COMPLETED) { jam(); ndbrequire(childPtr.p->m_info != 0 && childPtr.p->m_info->m_parent_batch_complete !=0 ); (this->*(childPtr.p->m_info->m_parent_batch_complete))(signal, requestPtr, childPtr); } } } void Dbspj::abort(Signal* signal, Ptr<Request> requestPtr, Uint32 errCode) { jam(); if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0) { jam(); goto checkcomplete; } requestPtr.p->m_state |= Request::RS_ABORTING; requestPtr.p->m_errCode = errCode; { Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) { jam(); /** * clear T_REPORT_BATCH_COMPLETE so that child nodes don't get confused * during abort */ nodePtr.p->m_bits &= ~Uint32(TreeNode::T_REPORT_BATCH_COMPLETE); ndbrequire(nodePtr.p->m_info != 0); if (nodePtr.p->m_info->m_abort != 0) { jam(); (this->*(nodePtr.p->m_info->m_abort))(signal, requestPtr, nodePtr); } } } checkcomplete: checkBatchComplete(signal, requestPtr, 0); } Uint32 Dbspj::nodeFail(Signal* signal, Ptr<Request> requestPtr, NdbNodeBitmask nodes) { Uint32 cnt = 0; Uint32 iter = 0; { Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) { jam(); ndbrequire(nodePtr.p->m_info != 0); if (nodePtr.p->m_info->m_execNODE_FAILREP != 0) { jam(); iter ++; cnt += (this->*(nodePtr.p->m_info->m_execNODE_FAILREP))(signal, requestPtr, nodePtr, nodes); } } } if (cnt == 0) { jam(); /** * None of the operations needed NodeFailRep "action" * check if our TC has died...but...only needed in * scan case...for lookup...not so... */ if (requestPtr.p->isScan() && nodes.get(refToNode(requestPtr.p->m_senderRef))) { jam(); abort(signal, requestPtr, DbspjErr::NodeFailure); } } else { jam(); abort(signal, requestPtr, DbspjErr::NodeFailure); } return cnt + iter; } void Dbspj::complete(Signal* signal, Ptr<Request> requestPtr) { /** * we need to run complete-phase before sending last SCAN_FRAGCONF */ Uint32 flags = requestPtr.p->m_state & (Request::RS_ABORTING | Request::RS_WAITING); requestPtr.p->m_state = Request::RS_COMPLETING | flags; // clear bit so that next batchComplete() // will continue to cleanup ndbassert((requestPtr.p->m_bits & Request::RT_NEED_COMPLETE) != 0); requestPtr.p->m_bits &= ~(Uint32)Request::RT_NEED_COMPLETE; requestPtr.p->m_outstanding = 0; { Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) { jam(); ndbrequire(nodePtr.p->m_info != 0); if (nodePtr.p->m_info->m_complete != 0) { jam(); (this->*(nodePtr.p->m_info->m_complete))(signal, requestPtr, nodePtr); } } /** * preferably RT_NEED_COMPLETE should only be set if blocking * calls are used, in which case m_outstanding should have been increased * * BUT: scanIndex does DIH_SCAN_TAB_COMPLETE_REP which does not send reply * so it not really "blocking" * i.e remove assert */ //ndbassert(requestPtr.p->m_outstanding); } checkBatchComplete(signal, requestPtr, 0); } void Dbspj::cleanup(Ptr<Request> requestPtr) { ndbrequire(requestPtr.p->m_cnt_active == 0); { Ptr<TreeNode> nodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(nodePtr); !nodePtr.isNull(); ) { jam(); ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->m_cleanup != 0); (this->*(nodePtr.p->m_info->m_cleanup))(requestPtr, nodePtr); Ptr<TreeNode> tmp = nodePtr; list.next(nodePtr); m_treenode_pool.release(tmp); } list.remove(); } if (requestPtr.p->isScan()) { jam(); if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0)) { jam(); requestPtr.p->m_state = Request::RS_ABORTED; return; } #ifdef VM_TRACE { Request key; key.m_transId[0] = requestPtr.p->m_transId[0]; key.m_transId[1] = requestPtr.p->m_transId[1]; key.m_senderData = requestPtr.p->m_senderData; Ptr<Request> tmp; ndbrequire(m_scan_request_hash.find(tmp, key)); } #endif m_scan_request_hash.remove(requestPtr); } else { jam(); #ifdef VM_TRACE { Request key; key.m_transId[0] = requestPtr.p->m_transId[0]; key.m_transId[1] = requestPtr.p->m_transId[1]; key.m_senderData = requestPtr.p->m_senderData; Ptr<Request> tmp; ndbrequire(m_lookup_request_hash.find(tmp, key)); } #endif m_lookup_request_hash.remove(requestPtr); } releaseRequestBuffers(requestPtr, false); ArenaHead ah = requestPtr.p->m_arena; m_request_pool.release(requestPtr); m_arenaAllocator.release(ah); } void Dbspj::cleanup_common(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); { Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes); list.release(); } { Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern); pattern.release(); } { Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern); pattern.release(); } if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL) { jam(); releaseSection(treeNodePtr.p->m_send.m_keyInfoPtrI); } if (treeNodePtr.p->m_send.m_attrInfoPtrI != RNIL) { jam(); releaseSection(treeNodePtr.p->m_send.m_attrInfoPtrI); } } /** * Processing of signals from LQH */ void Dbspj::execLQHKEYREF(Signal* signal) { jamEntry(); const LqhKeyRef* ref = reinterpret_cast<const LqhKeyRef*>(signal->getDataPtr()); DEBUG("execLQHKEYREF, errorCode:" << ref->errorCode); Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, ref->connectPtr); Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->m_execLQHKEYREF); (this->*(treeNodePtr.p->m_info->m_execLQHKEYREF))(signal, requestPtr, treeNodePtr); } void Dbspj::execLQHKEYCONF(Signal* signal) { jamEntry(); DEBUG("execLQHKEYCONF"); const LqhKeyConf* conf = reinterpret_cast<const LqhKeyConf*>(signal->getDataPtr()); Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, conf->opPtr); Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->m_execLQHKEYCONF); (this->*(treeNodePtr.p->m_info->m_execLQHKEYCONF))(signal, requestPtr, treeNodePtr); } void Dbspj::execSCAN_FRAGREF(Signal* signal) { jamEntry(); const ScanFragRef* ref = reinterpret_cast<const ScanFragRef*>(signal->getDataPtr()); DEBUG("execSCAN_FRAGREF, errorCode:" << ref->errorCode); Ptr<ScanFragHandle> scanFragHandlePtr; m_scanfraghandle_pool.getPtr(scanFragHandlePtr, ref->senderData); Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI); Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execSCAN_FRAGREF); (this->*(treeNodePtr.p->m_info->m_execSCAN_FRAGREF))(signal, requestPtr, treeNodePtr, scanFragHandlePtr); } void Dbspj::execSCAN_HBREP(Signal* signal) { jamEntry(); Uint32 senderData = signal->theData[0]; //Uint32 transId[2] = { signal->theData[1], signal->theData[2] }; Ptr<ScanFragHandle> scanFragHandlePtr; m_scanfraghandle_pool.getPtr(scanFragHandlePtr, senderData); Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI); Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); Uint32 ref = requestPtr.p->m_senderRef; signal->theData[0] = requestPtr.p->m_senderData; sendSignal(ref, GSN_SCAN_HBREP, signal, 3, JBB); } void Dbspj::execSCAN_FRAGCONF(Signal* signal) { jamEntry(); DEBUG("execSCAN_FRAGCONF"); const ScanFragConf* conf = reinterpret_cast<const ScanFragConf*>(signal->getDataPtr()); #ifdef DEBUG_SCAN_FRAGREQ ndbout_c("Dbspj::execSCAN_FRAGCONF() receiveing SCAN_FRAGCONF "); printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(), conf->total_len, DBLQH); #endif Ptr<ScanFragHandle> scanFragHandlePtr; m_scanfraghandle_pool.getPtr(scanFragHandlePtr, conf->senderData); Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI); Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execSCAN_FRAGCONF); (this->*(treeNodePtr.p->m_info->m_execSCAN_FRAGCONF))(signal, requestPtr, treeNodePtr, scanFragHandlePtr); } void Dbspj::execSCAN_NEXTREQ(Signal* signal) { jamEntry(); const ScanFragNextReq * req = (ScanFragNextReq*)&signal->theData[0]; DEBUG("Incomming SCAN_NEXTREQ"); #ifdef DEBUG_SCAN_FRAGREQ printSCANFRAGNEXTREQ(stdout, &signal->theData[0], ScanFragNextReq::SignalLength, DBLQH); #endif Request key; key.m_transId[0] = req->transId1; key.m_transId[1] = req->transId2; key.m_senderData = req->senderData; Ptr<Request> requestPtr; if (unlikely(!m_scan_request_hash.find(requestPtr, key))) { jam(); ndbrequire(req->requestInfo == ScanFragNextReq::ZCLOSE); return; } #ifdef SPJ_TRACE_TIME Uint64 now = spj_now(); Uint64 then = requestPtr.p->m_save_time; requestPtr.p->m_sum_waiting += Uint32(now - then); requestPtr.p->m_save_time = now; #endif Uint32 state = requestPtr.p->m_state; requestPtr.p->m_state = state & ~Uint32(Request::RS_WAITING); if (unlikely(state == Request::RS_ABORTED)) { jam(); batchComplete(signal, requestPtr); return; } if (unlikely((state & Request::RS_ABORTING) != 0)) { jam(); /** * abort is already in progress... * since RS_WAITING is cleared...it will end this request */ return; } if (req->requestInfo == ScanFragNextReq::ZCLOSE) // Requested close scan { jam(); abort(signal, requestPtr, 0); return; } ndbrequire((state & Request::RS_WAITING) != 0); ndbrequire(requestPtr.p->m_outstanding == 0); { /** * Scroll all relevant cursors... */ Ptr<TreeNode> treeNodePtr; Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes); Uint32 cnt_active = 0; for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr)) { if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE) { jam(); DEBUG("SCAN_NEXTREQ on TreeNode: " << treeNodePtr.i << ", m_node_no: " << treeNodePtr.p->m_node_no << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI); ndbrequire(treeNodePtr.p->m_info != 0 && treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0); (this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal, requestPtr, treeNodePtr); cnt_active++; } else { /** * Restart any other scans not being 'TN_ACTIVE' * (Only effective if 'RT_REPEAT_SCAN_RESULT') */ jam(); ndbrequire(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT); DEBUG(" Restart TreeNode: " << treeNodePtr.i << ", m_node_no: " << treeNodePtr.p->m_node_no << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI); ndbrequire(treeNodePtr.p->m_info != 0 && treeNodePtr.p->m_info->m_parent_batch_complete !=0 ); (this->*(treeNodePtr.p->m_info->m_parent_batch_complete))(signal, requestPtr, treeNodePtr); } } /* Expected only a single ACTIVE TreeNode among the cursors */ ndbrequire(cnt_active == 1 || !(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT)); } } void Dbspj::execTRANSID_AI(Signal* signal) { jamEntry(); DEBUG("execTRANSID_AI"); TransIdAI * req = (TransIdAI *)signal->getDataPtr(); Uint32 ptrI = req->connectPtr; //Uint32 transId[2] = { req->transId[0], req->transId[1] }; Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, ptrI); Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); ndbrequire(signal->getNoOfSections() != 0); // TODO check if this can happen SegmentedSectionPtr dataPtr; { SectionHandle handle(this, signal); handle.getSection(dataPtr, 0); handle.clear(); } #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ) printf("execTRANSID_AI: "); print(dataPtr, stdout); #endif /** * build easy-access-array for row */ Uint32 tmp[2+MAX_ATTRIBUTES_IN_TABLE]; RowPtr::Header* header = CAST_PTR(RowPtr::Header, &tmp[0]); Uint32 cnt = buildRowHeader(header, dataPtr); ndbassert(header->m_len < NDB_ARRAY_SIZE(tmp)); struct RowPtr row; row.m_type = RowPtr::RT_SECTION; row.m_src_node_ptrI = treeNodePtr.i; row.m_row_data.m_section.m_header = header; row.m_row_data.m_section.m_dataPtr.assign(dataPtr); getCorrelationData(row.m_row_data.m_section, cnt - 1, row.m_src_correlation); if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER) { jam(); Uint32 err = storeRow(requestPtr, treeNodePtr, row); ndbrequire(err == 0); } ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execTRANSID_AI); (this->*(treeNodePtr.p->m_info->m_execTRANSID_AI))(signal, requestPtr, treeNodePtr, row); release(dataPtr); } Uint32 Dbspj::storeRow(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, RowPtr &row) { ndbassert(row.m_type == RowPtr::RT_SECTION); SegmentedSectionPtr dataPtr = row.m_row_data.m_section.m_dataPtr; Uint32 * headptr = (Uint32*)row.m_row_data.m_section.m_header; Uint32 headlen = 1 + row.m_row_data.m_section.m_header->m_len; /** * If rows are not in map, then they are kept in linked list */ Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)? 0 : 2; Uint32 totlen = 0; totlen += dataPtr.sz; totlen += headlen; totlen += linklen; RowRef ref; Uint32 * dstptr = 0; if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0) { jam(); dstptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, totlen); } else { jam(); dstptr = varAlloc(requestPtr.p->m_rowBuffer, ref, totlen); } if (unlikely(dstptr == 0)) { jam(); return DbspjErr::OutOfRowMemory; } row.m_type = RowPtr::RT_LINEAR; row.m_row_data.m_linear.m_row_ref = ref; row.m_row_data.m_linear.m_header = (RowPtr::Header*)(dstptr + linklen); row.m_row_data.m_linear.m_data = dstptr + linklen + headlen; memcpy(dstptr + linklen, headptr, 4 * headlen); copy(dstptr + linklen + headlen, dataPtr); if (linklen) { jam(); NullRowRef.copyto_link(dstptr); // Null terminate list... add_to_list(treeNodePtr.p->m_row_list, ref); } else { jam(); return add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref); } return 0; } void Dbspj::setupRowPtr(Ptr<TreeNode> treeNodePtr, RowPtr& row, RowRef ref, const Uint32 * src) { Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)? 0 : 2; const RowPtr::Header * headptr = (RowPtr::Header*)(src + linklen); Uint32 headlen = 1 + headptr->m_len; row.m_type = RowPtr::RT_LINEAR; row.m_row_data.m_linear.m_row_ref = ref; row.m_row_data.m_linear.m_header = headptr; row.m_row_data.m_linear.m_data = (Uint32*)headptr + headlen; } void Dbspj::add_to_list(SLFifoRowList & list, RowRef rowref) { if (list.isNull()) { jam(); list.m_first_row_page_id = rowref.m_page_id; list.m_first_row_page_pos = rowref.m_page_pos; } else { jam(); /** * add last to list */ RowRef last; last.m_allocator = rowref.m_allocator; last.m_page_id = list.m_last_row_page_id; last.m_page_pos = list.m_last_row_page_pos; Uint32 * rowptr; if (rowref.m_allocator == 0) { jam(); rowptr = get_row_ptr_stack(last); } else { jam(); rowptr = get_row_ptr_var(last); } rowref.copyto_link(rowptr); } list.m_last_row_page_id = rowref.m_page_id; list.m_last_row_page_pos = rowref.m_page_pos; } Uint32 * Dbspj::get_row_ptr_stack(RowRef pos) { ndbassert(pos.m_allocator == 0); Ptr<RowPage> ptr; m_page_pool.getPtr(ptr, pos.m_page_id); return ptr.p->m_data + pos.m_page_pos; } Uint32 * Dbspj::get_row_ptr_var(RowRef pos) { ndbassert(pos.m_allocator == 1); Ptr<RowPage> ptr; m_page_pool.getPtr(ptr, pos.m_page_id); return ((Var_page*)ptr.p)->get_ptr(pos.m_page_pos); } bool Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, SLFifoRowListIterator& iter) { Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0; SLFifoRowList & list = treeNodePtr.p->m_row_list; if (list.isNull()) { jam(); iter.setNull(); return false; } iter.m_ref.m_allocator = var; iter.m_ref.m_page_id = list.m_first_row_page_id; iter.m_ref.m_page_pos = list.m_first_row_page_pos; if (var == 0) { jam(); iter.m_row_ptr = get_row_ptr_stack(iter.m_ref); } else { jam(); iter.m_row_ptr = get_row_ptr_var(iter.m_ref); } return true; } bool Dbspj::next(SLFifoRowListIterator& iter) { iter.m_ref.assign_from_link(iter.m_row_ptr); if (iter.m_ref.isNull()) { jam(); return false; } if (iter.m_ref.m_allocator == 0) { jam(); iter.m_row_ptr = get_row_ptr_stack(iter.m_ref); } else { jam(); iter.m_row_ptr = get_row_ptr_var(iter.m_ref); } return true; } bool Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, SLFifoRowListIterator& iter, SLFifoRowListIteratorPtr start) { Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0; (void)var; ndbassert(var == iter.m_ref.m_allocator); if (iter.m_ref.m_allocator == 0) { jam(); iter.m_row_ptr = get_row_ptr_stack(start.m_ref); } else { jam(); iter.m_row_ptr = get_row_ptr_var(start.m_ref); } return next(iter); } Uint32 Dbspj::add_to_map(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, Uint32 corrVal, RowRef rowref) { Uint32 * mapptr; RowMap& map = treeNodePtr.p->m_row_map; if (map.isNull()) { jam(); Uint16 batchsize = treeNodePtr.p->m_batch_size; Uint32 sz16 = RowMap::MAP_SIZE_PER_REF_16 * batchsize; Uint32 sz32 = (sz16 + 1) / 2; RowRef ref; if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0) { jam(); mapptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, sz32); } else { jam(); mapptr = varAlloc(requestPtr.p->m_rowBuffer, ref, sz32); } if (unlikely(mapptr == 0)) { jam(); return DbspjErr::OutOfRowMemory; } map.assign(ref); map.m_elements = 0; map.m_size = batchsize; map.clear(mapptr); } else { jam(); RowRef ref; map.copyto(ref); if (ref.m_allocator == 0) { jam(); mapptr = get_row_ptr_stack(ref); } else { jam(); mapptr = get_row_ptr_var(ref); } } Uint32 pos = corrVal & 0xFFFF; ndbrequire(pos < map.m_size); ndbrequire(map.m_elements < map.m_size); if (1) { /** * Check that *pos* is empty */ RowRef check; map.load(mapptr, pos, check); ndbrequire(check.m_page_pos == 0xFFFF); } map.store(mapptr, pos, rowref); return 0; } bool Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, RowMapIterator & iter) { Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0; RowMap& map = treeNodePtr.p->m_row_map; if (map.isNull()) { jam(); iter.setNull(); return false; } if (var == 0) { jam(); iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref); } else { jam(); iter.m_map_ptr = get_row_ptr_var(map.m_map_ref); } iter.m_size = map.m_size; iter.m_ref.m_allocator = var; Uint32 pos = 0; while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size) pos++; if (pos == iter.m_size) { jam(); iter.setNull(); return false; } else { jam(); RowMap::load(iter.m_map_ptr, pos, iter.m_ref); iter.m_element_no = pos; if (var == 0) { jam(); iter.m_row_ptr = get_row_ptr_stack(iter.m_ref); } else { jam(); iter.m_row_ptr = get_row_ptr_var(iter.m_ref); } return true; } } bool Dbspj::next(RowMapIterator & iter) { Uint32 pos = iter.m_element_no + 1; while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size) pos++; if (pos == iter.m_size) { jam(); iter.setNull(); return false; } else { jam(); RowMap::load(iter.m_map_ptr, pos, iter.m_ref); iter.m_element_no = pos; if (iter.m_ref.m_allocator == 0) { jam(); iter.m_row_ptr = get_row_ptr_stack(iter.m_ref); } else { jam(); iter.m_row_ptr = get_row_ptr_var(iter.m_ref); } return true; } } bool Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, RowMapIterator & iter, RowMapIteratorPtr start) { Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0; RowMap& map = treeNodePtr.p->m_row_map; ndbrequire(!map.isNull()); if (var == 0) { jam(); iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref); } else { jam(); iter.m_map_ptr = get_row_ptr_var(map.m_map_ref); } iter.m_size = map.m_size; RowMap::load(iter.m_map_ptr, start.m_element_no, iter.m_ref); iter.m_element_no = start.m_element_no; return next(iter); } Uint32 * Dbspj::stackAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz) { Ptr<RowPage> ptr; LocalDLFifoList<RowPage> list(m_page_pool, buffer.m_page_list); Uint32 pos = buffer.m_stack.m_pos; const Uint32 SIZE = RowPage::SIZE; if (list.isEmpty() || (pos + sz) > SIZE) { jam(); bool ret = allocPage(ptr); if (unlikely(ret == false)) { jam(); return 0; } pos = 0; list.addLast(ptr); } else { list.last(ptr); } dst.m_page_id = ptr.i; dst.m_page_pos = pos; dst.m_allocator = 0; buffer.m_stack.m_pos = pos + sz; return ptr.p->m_data + pos; } Uint32 * Dbspj::varAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz) { Ptr<RowPage> ptr; LocalDLFifoList<RowPage> list(m_page_pool, buffer.m_page_list); Uint32 free_space = buffer.m_var.m_free; if (list.isEmpty() || free_space < (sz + 1)) { jam(); bool ret = allocPage(ptr); if (unlikely(ret == false)) { jam(); return 0; } list.addLast(ptr); ((Var_page*)ptr.p)->init(); } else { jam(); list.last(ptr); } Var_page * vp = (Var_page*)ptr.p; Uint32 pos = vp->alloc_record(sz, (Var_page*)m_buffer0, Var_page::CHAIN); dst.m_page_id = ptr.i; dst.m_page_pos = pos; dst.m_allocator = 1; buffer.m_var.m_free = vp->free_space; return vp->get_ptr(pos); } bool Dbspj::allocPage(Ptr<RowPage> & ptr) { if (m_free_page_list.firstItem == RNIL) { jam(); ptr.p = (RowPage*)m_ctx.m_mm.alloc_page(RT_SPJ_DATABUFFER, &ptr.i, Ndbd_mem_manager::NDB_ZONE_ANY); if (ptr.p == 0) { return false; } return true; } else { jam(); LocalSLList<RowPage> list(m_page_pool, m_free_page_list); bool ret = list.remove_front(ptr); ndbrequire(ret); return ret; } } void Dbspj::releasePage(Ptr<RowPage> ptr) { LocalSLList<RowPage> list(m_page_pool, m_free_page_list); list.add(ptr); } void Dbspj::releasePages(Uint32 first, Ptr<RowPage> last) { LocalSLList<RowPage> list(m_page_pool, m_free_page_list); list.add(first, last); } void Dbspj::releaseGlobal(Signal * signal) { Uint32 delay = 100; LocalSLList<RowPage> list(m_page_pool, m_free_page_list); if (list.empty()) { jam(); delay = 300; } else { Ptr<RowPage> ptr; list.remove_front(ptr); m_ctx.m_mm.release_page(RT_SPJ_DATABUFFER, ptr.i); } signal->theData[0] = 0; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, delay, 1); } /** * END - MODULE GENERIC */ /** * MODULE LOOKUP */ const Dbspj::OpInfo Dbspj::g_LookupOpInfo = { &Dbspj::lookup_build, 0, // prepare &Dbspj::lookup_start, &Dbspj::lookup_execTRANSID_AI, &Dbspj::lookup_execLQHKEYREF, &Dbspj::lookup_execLQHKEYCONF, 0, // execSCAN_FRAGREF 0, // execSCAN_FRAGCONF &Dbspj::lookup_parent_row, &Dbspj::lookup_parent_batch_complete, 0, // Dbspj::lookup_parent_batch_repeat, 0, // Dbspj::lookup_parent_batch_cleanup, 0, // Dbspj::lookup_execSCAN_NEXTREQ 0, // Dbspj::lookup_complete &Dbspj::lookup_abort, &Dbspj::lookup_execNODE_FAILREP, &Dbspj::lookup_cleanup }; Uint32 Dbspj::lookup_build(Build_context& ctx, Ptr<Request> requestPtr, const QueryNode* qn, const QueryNodeParameters* qp) { Uint32 err = 0; Ptr<TreeNode> treeNodePtr; const QN_LookupNode * node = (const QN_LookupNode*)qn; const QN_LookupParameters * param = (const QN_LookupParameters*)qp; do { err = createNode(ctx, requestPtr, treeNodePtr); if (unlikely(err != 0)) { DEBUG_CRASH(); break; } treeNodePtr.p->m_info = &g_LookupOpInfo; Uint32 transId1 = requestPtr.p->m_transId[0]; Uint32 transId2 = requestPtr.p->m_transId[1]; Uint32 savePointId = ctx.m_savepointId; Uint32 treeBits = node->requestInfo; Uint32 paramBits = param->requestInfo; //ndbout_c("Dbspj::lookup_build() treeBits=%.8x paramBits=%.8x", // treeBits, paramBits); LqhKeyReq* dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq; { /** * static variables */ dst->tcBlockref = reference(); dst->clientConnectPtr = treeNodePtr.i; /** * TODO reference()+treeNodePtr.i is passed twice * this can likely be optimized using the requestInfo-bits * UPDATE: This can be accomplished by *not* setApplicationAddressFlag * and patch LQH to then instead use tcBlockref/clientConnectPtr */ dst->transId1 = transId1; dst->transId2 = transId2; dst->savePointId = savePointId; dst->scanInfo = 0; dst->attrLen = 0; /** Initialy set reply ref to client, do_send will set SPJ refs if non-LEAF */ dst->variableData[0] = ctx.m_resultRef; dst->variableData[1] = param->resultData; Uint32 requestInfo = 0; LqhKeyReq::setOperation(requestInfo, ZREAD); LqhKeyReq::setApplicationAddressFlag(requestInfo, 1); LqhKeyReq::setDirtyFlag(requestInfo, 1); LqhKeyReq::setSimpleFlag(requestInfo, 1); LqhKeyReq::setNormalProtocolFlag(requestInfo, 0); // Assume T_LEAF LqhKeyReq::setCorrFactorFlag(requestInfo, 1); LqhKeyReq::setNoDiskFlag(requestInfo, (treeBits & DABits::NI_LINKED_DISK) == 0 && (paramBits & DABits::PI_DISK_ATTR) == 0); dst->requestInfo = requestInfo; } err = DbspjErr::InvalidTreeNodeSpecification; if (unlikely(node->len < QN_LookupNode::NodeSize)) { DEBUG_CRASH(); break; } if (treeBits & QN_LookupNode::L_UNIQUE_INDEX) { jam(); treeNodePtr.p->m_bits |= TreeNode::T_UNIQUE_INDEX_LOOKUP; } Uint32 tableId = node->tableId; Uint32 schemaVersion = node->tableVersion; Uint32 tableSchemaVersion = tableId + ((schemaVersion << 16) & 0xFFFF0000); dst->tableSchemaVersion = tableSchemaVersion; err = DbspjErr::InvalidTreeParametersSpecification; DEBUG("param len: " << param->len); if (unlikely(param->len < QN_LookupParameters::NodeSize)) { DEBUG_CRASH(); break; } ctx.m_resultData = param->resultData; treeNodePtr.p->m_lookup_data.m_api_resultRef = ctx.m_resultRef; treeNodePtr.p->m_lookup_data.m_api_resultData = param->resultData; treeNodePtr.p->m_lookup_data.m_outstanding = 0; treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false; /** * Parse stuff common lookup/scan-frag */ struct DABuffer nodeDA, paramDA; nodeDA.ptr = node->optional; nodeDA.end = nodeDA.ptr + (node->len - QN_LookupNode::NodeSize); paramDA.ptr = param->optional; paramDA.end = paramDA.ptr + (param->len - QN_LookupParameters::NodeSize); err = parseDA(ctx, requestPtr, treeNodePtr, nodeDA, treeBits, paramDA, paramBits); if (unlikely(err != 0)) { DEBUG_CRASH(); break; } if (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED) { jam(); LqhKeyReq::setInterpretedFlag(dst->requestInfo, 1); } /** * Inherit batch size from parent */ treeNodePtr.p->m_batch_size = 1; if (treeNodePtr.p->m_parentPtrI != RNIL) { jam(); Ptr<TreeNode> parentPtr; m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI); treeNodePtr.p->m_batch_size = parentPtr.p->m_batch_size; } if (ctx.m_start_signal) { jam(); Signal * signal = ctx.m_start_signal; const LqhKeyReq* src = (const LqhKeyReq*)signal->getDataPtr(); #if NOT_YET Uint32 instanceNo = blockToInstance(signal->header.theReceiversBlockNumber); treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH, instanceNo, getOwnNodeId()); #else treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH, getInstanceKey(src->tableSchemaVersion & 0xFFFF, src->fragmentData & 0xFFFF), getOwnNodeId()); #endif Uint32 hashValue = src->hashValue; Uint32 fragId = src->fragmentData; Uint32 requestInfo = src->requestInfo; Uint32 attrLen = src->attrLen; // fragdist-key is in here /** * assertions */ ndbassert(LqhKeyReq::getAttrLen(attrLen) == 0); // Only long ndbassert(LqhKeyReq::getScanTakeOverFlag(attrLen) == 0);// Not supported ndbassert(LqhKeyReq::getReorgFlag(attrLen) == 0); // Not supported ndbassert(LqhKeyReq::getOperation(requestInfo) == ZREAD); ndbassert(LqhKeyReq::getKeyLen(requestInfo) == 0); // Only long ndbassert(LqhKeyReq::getMarkerFlag(requestInfo) == 0); // Only read ndbassert(LqhKeyReq::getAIInLqhKeyReq(requestInfo) == 0); ndbassert(LqhKeyReq::getSeqNoReplica(requestInfo) == 0); ndbassert(LqhKeyReq::getLastReplicaNo(requestInfo) == 0); ndbassert(LqhKeyReq::getApplicationAddressFlag(requestInfo) != 0); ndbassert(LqhKeyReq::getSameClientAndTcFlag(requestInfo) == 0); #if TODO /** * Handle various lock-modes */ static Uint8 getDirtyFlag(const UintR & requestInfo); static Uint8 getSimpleFlag(const UintR & requestInfo); #endif Uint32 dst_requestInfo = dst->requestInfo; ndbassert(LqhKeyReq::getInterpretedFlag(requestInfo) == LqhKeyReq::getInterpretedFlag(dst_requestInfo)); ndbassert(LqhKeyReq::getNoDiskFlag(requestInfo) == LqhKeyReq::getNoDiskFlag(dst_requestInfo)); dst->hashValue = hashValue; dst->fragmentData = fragId; dst->attrLen = attrLen; // fragdist is in here treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i; treeNodePtr.p->m_bits |= TreeNode::T_ONE_SHOT; } return 0; } while (0); return err; } void Dbspj::lookup_start(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { lookup_send(signal, requestPtr, treeNodePtr); } void Dbspj::lookup_send(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); Uint32 cnt = 2; if (treeNodePtr.p->isLeaf()) { jam(); if (requestPtr.p->isLookup()) { jam(); cnt = 0; } else { jam(); cnt = 1; } } LqhKeyReq* req = reinterpret_cast<LqhKeyReq*>(signal->getDataPtrSend()); memcpy(req, treeNodePtr.p->m_lookup_data.m_lqhKeyReq, sizeof(treeNodePtr.p->m_lookup_data.m_lqhKeyReq)); req->variableData[2] = treeNodePtr.p->m_send.m_correlation; req->variableData[3] = requestPtr.p->m_rootResultData; if (!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf())) { // Non-LEAF want reply to SPJ instead of ApiClient. LqhKeyReq::setNormalProtocolFlag(req->requestInfo, 1); req->variableData[0] = reference(); req->variableData[1] = treeNodePtr.i; } else { jam(); /** * Fake that TC sent this request, * so that it can route a maybe TCKEYREF */ req->tcBlockref = requestPtr.p->m_senderRef; } SectionHandle handle(this); Uint32 ref = treeNodePtr.p->m_send.m_ref; Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI; Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT) { jam(); /** * Pass sections to send */ treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL; treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL; } else { if ((treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED) == 0) { jam(); Uint32 tmp = RNIL; ndbrequire(dupSection(tmp, keyInfoPtrI)); // TODO handle error keyInfoPtrI = tmp; } else { jam(); treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL; } if ((treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED) == 0) { jam(); Uint32 tmp = RNIL; ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error attrInfoPtrI = tmp; } else { jam(); treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL; } } getSection(handle.m_ptr[0], keyInfoPtrI); getSection(handle.m_ptr[1], attrInfoPtrI); handle.m_cnt = 2; #if defined DEBUG_LQHKEYREQ ndbout_c("LQHKEYREQ to %x", ref); printLQHKEYREQ(stdout, signal->getDataPtrSend(), NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq), DBLQH); printf("KEYINFO: "); print(handle.m_ptr[0], stdout); printf("ATTRINFO: "); print(handle.m_ptr[1], stdout); #endif Uint32 Tnode = refToNode(ref); if (Tnode == getOwnNodeId()) { c_Counters.incr_counter(CI_LOCAL_READS_SENT, 1); } else { c_Counters.incr_counter(CI_REMOTE_READS_SENT, 1); } if (unlikely(!c_alive_nodes.get(Tnode))) { jam(); releaseSections(handle); abort(signal, requestPtr, DbspjErr::NodeFailure); return; } else if (! (treeNodePtr.p->isLeaf() && requestPtr.p->isLookup())) { jam(); ndbassert(Tnode < NDB_ARRAY_SIZE(requestPtr.p->m_lookup_node_data)); requestPtr.p->m_outstanding += cnt; requestPtr.p->m_lookup_node_data[Tnode] += cnt; // number wrapped ndbrequire(! (requestPtr.p->m_lookup_node_data[Tnode] == 0)); } sendSignal(ref, GSN_LQHKEYREQ, signal, NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq), JBB, &handle); treeNodePtr.p->m_lookup_data.m_outstanding += cnt; if (requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()) { jam(); /** * Send TCKEYCONF with DirtyReadBit + Tnode, * so that API can discover if Tnode while waiting for result */ Uint32 resultRef = req->variableData[0]; Uint32 resultData = req->variableData[1]; TcKeyConf* conf = (TcKeyConf*)signal->getDataPtrSend(); conf->apiConnectPtr = RNIL; // lookup transaction from operations... conf->confInfo = 0; TcKeyConf::setNoOfOperations(conf->confInfo, 1); conf->transId1 = requestPtr.p->m_transId[0]; conf->transId2 = requestPtr.p->m_transId[1]; conf->operations[0].apiOperationPtr = resultData; conf->operations[0].attrInfoLen = TcKeyConf::DirtyReadBit | Tnode; Uint32 sigLen = TcKeyConf::StaticLength + TcKeyConf::OperationLength; sendTCKEYCONF(signal, sigLen, resultRef, requestPtr.p->m_senderRef); } } void Dbspj::lookup_execTRANSID_AI(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, const RowPtr & rowRef) { jam(); Uint32 Tnode = refToNode(signal->getSendersBlockRef()); { LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes); Dependency_map::ConstDataBufferIterator it; for (list.first(it); !it.isNull(); list.next(it)) { jam(); Ptr<TreeNode> childPtr; m_treenode_pool.getPtr(childPtr, * it.data); ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0); (this->*(childPtr.p->m_info->m_parent_row))(signal, requestPtr, childPtr,rowRef); } } ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf())); ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1); requestPtr.p->m_lookup_node_data[Tnode] -= 1; treeNodePtr.p->m_lookup_data.m_outstanding--; if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { jam(); // We have received all rows for this operation in this batch. reportBatchComplete(signal, requestPtr, treeNodePtr); // Prepare for next batch. treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false; treeNodePtr.p->m_lookup_data.m_outstanding = 0; } checkBatchComplete(signal, requestPtr, 1); } void Dbspj::lookup_execLQHKEYREF(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { const LqhKeyRef * rep = (LqhKeyRef*)signal->getDataPtr(); Uint32 errCode = rep->errorCode; Uint32 Tnode = refToNode(signal->getSendersBlockRef()); c_Counters.incr_counter(CI_READS_NOT_FOUND, 1); if (requestPtr.p->isLookup()) { jam(); /* CONF/REF not requested for lookup-Leaf: */ ndbrequire(!treeNodePtr.p->isLeaf()); /** * Scan-request does not need to * send TCKEYREF... */ /** * Return back to api... * NOTE: assume that signal is tampered with */ Uint32 resultRef = treeNodePtr.p->m_lookup_data.m_api_resultRef; Uint32 resultData = treeNodePtr.p->m_lookup_data.m_api_resultData; TcKeyRef* ref = (TcKeyRef*)signal->getDataPtr(); ref->connectPtr = resultData; ref->transId[0] = requestPtr.p->m_transId[0]; ref->transId[1] = requestPtr.p->m_transId[1]; ref->errorCode = errCode; ref->errorData = 0; DEBUG("lookup_execLQHKEYREF, errorCode:" << errCode); sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef); if (treeNodePtr.p->m_bits & TreeNode::T_UNIQUE_INDEX_LOOKUP) { /** * If this is a "leaf" unique index lookup * emit extra TCKEYCONF as would have been done with ordinary * operation */ LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes); Dependency_map::ConstDataBufferIterator it; ndbrequire(list.first(it)); ndbrequire(list.getSize() == 1); // should only be 1 child Ptr<TreeNode> childPtr; m_treenode_pool.getPtr(childPtr, * it.data); if (childPtr.p->m_bits & TreeNode::T_LEAF) { jam(); Uint32 resultRef = childPtr.p->m_lookup_data.m_api_resultRef; Uint32 resultData = childPtr.p->m_lookup_data.m_api_resultData; TcKeyConf* conf = (TcKeyConf*)signal->getDataPtr(); conf->apiConnectPtr = RNIL; conf->confInfo = 0; conf->gci_hi = 0; TcKeyConf::setNoOfOperations(conf->confInfo, 1); conf->transId1 = requestPtr.p->m_transId[0]; conf->transId2 = requestPtr.p->m_transId[1]; conf->operations[0].apiOperationPtr = resultData; conf->operations[0].attrInfoLen = TcKeyConf::DirtyReadBit |getOwnNodeId(); sendTCKEYCONF(signal, TcKeyConf::StaticLength + 2, resultRef, requestPtr.p->m_senderRef); } } } else { jam(); switch(errCode){ case 626: // Row not found case 899: // Interpreter_exit_nok jam(); break; default: jam(); abort(signal, requestPtr, errCode); } } Uint32 cnt = 2; if (treeNodePtr.p->isLeaf()) // Can't be a lookup-Leaf, asserted above cnt = 1; ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= cnt); requestPtr.p->m_lookup_node_data[Tnode] -= cnt; treeNodePtr.p->m_lookup_data.m_outstanding -= cnt; if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { jam(); // We have received all rows for this operation in this batch. reportBatchComplete(signal, requestPtr, treeNodePtr); // Prepare for next batch. treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false; treeNodePtr.p->m_lookup_data.m_outstanding = 0; } checkBatchComplete(signal, requestPtr, cnt); } void Dbspj::lookup_execLQHKEYCONF(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf())); Uint32 Tnode = refToNode(signal->getSendersBlockRef()); if (treeNodePtr.p->m_bits & TreeNode::T_USER_PROJECTION) { jam(); requestPtr.p->m_rows++; } ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1); requestPtr.p->m_lookup_node_data[Tnode] -= 1; treeNodePtr.p->m_lookup_data.m_outstanding--; if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { jam(); // We have received all rows for this operation in this batch. reportBatchComplete(signal, requestPtr, treeNodePtr); // Prepare for next batch. treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false; treeNodePtr.p->m_lookup_data.m_outstanding = 0; } checkBatchComplete(signal, requestPtr, 1); } void Dbspj::lookup_parent_row(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, const RowPtr & rowRef) { /** * Here we need to... * 1) construct a key * 2) compute hash (normally TC) * 3) get node for row (normally TC) */ Uint32 err; const LqhKeyReq* src = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq; const Uint32 tableId = LqhKeyReq::getTableId(src->tableSchemaVersion); const Uint32 corrVal = rowRef.m_src_correlation; DEBUG("::lookup_parent_row"); do { Uint32 ptrI = RNIL; if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED) { jam(); DEBUG("parent_row w/ T_KEYINFO_CONSTRUCTED"); /** * Get key-pattern */ LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern); bool keyIsNull; err = expand(ptrI, pattern, rowRef, keyIsNull); if (unlikely(err != 0)) break; if (keyIsNull) { jam(); DEBUG("Key contain NULL values"); /** * When the key contains NULL values, an EQ-match is impossible! * Entire lookup request can therefore be eliminate as it is known * to be REFused with errorCode = 626 (Row not found). * Different handling is required depening of request being a * scan or lookup: */ if (requestPtr.p->isScan()) { /** * Scan request: We can simply ignore lookup operation: * As rowCount in SCANCONF will not include this KEYREQ, * we dont have to send a KEYREF either. */ jam(); DEBUG("..Ignore impossible KEYREQ"); if (ptrI != RNIL) { releaseSection(ptrI); } return; // Bailout, KEYREQ would have returned KEYREF(626) anyway } else // isLookup() { /** * Ignored lookup request need a faked KEYREF for the lookup operation. * Furthermore, if this is a leaf treeNode, a KEYCONF is also * expected by the API. * * TODO: Not implemented yet as we believe * elimination of NULL key access for scan request * will have the most performance impact. */ jam(); } } // keyIsNull /** * NOTE: * The logic below contradicts 'keyIsNull' logic above and should * be removed. * However, it's likely that scanIndex should have similar * logic as 'Null as wildcard' may make sense for a range bound. * NOTE2: * Until 'keyIsNull' also cause bailout for request->isLookup() * createEmptySection *is* require to avoid crash due to empty keys. */ if (ptrI == RNIL) // TODO: remove when keyIsNull is completely handled { jam(); /** * We constructed a null-key...construct a zero-length key (even if we don't support it *now*) * * (we actually did prior to joining mysql where null was treated as any other * value in a key). But mysql treats null in unique key as *wildcard* * which we don't support so well...and do nasty tricks in handler * * NOTE: should be *after* check for error */ err = createEmptySection(ptrI); if (unlikely(err != 0)) break; } treeNodePtr.p->m_send.m_keyInfoPtrI = ptrI; } BuildKeyReq tmp; err = computeHash(signal, tmp, tableId, treeNodePtr.p->m_send.m_keyInfoPtrI); if (unlikely(err != 0)) break; err = getNodes(signal, tmp, tableId); if (unlikely(err != 0)) break; Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; if (treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED) { jam(); Uint32 tmp = RNIL; ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error Uint32 org_size; { SegmentedSectionPtr ptr; getSection(ptr, tmp); org_size = ptr.sz; } bool hasNull; LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern); err = expand(tmp, pattern, rowRef, hasNull); if (unlikely(err != 0)) break; // ndbrequire(!hasNull); /** * Update size of subsrouting section, which contains arguments */ SegmentedSectionPtr ptr; getSection(ptr, tmp); Uint32 new_size = ptr.sz; Uint32 * sectionptrs = ptr.p->theData; sectionptrs[4] = new_size - org_size; treeNodePtr.p->m_send.m_attrInfoPtrI = tmp; } /** * Now send... */ /** * TODO merge better with lookup_start (refactor) */ { /* We set the upper half word of m_correlation to the tuple ID * of the parent, such that the API can match this tuple with its * parent. * Then we re-use the tuple ID of the parent as the * tuple ID for this tuple also. Since the tuple ID * is unique within this batch and SPJ block for the parent operation, * it must also be unique for this operation. * This ensures that lookup operations with no user projection will * work, since such operations will have the same tuple ID as their * parents. The API will then be able to match a tuple with its * grandparent, even if it gets no tuple for the parent operation.*/ treeNodePtr.p->m_send.m_correlation = (corrVal << 16) + (corrVal & 0xffff); treeNodePtr.p->m_send.m_ref = tmp.receiverRef; LqhKeyReq * dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq; dst->hashValue = tmp.hashInfo[0]; dst->fragmentData = tmp.fragId; Uint32 attrLen = 0; LqhKeyReq::setDistributionKey(attrLen, tmp.fragDistKey); dst->attrLen = attrLen; lookup_send(signal, requestPtr, treeNodePtr); if (treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED) { jam(); // restore treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI; } } return; } while (0); ndbrequire(false); } void Dbspj::lookup_parent_batch_complete(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); /** * lookups are performed directly...so we're not really interested in * parent_batch_complete...we only pass-through */ /** * but this method should only be called if we have T_REPORT_BATCH_COMPLETE */ ndbassert(treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE); ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete); treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true; if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { jam(); // We have received all rows for this operation in this batch. reportBatchComplete(signal, requestPtr, treeNodePtr); // Prepare for next batch. treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false; treeNodePtr.p->m_lookup_data.m_outstanding = 0; } } void Dbspj::lookup_abort(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); } Uint32 Dbspj::lookup_execNODE_FAILREP(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, NdbNodeBitmask mask) { jam(); Uint32 node = 0; Uint32 sum = 0; while (requestPtr.p->m_outstanding && ((node = mask.find(node + 1)) != NdbNodeBitmask::NotFound)) { Uint32 cnt = requestPtr.p->m_lookup_node_data[node]; sum += cnt; requestPtr.p->m_lookup_node_data[node] = 0; } if (sum) { jam(); ndbrequire(requestPtr.p->m_outstanding >= sum); requestPtr.p->m_outstanding -= sum; } return sum; } void Dbspj::lookup_cleanup(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { cleanup_common(requestPtr, treeNodePtr); } Uint32 Dbspj::handle_special_hash(Uint32 tableId, Uint32 dstHash[4], const Uint64* src, Uint32 srcLen, // Len in #32bit words const KeyDescriptor* desc) { const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS= (MAX_KEY_SIZE_IN_WORDS + 1) / 2; Uint64 alignedWorkspace[MAX_KEY_SIZE_IN_LONG_WORDS * MAX_XFRM_MULTIPLY]; const bool hasVarKeys = desc->noOfVarKeys > 0; const bool hasCharAttr = desc->hasCharAttr; const bool compute_distkey = desc->noOfDistrKeys > 0; const Uint64 *hashInput = 0; Uint32 inputLen = 0; Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]; Uint32 * keyPartLenPtr; /* Normalise KeyInfo into workspace if necessary */ if (hasCharAttr || (compute_distkey && hasVarKeys)) { hashInput = alignedWorkspace; keyPartLenPtr = keyPartLen; inputLen = xfrm_key(tableId, (Uint32*)src, (Uint32*)alignedWorkspace, sizeof(alignedWorkspace) >> 2, keyPartLenPtr); if (unlikely(inputLen == 0)) { return 290; // 'Corrupt key in TC, unable to xfrm' } } else { /* Keyinfo already suitable for hash */ hashInput = src; inputLen = srcLen; keyPartLenPtr = 0; } /* Calculate primary key hash */ md5_hash(dstHash, hashInput, inputLen); /* If the distribution key != primary key then we have to * form a distribution key from the primary key and calculate * a separate distribution hash based on this */ if (compute_distkey) { jam(); Uint32 distrKeyHash[4]; /* Reshuffle primary key columns to get just distribution key */ Uint32 len = create_distr_key(tableId, (Uint32*)hashInput, (Uint32*)alignedWorkspace, keyPartLenPtr); /* Calculate distribution key hash */ md5_hash(distrKeyHash, alignedWorkspace, len); /* Just one word used for distribution */ dstHash[1] = distrKeyHash[1]; } return 0; } Uint32 Dbspj::computeHash(Signal* signal, BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI) { /** * Essentially the same code as in Dbtc::hash(). * The code for user defined partitioning has been removed though. */ SegmentedSectionPtr ptr; getSection(ptr, ptrI); /* NOTE: md5_hash below require 64-bit alignment */ const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS= (MAX_KEY_SIZE_IN_WORDS + 1) / 2; Uint64 tmp64[MAX_KEY_SIZE_IN_LONG_WORDS]; Uint32 *tmp32 = (Uint32*)tmp64; copy(tmp32, ptr); const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId); ndbrequire(desc != NULL); bool need_special_hash = desc->hasCharAttr | (desc->noOfDistrKeys > 0); if (need_special_hash) { jam(); return handle_special_hash(tableId, dst.hashInfo, tmp64, ptr.sz, desc); } else { jam(); md5_hash(dst.hashInfo, tmp64, ptr.sz); return 0; } } /** * This function differs from computeHash in that *ptrI* * only contains partition key (packed) and not full primary key */ Uint32 Dbspj::computePartitionHash(Signal* signal, BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI) { SegmentedSectionPtr ptr; getSection(ptr, ptrI); /* NOTE: md5_hash below require 64-bit alignment */ const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS= (MAX_KEY_SIZE_IN_WORDS + 1) / 2; Uint64 _space[MAX_KEY_SIZE_IN_LONG_WORDS]; Uint64 *tmp64 = _space; Uint32 *tmp32 = (Uint32*)tmp64; Uint32 sz = ptr.sz; copy(tmp32, ptr); const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId); ndbrequire(desc != NULL); bool need_xfrm = desc->hasCharAttr || desc->noOfVarKeys; if (need_xfrm) { jam(); /** * xfrm distribution key */ Uint32 srcPos = 0; Uint32 dstPos = 0; Uint32 * src = tmp32; Uint32 * dst = signal->theData+24; for (Uint32 i = 0; i < desc->noOfKeyAttr; i++) { const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i]; if (AttributeDescriptor::getDKey(keyAttr.attributeDescriptor)) { xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo, src, srcPos, dst, dstPos, NDB_ARRAY_SIZE(signal->theData) - 24); } } tmp64 = (Uint64*)dst; sz = dstPos; } md5_hash(dst.hashInfo, tmp64, sz); return 0; } Uint32 Dbspj::getNodes(Signal* signal, BuildKeyReq& dst, Uint32 tableId) { Uint32 err; DiGetNodesReq * req = (DiGetNodesReq *)&signal->theData[0]; req->tableId = tableId; req->hashValue = dst.hashInfo[1]; req->distr_key_indicator = 0; // userDefinedPartitioning not supported! * (EmulatedJamBuffer**)req->jamBuffer = jamBuffer(); #if 1 EXECUTE_DIRECT(DBDIH, GSN_DIGETNODESREQ, signal, DiGetNodesReq::SignalLength, 0); #else sendSignal(DBDIH_REF, GSN_DIGETNODESREQ, signal, DiGetNodesReq::SignalLength, JBB); jamEntry(); #endif DiGetNodesConf * conf = (DiGetNodesConf *)&signal->theData[0]; err = signal->theData[0]; Uint32 Tdata2 = conf->reqinfo; Uint32 nodeId = conf->nodes[0]; Uint32 instanceKey = (Tdata2 >> 24) & 127; DEBUG("HASH to nodeId:" << nodeId << ", instanceKey:" << instanceKey); jamEntry(); if (unlikely(err != 0)) goto error; dst.fragId = conf->fragId; dst.fragDistKey = (Tdata2 >> 16) & 255; dst.receiverRef = numberToRef(DBLQH, instanceKey, nodeId); return 0; error: /** * TODO handle error */ ndbrequire(false); return err; } /** * END - MODULE LOOKUP */ /** * MODULE SCAN FRAG * * NOTE: This may only be root node */ const Dbspj::OpInfo Dbspj::g_ScanFragOpInfo = { &Dbspj::scanFrag_build, 0, // prepare &Dbspj::scanFrag_start, &Dbspj::scanFrag_execTRANSID_AI, 0, // execLQHKEYREF 0, // execLQHKEYCONF &Dbspj::scanFrag_execSCAN_FRAGREF, &Dbspj::scanFrag_execSCAN_FRAGCONF, 0, // parent row 0, // parent batch complete 0, // parent batch repeat 0, // Dbspj::scanFrag_parent_batch_cleanup, &Dbspj::scanFrag_execSCAN_NEXTREQ, 0, // Dbspj::scanFrag_complete &Dbspj::scanFrag_abort, 0, // execNODE_FAILREP, &Dbspj::scanFrag_cleanup }; Uint32 Dbspj::scanFrag_build(Build_context& ctx, Ptr<Request> requestPtr, const QueryNode* qn, const QueryNodeParameters* qp) { Uint32 err = 0; Ptr<TreeNode> treeNodePtr; const QN_ScanFragNode * node = (const QN_ScanFragNode*)qn; const QN_ScanFragParameters * param = (const QN_ScanFragParameters*)qp; do { err = createNode(ctx, requestPtr, treeNodePtr); if (unlikely(err != 0)) break; treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = RNIL; Ptr<ScanFragHandle> scanFragHandlePtr; if (unlikely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, scanFragHandlePtr) != true)) { err = DbspjErr::OutOfQueryMemory; break; } scanFragHandlePtr.p->m_treeNodePtrI = treeNodePtr.i; scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED; treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = scanFragHandlePtr.i; requestPtr.p->m_bits |= Request::RT_SCAN; treeNodePtr.p->m_info = &g_ScanFragOpInfo; treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED; treeNodePtr.p->m_batch_size = ctx.m_batch_size_rows; ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq; dst->senderData = scanFragHandlePtr.i; dst->resultRef = reference(); dst->resultData = treeNodePtr.i; dst->savePointId = ctx.m_savepointId; Uint32 transId1 = requestPtr.p->m_transId[0]; Uint32 transId2 = requestPtr.p->m_transId[1]; dst->transId1 = transId1; dst->transId2 = transId2; Uint32 treeBits = node->requestInfo; Uint32 paramBits = param->requestInfo; //ndbout_c("Dbspj::scanFrag_build() treeBits=%.8x paramBits=%.8x", // treeBits, paramBits); Uint32 requestInfo = 0; ScanFragReq::setReadCommittedFlag(requestInfo, 1); ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio); ScanFragReq::setCorrFactorFlag(requestInfo, 1); ScanFragReq::setNoDiskFlag(requestInfo, (treeBits & DABits::NI_LINKED_DISK) == 0 && (paramBits & DABits::PI_DISK_ATTR) == 0); dst->requestInfo = requestInfo; err = DbspjErr::InvalidTreeNodeSpecification; DEBUG("scanFrag_build: len=" << node->len); if (unlikely(node->len < QN_ScanFragNode::NodeSize)) break; dst->tableId = node->tableId; dst->schemaVersion = node->tableVersion; err = DbspjErr::InvalidTreeParametersSpecification; DEBUG("param len: " << param->len); if (unlikely(param->len < QN_ScanFragParameters::NodeSize)) { jam(); DEBUG_CRASH(); break; } ctx.m_resultData = param->resultData; /** * Parse stuff common lookup/scan-frag */ struct DABuffer nodeDA, paramDA; nodeDA.ptr = node->optional; nodeDA.end = nodeDA.ptr + (node->len - QN_ScanFragNode::NodeSize); paramDA.ptr = param->optional; paramDA.end = paramDA.ptr + (param->len - QN_ScanFragParameters::NodeSize); err = parseDA(ctx, requestPtr, treeNodePtr, nodeDA, treeBits, paramDA, paramBits); if (unlikely(err != 0)) { jam(); DEBUG_CRASH(); break; } ctx.m_scan_cnt++; ctx.m_scans.set(treeNodePtr.p->m_node_no); if (ctx.m_start_signal) { jam(); Signal* signal = ctx.m_start_signal; const ScanFragReq* src = (const ScanFragReq*)(signal->getDataPtr()); #if NOT_YET Uint32 instanceNo = blockToInstance(signal->header.theReceiversBlockNumber); treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH, instanceNo, getOwnNodeId()); #else treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH, getInstanceKey(src->tableId, src->fragmentNoKeyLen), getOwnNodeId()); #endif Uint32 fragId = src->fragmentNoKeyLen; Uint32 requestInfo = src->requestInfo; Uint32 batch_size_bytes = src->batch_size_bytes; Uint32 batch_size_rows = src->batch_size_rows; #ifdef VM_TRACE Uint32 savePointId = src->savePointId; Uint32 tableId = src->tableId; Uint32 schemaVersion = src->schemaVersion; Uint32 transId1 = src->transId1; Uint32 transId2 = src->transId2; #endif ndbassert(ScanFragReq::getLockMode(requestInfo) == 0); ndbassert(ScanFragReq::getHoldLockFlag(requestInfo) == 0); ndbassert(ScanFragReq::getKeyinfoFlag(requestInfo) == 0); ndbassert(ScanFragReq::getReadCommittedFlag(requestInfo) == 1); ndbassert(ScanFragReq::getLcpScanFlag(requestInfo) == 0); //ScanFragReq::getAttrLen(requestInfo); // ignore ndbassert(ScanFragReq::getReorgFlag(requestInfo) == 0); Uint32 tupScanFlag = ScanFragReq::getTupScanFlag(requestInfo); Uint32 rangeScanFlag = ScanFragReq::getRangeScanFlag(requestInfo); Uint32 descendingFlag = ScanFragReq::getDescendingFlag(requestInfo); Uint32 scanPrio = ScanFragReq::getScanPrio(requestInfo); Uint32 dst_requestInfo = dst->requestInfo; ScanFragReq::setTupScanFlag(dst_requestInfo,tupScanFlag); ScanFragReq::setRangeScanFlag(dst_requestInfo,rangeScanFlag); ScanFragReq::setDescendingFlag(dst_requestInfo,descendingFlag); ScanFragReq::setScanPrio(dst_requestInfo,scanPrio); /** * 'NoDiskFlag' should agree with information in treeNode */ ndbassert(ScanFragReq::getNoDiskFlag(requestInfo) == ScanFragReq::getNoDiskFlag(dst_requestInfo)); dst->fragmentNoKeyLen = fragId; dst->requestInfo = dst_requestInfo; dst->batch_size_bytes = batch_size_bytes; dst->batch_size_rows = batch_size_rows; #ifdef VM_TRACE ndbassert(dst->savePointId == savePointId); ndbassert(dst->tableId == tableId); ndbassert(dst->schemaVersion == schemaVersion); ndbassert(dst->transId1 == transId1); ndbassert(dst->transId2 == transId2); #endif treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i; treeNodePtr.p->m_bits |= TreeNode::T_ONE_SHOT; if (rangeScanFlag) { c_Counters.incr_counter(CI_RANGE_SCANS_RECEIVED, 1); } else { c_Counters.incr_counter(CI_TABLE_SCANS_RECEIVED, 1); } } else { ndbrequire(false); } return 0; } while (0); return err; } void Dbspj::scanFrag_start(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { scanFrag_send(signal, requestPtr, treeNodePtr); } void Dbspj::scanFrag_send(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); requestPtr.p->m_outstanding++; requestPtr.p->m_cnt_active++; treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; Ptr<ScanFragHandle> scanFragHandlePtr; m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p-> m_scanfrag_data.m_scanFragHandlePtrI); ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend()); memcpy(req, treeNodePtr.p->m_scanfrag_data.m_scanFragReq, sizeof(treeNodePtr.p->m_scanfrag_data.m_scanFragReq)); req->variableData[0] = treeNodePtr.p->m_send.m_correlation; req->variableData[1] = requestPtr.p->m_rootResultData; SectionHandle handle(this); Uint32 ref = treeNodePtr.p->m_send.m_ref; Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI; Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; /** * ScanFrag may only be used as root-node, i.e T_ONE_SHOT */ ndbrequire(treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT); /** * Pass sections to send */ treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL; treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL; getSection(handle.m_ptr[0], attrInfoPtrI); handle.m_cnt = 1; if (keyInfoPtrI != RNIL) { jam(); getSection(handle.m_ptr[1], keyInfoPtrI); handle.m_cnt = 2; } #ifdef DEBUG_SCAN_FRAGREQ ndbout_c("SCAN_FRAGREQ to %x", ref); printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), DBLQH); printf("ATTRINFO: "); print(handle.m_ptr[0], stdout); if (handle.m_cnt > 1) { printf("KEYINFO: "); print(handle.m_ptr[1], stdout); } #endif if (ScanFragReq::getRangeScanFlag(req->requestInfo)) { c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1); } else { c_Counters.incr_counter(CI_LOCAL_TABLE_SCANS_SENT, 1); } ndbrequire(refToNode(ref) == getOwnNodeId()); sendSignal(ref, GSN_SCAN_FRAGREQ, signal, NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), JBB, &handle); scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING; treeNodePtr.p->m_scanfrag_data.m_rows_received = 0; treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0); } void Dbspj::scanFrag_execTRANSID_AI(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, const RowPtr & rowRef) { jam(); treeNodePtr.p->m_scanfrag_data.m_rows_received++; LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes); Dependency_map::ConstDataBufferIterator it; { for (list.first(it); !it.isNull(); list.next(it)) { jam(); Ptr<TreeNode> childPtr; m_treenode_pool.getPtr(childPtr, * it.data); ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0); (this->*(childPtr.p->m_info->m_parent_row))(signal, requestPtr, childPtr,rowRef); } } if (treeNodePtr.p->m_scanfrag_data.m_rows_received == treeNodePtr.p->m_scanfrag_data.m_rows_expecting) { jam(); if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE) { jam(); reportBatchComplete(signal, requestPtr, treeNodePtr); } checkBatchComplete(signal, requestPtr, 1); return; } } void Dbspj::scanFrag_execSCAN_FRAGREF(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, Ptr<ScanFragHandle> scanFragHandlePtr) { const ScanFragRef* rep = reinterpret_cast<const ScanFragRef*>(signal->getDataPtr()); Uint32 errCode = rep->errorCode; DEBUG("scanFrag_execSCAN_FRAGREF, rep->senderData:" << rep->senderData << ", requestPtr.p->m_senderData:" << requestPtr.p->m_senderData); scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE; ndbrequire(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE); ndbrequire(requestPtr.p->m_cnt_active); requestPtr.p->m_cnt_active--; ndbrequire(requestPtr.p->m_outstanding); requestPtr.p->m_outstanding--; treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; abort(signal, requestPtr, errCode); } void Dbspj::scanFrag_execSCAN_FRAGCONF(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, Ptr<ScanFragHandle> scanFragHandlePtr) { const ScanFragConf * conf = reinterpret_cast<const ScanFragConf*>(signal->getDataPtr()); Uint32 rows = conf->completedOps; Uint32 done = conf->fragmentCompleted; Uint32 state = scanFragHandlePtr.p->m_state; if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0) { jam(); /** * We sent an explicit close request...ignore this...a close will come later */ return; } ndbrequire(done <= 2); // 0, 1, 2 (=ZSCAN_FRAG_CLOSED) ndbassert(treeNodePtr.p->m_scanfrag_data.m_rows_expecting == ~Uint32(0)); treeNodePtr.p->m_scanfrag_data.m_rows_expecting = rows; if (treeNodePtr.p->isLeaf()) { /** * If this is a leaf node, then no rows will be sent to the SPJ block, * as there are no child operations to instantiate. */ treeNodePtr.p->m_scanfrag_data.m_rows_received = rows; } requestPtr.p->m_rows += rows; if (done) { jam(); ndbrequire(requestPtr.p->m_cnt_active); requestPtr.p->m_cnt_active--; treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE; } else { jam(); scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ; } if (treeNodePtr.p->m_scanfrag_data.m_rows_expecting == treeNodePtr.p->m_scanfrag_data.m_rows_received || (state == ScanFragHandle::SFH_WAIT_CLOSE)) { jam(); if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE) { jam(); reportBatchComplete(signal, requestPtr, treeNodePtr); } checkBatchComplete(signal, requestPtr, 1); return; } } void Dbspj::scanFrag_execSCAN_NEXTREQ(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jamEntry(); Ptr<ScanFragHandle> scanFragHandlePtr; m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p-> m_scanfrag_data.m_scanFragHandlePtrI); const ScanFragReq * org = (ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq; ScanFragNextReq* req = reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend()); req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI; req->requestInfo = 0; req->transId1 = requestPtr.p->m_transId[0]; req->transId2 = requestPtr.p->m_transId[1]; req->batch_size_rows = org->batch_size_rows; req->batch_size_bytes = org->batch_size_bytes; DEBUG("scanFrag_execSCAN_NEXTREQ to: " << hex << treeNodePtr.p->m_send.m_ref << ", senderData: " << req->senderData); #ifdef DEBUG_SCAN_FRAGREQ printSCANFRAGNEXTREQ(stdout, &signal->theData[0], ScanFragNextReq::SignalLength, DBLQH); #endif sendSignal(treeNodePtr.p->m_send.m_ref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB); treeNodePtr.p->m_scanfrag_data.m_rows_received = 0; treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0); requestPtr.p->m_outstanding++; scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING; }//Dbspj::scanFrag_execSCAN_NEXTREQ() void Dbspj::scanFrag_abort(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); Ptr<ScanFragHandle> scanFragHandlePtr; m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p-> m_scanfrag_data.m_scanFragHandlePtrI); if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE) { jam(); switch(scanFragHandlePtr.p->m_state){ case ScanFragHandle::SFH_NOT_STARTED: case ScanFragHandle::SFH_COMPLETE: ndbrequire(false); // we shouldnt be TN_ACTIVE then... case ScanFragHandle::SFH_WAIT_CLOSE: jam(); // close already sent return; case ScanFragHandle::SFH_WAIT_NEXTREQ: jam(); // we were idle requestPtr.p->m_outstanding++; break; case ScanFragHandle::SFH_SCANNING: jam(); break; } treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0); scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE; ScanFragNextReq* req = reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend()); req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI; req->requestInfo = ScanFragNextReq::ZCLOSE; req->transId1 = requestPtr.p->m_transId[0]; req->transId2 = requestPtr.p->m_transId[1]; req->batch_size_rows = 0; req->batch_size_bytes = 0; sendSignal(treeNodePtr.p->m_send.m_ref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB); } } void Dbspj::scanFrag_cleanup(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { Uint32 ptrI = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI; if (ptrI != RNIL) { m_scanfraghandle_pool.release(ptrI); } cleanup_common(requestPtr, treeNodePtr); } /** * END - MODULE SCAN FRAG */ /** * MODULE SCAN INDEX * * NOTE: This may not be root-node */ const Dbspj::OpInfo Dbspj::g_ScanIndexOpInfo = { &Dbspj::scanIndex_build, &Dbspj::scanIndex_prepare, 0, // start &Dbspj::scanIndex_execTRANSID_AI, 0, // execLQHKEYREF 0, // execLQHKEYCONF &Dbspj::scanIndex_execSCAN_FRAGREF, &Dbspj::scanIndex_execSCAN_FRAGCONF, &Dbspj::scanIndex_parent_row, &Dbspj::scanIndex_parent_batch_complete, &Dbspj::scanIndex_parent_batch_repeat, &Dbspj::scanIndex_parent_batch_cleanup, &Dbspj::scanIndex_execSCAN_NEXTREQ, &Dbspj::scanIndex_complete, &Dbspj::scanIndex_abort, &Dbspj::scanIndex_execNODE_FAILREP, &Dbspj::scanIndex_cleanup }; Uint32 Dbspj::scanIndex_build(Build_context& ctx, Ptr<Request> requestPtr, const QueryNode* qn, const QueryNodeParameters* qp) { Uint32 err = 0; Ptr<TreeNode> treeNodePtr; const QN_ScanIndexNode * node = (const QN_ScanIndexNode*)qn; const QN_ScanIndexParameters * param = (const QN_ScanIndexParameters*)qp; do { err = createNode(ctx, requestPtr, treeNodePtr); if (unlikely(err != 0)) break; Uint32 batchSize = param->batchSize; requestPtr.p->m_bits |= Request::RT_SCAN; requestPtr.p->m_bits |= Request::RT_NEED_PREPARE; requestPtr.p->m_bits |= Request::RT_NEED_COMPLETE; treeNodePtr.p->m_info = &g_ScanIndexOpInfo; treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED; treeNodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED; treeNodePtr.p->m_batch_size = batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits); ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq; dst->senderData = treeNodePtr.i; dst->resultRef = reference(); dst->resultData = treeNodePtr.i; dst->savePointId = ctx.m_savepointId; dst->batch_size_rows = batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits); dst->batch_size_bytes = batchSize >> QN_ScanIndexParameters::BatchRowBits; Uint32 transId1 = requestPtr.p->m_transId[0]; Uint32 transId2 = requestPtr.p->m_transId[1]; dst->transId1 = transId1; dst->transId2 = transId2; Uint32 treeBits = node->requestInfo; Uint32 paramBits = param->requestInfo; Uint32 requestInfo = 0; ScanFragReq::setRangeScanFlag(requestInfo, 1); ScanFragReq::setReadCommittedFlag(requestInfo, 1); ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio); ScanFragReq::setNoDiskFlag(requestInfo, (treeBits & DABits::NI_LINKED_DISK) == 0 && (paramBits & DABits::PI_DISK_ATTR) == 0); ScanFragReq::setCorrFactorFlag(requestInfo, 1); dst->requestInfo = requestInfo; err = DbspjErr::InvalidTreeNodeSpecification; DEBUG("scanIndex_build: len=" << node->len); if (unlikely(node->len < QN_ScanIndexNode::NodeSize)) break; dst->tableId = node->tableId; dst->schemaVersion = node->tableVersion; err = DbspjErr::InvalidTreeParametersSpecification; DEBUG("param len: " << param->len); if (unlikely(param->len < QN_ScanIndexParameters::NodeSize)) { jam(); DEBUG_CRASH(); break; } ctx.m_resultData = param->resultData; /** * Parse stuff */ struct DABuffer nodeDA, paramDA; nodeDA.ptr = node->optional; nodeDA.end = nodeDA.ptr + (node->len - QN_ScanIndexNode::NodeSize); paramDA.ptr = param->optional; paramDA.end = paramDA.ptr + (param->len - QN_ScanIndexParameters::NodeSize); err = parseScanIndex(ctx, requestPtr, treeNodePtr, nodeDA, treeBits, paramDA, paramBits); if (unlikely(err != 0)) { jam(); DEBUG_CRASH(); break; } /** * Since we T_NEED_REPORT_BATCH_COMPLETED, we set * this on all our parents... */ Ptr<TreeNode> nodePtr; nodePtr.i = treeNodePtr.p->m_parentPtrI; while (nodePtr.i != RNIL) { jam(); m_treenode_pool.getPtr(nodePtr); nodePtr.p->m_bits |= TreeNode::T_REPORT_BATCH_COMPLETE; nodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED; nodePtr.i = nodePtr.p->m_parentPtrI; } /** * If there exists other scan TreeNodes not being among * my ancestors, results from this scanIndex may be repeated * as part of an X-scan. * * NOTE: The scan nodes being along the left deep ancestor chain * are not 'repeatable' as they are driving the * repeated X-scan and are thus not repeated themself. */ if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT && !treeNodePtr.p->m_ancestors.contains(ctx.m_scans)) { treeNodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE; } ctx.m_scan_cnt++; ctx.m_scans.set(treeNodePtr.p->m_node_no); return 0; } while (0); return err; } Uint32 Dbspj::parseScanIndex(Build_context& ctx, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, DABuffer tree, Uint32 treeBits, DABuffer param, Uint32 paramBits) { Uint32 err = 0; typedef QN_ScanIndexNode Node; typedef QN_ScanIndexParameters Params; do { jam(); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; data.m_fragments.init(); data.m_frags_outstanding = 0; data.m_frags_complete = 0; data.m_frags_not_started = 0; data.m_parallelismStat.init(); data.m_firstExecution = true; data.m_batch_chunks = 0; err = parseDA(ctx, requestPtr, treeNodePtr, tree, treeBits, param, paramBits); if (unlikely(err != 0)) break; if (treeBits & Node::SI_PRUNE_PATTERN) { Uint32 len_cnt = * tree.ptr ++; Uint32 len = len_cnt & 0xFFFF; // length of pattern in words Uint32 cnt = len_cnt >> 16; // no of parameters LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); ndbrequire((cnt==0) == ((treeBits & Node::SI_PRUNE_PARAMS) ==0)); ndbrequire((cnt==0) == ((paramBits & Params::SIP_PRUNE_PARAMS)==0)); if (treeBits & Node::SI_PRUNE_LINKED) { jam(); DEBUG("LINKED-PRUNE PATTERN w/ " << cnt << " PARAM values"); data.m_prunePattern.init(); Local_pattern_store pattern(pool, data.m_prunePattern); /** * Expand pattern into a new pattern (with linked values) */ err = expand(pattern, treeNodePtr, tree, len, param, cnt); if (unlikely(err != 0)) break; treeNodePtr.p->m_bits |= TreeNode::T_PRUNE_PATTERN; c_Counters.incr_counter(CI_PRUNED_RANGE_SCANS_RECEIVED, 1); } else { jam(); DEBUG("FIXED-PRUNE w/ " << cnt << " PARAM values"); /** * Expand pattern directly into * This means a "fixed" pruning from here on * i.e guaranteed single partition */ Uint32 prunePtrI = RNIL; bool hasNull; err = expand(prunePtrI, tree, len, param, cnt, hasNull); if (unlikely(err != 0)) break; if (unlikely(hasNull)) { /* API should have elliminated requests w/ const-NULL keys */ jam(); DEBUG("BEWARE: T_CONST_PRUNE-key contain NULL values"); // treeNodePtr.p->m_bits |= TreeNode::T_NULL_PRUNE; // break; ndbrequire(false); } ndbrequire(prunePtrI != RNIL); /* todo: can we allow / take advantage of NULLs in range scan? */ data.m_constPrunePtrI = prunePtrI; /** * We may not compute the partition for the hash-key here * as we have not yet opened a read-view */ treeNodePtr.p->m_bits |= TreeNode::T_CONST_PRUNE; c_Counters.incr_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED, 1); } } //SI_PRUNE_PATTERN if ((treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE) == 0 && ((treeBits & Node::SI_PARALLEL) || ((paramBits & Params::SIP_PARALLEL)))) { jam(); treeNodePtr.p->m_bits |= TreeNode::T_SCAN_PARALLEL; } return 0; } while(0); DEBUG_CRASH(); return err; } void Dbspj::scanIndex_prepare(Signal * signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); treeNodePtr.p->m_state = TreeNode::TN_PREPARING; ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq; DihScanTabReq * req = (DihScanTabReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = treeNodePtr.i; req->tableId = dst->tableId; req->schemaTransId = 0; sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal, DihScanTabReq::SignalLength, JBB); requestPtr.p->m_outstanding++; } void Dbspj::execDIH_SCAN_TAB_REF(Signal* signal) { jamEntry(); ndbrequire(false); } void Dbspj::execDIH_SCAN_TAB_CONF(Signal* signal) { jamEntry(); DihScanTabConf * conf = (DihScanTabConf*)signal->getDataPtr(); Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, conf->senderData); ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; Uint32 cookie = conf->scanCookie; Uint32 fragCount = conf->fragmentCount; ScanFragReq * dst = (ScanFragReq*)data.m_scanFragReq; if (conf->reorgFlag) { jam(); ScanFragReq::setReorgFlag(dst->requestInfo, 1); } data.m_fragCount = fragCount; data.m_scanCookie = cookie; const Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE; bool pruned = (treeNodePtr.p->m_bits & prunemask) != 0; Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); Ptr<ScanFragHandle> fragPtr; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr))) { jam(); fragPtr.p->init(0); fragPtr.p->m_treeNodePtrI = treeNodePtr.i; list.addLast(fragPtr); } else { jam(); goto error1; } if (treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE) { jam(); // TODO we need a different variant of computeHash here, // since m_constPrunePtrI does not contain full primary key // but only parts in distribution key BuildKeyReq tmp; Uint32 indexId = dst->tableId; Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId; Uint32 err = computePartitionHash(signal, tmp, tableId, data.m_constPrunePtrI); if (unlikely(err != 0)) goto error; releaseSection(data.m_constPrunePtrI); data.m_constPrunePtrI = RNIL; err = getNodes(signal, tmp, tableId); if (unlikely(err != 0)) goto error; fragPtr.p->m_fragId = tmp.fragId; fragPtr.p->m_ref = tmp.receiverRef; data.m_fragCount = 1; } else if (fragCount == 1) { jam(); /** * This is roughly equivalent to T_CONST_PRUNE * pretend that it is const-pruned */ if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) { jam(); LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_pattern_store pattern(pool, data.m_prunePattern); pattern.release(); } data.m_constPrunePtrI = RNIL; Uint32 clear = TreeNode::T_PRUNE_PATTERN | TreeNode::T_SCAN_PARALLEL; treeNodePtr.p->m_bits &= ~clear; treeNodePtr.p->m_bits |= TreeNode::T_CONST_PRUNE; /** * We must get fragPtr.p->m_ref...so set pruned=false */ pruned = false; } else { for (Uint32 i = 1; i<fragCount; i++) { jam(); Ptr<ScanFragHandle> fragPtr; if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr))) { jam(); fragPtr.p->init(i); fragPtr.p->m_treeNodePtrI = treeNodePtr.i; list.addLast(fragPtr); } else { goto error1; } } } data.m_frags_complete = data.m_fragCount; if (!pruned) { jam(); Uint32 tableId = ((ScanFragReq*)data.m_scanFragReq)->tableId; DihScanGetNodesReq * req = (DihScanGetNodesReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->tableId = tableId; req->scanCookie = cookie; Uint32 cnt = 0; for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr)) { jam(); req->senderData = fragPtr.i; req->fragId = fragPtr.p->m_fragId; sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, DihScanGetNodesReq::SignalLength, JBB); cnt++; } data.m_frags_outstanding = cnt; requestPtr.p->m_outstanding++; } else { jam(); treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; } checkPrepareComplete(signal, requestPtr, 1); return; error1: error: ndbrequire(false); } void Dbspj::execDIH_SCAN_GET_NODES_REF(Signal* signal) { jamEntry(); ndbrequire(false); } void Dbspj::execDIH_SCAN_GET_NODES_CONF(Signal* signal) { jamEntry(); DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr(); Uint32 senderData = conf->senderData; Uint32 node = conf->nodes[0]; Uint32 instanceKey = conf->instanceKey; Ptr<ScanFragHandle> fragPtr; m_scanfraghandle_pool.getPtr(fragPtr, senderData); Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, fragPtr.p->m_treeNodePtrI); ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; ndbrequire(data.m_frags_outstanding > 0); data.m_frags_outstanding--; fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node); if (data.m_frags_outstanding == 0) { jam(); treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); checkPrepareComplete(signal, requestPtr, 1); } } Uint32 Dbspj::scanIndex_findFrag(Local_ScanFragHandle_list & list, Ptr<ScanFragHandle> & fragPtr, Uint32 fragId) { for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr)) { jam(); if (fragPtr.p->m_fragId == fragId) { jam(); return 0; } } return 99; // TODO } void Dbspj::scanIndex_parent_row(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, const RowPtr & rowRef) { jam(); Uint32 err; ScanIndexData& data = treeNodePtr.p->m_scanindex_data; /** * Construct range definition, * and if prune pattern enabled * stuff it onto correct scanindexFrag */ do { Ptr<ScanFragHandle> fragPtr; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) { jam(); /** * TODO: Expand into linear memory instead * of expanding into sections, and then copy * section into linear */ Local_pattern_store pattern(pool, data.m_prunePattern); Uint32 pruneKeyPtrI = RNIL; bool hasNull; err = expand(pruneKeyPtrI, pattern, rowRef, hasNull); if (unlikely(err != 0)) { DEBUG_CRASH(); break; } if (unlikely(hasNull)) { jam(); DEBUG("T_PRUNE_PATTERN-key contain NULL values"); // Ignore this request as 'NULL == <column>' will never give a match if (pruneKeyPtrI != RNIL) { releaseSection(pruneKeyPtrI); } return; // Bailout, SCANREQ would have returned 0 rows anyway } // TODO we need a different variant of computeHash here, // since pruneKeyPtrI does not contain full primary key // but only parts in distribution key BuildKeyReq tmp; ScanFragReq * dst = (ScanFragReq*)data.m_scanFragReq; Uint32 indexId = dst->tableId; Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId; err = computePartitionHash(signal, tmp, tableId, pruneKeyPtrI); releaseSection(pruneKeyPtrI); // see ^ TODO if (unlikely(err != 0)) { DEBUG_CRASH(); break; } err = getNodes(signal, tmp, tableId); if (unlikely(err != 0)) { DEBUG_CRASH(); break; } err = scanIndex_findFrag(list, fragPtr, tmp.fragId); if (unlikely(err != 0)) { DEBUG_CRASH(); break; } /** * NOTE: We can get different receiverRef's here * for different keys. E.g during node-recovery where * primary-fragment is switched. * * Use latest that we receive * * TODO: Also double check table-reorg */ fragPtr.p->m_ref = tmp.receiverRef; } else { jam(); /** * If const prune, or no-prune, store on first fragment, * and send to 1 or all resp. */ list.first(fragPtr); } Uint32 ptrI = fragPtr.p->m_rangePtrI; bool hasNull; if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED) { jam(); Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern); err = expand(ptrI, pattern, rowRef, hasNull); if (unlikely(err != 0)) { DEBUG_CRASH(); break; } } else { jam(); // Fixed key...fix later... ndbrequire(false); } // ndbrequire(!hasNull); // FIXME, can't ignore request as we already added it to keyPattern fragPtr.p->m_rangePtrI = ptrI; scanIndex_fixupBound(fragPtr, ptrI, rowRef.m_src_correlation); if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT) { jam(); /** * We being a T_ONE_SHOT means that we're only be called * with parent_row once, i.e batch is complete */ scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr); } return; } while (0); ndbrequire(false); } void Dbspj::scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr, Uint32 ptrI, Uint32 corrVal) { /** * Index bounds...need special tender and care... * * 1) Set #bound no, bound-size, and renumber attributes */ SectionReader r0(ptrI, getSectionSegmentPool()); ndbrequire(r0.step(fragPtr.p->m_range_builder.m_range_size)); Uint32 boundsz = r0.getSize() - fragPtr.p->m_range_builder.m_range_size; Uint32 boundno = fragPtr.p->m_range_builder.m_range_cnt + 1; Uint32 tmp; ndbrequire(r0.peekWord(&tmp)); tmp |= (boundsz << 16) | ((corrVal & 0xFFF) << 4); ndbrequire(r0.updateWord(tmp)); ndbrequire(r0.step(1)); // Skip first BoundType // TODO: Renumbering below assume there are only EQ-bounds !! Uint32 id = 0; Uint32 len32; do { ndbrequire(r0.peekWord(&tmp)); AttributeHeader ah(tmp); Uint32 len = ah.getByteSize(); AttributeHeader::init(&tmp, id++, len); ndbrequire(r0.updateWord(tmp)); len32 = (len + 3) >> 2; } while (r0.step(2 + len32)); // Skip AttributeHeader(1) + Attribute(len32) + next BoundType(1) fragPtr.p->m_range_builder.m_range_cnt = boundno; fragPtr.p->m_range_builder.m_range_size = r0.getSize(); } void Dbspj::scanIndex_parent_batch_complete(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; data.m_rows_received = 0; data.m_rows_expecting = 0; ndbassert(data.m_frags_outstanding == 0); ndbassert(data.m_frags_complete == data.m_fragCount); data.m_frags_complete = 0; Ptr<ScanFragHandle> fragPtr; { Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); list.first(fragPtr); if ((treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) == 0) { if (fragPtr.p->m_rangePtrI == RNIL) { // No keys found jam(); data.m_frags_complete = data.m_fragCount; } } else { while(!fragPtr.isNull()) { if (fragPtr.p->m_rangePtrI == RNIL) { jam(); /** * This is a pruned scan, so we must scan those fragments that * some distribution key hashed to. */ fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; data.m_frags_complete++; } list.next(fragPtr); } } } data.m_frags_not_started = data.m_fragCount - data.m_frags_complete; if (data.m_frags_complete == data.m_fragCount) { jam(); /** * No keys was produced... */ return; } /** * When parent's batch is complete, we send our batch */ const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; ndbrequire(org->batch_size_rows > 0); if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) { jam(); data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, org->batch_size_rows); } else if (data.m_firstExecution) { /** * Having a high parallelism would allow us to fetch data from many * fragments in parallel and thus reduce the number of round trips. * On the other hand, we should set parallelism so low that we can fetch * all data from a fragment in one batch if possible. * Since this is the first execution, we do not know how many rows or bytes * this operation is likely to return. Therefore we set parallelism to 1, * since this gives the lowest penalty if our guess is wrong. */ jam(); data.m_parallelism = 1; } else { jam(); /** * Use statistics from earlier runs of this operation to estimate the * initial parallelism. We use the mean minus two times the standard * deviation to have a low risk of setting parallelism to high (as erring * in the other direction is more costly). */ Int32 parallelism = static_cast<Int32>(MIN(data.m_parallelismStat.getMean() - 2 * data.m_parallelismStat.getStdDev(), org->batch_size_rows)); if (parallelism < 1) { jam(); parallelism = 1; } else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0) { jam(); /** * Set parallelism such that we can expect to have similar * parallelism in each batch. For example if there are 8 remaining * fragments, then we should fecth 2 times 4 fragments rather than * 7+1. */ const Int32 roundTrips = 1 + (data.m_fragCount - data.m_frags_complete) / parallelism; parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips; } data.m_parallelism = static_cast<Uint32>(parallelism); #ifdef DEBUG_SCAN_FRAGREQ DEBUG("::scanIndex_send() starting index scan with parallelism=" << data.m_parallelism); #endif } ndbrequire(data.m_parallelism > 0); const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism; const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism; ndbassert(bs_rows > 0); ndbassert(bs_bytes > 0); data.m_largestBatchRows = 0; data.m_largestBatchBytes = 0; data.m_totalRows = 0; data.m_totalBytes = 0; { Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); Ptr<ScanFragHandle> fragPtr; list.first(fragPtr); while(!fragPtr.isNull()) { ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED || fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE); fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED; list.next(fragPtr); } } Uint32 batchRange = 0; scanIndex_send(signal, requestPtr, treeNodePtr, data.m_parallelism, bs_bytes, bs_rows, batchRange); data.m_firstExecution = false; ndbrequire(static_cast<Uint32>(data.m_frags_outstanding + data.m_frags_complete) <= data.m_fragCount); data.m_batch_chunks = 1; requestPtr.p->m_cnt_active++; requestPtr.p->m_outstanding++; treeNodePtr.p->m_state = TreeNode::TN_ACTIVE; } void Dbspj::scanIndex_parent_batch_repeat(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; DEBUG("scanIndex_parent_batch_repeat(), m_node_no: " << treeNodePtr.p->m_node_no << ", m_batch_chunks: " << data.m_batch_chunks); ndbassert(treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE); /** * Register index-scans to be restarted if we didn't get all * previously fetched parent related child rows in a single batch. */ if (data.m_batch_chunks > 1) { jam(); DEBUG("Register TreeNode for restart, m_node_no: " << treeNodePtr.p->m_node_no); ndbrequire(treeNodePtr.p->m_state != TreeNode::TN_ACTIVE); registerActiveCursor(requestPtr, treeNodePtr); data.m_batch_chunks = 0; } } /** * Ask for the first batch for a number of fragments. */ void Dbspj::scanIndex_send(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, Uint32 noOfFrags, Uint32 bs_bytes, Uint32 bs_rows, Uint32& batchRange) { /** * if (m_bits & prunemask): * - Range keys sliced out to each ScanFragHandle * - Else, range keys kept on first (and only) ScanFragHandle */ const bool prune = treeNodePtr.p->m_bits & (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE); /** * If scan is repeatable, we must make sure not to release range keys so * that we canuse them again in the next repetition. */ const bool repeatable = (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0; ScanIndexData& data = treeNodePtr.p->m_scanindex_data; ndbassert(noOfFrags > 0); ndbassert(data.m_frags_not_started >= noOfFrags); ScanFragReq* const req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend()); const ScanFragReq * const org = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq); memcpy(req, org, sizeof(data.m_scanFragReq)); // req->variableData[0] // set below req->variableData[1] = requestPtr.p->m_rootResultData; req->batch_size_bytes = bs_bytes; req->batch_size_rows = bs_rows; Uint32 requestsSent = 0; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); Ptr<ScanFragHandle> fragPtr; list.first(fragPtr); Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI; ndbrequire(prune || keyInfoPtrI != RNIL); /** * Iterate over the list of fragments until we have sent as many * SCAN_FRAGREQs as we should. */ while (requestsSent < noOfFrags) { jam(); ndbassert(!fragPtr.isNull()); if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED) { // Skip forward to the frags that we should send. jam(); list.next(fragPtr); continue; } const Uint32 ref = fragPtr.p->m_ref; if (noOfFrags==1 && !prune && data.m_frags_not_started == data.m_fragCount && refToNode(ref) != getOwnNodeId() && list.hasNext(fragPtr)) { /** * If we are doing a scan with adaptive parallelism and start with * parallelism=1 then it makes sense to fetch a batch from a fragment on * the local data node. The reason for this is that if that fragment * contains few rows, we may be able to read from several fragments in * parallel. Then we minimize the total number of round trips (to remote * data nodes) if we fetch the first fragment batch locally. */ jam(); list.next(fragPtr); continue; } SectionHandle handle(this); Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI; /** * Set data specific for this fragment */ req->senderData = fragPtr.i; req->fragmentNoKeyLen = fragPtr.p->m_fragId; if (prune) { jam(); keyInfoPtrI = fragPtr.p->m_rangePtrI; if (keyInfoPtrI == RNIL) { /** * Since we use pruning, we can see that no parent rows would hash * to this fragment. */ jam(); fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; list.next(fragPtr); continue; } if (!repeatable) { /** * If we'll use sendSignal() and we need to send the attrInfo several * times, we need to copy them. (For repeatable or unpruned scans * we use sendSignalNoRelease(), so then we do not need to copy.) */ jam(); Uint32 tmp = RNIL; ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error attrInfoPtrI = tmp; } } req->variableData[0] = batchRange; getSection(handle.m_ptr[0], attrInfoPtrI); getSection(handle.m_ptr[1], keyInfoPtrI); handle.m_cnt = 2; #if defined DEBUG_SCAN_FRAGREQ ndbout_c("SCAN_FRAGREQ to %x", ref); printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(), NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq), DBLQH); printf("ATTRINFO: "); print(handle.m_ptr[0], stdout); printf("KEYINFO: "); print(handle.m_ptr[1], stdout); #endif if (refToNode(ref) == getOwnNodeId()) { c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1); } else { c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1); } if (prune && !repeatable) { /** * For a non-repeatable pruned scan, key info is unique for each * fragment and therefore cannot be reused, so we release key info * right away. */ jam(); sendSignal(ref, GSN_SCAN_FRAGREQ, signal, NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); fragPtr.p->m_rangePtrI = RNIL; fragPtr.p->reset_ranges(); } else { /** * Reuse key info for multiple fragments and/or multiple repetitions * of the scan. */ jam(); sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal, NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle); } handle.clear(); fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running data.m_frags_outstanding++; batchRange += bs_rows; requestsSent++; list.next(fragPtr); } // while (requestsSent < noOfFrags) data.m_frags_not_started -= requestsSent; } void Dbspj::scanIndex_execTRANSID_AI(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, const RowPtr & rowRef) { jam(); LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes); Dependency_map::ConstDataBufferIterator it; { for (list.first(it); !it.isNull(); list.next(it)) { jam(); Ptr<TreeNode> childPtr; m_treenode_pool.getPtr(childPtr, * it.data); ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0); (this->*(childPtr.p->m_info->m_parent_row))(signal, requestPtr, childPtr,rowRef); } } ScanIndexData& data = treeNodePtr.p->m_scanindex_data; data.m_rows_received++; if (data.m_frags_outstanding == 0 && data.m_rows_received == data.m_rows_expecting) { jam(); /** * Finished... */ if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE) { jam(); reportBatchComplete(signal, requestPtr, treeNodePtr); } checkBatchComplete(signal, requestPtr, 1); return; } } void Dbspj::scanIndex_execSCAN_FRAGCONF(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, Ptr<ScanFragHandle> fragPtr) { jam(); const ScanFragConf * conf = (const ScanFragConf*)(signal->getDataPtr()); Uint32 rows = conf->completedOps; Uint32 done = conf->fragmentCompleted; Uint32 state = fragPtr.p->m_state; ScanIndexData& data = treeNodePtr.p->m_scanindex_data; if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0) { jam(); /** * We sent an explicit close request...ignore this...a close will come later */ return; } requestPtr.p->m_rows += rows; data.m_totalRows += rows; data.m_totalBytes += conf->total_len; data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows); data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len); if (!treeNodePtr.p->isLeaf()) { jam(); data.m_rows_expecting += rows; } ndbrequire(data.m_frags_outstanding); ndbrequire(state == ScanFragHandle::SFH_SCANNING || state == ScanFragHandle::SFH_WAIT_CLOSE); data.m_frags_outstanding--; fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ; if (done) { jam(); fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; ndbrequire(data.m_frags_complete < data.m_fragCount); data.m_frags_complete++; if (data.m_frags_complete == data.m_fragCount || ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 && data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))) { jam(); ndbrequire(requestPtr.p->m_cnt_active); requestPtr.p->m_cnt_active--; treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; } } if (data.m_frags_outstanding == 0) { const ScanFragReq * const org = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq); if (data.m_frags_complete == data.m_fragCount) { jam(); /** * Calculate what would have been the optimal parallelism for the * scan instance that we have just completed, and update * 'parallelismStat' with this value. We then use this statistics to set * the initial parallelism for the next instance of this operation. */ double parallelism = data.m_fragCount; if (data.m_totalRows > 0) { parallelism = MIN(parallelism, double(org->batch_size_rows) / data.m_totalRows); } if (data.m_totalBytes > 0) { parallelism = MIN(parallelism, double(org->batch_size_bytes) / data.m_totalBytes); } data.m_parallelismStat.update(parallelism); } /** * Don't reportBatchComplete to children if we're aborting... */ if (state == ScanFragHandle::SFH_WAIT_CLOSE) { jam(); ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0); } else if (! (data.m_rows_received == data.m_rows_expecting)) { jam(); return; } else { if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE) { jam(); reportBatchComplete(signal, requestPtr, treeNodePtr); } } checkBatchComplete(signal, requestPtr, 1); return; } } void Dbspj::scanIndex_execSCAN_FRAGREF(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, Ptr<ScanFragHandle> fragPtr) { jam(); const ScanFragRef * rep = CAST_CONSTPTR(ScanFragRef, signal->getDataPtr()); const Uint32 errCode = rep->errorCode; Uint32 state = fragPtr.p->m_state; ndbrequire(state == ScanFragHandle::SFH_SCANNING || state == ScanFragHandle::SFH_WAIT_CLOSE); fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; ScanIndexData& data = treeNodePtr.p->m_scanindex_data; ndbrequire(data.m_frags_complete < data.m_fragCount); data.m_frags_complete++; ndbrequire(data.m_frags_outstanding > 0); data.m_frags_outstanding--; if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)) { jam(); ndbrequire(requestPtr.p->m_cnt_active); requestPtr.p->m_cnt_active--; treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; } if (data.m_frags_outstanding == 0) { jam(); ndbrequire(requestPtr.p->m_outstanding); requestPtr.p->m_outstanding--; } abort(signal, requestPtr, errCode); } void Dbspj::scanIndex_execSCAN_NEXTREQ(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq; data.m_rows_received = 0; data.m_rows_expecting = 0; ndbassert(data.m_frags_outstanding == 0); ndbrequire(data.m_frags_complete < data.m_fragCount); if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0) { jam(); /** * Since fetching few but large batches is more efficient, we * set parallelism to the lowest value where we can still expect each * batch to be full. */ if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism && data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism) { jam(); data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, org->batch_size_rows); if (data.m_largestBatchRows > 0) { jam(); data.m_parallelism = MIN(org->batch_size_rows / data.m_largestBatchRows, data.m_parallelism); } if (data.m_largestBatchBytes > 0) { jam(); data.m_parallelism = MIN(data.m_parallelism, org->batch_size_bytes/data.m_largestBatchBytes); } if (data.m_frags_complete == 0 && data.m_frags_not_started % data.m_parallelism != 0) { jam(); /** * Set parallelism such that we can expect to have similar * parallelism in each batch. For example if there are 8 remaining * fragments, then we should fecth 2 times 4 fragments rather than * 7+1. */ const Uint32 roundTrips = 1 + data.m_frags_not_started / data.m_parallelism; data.m_parallelism = data.m_frags_not_started / roundTrips; } } else { jam(); // We get full batches, so we should lower parallelism. data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, MAX(1, data.m_parallelism/2)); } ndbassert(data.m_parallelism > 0); #ifdef DEBUG_SCAN_FRAGREQ DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " << data.m_parallelism << " fragments with " << org->batch_size_rows/data.m_parallelism << " rows and " << org->batch_size_bytes/data.m_parallelism << " bytes."); #endif } else { jam(); data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete, org->batch_size_rows); } const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism; ndbassert(bs_rows > 0); ScanFragNextReq* req = reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend()); req->requestInfo = 0; ScanFragNextReq::setCorrFactorFlag(req->requestInfo); req->transId1 = requestPtr.p->m_transId[0]; req->transId2 = requestPtr.p->m_transId[1]; req->batch_size_rows = bs_rows; req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism; Uint32 batchRange = 0; Ptr<ScanFragHandle> fragPtr; Uint32 sentFragCount = 0; { /** * First, ask for more data from fragments that are already started. */ Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); list.first(fragPtr); while (sentFragCount < data.m_parallelism && !fragPtr.isNull()) { jam(); ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ || fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE || fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED); if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ) { jam(); data.m_frags_outstanding++; req->variableData[0] = batchRange; fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; batchRange += bs_rows; DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex << treeNodePtr.p->m_send.m_ref << ", m_node_no=" << treeNodePtr.p->m_node_no << ", senderData: " << req->senderData); #ifdef DEBUG_SCAN_FRAGREQ printSCANFRAGNEXTREQ(stdout, &signal->theData[0], ScanFragNextReq:: SignalLength + 1, DBLQH); #endif req->senderData = fragPtr.i; sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength + 1, JBB); sentFragCount++; } list.next(fragPtr); } } if (sentFragCount < data.m_parallelism) { /** * Then start new fragments until we reach data.m_parallelism. */ jam(); ndbassert(data.m_frags_not_started != 0); scanIndex_send(signal, requestPtr, treeNodePtr, data.m_parallelism - sentFragCount, org->batch_size_bytes/data.m_parallelism, bs_rows, batchRange); } /** * cursor should not have been positioned here... * unless we actually had something more to send. * so require that we did actually send something */ ndbrequire(data.m_frags_outstanding > 0); ndbrequire(data.m_batch_chunks > 0); data.m_batch_chunks++; requestPtr.p->m_outstanding++; ndbassert(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE); } void Dbspj::scanIndex_complete(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq; if (!data.m_fragments.isEmpty()) { jam(); DihScanTabCompleteRep* rep=(DihScanTabCompleteRep*)signal->getDataPtrSend(); rep->tableId = dst->tableId; rep->scanCookie = data.m_scanCookie; sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP, signal, DihScanTabCompleteRep::SignalLength, JBB); } } void Dbspj::scanIndex_abort(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); switch(treeNodePtr.p->m_state){ case TreeNode::TN_BUILDING: case TreeNode::TN_PREPARING: case TreeNode::TN_INACTIVE: case TreeNode::TN_COMPLETING: case TreeNode::TN_END: ndbout_c("H'%.8x H'%.8x scanIndex_abort state: %u", requestPtr.p->m_transId[0], requestPtr.p->m_transId[1], treeNodePtr.p->m_state); return; case TreeNode::TN_ACTIVE: jam(); break; } ScanFragNextReq* req = CAST_PTR(ScanFragNextReq, signal->getDataPtrSend()); req->requestInfo = ScanFragNextReq::ZCLOSE; req->transId1 = requestPtr.p->m_transId[0]; req->transId2 = requestPtr.p->m_transId[1]; req->batch_size_rows = 0; req->batch_size_bytes = 0; ScanIndexData& data = treeNodePtr.p->m_scanindex_data; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); Ptr<ScanFragHandle> fragPtr; Uint32 cnt_waiting = 0; Uint32 cnt_scanning = 0; for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr)) { switch(fragPtr.p->m_state){ case ScanFragHandle::SFH_NOT_STARTED: case ScanFragHandle::SFH_COMPLETE: case ScanFragHandle::SFH_WAIT_CLOSE: jam(); break; case ScanFragHandle::SFH_WAIT_NEXTREQ: jam(); cnt_waiting++; // was idle... data.m_frags_outstanding++; // is closing goto do_abort; case ScanFragHandle::SFH_SCANNING: jam(); cnt_scanning++; goto do_abort; do_abort: req->senderData = fragPtr.i; sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB); fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE; break; } } if (cnt_scanning == 0) { if (cnt_waiting > 0) { /** * If all were waiting...this should increase m_outstanding */ jam(); requestPtr.p->m_outstanding++; } else { /** * All fragments are either complete or not yet started, so there is * nothing to abort. */ jam(); ndbassert(data.m_frags_not_started > 0); ndbrequire(requestPtr.p->m_cnt_active); requestPtr.p->m_cnt_active--; treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; } } } Uint32 Dbspj::scanIndex_execNODE_FAILREP(Signal* signal, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, NdbNodeBitmask nodes) { jam(); switch(treeNodePtr.p->m_state){ case TreeNode::TN_PREPARING: case TreeNode::TN_INACTIVE: return 1; case TreeNode::TN_BUILDING: case TreeNode::TN_COMPLETING: case TreeNode::TN_END: return 0; case TreeNode::TN_ACTIVE: jam(); break; } Uint32 sum = 0; ScanIndexData& data = treeNodePtr.p->m_scanindex_data; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); Ptr<ScanFragHandle> fragPtr; Uint32 save0 = data.m_frags_outstanding; Uint32 save1 = data.m_frags_complete; for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr)) { if (nodes.get(refToNode(fragPtr.p->m_ref)) == false) { jam(); /** * No action needed */ continue; } switch(fragPtr.p->m_state){ case ScanFragHandle::SFH_NOT_STARTED: jam(); ndbrequire(data.m_frags_complete < data.m_fragCount); data.m_frags_complete++; ndbrequire(data.m_frags_not_started > 0); data.m_frags_not_started--; // fall through case ScanFragHandle::SFH_COMPLETE: jam(); sum++; // indicate that we should abort /** * we could keep list of all fragments... * or execute DIGETNODES again... * but for now, we don't */ break; case ScanFragHandle::SFH_WAIT_CLOSE: case ScanFragHandle::SFH_SCANNING: jam(); ndbrequire(data.m_frags_outstanding > 0); data.m_frags_outstanding--; // fall through case ScanFragHandle::SFH_WAIT_NEXTREQ: jam(); sum++; ndbrequire(data.m_frags_complete < data.m_fragCount); data.m_frags_complete++; break; } fragPtr.p->m_ref = 0; fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE; } if (save0 != 0 && data.m_frags_outstanding == 0) { jam(); ndbrequire(requestPtr.p->m_outstanding); requestPtr.p->m_outstanding--; } if (save1 != 0 && data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)) { jam(); ndbrequire(requestPtr.p->m_cnt_active); requestPtr.p->m_cnt_active--; treeNodePtr.p->m_state = TreeNode::TN_INACTIVE; } return sum; } void Dbspj::scanIndex_release_rangekeys(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { jam(); DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i << " m_node_no: " << treeNodePtr.p->m_node_no); ScanIndexData& data = treeNodePtr.p->m_scanindex_data; Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); Ptr<ScanFragHandle> fragPtr; if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) { jam(); for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr)) { if (fragPtr.p->m_rangePtrI != RNIL) { releaseSection(fragPtr.p->m_rangePtrI); fragPtr.p->m_rangePtrI = RNIL; } fragPtr.p->reset_ranges(); } } else { jam(); list.first(fragPtr); if (fragPtr.p->m_rangePtrI != RNIL) { releaseSection(fragPtr.p->m_rangePtrI); fragPtr.p->m_rangePtrI = RNIL; } fragPtr.p->reset_ranges(); } } /** * Parent batch has completed, and will not refetch (X-joined) results * from its childs. Release & reset range keys which are unsent or we * have kept for possible resubmits. */ void Dbspj::scanIndex_parent_batch_cleanup(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { DEBUG("scanIndex_parent_batch_cleanup"); scanIndex_release_rangekeys(requestPtr,treeNodePtr); } void Dbspj::scanIndex_cleanup(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr) { ScanIndexData& data = treeNodePtr.p->m_scanindex_data; DEBUG("scanIndex_cleanup"); /** * Range keys has been collected wherever there are uncompleted * parent batches...release them to avoid memleak. */ scanIndex_release_rangekeys(requestPtr,treeNodePtr); { Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments); list.remove(); } if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) { jam(); LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_pattern_store pattern(pool, data.m_prunePattern); pattern.release(); } else if (treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE) { jam(); if (data.m_constPrunePtrI != RNIL) { jam(); releaseSection(data.m_constPrunePtrI); data.m_constPrunePtrI = RNIL; } } cleanup_common(requestPtr, treeNodePtr); } /** * END - MODULE SCAN INDEX */ /** * Static OpInfo handling */ const Dbspj::OpInfo* Dbspj::getOpInfo(Uint32 op) { DEBUG("getOpInfo(" << op << ")"); switch(op){ case QueryNode::QN_LOOKUP: return &Dbspj::g_LookupOpInfo; case QueryNode::QN_SCAN_FRAG: return &Dbspj::g_ScanFragOpInfo; case QueryNode::QN_SCAN_INDEX: return &Dbspj::g_ScanIndexOpInfo; default: return 0; } } /** * MODULE COMMON PARSE/UNPACK */ /** * @returns dstLen + 1 on error */ static Uint32 unpackList(Uint32 dstLen, Uint32 * dst, Dbspj::DABuffer & buffer) { const Uint32 * ptr = buffer.ptr; if (likely(ptr != buffer.end)) { Uint32 tmp = * ptr++; Uint32 cnt = tmp & 0xFFFF; * dst ++ = (tmp >> 16); // Store first DEBUG("cnt: " << cnt << " first: " << (tmp >> 16)); if (cnt > 1) { Uint32 len = cnt / 2; if (unlikely(cnt >= dstLen || (ptr + len > buffer.end))) goto error; cnt --; // subtract item stored in header for (Uint32 i = 0; i < cnt/2; i++) { * dst++ = (* ptr) & 0xFFFF; * dst++ = (* ptr) >> 16; ptr++; } if (cnt & 1) { * dst ++ = * ptr & 0xFFFF; ptr++; } cnt ++; // readd item stored in header } buffer.ptr = ptr; return cnt; } return 0; error: return dstLen + 1; } /** * This fuctions takes an array of attrinfo, and builds "header" * which can be used to do random access inside the row */ Uint32 Dbspj::buildRowHeader(RowPtr::Header * header, SegmentedSectionPtr ptr) { Uint32 tmp, len; Uint32 * dst = header->m_offset; const Uint32 * const save = dst; SectionReader r0(ptr, getSectionSegmentPool()); Uint32 offset = 0; do { * dst++ = offset; r0.getWord(&tmp); len = AttributeHeader::getDataSize(tmp); offset += 1 + len; } while (r0.step(len)); return header->m_len = static_cast<Uint32>(dst - save); } /** * This fuctions takes an array of attrinfo, and builds "header" * which can be used to do random access inside the row */ Uint32 Dbspj::buildRowHeader(RowPtr::Header * header, const Uint32 *& src, Uint32 len) { Uint32 * dst = header->m_offset; const Uint32 * save = dst; Uint32 offset = 0; for (Uint32 i = 0; i<len; i++) { * dst ++ = offset; Uint32 tmp = * src++; Uint32 tmp_len = AttributeHeader::getDataSize(tmp); offset += 1 + tmp_len; src += tmp_len; } return header->m_len = static_cast<Uint32>(dst - save); } Uint32 Dbspj::appendToPattern(Local_pattern_store & pattern, DABuffer & tree, Uint32 len) { if (unlikely(tree.ptr + len > tree.end)) return DbspjErr::InvalidTreeNodeSpecification; if (unlikely(pattern.append(tree.ptr, len)==0)) return DbspjErr::OutOfQueryMemory; tree.ptr += len; return 0; } Uint32 Dbspj::appendParamToPattern(Local_pattern_store& dst, const RowPtr::Linear & row, Uint32 col) { /** * TODO handle errors */ Uint32 offset = row.m_header->m_offset[col]; const Uint32 * ptr = row.m_data + offset; Uint32 len = AttributeHeader::getDataSize(* ptr ++); /* Param COL's converted to DATA when appended to pattern */ Uint32 info = QueryPattern::data(len); return dst.append(&info,1) && dst.append(ptr,len) ? 0 : DbspjErr::OutOfQueryMemory; } Uint32 Dbspj::appendParamHeadToPattern(Local_pattern_store& dst, const RowPtr::Linear & row, Uint32 col) { /** * TODO handle errors */ Uint32 offset = row.m_header->m_offset[col]; const Uint32 * ptr = row.m_data + offset; Uint32 len = AttributeHeader::getDataSize(*ptr); /* Param COL's converted to DATA when appended to pattern */ Uint32 info = QueryPattern::data(len+1); return dst.append(&info,1) && dst.append(ptr,len+1) ? 0 : DbspjErr::OutOfQueryMemory; } Uint32 Dbspj::appendTreeToSection(Uint32 & ptrI, SectionReader & tree, Uint32 len) { /** * TODO handle errors */ Uint32 SZ = 16; Uint32 tmp[16]; while (len > SZ) { jam(); tree.getWords(tmp, SZ); ndbrequire(appendToSection(ptrI, tmp, SZ)); len -= SZ; } tree.getWords(tmp, len); return appendToSection(ptrI, tmp, len) ? 0 : /** todo error code */ 1; #if TODO err: return 1; #endif } void Dbspj::getCorrelationData(const RowPtr::Section & row, Uint32 col, Uint32& correlationNumber) { /** * TODO handle errors */ SegmentedSectionPtr ptr(row.m_dataPtr); SectionReader reader(ptr, getSectionSegmentPool()); Uint32 offset = row.m_header->m_offset[col]; ndbrequire(reader.step(offset)); Uint32 tmp; ndbrequire(reader.getWord(&tmp)); Uint32 len = AttributeHeader::getDataSize(tmp); ndbrequire(len == 1); ndbrequire(AttributeHeader::getAttributeId(tmp) == AttributeHeader::CORR_FACTOR32); ndbrequire(reader.getWord(&correlationNumber)); } void Dbspj::getCorrelationData(const RowPtr::Linear & row, Uint32 col, Uint32& correlationNumber) { /** * TODO handle errors */ Uint32 offset = row.m_header->m_offset[col]; Uint32 tmp = row.m_data[offset]; Uint32 len = AttributeHeader::getDataSize(tmp); ndbrequire(len == 1); ndbrequire(AttributeHeader::getAttributeId(tmp) == AttributeHeader::CORR_FACTOR32); correlationNumber = row.m_data[offset+1]; } Uint32 Dbspj::appendColToSection(Uint32 & dst, const RowPtr::Section & row, Uint32 col, bool& hasNull) { /** * TODO handle errors */ SegmentedSectionPtr ptr(row.m_dataPtr); SectionReader reader(ptr, getSectionSegmentPool()); Uint32 offset = row.m_header->m_offset[col]; ndbrequire(reader.step(offset)); Uint32 tmp; ndbrequire(reader.getWord(&tmp)); Uint32 len = AttributeHeader::getDataSize(tmp); if (unlikely(len==0)) { jam(); hasNull = true; // NULL-value in key return 0; } return appendTreeToSection(dst, reader, len); } Uint32 Dbspj::appendColToSection(Uint32 & dst, const RowPtr::Linear & row, Uint32 col, bool& hasNull) { /** * TODO handle errors */ Uint32 offset = row.m_header->m_offset[col]; const Uint32 * ptr = row.m_data + offset; Uint32 len = AttributeHeader::getDataSize(* ptr ++); if (unlikely(len==0)) { jam(); hasNull = true; // NULL-value in key return 0; } return appendToSection(dst, ptr, len) ? 0 : DbspjErr::InvalidPattern; } Uint32 Dbspj::appendAttrinfoToSection(Uint32 & dst, const RowPtr::Linear & row, Uint32 col, bool& hasNull) { /** * TODO handle errors */ Uint32 offset = row.m_header->m_offset[col]; const Uint32 * ptr = row.m_data + offset; Uint32 len = AttributeHeader::getDataSize(* ptr); if (unlikely(len==0)) { jam(); hasNull = true; // NULL-value in key } return appendToSection(dst, ptr, 1 + len) ? 0 : DbspjErr::InvalidPattern; } Uint32 Dbspj::appendAttrinfoToSection(Uint32 & dst, const RowPtr::Section & row, Uint32 col, bool& hasNull) { /** * TODO handle errors */ SegmentedSectionPtr ptr(row.m_dataPtr); SectionReader reader(ptr, getSectionSegmentPool()); Uint32 offset = row.m_header->m_offset[col]; ndbrequire(reader.step(offset)); Uint32 tmp; ndbrequire(reader.peekWord(&tmp)); Uint32 len = AttributeHeader::getDataSize(tmp); if (unlikely(len==0)) { jam(); hasNull = true; // NULL-value in key } return appendTreeToSection(dst, reader, 1 + len); } /** * 'PkCol' is the composite NDB$PK column in an unique index consisting of * a fragment id and the composite PK value (all PK columns concatenated) */ Uint32 Dbspj::appendPkColToSection(Uint32 & dst, const RowPtr::Section & row, Uint32 col) { /** * TODO handle errors */ SegmentedSectionPtr ptr(row.m_dataPtr); SectionReader reader(ptr, getSectionSegmentPool()); Uint32 offset = row.m_header->m_offset[col]; ndbrequire(reader.step(offset)); Uint32 tmp; ndbrequire(reader.getWord(&tmp)); Uint32 len = AttributeHeader::getDataSize(tmp); ndbrequire(len>1); // NULL-value in PkKey is an error ndbrequire(reader.step(1)); // Skip fragid return appendTreeToSection(dst, reader, len-1); } /** * 'PkCol' is the composite NDB$PK column in an unique index consisting of * a fragment id and the composite PK value (all PK columns concatenated) */ Uint32 Dbspj::appendPkColToSection(Uint32 & dst, const RowPtr::Linear & row, Uint32 col) { Uint32 offset = row.m_header->m_offset[col]; Uint32 tmp = row.m_data[offset]; Uint32 len = AttributeHeader::getDataSize(tmp); ndbrequire(len>1); // NULL-value in PkKey is an error return appendToSection(dst, row.m_data+offset+2, len - 1) ? 0 : /** todo error code */ 1; } Uint32 Dbspj::appendFromParent(Uint32 & dst, Local_pattern_store& pattern, Local_pattern_store::ConstDataBufferIterator& it, Uint32 levels, const RowPtr & rowptr, bool& hasNull) { Ptr<TreeNode> treeNodePtr; m_treenode_pool.getPtr(treeNodePtr, rowptr.m_src_node_ptrI); Uint32 corrVal = rowptr.m_src_correlation; RowPtr targetRow; while (levels--) { jam(); if (unlikely(treeNodePtr.p->m_parentPtrI == RNIL)) { DEBUG_CRASH(); return DbspjErr::InvalidPattern; } m_treenode_pool.getPtr(treeNodePtr, treeNodePtr.p->m_parentPtrI); if (unlikely((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0)) { DEBUG_CRASH(); return DbspjErr::InvalidPattern; } RowRef ref; treeNodePtr.p->m_row_map.copyto(ref); Uint32 allocator = ref.m_allocator; const Uint32 * mapptr; if (allocator == 0) { jam(); mapptr = get_row_ptr_stack(ref); } else { jam(); mapptr = get_row_ptr_var(ref); } Uint32 pos = corrVal >> 16; // parent corr-val if (unlikely(! (pos < treeNodePtr.p->m_row_map.m_size))) { DEBUG_CRASH(); return DbspjErr::InvalidPattern; } // load ref to parent row treeNodePtr.p->m_row_map.load(mapptr, pos, ref); const Uint32 * rowptr; if (allocator == 0) { jam(); rowptr = get_row_ptr_stack(ref); } else { jam(); rowptr = get_row_ptr_var(ref); } setupRowPtr(treeNodePtr, targetRow, ref, rowptr); if (levels) { jam(); getCorrelationData(targetRow.m_row_data.m_linear, targetRow.m_row_data.m_linear.m_header->m_len - 1, corrVal); } } if (unlikely(it.isNull())) { DEBUG_CRASH(); return DbspjErr::InvalidPattern; } Uint32 info = *it.data; Uint32 type = QueryPattern::getType(info); Uint32 val = QueryPattern::getLength(info); pattern.next(it); switch(type){ case QueryPattern::P_COL: jam(); return appendColToSection(dst, targetRow.m_row_data.m_linear, val, hasNull); break; case QueryPattern::P_UNQ_PK: jam(); return appendPkColToSection(dst, targetRow.m_row_data.m_linear, val); break; case QueryPattern::P_ATTRINFO: jam(); return appendAttrinfoToSection(dst, targetRow.m_row_data.m_linear, val, hasNull); break; case QueryPattern::P_DATA: jam(); // retreiving DATA from parent...is...an error break; case QueryPattern::P_PARENT: jam(); // no point in nesting P_PARENT...an error break; case QueryPattern::P_PARAM: case QueryPattern::P_PARAM_HEADER: jam(); // should have been expanded during build break; } DEBUG_CRASH(); return DbspjErr::InvalidPattern; } Uint32 Dbspj::appendDataToSection(Uint32 & ptrI, Local_pattern_store& pattern, Local_pattern_store::ConstDataBufferIterator& it, Uint32 len, bool& hasNull) { if (unlikely(len==0)) { jam(); hasNull = true; return 0; } #if 0 /** * TODO handle errors */ Uint32 tmp[NDB_SECTION_SEGMENT_SZ]; while (len > NDB_SECTION_SEGMENT_SZ) { pattern.copyout(tmp, NDB_SECTION_SEGMENT_SZ, it); appendToSection(ptrI, tmp, NDB_SECTION_SEGMENT_SZ); len -= NDB_SECTION_SEGMENT_SZ; } pattern.copyout(tmp, len, it); appendToSection(ptrI, tmp, len); return 0; #else Uint32 remaining = len; Uint32 dstIdx = 0; Uint32 tmp[NDB_SECTION_SEGMENT_SZ]; while (remaining > 0 && !it.isNull()) { tmp[dstIdx] = *it.data; remaining--; dstIdx++; pattern.next(it); if (dstIdx == NDB_SECTION_SEGMENT_SZ || remaining == 0) { if (!appendToSection(ptrI, tmp, dstIdx)) { DEBUG_CRASH(); return DbspjErr::InvalidPattern; } dstIdx = 0; } } if (remaining > 0) { DEBUG_CRASH(); return DbspjErr::InvalidPattern; } else { return 0; } #endif } Uint32 Dbspj::createEmptySection(Uint32 & dst) { Uint32 tmp; SegmentedSectionPtr ptr; if (likely(import(ptr, &tmp, 0))) { jam(); dst = ptr.i; return 0; } jam(); return DbspjErr::OutOfSectionMemory; } /** * This function takes a pattern and a row and expands it into a section */ Uint32 Dbspj::expandS(Uint32 & _dst, Local_pattern_store& pattern, const RowPtr & row, bool& hasNull) { Uint32 err; Uint32 dst = _dst; hasNull = false; Local_pattern_store::ConstDataBufferIterator it; pattern.first(it); while (!it.isNull()) { Uint32 info = *it.data; Uint32 type = QueryPattern::getType(info); Uint32 val = QueryPattern::getLength(info); pattern.next(it); switch(type){ case QueryPattern::P_COL: jam(); err = appendColToSection(dst, row.m_row_data.m_section, val, hasNull); break; case QueryPattern::P_UNQ_PK: jam(); err = appendPkColToSection(dst, row.m_row_data.m_section, val); break; case QueryPattern::P_ATTRINFO: jam(); err = appendAttrinfoToSection(dst, row.m_row_data.m_section, val, hasNull); break; case QueryPattern::P_DATA: jam(); err = appendDataToSection(dst, pattern, it, val, hasNull); break; case QueryPattern::P_PARENT: jam(); // P_PARENT is a prefix to another pattern token // that permits code to access rows from earlier than immediate parent. // val is no of levels to move up the tree err = appendFromParent(dst, pattern, it, val, row, hasNull); break; // PARAM's was converted to DATA by ::expand(pattern...) case QueryPattern::P_PARAM: case QueryPattern::P_PARAM_HEADER: default: jam(); err = DbspjErr::InvalidPattern; DEBUG_CRASH(); } if (unlikely(err != 0)) { jam(); DEBUG_CRASH(); goto error; } } _dst = dst; return 0; error: jam(); return err; } /** * This function takes a pattern and a row and expands it into a section */ Uint32 Dbspj::expandL(Uint32 & _dst, Local_pattern_store& pattern, const RowPtr & row, bool& hasNull) { Uint32 err; Uint32 dst = _dst; hasNull = false; Local_pattern_store::ConstDataBufferIterator it; pattern.first(it); while (!it.isNull()) { Uint32 info = *it.data; Uint32 type = QueryPattern::getType(info); Uint32 val = QueryPattern::getLength(info); pattern.next(it); switch(type){ case QueryPattern::P_COL: jam(); err = appendColToSection(dst, row.m_row_data.m_linear, val, hasNull); break; case QueryPattern::P_UNQ_PK: jam(); err = appendPkColToSection(dst, row.m_row_data.m_linear, val); break; case QueryPattern::P_ATTRINFO: jam(); err = appendAttrinfoToSection(dst, row.m_row_data.m_linear, val, hasNull); break; case QueryPattern::P_DATA: jam(); err = appendDataToSection(dst, pattern, it, val, hasNull); break; case QueryPattern::P_PARENT: jam(); // P_PARENT is a prefix to another pattern token // that permits code to access rows from earlier than immediate parent // val is no of levels to move up the tree err = appendFromParent(dst, pattern, it, val, row, hasNull); break; // PARAM's was converted to DATA by ::expand(pattern...) case QueryPattern::P_PARAM: case QueryPattern::P_PARAM_HEADER: default: jam(); err = DbspjErr::InvalidPattern; DEBUG_CRASH(); } if (unlikely(err != 0)) { jam(); DEBUG_CRASH(); goto error; } } _dst = dst; return 0; error: jam(); return err; } Uint32 Dbspj::expand(Uint32 & ptrI, DABuffer& pattern, Uint32 len, DABuffer& param, Uint32 paramCnt, bool& hasNull) { /** * TODO handle error */ Uint32 err; Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE]; struct RowPtr::Linear row; row.m_data = param.ptr; row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]); buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt); Uint32 dst = ptrI; const Uint32 * ptr = pattern.ptr; const Uint32 * end = ptr + len; hasNull = false; for (; ptr < end; ) { Uint32 info = * ptr++; Uint32 type = QueryPattern::getType(info); Uint32 val = QueryPattern::getLength(info); switch(type){ case QueryPattern::P_PARAM: jam(); ndbassert(val < paramCnt); err = appendColToSection(dst, row, val, hasNull); break; case QueryPattern::P_PARAM_HEADER: jam(); ndbassert(val < paramCnt); err = appendAttrinfoToSection(dst, row, val, hasNull); break; case QueryPattern::P_DATA: if (unlikely(val==0)) { jam(); hasNull = true; err = 0; } else if (likely(appendToSection(dst, ptr, val))) { jam(); err = 0; } else { jam(); err = DbspjErr::InvalidPattern; } ptr += val; break; case QueryPattern::P_COL: // (linked) COL's not expected here case QueryPattern::P_PARENT: // Prefix to P_COL case QueryPattern::P_ATTRINFO: case QueryPattern::P_UNQ_PK: default: jam(); jamLine(type); err = DbspjErr::InvalidPattern; DEBUG_CRASH(); } if (unlikely(err != 0)) { jam(); DEBUG_CRASH(); goto error; } } /** * Iterate forward */ pattern.ptr = end; error: jam(); ptrI = dst; return err; } Uint32 Dbspj::expand(Local_pattern_store& dst, Ptr<TreeNode> treeNodePtr, DABuffer& pattern, Uint32 len, DABuffer& param, Uint32 paramCnt) { /** * TODO handle error */ Uint32 err; Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE]; struct RowPtr::Linear row; row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]); row.m_data = param.ptr; buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt); const Uint32 * end = pattern.ptr + len; for (; pattern.ptr < end; ) { Uint32 info = *pattern.ptr; Uint32 type = QueryPattern::getType(info); Uint32 val = QueryPattern::getLength(info); switch(type){ case QueryPattern::P_COL: case QueryPattern::P_UNQ_PK: case QueryPattern::P_ATTRINFO: jam(); err = appendToPattern(dst, pattern, 1); break; case QueryPattern::P_DATA: jam(); err = appendToPattern(dst, pattern, val+1); break; case QueryPattern::P_PARAM: jam(); // NOTE: Converted to P_DATA by appendParamToPattern ndbassert(val < paramCnt); err = appendParamToPattern(dst, row, val); pattern.ptr++; break; case QueryPattern::P_PARAM_HEADER: jam(); // NOTE: Converted to P_DATA by appendParamHeadToPattern ndbassert(val < paramCnt); err = appendParamHeadToPattern(dst, row, val); pattern.ptr++; break; case QueryPattern::P_PARENT: // Prefix to P_COL { jam(); err = appendToPattern(dst, pattern, 1); // Locate requested grandparent and request it to // T_ROW_BUFFER its result rows Ptr<TreeNode> parentPtr; m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI); while (val--) { jam(); ndbassert(parentPtr.p->m_parentPtrI != RNIL); m_treenode_pool.getPtr(parentPtr, parentPtr.p->m_parentPtrI); parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER; parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER_MAP; } Ptr<Request> requestPtr; m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI); requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS; break; } default: jam(); err = DbspjErr::InvalidPattern; DEBUG_CRASH(); } if (unlikely(err != 0)) { DEBUG_CRASH(); goto error; } } return 0; error: jam(); return err; } Uint32 Dbspj::parseDA(Build_context& ctx, Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, DABuffer& tree, Uint32 treeBits, DABuffer& param, Uint32 paramBits) { Uint32 err; Uint32 attrInfoPtrI = RNIL; Uint32 attrParamPtrI = RNIL; do { if (treeBits & DABits::NI_REPEAT_SCAN_RESULT) { jam(); DEBUG("use REPEAT_SCAN_RESULT when returning results"); requestPtr.p->m_bits |= Request::RT_REPEAT_SCAN_RESULT; } // DABits::NI_HAS_PARENT if (treeBits & DABits::NI_HAS_PARENT) { jam(); DEBUG("NI_HAS_PARENT"); /** * OPTIONAL PART 1: * * Parent nodes are stored first in optional part * this is a list of 16-bit numbers refering to * *earlier* nodes in tree * the list stores length of list as first 16-bit */ err = DbspjErr::InvalidTreeNodeSpecification; Uint32 dst[63]; Uint32 cnt = unpackList(NDB_ARRAY_SIZE(dst), dst, tree); if (unlikely(cnt > NDB_ARRAY_SIZE(dst))) { DEBUG_CRASH(); break; } err = 0; if (unlikely(cnt!=1)) { /** * Only a single parent supported for now, i.e only trees */ DEBUG_CRASH(); } for (Uint32 i = 0; i<cnt; i++) { DEBUG("adding " << dst[i] << " as parent"); Ptr<TreeNode> parentPtr = ctx.m_node_list[dst[i]]; LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_dependency_map map(pool, parentPtr.p->m_dependent_nodes); if (unlikely(!map.append(&treeNodePtr.i, 1))) { err = DbspjErr::OutOfQueryMemory; DEBUG_CRASH(); break; } parentPtr.p->m_bits &= ~(Uint32)TreeNode::T_LEAF; treeNodePtr.p->m_parentPtrI = parentPtr.i; // Build Bitmask of all ancestors to treeNode treeNodePtr.p->m_ancestors = parentPtr.p->m_ancestors; treeNodePtr.p->m_ancestors.set(parentPtr.p->m_node_no); } if (unlikely(err != 0)) break; } // DABits::NI_HAS_PARENT err = DbspjErr::InvalidTreeParametersSpecificationKeyParamBitsMissmatch; if (unlikely( ((treeBits & DABits::NI_KEY_PARAMS)==0) != ((paramBits & DABits::PI_KEY_PARAMS)==0))) { DEBUG_CRASH(); break; } if (treeBits & (DABits::NI_KEY_PARAMS | DABits::NI_KEY_LINKED | DABits::NI_KEY_CONSTS)) { jam(); DEBUG("NI_KEY_PARAMS | NI_KEY_LINKED | NI_KEY_CONSTS"); /** * OPTIONAL PART 2: * * If keys are parametrized or linked * DATA0[LO/HI] - Length of key pattern/#parameters to key */ Uint32 len_cnt = * tree.ptr ++; Uint32 len = len_cnt & 0xFFFF; // length of pattern in words Uint32 cnt = len_cnt >> 16; // no of parameters LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern); err = DbspjErr::InvalidTreeParametersSpecificationIncorrectKeyParamCount; if (unlikely( ((cnt==0) != ((treeBits & DABits::NI_KEY_PARAMS) == 0)) || ((cnt==0) != ((paramBits & DABits::PI_KEY_PARAMS) == 0)))) { DEBUG_CRASH(); break; } if (treeBits & DABits::NI_KEY_LINKED) { jam(); DEBUG("LINKED-KEY PATTERN w/ " << cnt << " PARAM values"); /** * Expand pattern into a new pattern (with linked values) */ err = expand(pattern, treeNodePtr, tree, len, param, cnt); /** * This node constructs a new key for each send */ treeNodePtr.p->m_bits |= TreeNode::T_KEYINFO_CONSTRUCTED; } else { jam(); DEBUG("FIXED-KEY w/ " << cnt << " PARAM values"); /** * Expand pattern directly into keyinfo * This means a "fixed" key from here on */ bool hasNull; Uint32 keyInfoPtrI = RNIL; err = expand(keyInfoPtrI, tree, len, param, cnt, hasNull); if (unlikely(hasNull)) { /* API should have elliminated requests w/ const-NULL keys */ jam(); DEBUG("BEWARE: FIXED-key contain NULL values"); // treeNodePtr.p->m_bits |= TreeNode::T_NULL_PRUNE; // break; ndbrequire(false); } treeNodePtr.p->m_send.m_keyInfoPtrI = keyInfoPtrI; } if (unlikely(err != 0)) { DEBUG_CRASH(); break; } } // DABits::NI_KEY_... const Uint32 mask = DABits::NI_LINKED_ATTR | DABits::NI_ATTR_INTERPRET | DABits::NI_ATTR_LINKED | DABits::NI_ATTR_PARAMS; if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0) { jam(); /** * OPTIONAL PART 3: attrinfo handling * - NI_LINKED_ATTR - these are attributes to be passed to children * - PI_ATTR_LIST - this is "user-columns" (passed as parameters) * - NI_ATTR_INTERPRET - tree contains interpreted program * - NI_ATTR_LINKED - means that the attr-info contains linked-values * - NI_ATTR_PARAMS - means that the attr-info is parameterized * PI_ATTR_PARAMS - means that the parameters contains attr parameters * * IF NI_ATTR_INTERPRET * DATA0[LO/HI] = Length of program / total #arguments to program * DATA1..N = Program * * IF NI_ATTR_PARAMS * DATA0[LO/HI] = Length / #param * DATA1..N = PARAM-0...PARAM-M * * IF PI_ATTR_INTERPRET * DATA0[LO/HI] = Length of program / Length of subroutine-part * DATA1..N = Program (scan filter) * * IF NI_ATTR_LINKED * DATA0[LO/HI] = Length / # * * */ Uint32 sections[5] = { 0, 0, 0, 0, 0 }; Uint32 * sectionptrs = 0; bool interpreted = (treeBits & DABits::NI_ATTR_INTERPRET) || (paramBits & DABits::PI_ATTR_INTERPRET) || (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED); if (interpreted) { /** * Add section headers for interpreted execution * and create pointer so that they can be updated later */ jam(); err = DbspjErr::OutOfSectionMemory; if (unlikely(!appendToSection(attrInfoPtrI, sections, 5))) { DEBUG_CRASH(); break; } SegmentedSectionPtr ptr; getSection(ptr, attrInfoPtrI); sectionptrs = ptr.p->theData; if (treeBits & DABits::NI_ATTR_INTERPRET) { jam(); /** * Having two interpreter programs is an error. */ err = DbspjErr::BothTreeAndParametersContainInterpretedProgram; if (unlikely(paramBits & DABits::PI_ATTR_INTERPRET)) { DEBUG_CRASH(); break; } treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED; Uint32 len2 = * tree.ptr++; Uint32 len_prg = len2 & 0xFFFF; // Length of interpret program Uint32 len_pattern = len2 >> 16;// Length of attr param pattern err = DbspjErr::OutOfSectionMemory; if (unlikely(!appendToSection(attrInfoPtrI, tree.ptr, len_prg))) { DEBUG_CRASH(); break; } tree.ptr += len_prg; sectionptrs[1] = len_prg; // size of interpret program Uint32 tmp = * tree.ptr ++; // attr-pattern header Uint32 cnt = tmp & 0xFFFF; if (treeBits & DABits::NI_ATTR_LINKED) { jam(); /** * Expand pattern into a new pattern (with linked values) */ LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); Local_pattern_store pattern(pool,treeNodePtr.p->m_attrParamPattern); err = expand(pattern, treeNodePtr, tree, len_pattern, param, cnt); if (unlikely(err)) { DEBUG_CRASH(); break; } /** * This node constructs a new attr-info for each send */ treeNodePtr.p->m_bits |= TreeNode::T_ATTRINFO_CONSTRUCTED; } else { jam(); /** * Expand pattern directly into attr-info param * This means a "fixed" attr-info param from here on */ bool hasNull; err = expand(attrParamPtrI, tree, len_pattern, param, cnt, hasNull); if (unlikely(err)) { DEBUG_CRASH(); break; } // ndbrequire(!hasNull); } } else // if (treeBits & DABits::NI_ATTR_INTERPRET) { jam(); /** * Only relevant for interpreted stuff */ ndbrequire((treeBits & DABits::NI_ATTR_PARAMS) == 0); ndbrequire((paramBits & DABits::PI_ATTR_PARAMS) == 0); ndbrequire((treeBits & DABits::NI_ATTR_LINKED) == 0); treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED; if (! (paramBits & DABits::PI_ATTR_INTERPRET)) { jam(); /** * Tree node has interpreted execution, * but no interpreted program specified * auto-add Exit_ok (i.e return each row) */ Uint32 tmp = Interpreter::ExitOK(); err = DbspjErr::OutOfSectionMemory; if (unlikely(!appendToSection(attrInfoPtrI, &tmp, 1))) { DEBUG_CRASH(); break; } sectionptrs[1] = 1; } } // if (treeBits & DABits::NI_ATTR_INTERPRET) } // if (interpreted) if (paramBits & DABits::PI_ATTR_INTERPRET) { jam(); /** * Add the interpreted code that represents the scan filter. */ const Uint32 len2 = * param.ptr++; Uint32 program_len = len2 & 0xFFFF; Uint32 subroutine_len = len2 >> 16; err = DbspjErr::OutOfSectionMemory; if (unlikely(!appendToSection(attrInfoPtrI, param.ptr, program_len))) { DEBUG_CRASH(); break; } /** * The interpreted code is added is in the "Interpreted execute region" * of the attrinfo (see Dbtup::interpreterStartLab() for details). * It will thus execute before reading the attributes that constitutes * the projections. */ sectionptrs[1] = program_len; param.ptr += program_len; if (subroutine_len) { if (unlikely(!appendToSection(attrParamPtrI, param.ptr, subroutine_len))) { DEBUG_CRASH(); break; } sectionptrs[4] = subroutine_len; param.ptr += subroutine_len; } treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED; } Uint32 sum_read = 0; Uint32 dst[MAX_ATTRIBUTES_IN_TABLE + 2]; if (paramBits & DABits::PI_ATTR_LIST) { jam(); Uint32 len = * param.ptr++; DEBUG("PI_ATTR_LIST"); treeNodePtr.p->m_bits |= TreeNode::T_USER_PROJECTION; err = DbspjErr::OutOfSectionMemory; if (!appendToSection(attrInfoPtrI, param.ptr, len)) { DEBUG_CRASH(); break; } param.ptr += len; /** * Insert a flush of this partial result set */ Uint32 flush[4]; flush[0] = AttributeHeader::FLUSH_AI << 16; flush[1] = ctx.m_resultRef; flush[2] = ctx.m_resultData; flush[3] = ctx.m_senderRef; // RouteRef if (!appendToSection(attrInfoPtrI, flush, 4)) { DEBUG_CRASH(); break; } sum_read += len + 4; } if (treeBits & DABits::NI_LINKED_ATTR) { jam(); DEBUG("NI_LINKED_ATTR"); err = DbspjErr::InvalidTreeNodeSpecification; Uint32 cnt = unpackList(MAX_ATTRIBUTES_IN_TABLE, dst, tree); if (unlikely(cnt > MAX_ATTRIBUTES_IN_TABLE)) { DEBUG_CRASH(); break; } /** * AttributeHeader contains attrId in 16-higher bits */ for (Uint32 i = 0; i<cnt; i++) dst[i] <<= 16; /** * Read correlation factor */ dst[cnt++] = AttributeHeader::CORR_FACTOR32 << 16; err = DbspjErr::OutOfSectionMemory; if (!appendToSection(attrInfoPtrI, dst, cnt)) { DEBUG_CRASH(); break; } sum_read += cnt; } if (interpreted) { jam(); /** * Let reads be performed *after* interpreted program * i.e in "final read"-section */ sectionptrs[3] = sum_read; if (attrParamPtrI != RNIL) { jam(); ndbrequire(!(treeNodePtr.p->m_bits&TreeNode::T_ATTRINFO_CONSTRUCTED)); SegmentedSectionPtr ptr; getSection(ptr, attrParamPtrI); { SectionReader r0(ptr, getSectionSegmentPool()); err = appendTreeToSection(attrInfoPtrI, r0, ptr.sz); sectionptrs[4] = ptr.sz; if (unlikely(err != 0)) { DEBUG_CRASH(); break; } } releaseSection(attrParamPtrI); } } treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI; } // if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0) return 0; } while (0); return err; } /** * END - MODULE COMMON PARSE/UNPACK */ /** * Process a scan request for an ndb$info table. (These are used for monitoring * purposes and do not contain application data.) */ void Dbspj::execDBINFO_SCANREQ(Signal *signal) { DbinfoScanReq req= * CAST_PTR(DbinfoScanReq, &signal->theData[0]); const Ndbinfo::ScanCursor* cursor = CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req)); Ndbinfo::Ratelimit rl; jamEntry(); switch(req.tableId){ // The SPJ block only implements the ndbinfo.counters table. case Ndbinfo::COUNTERS_TABLEID: { Ndbinfo::counter_entry counters[] = { { Ndbinfo::SPJ_READS_RECEIVED_COUNTER, c_Counters.get_counter(CI_READS_RECEIVED) }, { Ndbinfo::SPJ_LOCAL_READS_SENT_COUNTER, c_Counters.get_counter(CI_LOCAL_READS_SENT) }, { Ndbinfo::SPJ_REMOTE_READS_SENT_COUNTER, c_Counters.get_counter(CI_REMOTE_READS_SENT) }, { Ndbinfo::SPJ_READS_NOT_FOUND_COUNTER, c_Counters.get_counter(CI_READS_NOT_FOUND) }, { Ndbinfo::SPJ_TABLE_SCANS_RECEIVED_COUNTER, c_Counters.get_counter(CI_TABLE_SCANS_RECEIVED) }, { Ndbinfo::SPJ_LOCAL_TABLE_SCANS_SENT_COUNTER, c_Counters.get_counter(CI_LOCAL_TABLE_SCANS_SENT) }, { Ndbinfo::SPJ_RANGE_SCANS_RECEIVED_COUNTER, c_Counters.get_counter(CI_RANGE_SCANS_RECEIVED) }, { Ndbinfo::SPJ_LOCAL_RANGE_SCANS_SENT_COUNTER, c_Counters.get_counter(CI_LOCAL_RANGE_SCANS_SENT) }, { Ndbinfo::SPJ_REMOTE_RANGE_SCANS_SENT_COUNTER, c_Counters.get_counter(CI_REMOTE_RANGE_SCANS_SENT) }, { Ndbinfo::SPJ_SCAN_BATCHES_RETURNED_COUNTER, c_Counters.get_counter(CI_SCAN_BATCHES_RETURNED) }, { Ndbinfo::SPJ_SCAN_ROWS_RETURNED_COUNTER, c_Counters.get_counter(CI_SCAN_ROWS_RETURNED) }, { Ndbinfo::SPJ_PRUNED_RANGE_SCANS_RECEIVED_COUNTER, c_Counters.get_counter(CI_PRUNED_RANGE_SCANS_RECEIVED) }, { Ndbinfo::SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER, c_Counters.get_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED) } }; const size_t num_counters = sizeof(counters) / sizeof(counters[0]); Uint32 i = cursor->data[0]; const BlockNumber bn = blockToMain(number()); while(i < num_counters) { jam(); Ndbinfo::Row row(signal, req); row.write_uint32(getOwnNodeId()); row.write_uint32(bn); // block number row.write_uint32(instance()); // block instance row.write_uint32(counters[i].id); row.write_uint64(counters[i].val); ndbinfo_send_row(signal, req, row, rl); i++; if (rl.need_break(req)) { jam(); ndbinfo_send_scan_break(signal, req, rl, i); return; } } break; } default: break; } ndbinfo_send_scan_conf(signal, req, rl); } // Dbspj::execDBINFO_SCANREQ(Signal *signal) void Dbspj::IncrementalStatistics::update(double sample) { // Prevent wrap-around if(m_noOfSamples < 0xffffffff) { m_noOfSamples++; const double delta = sample - m_mean; m_mean += delta/m_noOfSamples; m_sumSquare += delta * (sample - m_mean); } }