storage/ndb/src/kernel/blocks/suma/Suma.cpp (5,503 lines of code) (raw):

/* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include <my_global.h> #include "Suma.hpp" #include <ndb_version.h> #include <NdbTCP.h> #include <Bitmask.hpp> #include <SimpleProperties.hpp> #include <signaldata/NodeFailRep.hpp> #include <signaldata/ReadNodesConf.hpp> #include <signaldata/ListTables.hpp> #include <signaldata/GetTabInfo.hpp> #include <signaldata/GetTableId.hpp> #include <signaldata/DictTabInfo.hpp> #include <signaldata/SumaImpl.hpp> #include <signaldata/ScanFrag.hpp> #include <signaldata/TransIdAI.hpp> #include <signaldata/CreateTrigImpl.hpp> #include <signaldata/DropTrigImpl.hpp> #include <signaldata/FireTrigOrd.hpp> #include <signaldata/TrigAttrInfo.hpp> #include <signaldata/CheckNodeGroups.hpp> #include <signaldata/CreateTab.hpp> #include <signaldata/DropTab.hpp> #include <signaldata/AlterTable.hpp> #include <signaldata/AlterTab.hpp> #include <signaldata/DihScanTab.hpp> #include <signaldata/SystemError.hpp> #include <signaldata/GCP.hpp> #include <signaldata/StopMe.hpp> #include <signaldata/DictLock.hpp> #include <ndbapi/NdbDictionary.hpp> #include <DebuggerNames.hpp> #include "../dbtup/Dbtup.hpp" #include "../dbdih/Dbdih.hpp" #include <signaldata/CreateNodegroup.hpp> #include <signaldata/CreateNodegroupImpl.hpp> #include <signaldata/DropNodegroup.hpp> #include <signaldata/DropNodegroupImpl.hpp> #include <signaldata/DbinfoScan.hpp> #include <signaldata/TransIdAI.hpp> #include <EventLogger.hpp> extern EventLogger * g_eventLogger; //#define HANDOVER_DEBUG //#define NODEFAIL_DEBUG //#define NODEFAIL_DEBUG2 //#define DEBUG_SUMA_SEQUENCE //#define EVENT_DEBUG //#define EVENT_PH3_DEBUG //#define EVENT_DEBUG2 #if 1 #undef DBUG_ENTER #undef DBUG_PRINT #undef DBUG_RETURN #undef DBUG_VOID_RETURN #if 0 #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);} #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;} #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); } #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; } #else #define DBUG_ENTER(a) #define DBUG_PRINT(a,b) #define DBUG_RETURN(a) return a #define DBUG_VOID_RETURN return #endif #endif #define DBG_3R 0 /** * @todo: * SUMA crashes if an index is created at the same time as * global replication. Very easy to reproduce using testIndex. * Note: This only happens occasionally, but is quite easy to reprod. */ Uint32 g_subPtrI = RNIL; static const Uint32 SUMA_SEQUENCE = 0xBABEBABE; static const Uint32 MAX_CONCURRENT_GCP = 2; /************************************************************** * * Start of suma * */ #define PRINT_ONLY 0 void Suma::execREAD_CONFIG_REQ(Signal* signal) { jamEntry(); const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; const ndb_mgm_configuration_iterator * p = m_ctx.m_config.getOwnConfigIterator(); ndbrequire(p != 0); // SumaParticipant Uint32 noTables, noAttrs, maxBufferedEpochs; ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE, &noTables); ndb_mgm_get_int_parameter(p, CFG_DICT_ATTRIBUTE, &noAttrs); ndb_mgm_get_int_parameter(p, CFG_DB_MAX_BUFFERED_EPOCHS, &maxBufferedEpochs); c_tablePool.setSize(noTables); c_tables.setSize(noTables); c_subscriptions.setSize(noTables); Uint32 cnt = 0; cnt = 0; ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIPTIONS, &cnt); if (cnt == 0) { jam(); cnt = noTables; } c_subscriptionPool.setSize(cnt); cnt *= 2; { Uint32 val = 0; ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIBERS, &val); if (val) { jam(); cnt = val; } } c_subscriberPool.setSize(cnt); cnt = 0; ndb_mgm_get_int_parameter(p, CFG_DB_SUB_OPERATIONS, &cnt); if (cnt) c_subOpPool.setSize(cnt); else c_subOpPool.setSize(256); c_syncPool.setSize(2); // Trix: max 5 concurrent index stats ops with max 9 words bounds Uint32 noOfBoundWords = 5 * 9; // XXX multiplies number of words by 15 ??? c_dataBufferPool.setSize(noAttrs + noOfBoundWords); c_maxBufferedEpochs = maxBufferedEpochs; // Calculate needed gcp pool as 10 records + the ones needed // during a possible api timeout Uint32 dbApiHbInterval, gcpInterval, microGcpInterval = 0; ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL, &dbApiHbInterval); ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL, &gcpInterval); ndb_mgm_get_int_parameter(p, CFG_DB_MICRO_GCP_INTERVAL, &microGcpInterval); if (microGcpInterval) { gcpInterval = microGcpInterval; } c_gcp_pool.setSize(10 + (4*dbApiHbInterval+gcpInterval-1)/gcpInterval); c_page_chunk_pool.setSize(50); { SLList<SyncRecord> tmp(c_syncPool); Ptr<SyncRecord> ptr; while(tmp.seize(ptr)) new (ptr.p) SyncRecord(* this, c_dataBufferPool); tmp.release(); } // Suma c_masterNodeId = getOwnNodeId(); c_nodeGroup = c_noNodesInGroup = 0; for (int i = 0; i < MAX_REPLICAS; i++) { c_nodesInGroup[i] = 0; } m_first_free_page= RNIL; c_no_of_buckets = 0; memset(c_buckets, 0, sizeof(c_buckets)); for(Uint32 i = 0; i<NO_OF_BUCKETS; i++) { Bucket* bucket= c_buckets+i; bucket->m_buffer_tail = RNIL; bucket->m_buffer_head.m_page_id = RNIL; bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS; } m_max_seen_gci = 0; // FIRE_TRIG_ORD m_max_sent_gci = 0; // FIRE_TRIG_ORD -> send m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP m_gcp_complete_rep_count = 0; m_out_of_buffer_gci = 0; m_missing_data = false; c_startup.m_wait_handover= false; c_failedApiNodes.clear(); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(ref, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB); } void Suma::execSTTOR(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSTTOR"); m_startphase = signal->theData[1]; m_typeOfStart = signal->theData[7]; DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", m_startphase, m_typeOfStart)); if(m_startphase == 3) { jam(); void* ptr = m_ctx.m_mm.get_memroot(); c_page_pool.set((Buffer_page*)ptr, (Uint32)~0); } if(m_startphase == 5) { jam(); if (ERROR_INSERTED(13029)) /* Hold startphase 5 */ { sendSignalWithDelay(SUMA_REF, GSN_STTOR, signal, 30, signal->getLength()); DBUG_VOID_RETURN; } signal->theData[0] = reference(); sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB); DBUG_VOID_RETURN; } if(m_startphase == 7) { if (m_typeOfStart != NodeState::ST_NODE_RESTART && m_typeOfStart != NodeState::ST_INITIAL_NODE_RESTART) { for( Uint32 i = 0; i < c_no_of_buckets; i++) { if (get_responsible_node(i) == getOwnNodeId()) { // I'm running this bucket DBUG_PRINT("info",("bucket %u set to true", i)); m_active_buckets.set(i); ndbout_c("m_active_buckets.set(%d)", i); } } } if(!m_active_buckets.isclear()) { NdbNodeBitmask tmp; Uint32 bucket = 0; while ((bucket = m_active_buckets.find(bucket)) != Bucket_mask::NotFound) { tmp.set(get_responsible_node(bucket, c_nodes_in_nodegroup_mask)); bucket++; } ndbassert(tmp.get(getOwnNodeId())); m_gcp_complete_rep_count = m_active_buckets.count(); } else m_gcp_complete_rep_count = 0; // I contribute 1 gcp complete rep if(m_typeOfStart == NodeState::ST_INITIAL_START && c_masterNodeId == getOwnNodeId()) { jam(); createSequence(signal); DBUG_VOID_RETURN; }//if if (ERROR_INSERTED(13030)) { ndbout_c("Dont start handover"); DBUG_VOID_RETURN; } }//if if(m_startphase == 100) { /** * Allow API's to connect */ sendSTTORRY(signal); DBUG_VOID_RETURN; } if(m_startphase == 101) { if (m_typeOfStart == NodeState::ST_NODE_RESTART || m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART) { /** * Handover code here */ c_startup.m_wait_handover= true; check_start_handover(signal); DBUG_VOID_RETURN; } } sendSTTORRY(signal); DBUG_VOID_RETURN; } #include <ndb_version.h> void Suma::send_dict_lock_req(Signal* signal, Uint32 state) { if (state == DictLockReq::SumaStartMe && !ndbd_suma_dictlock_startme(getNodeInfo(c_masterNodeId).m_version)) { jam(); goto notsupported; } else if (state == DictLockReq::SumaHandOver && !ndbd_suma_dictlock_handover(getNodeInfo(c_masterNodeId).m_version)) { jam(); goto notsupported; } { jam(); DictLockReq* req = (DictLockReq*)signal->getDataPtrSend(); req->lockType = state; req->userPtr = state; req->userRef = reference(); sendSignal(calcDictBlockRef(c_masterNodeId), GSN_DICT_LOCK_REQ, signal, DictLockReq::SignalLength, JBB); } return; notsupported: DictLockConf* conf = (DictLockConf*)signal->getDataPtrSend(); conf->userPtr = state; execDICT_LOCK_CONF(signal); } void Suma::execDICT_LOCK_CONF(Signal* signal) { jamEntry(); DictLockConf* conf = (DictLockConf*)signal->getDataPtr(); Uint32 state = conf->userPtr; switch(state){ case DictLockReq::SumaStartMe: jam(); c_startup.m_restart_server_node_id = 0; CRASH_INSERTION(13039); send_start_me_req(signal); return; case DictLockReq::SumaHandOver: jam(); send_handover_req(signal, SumaHandoverReq::RT_START_NODE); return; default: jam(); jamLine(state); ndbrequire(false); } } void Suma::execDICT_LOCK_REF(Signal* signal) { jamEntry(); DictLockRef* ref = (DictLockRef*)signal->getDataPtr(); Uint32 state = ref->userPtr; ndbrequire(ref->errorCode == DictLockRef::TooManyRequests); signal->theData[0] = SumaContinueB::RETRY_DICT_LOCK; signal->theData[1] = state; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 300, 2); } void Suma::send_dict_unlock_ord(Signal* signal, Uint32 state) { if (state == DictLockReq::SumaStartMe && !ndbd_suma_dictlock_startme(getNodeInfo(c_masterNodeId).m_version)) { jam(); return; } else if (state == DictLockReq::SumaHandOver && !ndbd_suma_dictlock_handover(getNodeInfo(c_masterNodeId).m_version)) { jam(); return; } jam(); DictUnlockOrd* ord = (DictUnlockOrd*)signal->getDataPtrSend(); ord->lockPtr = 0; ord->lockType = state; ord->senderData = state; ord->senderRef = reference(); sendSignal(calcDictBlockRef(c_masterNodeId), GSN_DICT_UNLOCK_ORD, signal, DictUnlockOrd::SignalLength, JBB); } void Suma::send_start_me_req(Signal* signal) { Uint32 nodeId= c_startup.m_restart_server_node_id; do { nodeId = c_alive_nodes.find(nodeId + 1); if(nodeId == getOwnNodeId()) continue; if(nodeId == NdbNodeBitmask::NotFound) { nodeId = 0; continue; } break; } while(true); infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId); c_startup.m_restart_server_node_id= nodeId; sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_START_ME_REQ, signal, 1, JBB); } void Suma::execSUMA_START_ME_REF(Signal* signal) { const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr(); Uint32 error = ref->errorCode; if (error != SumaStartMeRef::Busy && error != SumaStartMeRef::NotStarted) { jam(); // for some reason we did not manage to create a subscription // on the starting node SystemError * const sysErr = (SystemError*)&signal->theData[0]; sysErr->errorCode = SystemError::CopySubscriptionRef; sysErr->errorRef = reference(); sysErr->data[0] = error; sysErr->data[1] = 0; sendSignal(NDBCNTR_REF, GSN_SYSTEM_ERROR, signal, SystemError::SignalLength, JBB); return; } infoEvent("Suma: node %d refused %d", c_startup.m_restart_server_node_id, ref->errorCode); send_start_me_req(signal); } void Suma::execSUMA_START_ME_CONF(Signal* signal) { infoEvent("Suma: node %d has completed restoring me", c_startup.m_restart_server_node_id); sendSTTORRY(signal); send_dict_unlock_ord(signal, DictLockReq::SumaStartMe); c_startup.m_restart_server_node_id= 0; } void Suma::createSequence(Signal* signal) { jam(); DBUG_ENTER("Suma::createSequence"); UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend(); req->senderData = RNIL; req->sequenceId = SUMA_SEQUENCE; req->requestType = UtilSequenceReq::Create; sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB); // execUTIL_SEQUENCE_CONF will call createSequenceReply() DBUG_VOID_RETURN; } void Suma::createSequenceReply(Signal* signal, UtilSequenceConf * conf, UtilSequenceRef * ref) { jam(); if (ref != NULL) { switch ((UtilSequenceRef::ErrorCode)ref->errorCode) { case UtilSequenceRef::NoSuchSequence: ndbrequire(false); case UtilSequenceRef::TCError: { char buf[128]; BaseString::snprintf(buf, sizeof(buf), "Startup failed during sequence creation. TC error %d", ref->TCErrorCode); progError(__LINE__, NDBD_EXIT_RESOURCE_ALLOC_ERROR, buf); } } ndbrequire(false); } sendSTTORRY(signal); } void Suma::execREAD_NODESCONF(Signal* signal){ jamEntry(); ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr(); if(getNodeState().getNodeRestartInProgress()) { c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes); c_alive_nodes.set(getOwnNodeId()); } else { c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes); NdbNodeBitmask tmp; tmp.assign(NdbNodeBitmask::Size, conf->startedNodes); ndbrequire(tmp.isclear()); // No nodes can be started during SR } if (DBG_3R) { for (Uint32 i = 0; i<MAX_NDB_NODES; i++) { if (c_alive_nodes.get(i)) ndbout_c("%u c_alive_nodes.set(%u)", __LINE__, i); } } c_masterNodeId = conf->masterNodeId; getNodeGroupMembers(signal); } void Suma::getNodeGroupMembers(Signal* signal) { jam(); DBUG_ENTER("Suma::getNodeGroupMembers"); /** * Ask DIH for nodeGroupMembers */ CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend(); sd->blockRef = reference(); sd->requestType = CheckNodeGroups::GetNodeGroupMembers; sd->nodeId = getOwnNodeId(); sd->senderData = RNIL; sendSignal(DBDIH_REF, GSN_CHECKNODEGROUPSREQ, signal, CheckNodeGroups::SignalLength, JBB); DBUG_VOID_RETURN; } static bool valid_seq(Uint32 n, Uint32 r, Uint16 dst[]) { Uint16 tmp[MAX_REPLICAS]; for (Uint32 i = 0; i<r; i++) { tmp[i] = n % r; for (Uint32 j = 0; j<i; j++) if (tmp[j] == tmp[i]) return false; n /= r; } /** * reverse order for backward compatibility (with 2 replica) */ for (Uint32 i = 0; i<r; i++) dst[i] = tmp[r-i-1]; return true; } void Suma::fix_nodegroup() { Uint32 i, pos= 0; for (i = 0; i < MAX_NDB_NODES; i++) { if (c_nodes_in_nodegroup_mask.get(i)) { c_nodesInGroup[pos++] = i; } } const Uint32 replicas= c_noNodesInGroup = pos; if (replicas) { Uint32 buckets= 1; for(i = 1; i <= replicas; i++) buckets *= i; Uint32 tot = 0; switch(replicas){ case 1: tot = 1; break; case 2: tot = 4; // 2^2 break; case 3: tot = 27; // 3^3 break; case 4: tot = 256; // 4^4 break; ndbrequire(false); } Uint32 cnt = 0; for (i = 0; i<tot; i++) { Bucket* ptr= c_buckets + cnt; if (valid_seq(i, replicas, ptr->m_nodes)) { jam(); if (DBG_3R) printf("bucket %u : ", cnt); for (Uint32 j = 0; j<replicas; j++) { ptr->m_nodes[j] = c_nodesInGroup[ptr->m_nodes[j]]; if (DBG_3R) printf("%u ", ptr->m_nodes[j]); } if (DBG_3R) printf("\n"); cnt++; } } ndbrequire(cnt == buckets); c_no_of_buckets= buckets; } else { jam(); c_no_of_buckets = 0; } } void Suma::execCHECKNODEGROUPSCONF(Signal *signal) { const CheckNodeGroups *sd = (const CheckNodeGroups *)signal->getDataPtrSend(); DBUG_ENTER("Suma::execCHECKNODEGROUPSCONF"); jamEntry(); c_nodeGroup = sd->output; c_nodes_in_nodegroup_mask.assign(sd->mask); c_noNodesInGroup = c_nodes_in_nodegroup_mask.count(); fix_nodegroup(); #ifndef DBUG_OFF for (Uint32 i = 0; i < c_noNodesInGroup; i++) { DBUG_PRINT("exit",("Suma: NodeGroup %u, me %u, " "member[%u] %u", c_nodeGroup, getOwnNodeId(), i, c_nodesInGroup[i])); } #endif c_startup.m_restart_server_node_id = 0; if (m_typeOfStart == NodeState::ST_NODE_RESTART || m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART) { jam(); send_dict_lock_req(signal, DictLockReq::SumaStartMe); return; } c_startup.m_restart_server_node_id = 0; sendSTTORRY(signal); DBUG_VOID_RETURN; } void Suma::execAPI_START_REP(Signal* signal) { Uint32 nodeId = signal->theData[0]; c_connected_nodes.set(nodeId); check_start_handover(signal); } void Suma::check_start_handover(Signal* signal) { if(c_startup.m_wait_handover) { NodeBitmask tmp; tmp.assign(c_connected_nodes); tmp.bitAND(c_subscriber_nodes); if(!c_subscriber_nodes.equal(tmp)) { return; } c_startup.m_wait_handover= false; if (c_no_of_buckets) { jam(); send_dict_lock_req(signal, DictLockReq::SumaHandOver); } else { jam(); sendSTTORRY(signal); } } } void Suma::send_handover_req(Signal* signal, Uint32 type) { jam(); c_startup.m_handover_nodes.assign(c_alive_nodes); c_startup.m_handover_nodes.bitAND(c_nodes_in_nodegroup_mask); c_startup.m_handover_nodes.clear(getOwnNodeId()); Uint32 gci= Uint32(m_last_complete_gci >> 32) + 3; SumaHandoverReq* req= (SumaHandoverReq*)signal->getDataPtrSend(); char buf[255]; c_startup.m_handover_nodes.getText(buf); infoEvent("Suma: initiate handover for %s with nodes %s GCI: %u", (type == SumaHandoverReq::RT_START_NODE ? "startup" : "shutdown"), buf, gci); req->gci = gci; req->nodeId = getOwnNodeId(); req->requestType = type; NodeReceiverGroup rg(SUMA, c_startup.m_handover_nodes); sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal, SumaHandoverReq::SignalLength, JBB); } void Suma::sendSTTORRY(Signal* signal){ signal->theData[0] = 0; signal->theData[3] = 1; signal->theData[4] = 3; signal->theData[5] = 5; signal->theData[6] = 7; signal->theData[7] = 100; signal->theData[8] = 101; signal->theData[9] = 255; // No more start phases from missra sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB); } void Suma::execNDB_STTOR(Signal* signal) { jamEntry(); } void Suma::execCONTINUEB(Signal* signal){ jamEntry(); Uint32 type= signal->theData[0]; switch(type){ case SumaContinueB::RELEASE_GCI: { Uint32 gci_hi = signal->theData[2]; Uint32 gci_lo = signal->theData[3]; Uint64 gci = gci_lo | (Uint64(gci_hi) << 32); release_gci(signal, signal->theData[1], gci); return; } case SumaContinueB::RESEND_BUCKET: { Uint32 min_gci_hi = signal->theData[2]; Uint32 min_gci_lo = signal->theData[5]; Uint32 last_gci_hi = signal->theData[4]; Uint32 last_gci_lo = signal->theData[6]; Uint64 min_gci = min_gci_lo | (Uint64(min_gci_hi) << 32); Uint64 last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32); resend_bucket(signal, signal->theData[1], min_gci, signal->theData[3], last_gci); return; } case SumaContinueB::OUT_OF_BUFFER_RELEASE: out_of_buffer_release(signal, signal->theData[1]); return; case SumaContinueB::API_FAIL_GCI_LIST: api_fail_gci_list(signal, signal->theData[1]); return; case SumaContinueB::API_FAIL_SUBSCRIBER_LIST: api_fail_subscriber_list(signal, signal->theData[1]); return; case SumaContinueB::API_FAIL_SUBSCRIPTION: api_fail_subscription(signal); return; case SumaContinueB::SUB_STOP_REQ: sub_stop_req(signal); return; case SumaContinueB::RETRY_DICT_LOCK: jam(); send_dict_lock_req(signal, signal->theData[1]); return; } } /***************************************************************************** * * Node state handling * *****************************************************************************/ void Suma::execAPI_FAILREQ(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execAPI_FAILREQ"); Uint32 failedApiNode = signal->theData[0]; ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR c_connected_nodes.clear(failedApiNode); if (c_failedApiNodes.get(failedApiNode)) { jam(); /* Being handled already, just conf */ goto CONF; } if (!c_subscriber_nodes.get(failedApiNode)) { jam(); /* No Subscribers on that node, no SUMA * specific work to do */ goto BLOCK_CLEANUP; } c_failedApiNodes.set(failedApiNode); c_subscriber_nodes.clear(failedApiNode); c_subscriber_per_node[failedApiNode] = 0; c_failedApiNodesState[failedApiNode] = __LINE__; check_start_handover(signal); signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST; signal->theData[1] = failedApiNode; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB); return; BLOCK_CLEANUP: jam(); api_fail_block_cleanup(signal, failedApiNode); DBUG_VOID_RETURN; CONF: jam(); signal->theData[0] = failedApiNode; signal->theData[1] = reference(); sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB); c_failedApiNodesState[failedApiNode] = 0; DBUG_VOID_RETURN; }//execAPI_FAILREQ() void Suma::api_fail_block_cleanup_callback(Signal* signal, Uint32 failedNodeId, Uint32 elementsCleaned) { jamEntry(); /* Suma should not have any block level elements * to be cleaned (Fragmented send/receive structures etc.) * As it only uses Fragmented send/receive locally */ ndbassert(elementsCleaned == 0); /* Node failure handling is complete */ signal->theData[0] = failedNodeId; signal->theData[1] = reference(); sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB); c_failedApiNodes.clear(failedNodeId); c_failedApiNodesState[failedNodeId] = 0; } void Suma::api_fail_block_cleanup(Signal* signal, Uint32 failedNode) { jam(); c_failedApiNodesState[failedNode] = __LINE__; Callback cb = {safe_cast(&Suma::api_fail_block_cleanup_callback), failedNode}; simBlockNodeFailure(signal, failedNode, cb); } void Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId) { jam(); Ptr<Gcp_record> gcp; if (c_gcp_list.first(gcp)) { jam(); gcp.p->m_subscribers.bitAND(c_subscriber_nodes); if (gcp.p->m_subscribers.isclear()) { jam(); SubGcpCompleteAck* ack = (SubGcpCompleteAck*)signal->getDataPtrSend(); ack->rep.gci_hi = Uint32(gcp.p->m_gci >> 32); ack->rep.gci_lo = Uint32(gcp.p->m_gci); ack->rep.senderRef = reference(); NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask); sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal, SubGcpCompleteAck::SignalLength, JBB); c_gcp_list.release(gcp); c_failedApiNodesState[nodeId] = __LINE__; signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST; signal->theData[1] = nodeId; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB); return; } } if (ERROR_INSERTED(13023)) { CLEAR_ERROR_INSERT_VALUE; } signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST; signal->theData[1] = nodeId; signal->theData[2] = RNIL; // SubOpPtr signal->theData[3] = RNIL; // c_subscribers bucket signal->theData[4] = RNIL; // subscriptionId signal->theData[5] = RNIL; // SubscriptionKey Ptr<SubOpRecord> subOpPtr; if (c_subOpPool.seize(subOpPtr)) { c_failedApiNodesState[nodeId] = __LINE__; signal->theData[2] = subOpPtr.i; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB); } else { c_failedApiNodesState[nodeId] = __LINE__; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); } return; } void Suma::api_fail_subscriber_list(Signal* signal, Uint32 nodeId) { jam(); Ptr<SubOpRecord> subOpPtr; if (c_outstanding_drop_trig_req > 9) { jam(); /** * Make sure not to overflow DbtupProxy with too many GSN_DROP_TRIG_IMPL_REQ * 9 is arbitrary number... */ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, signal->getLength()); return; } subOpPtr.i = signal->theData[2]; if (subOpPtr.i == RNIL) { if (c_subOpPool.seize(subOpPtr)) { signal->theData[3] = RNIL; } else { jam(); sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); c_failedApiNodesState[nodeId] = __LINE__; return; } } else { jam(); c_subOpPool.getPtr(subOpPtr); } Uint32 bucket = signal->theData[3]; Uint32 subscriptionId = signal->theData[4]; Uint32 subscriptionKey = signal->theData[5]; DLHashTable<Subscription>::Iterator iter; if (bucket == RNIL) { jam(); c_subscriptions.first(iter); c_failedApiNodesState[nodeId] = __LINE__; } else { jam(); Subscription key; key.m_subscriptionId = subscriptionId; key.m_subscriptionKey = subscriptionKey; if (c_subscriptions.find(iter.curr, key) == false) { jam(); /** * We restart from this bucket :-( */ c_subscriptions.next(bucket, iter); c_failedApiNodesState[nodeId] = __LINE__; } else { iter.bucket = bucket; } } if (iter.curr.isNull()) { jam(); api_fail_block_cleanup(signal, nodeId); c_failedApiNodesState[nodeId] = __LINE__; return; } subOpPtr.p->m_opType = SubOpRecord::R_API_FAIL_REQ; subOpPtr.p->m_subPtrI = iter.curr.i; subOpPtr.p->m_senderRef = nodeId; subOpPtr.p->m_senderData = iter.bucket; LocalDLFifoList<SubOpRecord> list(c_subOpPool, iter.curr.p->m_stop_req); bool empty = list.isEmpty(); list.add(subOpPtr); if (empty) { jam(); c_failedApiNodesState[nodeId] = __LINE__; signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION; signal->theData[1] = subOpPtr.i; signal->theData[2] = RNIL; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); } else { jam(); c_failedApiNodesState[nodeId] = __LINE__; } } void Suma::api_fail_subscription(Signal* signal) { jam(); Ptr<SubOpRecord> subOpPtr; c_subOpPool.getPtr(subOpPtr, signal->theData[1]); Uint32 nodeId = subOpPtr.p->m_senderRef; Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI); Ptr<Subscriber> ptr; { LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); if (signal->theData[2] == RNIL) { jam(); list.first(ptr); } else { jam(); list.getPtr(ptr, signal->theData[2]); } for (Uint32 i = 0; i<32 && !ptr.isNull(); i++) { jam(); if (refToNode(ptr.p->m_senderRef) == nodeId) { jam(); Ptr<Subscriber> tmp = ptr; list.next(ptr); list.remove(tmp); /** * NOTE: remove before...so we done send UNSUBSCRIBE to self (yuck) */ bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE; send_sub_start_stop_event(signal, tmp, NdbDictionary::Event::_TE_STOP, report, list); c_subscriberPool.release(tmp); } else { jam(); list.next(ptr); } } } if (!ptr.isNull()) { jam(); c_failedApiNodesState[nodeId] = __LINE__; signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION; signal->theData[1] = subOpPtr.i; signal->theData[2] = ptr.i; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); return; } // Start potential waiter(s) check_remove_queue(signal, subPtr, subOpPtr, true, false); check_release_subscription(signal, subPtr); // Continue iterating through subscriptions DLHashTable<Subscription>::Iterator iter; iter.bucket = subOpPtr.p->m_senderData; iter.curr = subPtr; if (c_subscriptions.next(iter)) { jam(); c_failedApiNodesState[nodeId] = __LINE__; signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST; signal->theData[1] = nodeId; signal->theData[2] = subOpPtr.i; signal->theData[3] = iter.bucket; signal->theData[4] = iter.curr.p->m_subscriptionId; // subscriptionId signal->theData[5] = iter.curr.p->m_subscriptionKey; // SubscriptionKey sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB); return; } c_subOpPool.release(subOpPtr); /* Now do block level cleanup */ api_fail_block_cleanup(signal, nodeId); } void Suma::execNODE_FAILREP(Signal* signal){ jamEntry(); DBUG_ENTER("Suma::execNODE_FAILREP"); ndbassert(signal->getNoOfSections() == 0); const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr(); NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes); if(c_restart.m_ref && failed.get(refToNode(c_restart.m_ref))) { jam(); if (c_restart.m_waiting_on_self) { jam(); c_restart.m_abort = 1; } else { jam(); Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI); abort_start_me(signal, subPtr, false); } } if (ERROR_INSERTED(13032)) { Uint32 node = c_subscriber_nodes.find(0); if (node != NodeBitmask::NotFound) { ndbout_c("Inserting API_FAILREQ node: %u", node); signal->theData[0] = node; sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA); } } NdbNodeBitmask tmp; tmp.assign(c_alive_nodes); tmp.bitANDC(failed); NdbNodeBitmask takeover_nodes; if(c_nodes_in_nodegroup_mask.overlaps(failed)) { for( Uint32 i = 0; i < c_no_of_buckets; i++) { if(m_active_buckets.get(i)) continue; else if(m_switchover_buckets.get(i)) { Uint32 state= c_buckets[i].m_state; if((state & Bucket::BUCKET_HANDOVER) && failed.get(get_responsible_node(i))) { m_active_buckets.set(i); m_switchover_buckets.clear(i); ndbout_c("aborting handover"); } else if(state & Bucket::BUCKET_STARTING) { progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, "Nodefailure during SUMA takeover"); } else if (state & Bucket::BUCKET_SHUTDOWN_TO) { jam(); c_buckets[i].m_state &= ~Uint32(Bucket::BUCKET_SHUTDOWN_TO); m_switchover_buckets.clear(i); ndbrequire(get_responsible_node(i, tmp) == getOwnNodeId()); start_resend(signal, i); } } else if(get_responsible_node(i, tmp) == getOwnNodeId()) { start_resend(signal, i); } } } /* Block level cleanup */ for(unsigned i = 1; i < MAX_NDB_NODES; i++) { jam(); if(failed.get(i)) { jam(); Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback ndbassert(elementsCleaned == 0); // As Suma has no remote fragmented signals (void) elementsCleaned; // Avoid compiler error }//if }//for c_alive_nodes.assign(tmp); DBUG_VOID_RETURN; } void Suma::execINCL_NODEREQ(Signal* signal){ jamEntry(); const Uint32 senderRef = signal->theData[0]; const Uint32 nodeId = signal->theData[1]; ndbrequire(!c_alive_nodes.get(nodeId)); if (c_nodes_in_nodegroup_mask.get(nodeId)) { /** * * XXX TODO: This should be removed * But, other nodes are (incorrectly) reported as started * even if they're not "started", but only INCL_NODEREQ'ed */ c_alive_nodes.set(nodeId); /** * * Nodes in nodegroup will be "alive" when * sending SUMA_HANDOVER_REQ */ } else { jam(); c_alive_nodes.set(nodeId); } signal->theData[0] = nodeId; signal->theData[1] = reference(); sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB); } void Suma::execSIGNAL_DROPPED_REP(Signal* signal){ jamEntry(); ndbrequire(false); } /******************************************************************** * * Dump state * */ static const char* cstr(Suma::Subscription::State s) { switch(s){ case Suma::Subscription::UNDEFINED: return "undefined"; case Suma::Subscription::DEFINED: return "defined"; case Suma::Subscription::DEFINING: return "defining"; } return "<unknown>"; } static const char* cstr(Suma::Subscription::TriggerState s) { switch(s){ case Suma::Subscription::T_UNDEFINED: return "undefined"; case Suma::Subscription::T_CREATING: return "creating"; case Suma::Subscription::T_DEFINED: return "defined"; case Suma::Subscription::T_DROPPING: return "dropping"; case Suma::Subscription::T_ERROR: return "error"; } return "<uknown>"; } static const char* cstr(Suma::Subscription::Options s) { static char buf[256]; buf[0] = 0; strcat(buf, "["); if (s & Suma::Subscription::REPORT_ALL) strcat(buf, " reportall"); if (s & Suma::Subscription::REPORT_SUBSCRIBE) strcat(buf, " reportsubscribe"); if (s & Suma::Subscription::MARKED_DROPPED) strcat(buf, " dropped"); if (s & Suma::Subscription::NO_REPORT_DDL) strcat(buf, " noreportddl"); strcat(buf, " ]"); return buf; } static const char* cstr(Suma::Table::State s) { switch(s){ case Suma::Table::UNDEFINED: return "undefined"; case Suma::Table::DEFINING: return "defining"; case Suma::Table::DEFINED: return "defined"; case Suma::Table::DROPPED: return "dropped"; } return "<unknown>"; } void Suma::execDUMP_STATE_ORD(Signal* signal){ jamEntry(); Uint32 tCase = signal->theData[0]; #if 0 if(tCase >= 8000 && tCase <= 8003){ SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, g_subPtrI); Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); if(tCase == 8000){ syncPtr.p->startMeta(signal); } if(tCase == 8001){ syncPtr.p->startScan(signal); } if(tCase == 8002){ syncPtr.p->startTrigger(signal); } if(tCase == 8003){ subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan; LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList); Uint32 tab = 0; Uint32 att[] = { 0, 1, 1 }; syncPtr.p->m_tableList.append(&tab, 1); attrs.append(att, 3); } } #endif if(tCase == 8004){ infoEvent("Suma: c_subscriberPool size: %d free: %d", c_subscriberPool.getSize(), c_subscriberPool.getNoOfFree()); infoEvent("Suma: c_tablePool size: %d free: %d", c_tablePool.getSize(), c_tablePool.getNoOfFree()); infoEvent("Suma: c_subscriptionPool size: %d free: %d", c_subscriptionPool.getSize(), c_subscriptionPool.getNoOfFree()); infoEvent("Suma: c_syncPool size: %d free: %d", c_syncPool.getSize(), c_syncPool.getNoOfFree()); infoEvent("Suma: c_dataBufferPool size: %d free: %d", c_dataBufferPool.getSize(), c_dataBufferPool.getNoOfFree()); infoEvent("Suma: c_subOpPool size: %d free: %d", c_subOpPool.getSize(), c_subOpPool.getNoOfFree()); #if 0 infoEvent("Suma: c_dataSubscribers count: %d", count_subscribers(c_dataSubscribers)); infoEvent("Suma: c_prepDataSubscribers count: %d", count_subscribers(c_prepDataSubscribers)); #endif } if(tCase == 8005) { for(Uint32 i = 0; i<c_no_of_buckets; i++) { Bucket* ptr= c_buckets + i; infoEvent("Bucket %d %d%d-%x switch gci: %llu max_acked_gci: %llu max_gci: %llu tail: %d head: %d", i, m_active_buckets.get(i), m_switchover_buckets.get(i), ptr->m_state, ptr->m_switchover_gci, ptr->m_max_acked_gci, ptr->m_buffer_head.m_max_gci, ptr->m_buffer_tail, ptr->m_buffer_head.m_page_id); } } if (tCase == 8006) { SET_ERROR_INSERT_VALUE(13029); } if (tCase == 8007) { c_startup.m_restart_server_node_id = MAX_NDB_NODES + 1; SET_ERROR_INSERT_VALUE(13029); } if (tCase == 8008) { CLEAR_ERROR_INSERT_VALUE; } if (tCase == 8010) { char buf1[255], buf2[255]; c_subscriber_nodes.getText(buf1); c_connected_nodes.getText(buf2); infoEvent("c_subscriber_nodes: %s", buf1); infoEvent("c_connected_nodes: %s", buf2); } if (tCase == 8009) { if (ERROR_INSERTED(13030)) { CLEAR_ERROR_INSERT_VALUE; sendSTTORRY(signal); } else { SET_ERROR_INSERT_VALUE(13030); } return; } if (tCase == 8011) { jam(); Uint32 bucket = signal->theData[1]; KeyTable<Table>::Iterator it; if (signal->getLength() == 1) { jam(); bucket = 0; infoEvent("-- Starting dump of subscribers --"); } c_tables.next(bucket, it); const Uint32 RT_BREAK = 16; for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++) { jam(); if(it.curr.i == RNIL) { jam(); infoEvent("-- Ending dump of subscribers --"); return; } infoEvent("Table %u ver %u", it.curr.p->m_tableId, it.curr.p->m_schemaVersion); Uint32 cnt = 0; Ptr<Subscription> subPtr; LocalDLList<Subscription> subList(c_subscriptionPool, it.curr.p->m_subscriptions); for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr)) { infoEvent(" Subcription %u", subPtr.i); { Ptr<Subscriber> ptr; LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++) { jam(); cnt++; infoEvent(" Subscriber [ %x %u %u ]", ptr.p->m_senderRef, ptr.p->m_senderData, subPtr.i); } } { Ptr<SubOpRecord> ptr; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req); for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++) { jam(); infoEvent(" create [ %x %u ]", ptr.p->m_senderRef, ptr.p->m_senderData); } } { Ptr<SubOpRecord> ptr; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_start_req); for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++) { jam(); infoEvent(" start [ %x %u ]", ptr.p->m_senderRef, ptr.p->m_senderData); } } { Ptr<SubOpRecord> ptr; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req); for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++) { jam(); infoEvent(" stop [ %u %x %u ]", ptr.p->m_opType, ptr.p->m_senderRef, ptr.p->m_senderData); } } } infoEvent("Table %u #subscribers %u", it.curr.p->m_tableId, cnt); c_tables.next(it); } signal->theData[0] = tCase; signal->theData[1] = it.bucket; sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2); return; } if (tCase == 8012) { jam(); Uint32 bucket = signal->theData[1]; KeyTable<Subscription>::Iterator it; if (signal->getLength() == 1) { jam(); bucket = 0; infoEvent("-- Starting dump of subscribers --"); } c_subscriptions.next(bucket, it); const Uint32 RT_BREAK = 16; for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++) { jam(); if(it.curr.i == RNIL) { jam(); infoEvent("-- Ending dump of subscribers --"); return; } Ptr<Subscription> subPtr = it.curr; Ptr<Table> tabPtr; c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI); infoEvent("Subcription %u id: 0x%.8x key: 0x%.8x state: %s", subPtr.i, subPtr.p->m_subscriptionId, subPtr.p->m_subscriptionKey, cstr(subPtr.p->m_state)); infoEvent(" trigger state: %s options: %s", cstr(subPtr.p->m_trigger_state), cstr((Suma::Subscription::Options)subPtr.p->m_options)); infoEvent(" tablePtr: %u tableId: %u schemaVersion: 0x%.8x state: %s", tabPtr.i, subPtr.p->m_tableId, tabPtr.p->m_schemaVersion, cstr(tabPtr.p->m_state)); { Ptr<Subscriber> ptr; LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++) { jam(); infoEvent(" Subscriber [ %x %u %u ]", ptr.p->m_senderRef, ptr.p->m_senderData, subPtr.i); } } { Ptr<SubOpRecord> ptr; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req); for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++) { jam(); infoEvent(" create [ %x %u ]", ptr.p->m_senderRef, ptr.p->m_senderData); } } { Ptr<SubOpRecord> ptr; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_start_req); for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++) { jam(); infoEvent(" start [ %x %u ]", ptr.p->m_senderRef, ptr.p->m_senderData); } } { Ptr<SubOpRecord> ptr; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req); for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++) { jam(); infoEvent(" stop [ %u %x %u ]", ptr.p->m_opType, ptr.p->m_senderRef, ptr.p->m_senderData); } } c_subscriptions.next(it); } signal->theData[0] = tCase; signal->theData[1] = it.bucket; sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2); return; } if (tCase == 7019 && signal->getLength() == 2) { jam(); Uint32 nodeId = signal->theData[1]; if (nodeId < MAX_NODES) { warningEvent(" Suma 7019 %u line: %u", nodeId, c_failedApiNodesState[nodeId]); warningEvent(" c_connected_nodes.get(): %u", c_connected_nodes.get(nodeId)); warningEvent(" c_failedApiNodes.get(): %u", c_failedApiNodes.get(nodeId)); warningEvent(" c_subscriber_nodes.get(): %u", c_subscriber_nodes.get(nodeId)); warningEvent(" c_subscriber_per_node[%u]: %u", nodeId, c_subscriber_per_node[nodeId]); } else { warningEvent(" SUMP: dump-7019 to unknown node: %u", nodeId); } } } void Suma::execDBINFO_SCANREQ(Signal *signal) { DbinfoScanReq req= *(DbinfoScanReq*)signal->theData; const Ndbinfo::ScanCursor* cursor = CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req)); Ndbinfo::Ratelimit rl; jamEntry(); switch(req.tableId){ case Ndbinfo::POOLS_TABLEID: { Ndbinfo::pool_entry pools[] = { { "Subscriber", c_subscriberPool.getUsed(), c_subscriberPool.getSize(), c_subscriberPool.getEntrySize(), c_subscriberPool.getUsedHi(), { CFG_DB_SUBSCRIBERS, CFG_DB_SUBSCRIPTIONS, CFG_DB_NO_TABLES,0 }}, { "Table", c_tablePool.getUsed(), c_tablePool.getSize(), c_tablePool.getEntrySize(), c_tablePool.getUsedHi(), { CFG_DB_NO_TABLES,0,0,0 }}, { "Subscription", c_subscriptionPool.getUsed(), c_subscriptionPool.getSize(), c_subscriptionPool.getEntrySize(), c_subscriptionPool.getUsedHi(), { CFG_DB_SUBSCRIPTIONS, CFG_DB_NO_TABLES,0,0 }}, { "Sync", c_syncPool.getUsed(), c_syncPool.getSize(), c_syncPool.getEntrySize(), c_syncPool.getUsedHi(), { 0,0,0,0 }}, { "Data Buffer", c_dataBufferPool.getUsed(), c_dataBufferPool.getSize(), c_dataBufferPool.getEntrySize(), c_dataBufferPool.getUsedHi(), { CFG_DB_NO_ATTRIBUTES,0,0,0 }}, { "SubOp", c_subOpPool.getUsed(), c_subOpPool.getSize(), c_subOpPool.getEntrySize(), c_subOpPool.getUsedHi(), { CFG_DB_SUB_OPERATIONS,0,0,0 }}, { "Page Chunk", c_page_chunk_pool.getUsed(), c_page_chunk_pool.getSize(), c_page_chunk_pool.getEntrySize(), c_page_chunk_pool.getUsedHi(), { 0,0,0,0 }}, { "GCP", c_gcp_pool.getUsed(), c_gcp_pool.getSize(), c_gcp_pool.getEntrySize(), c_gcp_pool.getUsedHi(), { CFG_DB_API_HEARTBEAT_INTERVAL, CFG_DB_GCP_INTERVAL,0,0 }}, { NULL, 0,0,0,0, { 0,0,0,0 }} }; const size_t num_config_params = sizeof(pools[0].config_params) / sizeof(pools[0].config_params[0]); Uint32 pool = cursor->data[0]; BlockNumber bn = blockToMain(number()); while(pools[pool].poolname) { jam(); Ndbinfo::Row row(signal, req); row.write_uint32(getOwnNodeId()); row.write_uint32(bn); // block number row.write_uint32(instance()); // block instance row.write_string(pools[pool].poolname); row.write_uint64(pools[pool].used); row.write_uint64(pools[pool].total); row.write_uint64(pools[pool].used_hi); row.write_uint64(pools[pool].entry_size); for (size_t i = 0; i < num_config_params; i++) row.write_uint32(pools[pool].config_params[i]); ndbinfo_send_row(signal, req, row, rl); pool++; if (rl.need_break(req)) { jam(); ndbinfo_send_scan_break(signal, req, rl, pool); return; } } break; } default: break; } ndbinfo_send_scan_conf(signal, req, rl); } /************************************************************* * * Creation of subscription id's * ************************************************************/ void Suma::execCREATE_SUBID_REQ(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execCREATE_SUBID_REQ"); ndbassert(signal->getNoOfSections() == 0); CRASH_INSERTION(13001); CreateSubscriptionIdReq const * req = (CreateSubscriptionIdReq*)signal->getDataPtr(); SubscriberPtr subbPtr; if(!c_subscriberPool.seize(subbPtr)){ jam(); sendSubIdRef(signal, req->senderRef, req->senderData, 1412); DBUG_VOID_RETURN; } DBUG_PRINT("info",("c_subscriberPool size: %d free: %d", c_subscriberPool.getSize(), c_subscriberPool.getNoOfFree())); subbPtr.p->m_senderRef = req->senderRef; subbPtr.p->m_senderData = req->senderData; UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend(); utilReq->senderData = subbPtr.i; utilReq->sequenceId = SUMA_SEQUENCE; utilReq->requestType = UtilSequenceReq::NextVal; sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB); DBUG_VOID_RETURN; } void Suma::execUTIL_SEQUENCE_CONF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF"); ndbassert(signal->getNoOfSections() == 0); CRASH_INSERTION(13002); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); if(conf->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, conf, NULL); DBUG_VOID_RETURN; } Uint64 subId; memcpy(&subId,conf->sequenceValue,8); SubscriberPtr subbPtr; c_subscriberPool.getPtr(subbPtr,conf->senderData); CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf; subconf->senderRef = reference(); subconf->senderData = subbPtr.p->m_senderData; subconf->subscriptionId = (Uint32)subId; subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF); sendSignal(subbPtr.p->m_senderRef, GSN_CREATE_SUBID_CONF, signal, CreateSubscriptionIdConf::SignalLength, JBB); c_subscriberPool.release(subbPtr); DBUG_PRINT("info",("c_subscriberPool size: %d free: %d", c_subscriberPool.getSize(), c_subscriberPool.getNoOfFree())); DBUG_VOID_RETURN; } void Suma::execUTIL_SEQUENCE_REF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF"); ndbassert(signal->getNoOfSections() == 0); UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr(); Uint32 err= ref->errorCode; if(ref->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, NULL, ref); DBUG_VOID_RETURN; } Uint32 subData = ref->senderData; SubscriberPtr subbPtr; c_subscriberPool.getPtr(subbPtr,subData); if (err == UtilSequenceRef::TCError) { jam(); err = ref->TCErrorCode; } sendSubIdRef(signal, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, err); c_subscriberPool.release(subbPtr); DBUG_PRINT("info",("c_subscriberPool size: %d free: %d", c_subscriberPool.getSize(), c_subscriberPool.getNoOfFree())); DBUG_VOID_RETURN; }//execUTIL_SEQUENCE_REF() void Suma::sendSubIdRef(Signal* signal, Uint32 senderRef, Uint32 senderData, Uint32 errCode) { jam(); DBUG_ENTER("Suma::sendSubIdRef"); CreateSubscriptionIdRef * ref = (CreateSubscriptionIdRef *)signal->getDataPtrSend(); ref->senderRef = reference(); ref->senderData = senderData; ref->errorCode = errCode; sendSignal(senderRef, GSN_CREATE_SUBID_REF, signal, CreateSubscriptionIdRef::SignalLength, JBB); DBUG_VOID_RETURN; } /********************************************************** * Suma participant interface * * Creation of subscriptions */ void Suma::execSUB_CREATE_REQ(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSUB_CREATE_REQ"); ndbassert(signal->getNoOfSections() == 0); CRASH_INSERTION(13003); const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr(); const Uint32 senderRef = req.senderRef; const Uint32 senderData = req.senderData; const Uint32 subId = req.subscriptionId; const Uint32 subKey = req.subscriptionKey; const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags; const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags; const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ? Subscription::REPORT_ALL : 0; const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ? Subscription::REPORT_SUBSCRIBE : 0; const Uint32 noReportDDL = (flags & SubCreateReq::NoReportDDL) ? Subscription::NO_REPORT_DDL : 0; const Uint32 tableId = req.tableId; const Uint32 schemaTransId = req.schemaTransId; bool subDropped = req.subscriptionType & SubCreateReq::NR_Sub_Dropped; /** * This 2 options are only allowed during NR */ if (subDropped) { ndbrequire(refToNode(senderRef) == c_startup.m_restart_server_node_id); } Subscription key; key.m_subscriptionId = subId; key.m_subscriptionKey = subKey; DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u", key.m_subscriptionId, key.m_subscriptionKey)); SubscriptionPtr subPtr; bool found = c_subscriptions.find(subPtr, key); if (c_startup.m_restart_server_node_id == RNIL) { jam(); /** * We havent started syncing yet */ sendSubCreateRef(signal, senderRef, senderData, SubCreateRef::NotStarted); return; } CRASH_INSERTION2(13040, c_startup.m_restart_server_node_id != RNIL); CRASH_INSERTION(13041); bool allowDup = true; //c_startup.m_restart_server_node_id; if (found && !allowDup) { jam(); sendSubCreateRef(signal, senderRef, senderData, SubCreateRef::SubscriptionAlreadyExist); return; } if (found == false) { jam(); if(!c_subscriptions.seize(subPtr)) { jam(); sendSubCreateRef(signal, senderRef, senderData, SubCreateRef::OutOfSubscriptionRecords); return; } new (subPtr.p) Subscription(); subPtr.p->m_seq_no = c_current_seq; subPtr.p->m_subscriptionId = subId; subPtr.p->m_subscriptionKey = subKey; subPtr.p->m_subscriptionType = type; subPtr.p->m_tableId = tableId; subPtr.p->m_table_ptrI = RNIL; subPtr.p->m_state = Subscription::UNDEFINED; subPtr.p->m_trigger_state = Subscription::T_UNDEFINED; subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID; subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID; subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID; subPtr.p->m_errorCode = 0; subPtr.p->m_options = reportSubscribe | reportAll | noReportDDL; subPtr.p->m_schemaTransId = schemaTransId; } Ptr<SubOpRecord> subOpPtr; LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_create_req); if ((ERROR_INSERTED(13044) && found == false) || subOpList.seize(subOpPtr) == false) { jam(); if (found == false) { jam(); if (ERROR_INSERTED(13044)) { CLEAR_ERROR_INSERT_VALUE; } c_subscriptionPool.release(subPtr); // not yet in hash } sendSubCreateRef(signal, senderRef, senderData, SubCreateRef::OutOfTableRecords); return; } subOpPtr.p->m_senderRef = senderRef; subOpPtr.p->m_senderData = senderData; if (subDropped) { jam(); subPtr.p->m_options |= Subscription::MARKED_DROPPED; } TablePtr tabPtr; if (found) { jam(); c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI); } else if (c_tables.find(tabPtr, tableId)) { jam(); } else { jam(); if (ERROR_INSERTED(13045) || c_tablePool.seize(tabPtr) == false) { jam(); if (ERROR_INSERTED(13045)) { CLEAR_ERROR_INSERT_VALUE; } subOpList.release(subOpPtr); c_subscriptionPool.release(subPtr); // not yet in hash sendSubCreateRef(signal, senderRef, senderData, SubCreateRef::OutOfTableRecords); return; } new (tabPtr.p) Table; tabPtr.p->m_tableId= tableId; tabPtr.p->m_ptrI= tabPtr.i; tabPtr.p->m_error = 0; tabPtr.p->m_schemaVersion = RNIL; tabPtr.p->m_state = Table::UNDEFINED; tabPtr.p->m_schemaTransId = schemaTransId; c_tables.add(tabPtr); } if (found == false) { jam(); c_subscriptions.add(subPtr); LocalDLList<Subscription> list(c_subscriptionPool, tabPtr.p->m_subscriptions); list.add(subPtr); subPtr.p->m_table_ptrI = tabPtr.i; } switch(tabPtr.p->m_state){ case Table::DEFINED:{ jam(); // Send conf subOpList.release(subOpPtr); subPtr.p->m_state = Subscription::DEFINED; SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(senderRef, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB); return; } case Table::UNDEFINED:{ jam(); tabPtr.p->m_state = Table::DEFINING; subPtr.p->m_state = Subscription::DEFINING; if (ERROR_INSERTED(13031)) { jam(); CLEAR_ERROR_INSERT_VALUE; GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend(); ref->tableId = tableId; ref->senderData = tabPtr.i; ref->errorCode = GetTabInfoRef::TableNotDefined; sendSignal(reference(), GSN_GET_TABINFOREF, signal, GetTabInfoRef::SignalLength, JBB); return; } GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = tabPtr.i; req->requestType = GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf; req->tableId = tableId; req->schemaTransId = schemaTransId; sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, GetTabInfoReq::SignalLength, JBB); return; } case Table::DEFINING: { jam(); /** * just wait for completion */ subPtr.p->m_state = Subscription::DEFINING; return; } case Table::DROPPED: { subOpList.release(subOpPtr); { LocalDLList<Subscription> list(c_subscriptionPool, tabPtr.p->m_subscriptions); list.remove(subPtr); } c_subscriptions.release(subPtr); sendSubCreateRef(signal, senderRef, senderData, SubCreateRef::TableDropped); return; } } ndbrequire(false); } void Suma::sendSubCreateRef(Signal* signal, Uint32 retRef, Uint32 data, Uint32 errCode) { jam(); SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend(); ref->errorCode = errCode; ref->senderData = data; sendSignal(retRef, GSN_SUB_CREATE_REF, signal, SubCreateRef::SignalLength, JBB); return; } /********************************************************** * * Setting upp trigger for subscription * */ void Suma::execSUB_SYNC_REQ(Signal* signal) { jamEntry(); CRASH_INSERTION(13004); SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr(); SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = req->subscriptionId; key.m_subscriptionKey = req->subscriptionKey; SectionHandle handle(this, signal); if(!c_subscriptions.find(subPtr, key)) { jam(); releaseSections(handle); sendSubSyncRef(signal, 1407); return; } Ptr<SyncRecord> syncPtr; LocalDLList<SyncRecord> list(c_syncPool, subPtr.p->m_syncRecords); if(!list.seize(syncPtr)) { jam(); releaseSections(handle); sendSubSyncRef(signal, 1416); return; } new (syncPtr.p) Ptr<SyncRecord>; syncPtr.p->m_senderRef = req->senderRef; syncPtr.p->m_senderData = req->senderData; syncPtr.p->m_subscriptionPtrI = subPtr.i; syncPtr.p->ptrI = syncPtr.i; syncPtr.p->m_error = 0; syncPtr.p->m_requestInfo = req->requestInfo; syncPtr.p->m_frag_cnt = req->fragCount; syncPtr.p->m_frag_id = req->fragId; syncPtr.p->m_tableId = subPtr.p->m_tableId; { jam(); if(handle.m_cnt > 0) { SegmentedSectionPtr ptr; handle.getSection(ptr, SubSyncReq::ATTRIBUTE_LIST); LocalDataBuffer<15> attrBuf(c_dataBufferPool, syncPtr.p->m_attributeList); append(attrBuf, ptr, getSectionSegmentPool()); } if (req->requestInfo & SubSyncReq::RangeScan) { jam(); ndbrequire(handle.m_cnt > 1) SegmentedSectionPtr ptr; handle.getSection(ptr, SubSyncReq::TUX_BOUND_INFO); LocalDataBuffer<15> boundBuf(c_dataBufferPool, syncPtr.p->m_boundInfo); append(boundBuf, ptr, getSectionSegmentPool()); } releaseSections(handle); } /** * We need to gather fragment info */ { jam(); DihScanTabReq* req = (DihScanTabReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = syncPtr.i; req->tableId = subPtr.p->m_tableId; req->schemaTransId = subPtr.p->m_schemaTransId; sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal, DihScanTabReq::SignalLength, JBB); } } void Suma::sendSubSyncRef(Signal* signal, Uint32 errCode){ jam(); SubSyncRef * ref= (SubSyncRef *)signal->getDataPtrSend(); ref->errorCode = errCode; sendSignal(signal->getSendersBlockRef(), GSN_SUB_SYNC_REF, signal, SubSyncRef::SignalLength, JBB); return; } void Suma::execDIH_SCAN_TAB_REF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execDI_FCOUNTREF"); DihScanTabRef * ref = (DihScanTabRef*)signal->getDataPtr(); switch ((DihScanTabRef::ErrorCode) ref->error) { case DihScanTabRef::ErroneousTableState: jam(); if (ref->tableStatus == Dbdih::TabRecord::TS_CREATING) { const Uint32 tableId = ref->tableId; const Uint32 synPtrI = ref->senderData; const Uint32 schemaTransId = ref->schemaTransId; DihScanTabReq * req = (DihScanTabReq*)signal->getDataPtrSend(); req->senderData = synPtrI; req->senderRef = reference(); req->tableId = tableId; req->schemaTransId = schemaTransId; sendSignalWithDelay(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal, DihScanTabReq::SignalLength, DihScanTabReq::RetryInterval); DBUG_VOID_RETURN; } ndbrequire(false); default: ndbrequire(false); } DBUG_VOID_RETURN; } void Suma::execDIH_SCAN_TAB_CONF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execDI_FCOUNTCONF"); ndbassert(signal->getNoOfSections() == 0); DihScanTabConf * conf = (DihScanTabConf*)signal->getDataPtr(); const Uint32 tableId = conf->tableId; const Uint32 fragCount = conf->fragmentCount; const Uint32 scanCookie = conf->scanCookie; Ptr<SyncRecord> ptr; c_syncPool.getPtr(ptr, conf->senderData); LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments); ndbrequire(fragBuf.getSize() == 0); ndbassert(fragCount >= ptr.p->m_frag_cnt); if (ptr.p->m_frag_cnt == 0) { jam(); ptr.p->m_frag_cnt = fragCount; } ptr.p->m_scan_cookie = scanCookie; DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = ptr.i; req->tableId = tableId; req->fragId = 0; req->scanCookie = scanCookie; sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, DihScanGetNodesReq::SignalLength, JBB); DBUG_VOID_RETURN; } void Suma::execDIH_SCAN_GET_NODES_CONF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execDIGETPRIMCONF"); ndbassert(signal->getNoOfSections() == 0); DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtr(); const Uint32 nodeCount = conf->count; const Uint32 tableId = conf->tableId; const Uint32 fragNo = conf->fragId; ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS); Ptr<SyncRecord> ptr; c_syncPool.getPtr(ptr, conf->senderData); { LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments); /** * Add primary node for fragment to list */ FragmentDescriptor fd; fd.m_fragDesc.m_nodeId = conf->nodes[0]; fd.m_fragDesc.m_fragmentNo = fragNo; fd.m_fragDesc.m_lqhInstanceKey = conf->instanceKey; if (ptr.p->m_frag_id == ZNIL) { signal->theData[2] = fd.m_dummy; fragBuf.append(&signal->theData[2], 1); } else if (ptr.p->m_frag_id == fragNo) { /* * Given fragment must have a replica on this node. */ const Uint32 ownNodeId = getOwnNodeId(); Uint32 i = 0; for (i = 0; i < nodeCount; i++) if (conf->nodes[i] == ownNodeId) break; if (i == nodeCount) { sendSubSyncRef(signal, 1428); return; } fd.m_fragDesc.m_nodeId = ownNodeId; signal->theData[2] = fd.m_dummy; fragBuf.append(&signal->theData[2], 1); } } const Uint32 nextFrag = fragNo + 1; if(nextFrag == ptr.p->m_frag_cnt) { jam(); ptr.p->startScan(signal); return; } DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = ptr.i; req->tableId = tableId; req->fragId = nextFrag; req->scanCookie = ptr.p->m_scan_cookie; sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal, DihScanGetNodesReq::SignalLength, JBB); DBUG_VOID_RETURN; } /********************************************************** * Dict interface */ /************************************************************************* * * */ void Suma::execGET_TABINFOREF(Signal* signal){ jamEntry(); GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr(); Uint32 tableId = ref->tableId; Uint32 senderData = ref->senderData; Uint32 schemaTransId = ref->schemaTransId; GetTabInfoRef::ErrorCode errorCode = (GetTabInfoRef::ErrorCode) ref->errorCode; int do_resend_request = 0; TablePtr tabPtr; c_tablePool.getPtr(tabPtr, senderData); switch (errorCode) { case GetTabInfoRef::TableNotDefined: // wrong state break; case GetTabInfoRef::InvalidTableId: // no such table break; case GetTabInfoRef::Busy: do_resend_request = 1; break; case GetTabInfoRef::NoFetchByName: jam(); case GetTabInfoRef::TableNameTooLong: jam(); ndbrequire(false); } if (tabPtr.p->m_state == Table::DROPPED) { jam(); do_resend_request = 0; } if (do_resend_request) { GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = senderData; req->requestType = GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf; req->tableId = tableId; req->schemaTransId = schemaTransId; sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal, 30, GetTabInfoReq::SignalLength); return; } get_tabinfo_ref_release(signal, tabPtr); } void Suma::get_tabinfo_ref_release(Signal* signal, Ptr<Table> tabPtr) { LocalDLList<Subscription> subList(c_subscriptionPool, tabPtr.p->m_subscriptions); Ptr<Subscription> subPtr; bool empty = subList.isEmpty(); for(subList.first(subPtr); !subPtr.isNull();) { jam(); Ptr<SubOpRecord> ptr; ndbassert(subPtr.p->m_start_req.isEmpty()); ndbassert(subPtr.p->m_stop_req.isEmpty()); LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req); for (list.first(ptr); !ptr.isNull(); ) { jam(); sendSubCreateRef(signal, ptr.p->m_senderRef, ptr.p->m_senderData, SubCreateRef::TableDropped); Ptr<SubOpRecord> tmp0 = ptr; list.next(ptr); list.release(tmp0); } Ptr<Subscription> tmp1 = subPtr; subList.next(subPtr); c_subscriptions.remove(tmp1); subList.release(tmp1); } c_tables.release(tabPtr); ndbassert(!empty); } void Suma::execGET_TABINFO_CONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13006); if(!assembleFragments(signal)){ return; } SectionHandle handle(this, signal); GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr(); TablePtr tabPtr; c_tablePool.getPtr(tabPtr, conf->senderData); SegmentedSectionPtr ptr; handle.getSection(ptr, GetTabInfoConf::DICT_TAB_INFO); ndbrequire(tabPtr.p->parseTable(ptr, *this)); releaseSections(handle); if (tabPtr.p->m_state == Table::DROPPED) { jam(); get_tabinfo_ref_release(signal, tabPtr); return; } tabPtr.p->m_state = Table::DEFINED; LocalDLList<Subscription> subList(c_subscriptionPool, tabPtr.p->m_subscriptions); Ptr<Subscription> subPtr; bool empty = subList.isEmpty(); for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr)) { jam(); subPtr.p->m_state = Subscription::DEFINED; Ptr<SubOpRecord> ptr; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req); for (list.first(ptr); !ptr.isNull();) { jam(); SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = ptr.p->m_senderData; sendSignal(ptr.p->m_senderRef, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB); Ptr<SubOpRecord> tmp = ptr; list.next(ptr); list.release(tmp); } } ndbassert(!empty); } bool Suma::Table::parseTable(SegmentedSectionPtr ptr, Suma &suma) { DBUG_ENTER("Suma::Table::parseTable"); SimplePropertiesSectionReader it(ptr, suma.getSectionSegmentPool()); SimpleProperties::UnpackStatus s; DictTabInfo::Table tableDesc; tableDesc.init(); s = SimpleProperties::unpack(it, &tableDesc, DictTabInfo::TableMapping, DictTabInfo::TableMappingSize, true, true); jamBlock(&suma); suma.suma_ndbrequire(s == SimpleProperties::Break); /** * Initialize table object */ m_noOfAttributes = tableDesc.NoOfAttributes; m_schemaVersion = tableDesc.TableVersion; DBUG_RETURN(true); } /********************************************************** * * Scan interface * */ void Suma::SyncRecord::startScan(Signal* signal) { jam(); DBUG_ENTER("Suma::SyncRecord::startScan"); /** * Get fraginfo */ m_currentFragment = 0; nextScan(signal); DBUG_VOID_RETURN; } bool Suma::SyncRecord::getNextFragment(TablePtr * tab, FragmentDescriptor * fd) { jam(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); DataBuffer<15>::DataBufferIterator fragIt; TablePtr tabPtr; suma.c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments); fragBuf.position(fragIt, m_currentFragment); for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++) { FragmentDescriptor tmp; tmp.m_dummy = * fragIt.data; if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){ * fd = tmp; * tab = tabPtr; return true; } } m_currentFragment = 0; return false; } void Suma::SyncRecord::nextScan(Signal* signal) { jam(); DBUG_ENTER("Suma::SyncRecord::nextScan"); TablePtr tabPtr; FragmentDescriptor fd; SubscriptionPtr subPtr; if(!getNextFragment(&tabPtr, &fd)){ jam(); completeScan(signal); DBUG_VOID_RETURN; } suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); DataBuffer<15>::Head head = m_attributeList; LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head); Uint32 instanceKey = fd.m_fragDesc.m_lqhInstanceKey; BlockReference lqhRef = numberToRef(DBLQH, instanceKey, suma.getOwnNodeId()); ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend(); const Uint32 parallelism = 16; //const Uint32 attrLen = 5 + attrBuf.getSize(); req->senderData = ptrI; req->resultRef = suma.reference(); req->tableId = tabPtr.p->m_tableId; req->requestInfo = 0; req->savePointId = 0; ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); if (m_requestInfo & SubSyncReq::NoDisk) { ScanFragReq::setNoDiskFlag(req->requestInfo, 1); } if (m_requestInfo & SubSyncReq::LM_Exclusive) { ScanFragReq::setLockMode(req->requestInfo, 1); ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 1); } if (m_requestInfo & SubSyncReq::Reorg) { ScanFragReq::setReorgFlag(req->requestInfo, ScanFragReq::REORG_MOVED); } if (m_requestInfo & SubSyncReq::TupOrder) { ScanFragReq::setTupScanFlag(req->requestInfo, 1); } if (m_requestInfo & SubSyncReq::LM_CommittedRead) { ScanFragReq::setReadCommittedFlag(req->requestInfo, 1); } if (m_requestInfo & SubSyncReq::RangeScan) { ScanFragReq::setRangeScanFlag(req->requestInfo, 1); } if (m_requestInfo & SubSyncReq::StatScan) { ScanFragReq::setStatScanFlag(req->requestInfo, 1); } req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; req->schemaVersion = tabPtr.p->m_schemaVersion; req->transId1 = 0; req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8); req->clientOpPtr = (ptrI << 16); req->batch_size_rows= parallelism; req->batch_size_bytes= 0; Uint32 * attrInfo = signal->theData + 25; attrInfo[0] = attrBuf.getSize(); attrInfo[1] = 0; attrInfo[2] = 0; attrInfo[3] = 0; attrInfo[4] = 0; Uint32 pos = 5; DataBuffer<15>::DataBufferIterator it; for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)) { AttributeHeader::init(&attrInfo[pos++], * it.data, 0); } LinearSectionPtr ptr[3]; Uint32 noOfSections; ptr[0].p = attrInfo; ptr[0].sz = pos; noOfSections = 1; if (m_requestInfo & SubSyncReq::RangeScan) { jam(); Uint32 oldpos = pos; // after attrInfo LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo); for (boundBuf.first(it); !it.curr.isNull(); boundBuf.next(it)) { attrInfo[pos++] = *it.data; } ptr[1].p = &attrInfo[oldpos]; ptr[1].sz = pos - oldpos; noOfSections = 2; } suma.sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB, ptr, noOfSections); m_currentNoOfAttributes = attrBuf.getSize(); DBUG_VOID_RETURN; } void Suma::execSCAN_FRAGREF(Signal* signal){ jamEntry(); // ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr(); ndbrequire(false); } void Suma::execSCAN_FRAGCONF(Signal* signal){ jamEntry(); DBUG_ENTER("Suma::execSCAN_FRAGCONF"); ndbassert(signal->getNoOfSections() == 0); CRASH_INSERTION(13011); ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr(); const Uint32 completed = conf->fragmentCompleted; const Uint32 senderData = conf->senderData; const Uint32 completedOps = conf->completedOps; Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, senderData); if(completed != 2){ // 2==ZSCAN_FRAG_CLOSED jam(); #if PRINT_ONLY SubSyncContinueConf * const conf = (SubSyncContinueConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; execSUB_SYNC_CONTINUE_CONF(signal); #else SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend(); req->subscriberData = syncPtr.p->m_senderData; req->noOfRowsSent = completedOps; req->senderData = senderData; sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal, SubSyncContinueReq::SignalLength, JBB); #endif DBUG_VOID_RETURN; } ndbrequire(completedOps == 0); syncPtr.p->m_currentFragment++; syncPtr.p->nextScan(signal); DBUG_VOID_RETURN; } void Suma::execSUB_SYNC_CONTINUE_CONF(Signal* signal){ jamEntry(); ndbassert(signal->getNoOfSections() == 0); CRASH_INSERTION(13012); SubSyncContinueConf * const conf = (SubSyncContinueConf*)signal->getDataPtr(); SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = conf->subscriptionId; key.m_subscriptionKey = conf->subscriptionKey; Uint32 syncPtrI = conf->senderData; ndbrequire(c_subscriptions.find(subPtr, key)); Uint32 instanceKey; { Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, syncPtrI); LocalDataBuffer<15> fragBuf(c_dataBufferPool, syncPtr.p->m_fragments); DataBuffer<15>::DataBufferIterator fragIt; bool ok = fragBuf.position(fragIt, syncPtr.p->m_currentFragment); ndbrequire(ok); FragmentDescriptor tmp; tmp.m_dummy = * fragIt.data; instanceKey = tmp.m_fragDesc.m_lqhInstanceKey; } BlockReference lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId()); ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend(); req->senderData = syncPtrI; req->requestInfo = 0; req->transId1 = 0; req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8); req->batch_size_rows = 16; req->batch_size_bytes = 0; sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB); } void Suma::SyncRecord::completeScan(Signal* signal, int error) { jam(); DBUG_ENTER("Suma::SyncRecord::completeScan"); SubscriptionPtr subPtr; suma.c_subscriptionPool.getPtr(subPtr, m_subscriptionPtrI); DihScanTabCompleteRep* rep = (DihScanTabCompleteRep*)signal->getDataPtr(); rep->tableId = subPtr.p->m_tableId; rep->scanCookie = m_scan_cookie; suma.sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP, signal, DihScanTabCompleteRep::SignalLength, JBB); #if PRINT_ONLY ndbout_c("GSN_SUB_SYNC_CONF (data)"); #else if (error == 0) { SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend(); conf->senderRef = suma.reference(); conf->senderData = m_senderData; suma.sendSignal(m_senderRef, GSN_SUB_SYNC_CONF, signal, SubSyncConf::SignalLength, JBB); } else { SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend(); ref->senderRef = suma.reference(); ref->senderData = m_senderData; suma.sendSignal(m_senderRef, GSN_SUB_SYNC_REF, signal, SubSyncRef::SignalLength, JBB); } #endif release(); LocalDLList<SyncRecord> list(suma.c_syncPool, subPtr.p->m_syncRecords); Ptr<SyncRecord> tmp; tmp.i = ptrI; tmp.p = this; list.release(tmp); DBUG_VOID_RETURN; } void Suma::execSCAN_HBREP(Signal* signal){ jamEntry(); #if 0 ndbout << "execSCAN_HBREP" << endl << hex; for(int i = 0; i<signal->length(); i++){ ndbout << signal->theData[i] << " "; if(((i + 1) % 8) == 0) ndbout << endl << hex; } ndbout << endl; #endif } /********************************************************** * * Suma participant interface * * Creation of subscriber * */ void Suma::execSUB_START_REQ(Signal* signal){ jamEntry(); ndbassert(signal->getNoOfSections() == 0); DBUG_ENTER("Suma::execSUB_START_REQ"); SubStartReq * const req = (SubStartReq*)signal->getDataPtr(); CRASH_INSERTION(13013); Uint32 senderRef = req->senderRef; Uint32 senderData = req->senderData; Uint32 subscriberData = req->subscriberData; Uint32 subscriberRef = req->subscriberRef; SubscriptionData::Part part = (SubscriptionData::Part)req->part; (void)part; // TODO validate part Subscription key; key.m_subscriptionId = req->subscriptionId; key.m_subscriptionKey = req->subscriptionKey; SubscriptionPtr subPtr; CRASH_INSERTION2(13042, getNodeState().startLevel == NodeState::SL_STARTING); if (c_startup.m_restart_server_node_id == RNIL) { jam(); /** * We havent started syncing yet */ sendSubStartRef(signal, senderRef, senderData, SubStartRef::NotStarted); return; } bool found = c_subscriptions.find(subPtr, key); if (!found) { jam(); sendSubStartRef(signal, senderRef, senderData, SubStartRef::NoSuchSubscription); return; } if (ERROR_INSERTED(13046)) { jam(); CLEAR_ERROR_INSERT_VALUE; sendSubStartRef(signal, senderRef, senderData, SubStartRef::NoSuchSubscription); return; } switch(subPtr.p->m_state){ case Subscription::UNDEFINED: jam(); ndbrequire(false); case Subscription::DEFINING: jam(); sendSubStartRef(signal, senderRef, senderData, SubStartRef::Defining); return; case Subscription::DEFINED: break; } if (subPtr.p->m_options & Subscription::MARKED_DROPPED) { jam(); if (c_startup.m_restart_server_node_id == 0) { sendSubStartRef(signal, senderRef, senderData, SubStartRef::Dropped); return; } else { /** * Allow SUB_START_REQ from peer node */ } } if (subPtr.p->m_trigger_state == Subscription::T_ERROR) { jam(); sendSubStartRef(signal, senderRef, senderData, subPtr.p->m_errorCode); return; } SubscriberPtr subbPtr; if(!c_subscriberPool.seize(subbPtr)) { jam(); sendSubStartRef(signal, senderRef, senderData, SubStartRef::OutOfSubscriberRecords); return; } Ptr<SubOpRecord> subOpPtr; if (!c_subOpPool.seize(subOpPtr)) { jam(); c_subscriberPool.release(subbPtr); sendSubStartRef(signal, senderRef, senderData, SubStartRef::OutOfSubOpRecords); return; } if (! check_sub_start(subscriberRef)) { jam(); c_subscriberPool.release(subbPtr); c_subOpPool.release(subOpPtr); sendSubStartRef(signal, senderRef, senderData, SubStartRef::NodeDied); return; } // setup subscriber record subbPtr.p->m_senderRef = subscriberRef; subbPtr.p->m_senderData = subscriberData; subOpPtr.p->m_opType = SubOpRecord::R_SUB_START_REQ; subOpPtr.p->m_subPtrI = subPtr.i; subOpPtr.p->m_senderRef = senderRef; subOpPtr.p->m_senderData = senderData; subOpPtr.p->m_subscriberRef = subbPtr.i; { LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req); subOpList.add(subOpPtr); } /** * Check triggers */ switch(subPtr.p->m_trigger_state){ case Subscription::T_UNDEFINED: jam(); /** * create triggers */ create_triggers(signal, subPtr); break; case Subscription::T_CREATING: jam(); /** * Triggers are already being created...wait for completion */ return; case Subscription::T_DROPPING: jam(); /** * Trigger(s) are being dropped...wait for completion * (and recreate them when done) */ break; case Subscription::T_DEFINED:{ jam(); report_sub_start_conf(signal, subPtr); return; } case Subscription::T_ERROR: jam(); ndbrequire(false); // Checked above break; } } void Suma::sendSubStartRef(Signal* signal, Uint32 dstref, Uint32 data, Uint32 err) { jam(); SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend(); ref->senderRef = reference(); ref->senderData = data; ref->errorCode = err; sendSignal(dstref, GSN_SUB_START_REF, signal, SubStartRef::SignalLength, JBB); } void Suma::create_triggers(Signal* signal, SubscriptionPtr subPtr) { jam(); ndbrequire(subPtr.p->m_trigger_state == Subscription::T_UNDEFINED); subPtr.p->m_trigger_state = Subscription::T_CREATING; TablePtr tabPtr; c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI); AttributeMask attrMask; tabPtr.p->createAttributeMask(attrMask, *this); subPtr.p->m_outstanding_trigger = 3; for(Uint32 j = 0; j<3; j++) { Uint32 triggerId = (tabPtr.p->m_schemaVersion << 18) | (j << 16) | subPtr.i; ndbrequire(subPtr.p->m_triggers[j] == ILLEGAL_TRIGGER_ID); CreateTrigImplReq * const req = (CreateTrigImplReq*)signal->getDataPtrSend(); req->senderRef = SUMA_REF; req->senderData = subPtr.i; req->requestType = 0; Uint32 ti = 0; TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE); TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED); TriggerInfo::setTriggerEvent(ti, (TriggerEvent::Value)j); TriggerInfo::setMonitorReplicas(ti, true); //TriggerInfo::setMonitorAllAttributes(ti, j == TriggerEvent::TE_DELETE); TriggerInfo::setMonitorAllAttributes(ti, true); TriggerInfo::setReportAllMonitoredAttributes(ti, subPtr.p->m_options & Subscription::REPORT_ALL); req->triggerInfo = ti; req->receiverRef = SUMA_REF; req->triggerId = triggerId; req->tableId = subPtr.p->m_tableId; req->tableVersion = 0; // not used req->indexId = ~(Uint32)0; req->indexVersion = 0; LinearSectionPtr ptr[3]; ptr[0].p = attrMask.rep.data; ptr[0].sz = attrMask.getSizeInWords(); sendSignal(DBTUP_REF, GSN_CREATE_TRIG_IMPL_REQ, signal, CreateTrigImplReq::SignalLength, JBB, ptr, 1); } } void Suma::execCREATE_TRIG_IMPL_CONF(Signal* signal) { jamEntry(); CreateTrigImplConf * conf = (CreateTrigImplConf*)signal->getDataPtr(); const Uint32 triggerId = conf->triggerId; Uint32 type = (triggerId >> 16) & 0x3; Uint32 tableId = conf->tableId; TablePtr tabPtr; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, conf->senderData); c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI); ndbrequire(tabPtr.p->m_tableId == tableId); ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING); ndbrequire(type < 3); ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID); subPtr.p->m_triggers[type] = triggerId; ndbrequire(subPtr.p->m_outstanding_trigger); subPtr.p->m_outstanding_trigger--; if (subPtr.p->m_outstanding_trigger) { jam(); /** * Wait for more */ return; } if (subPtr.p->m_errorCode == 0) { jam(); subPtr.p->m_trigger_state = Subscription::T_DEFINED; report_sub_start_conf(signal, subPtr); } else { jam(); subPtr.p->m_trigger_state = Subscription::T_ERROR; drop_triggers(signal, subPtr); } } void Suma::execCREATE_TRIG_IMPL_REF(Signal* signal) { jamEntry(); CreateTrigImplRef * const ref = (CreateTrigImplRef*)signal->getDataPtr(); const Uint32 triggerId = ref->triggerId; Uint32 type = (triggerId >> 16) & 0x3; Uint32 tableId = ref->tableId; TablePtr tabPtr; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, ref->senderData); c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI); ndbrequire(tabPtr.p->m_tableId == tableId); ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING); ndbrequire(type < 3); ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID); subPtr.p->m_errorCode = ref->errorCode; ndbrequire(subPtr.p->m_outstanding_trigger); subPtr.p->m_outstanding_trigger--; if (subPtr.p->m_outstanding_trigger) { jam(); /** * Wait for more */ return; } subPtr.p->m_trigger_state = Subscription::T_ERROR; drop_triggers(signal, subPtr); } bool Suma::check_sub_start(Uint32 subscriberRef) { Uint32 nodeId = refToNode(subscriberRef); bool startme = c_startup.m_restart_server_node_id; bool handover = c_startup.m_wait_handover; bool connected = c_failedApiNodes.get(nodeId) == false && c_connected_nodes.get(nodeId); return (startme || handover || connected); } void Suma::report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr) { const Uint64 gci = get_current_gci(signal); { LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req); Ptr<Subscriber> ptr; Ptr<SubOpRecord> subOpPtr; for (subOpList.first(subOpPtr); !subOpPtr.isNull(); ) { jam(); Uint32 senderRef = subOpPtr.p->m_senderRef; Uint32 senderData = subOpPtr.p->m_senderData; c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef); if (check_sub_start(ptr.p->m_senderRef)) { SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; conf->firstGCI = Uint32(gci >> 32); conf->part = SubscriptionData::TableData; conf->bucketCount = c_no_of_buckets; conf->nodegroup = c_nodeGroup; sendSignal(senderRef, GSN_SUB_START_CONF, signal, SubStartConf::SignalLength, JBB); /** * Call before adding to list... * cause method will (maybe) iterate thought list */ bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE; send_sub_start_stop_event(signal, ptr,NdbDictionary::Event::_TE_ACTIVE, report, list); list.add(ptr); c_subscriber_nodes.set(refToNode(ptr.p->m_senderRef)); c_subscriber_per_node[refToNode(ptr.p->m_senderRef)]++; } else { jam(); sendSubStartRef(signal, senderRef, senderData, SubStartRef::NodeDied); c_subscriberPool.release(ptr); } Ptr<SubOpRecord> tmp = subOpPtr; subOpList.next(subOpPtr); subOpList.release(tmp); } } check_release_subscription(signal, subPtr); } void Suma::report_sub_start_ref(Signal* signal, Ptr<Subscription> subPtr, Uint32 errCode) { LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req); Ptr<Subscriber> ptr; Ptr<SubOpRecord> subOpPtr; for (subOpList.first(subOpPtr); !subOpPtr.isNull(); ) { jam(); Uint32 senderRef = subOpPtr.p->m_senderRef; Uint32 senderData = subOpPtr.p->m_senderData; c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef); SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend(); ref->senderRef = reference(); ref->senderData = senderData; ref->errorCode = errCode; sendSignal(senderRef, GSN_SUB_START_REF, signal, SubStartConf::SignalLength, JBB); Ptr<SubOpRecord> tmp = subOpPtr; subOpList.next(subOpPtr); subOpList.release(tmp); c_subscriberPool.release(ptr); } } void Suma::drop_triggers(Signal* signal, SubscriptionPtr subPtr) { jam(); subPtr.p->m_outstanding_trigger = 0; Ptr<Table> tabPtr; c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI); if (tabPtr.p->m_state == Table::DROPPED) { jam(); subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID; subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID; subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID; } else { for(Uint32 j = 0; j<3; j++) { jam(); Uint32 triggerId = subPtr.p->m_triggers[j]; if (triggerId != ILLEGAL_TRIGGER_ID) { subPtr.p->m_outstanding_trigger++; DropTrigImplReq * const req = (DropTrigImplReq*)signal->getDataPtrSend(); req->senderRef = SUMA_REF; // Sending to myself req->senderData = subPtr.i; req->requestType = 0; // TUP needs some triggerInfo to find right list Uint32 ti = 0; TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE); TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED); TriggerInfo::setTriggerEvent(ti, (TriggerEvent::Value)j); TriggerInfo::setMonitorReplicas(ti, true); //TriggerInfo::setMonitorAllAttributes(ti, j ==TriggerEvent::TE_DELETE); TriggerInfo::setMonitorAllAttributes(ti, true); TriggerInfo::setReportAllMonitoredAttributes(ti, subPtr.p->m_options & Subscription::REPORT_ALL); req->triggerInfo = ti; req->tableId = subPtr.p->m_tableId; req->tableVersion = 0; // not used req->indexId = RNIL; req->indexVersion = 0; req->triggerId = triggerId; req->receiverRef = SUMA_REF; c_outstanding_drop_trig_req++; sendSignal(DBTUP_REF, GSN_DROP_TRIG_IMPL_REQ, signal, DropTrigImplReq::SignalLength, JBB); } } } if (subPtr.p->m_outstanding_trigger == 0) { jam(); drop_triggers_complete(signal, subPtr); } } void Suma::execDROP_TRIG_IMPL_REF(Signal* signal) { jamEntry(); DropTrigImplRef * const ref = (DropTrigImplRef*)signal->getDataPtr(); Ptr<Table> tabPtr; Ptr<Subscription> subPtr; const Uint32 triggerId = ref->triggerId; const Uint32 type = (triggerId >> 16) & 0x3; c_subscriptionPool.getPtr(subPtr, ref->senderData); c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI); ndbrequire(tabPtr.p->m_tableId == ref->tableId); ndbrequire(type < 3); ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID); subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID; ndbrequire(subPtr.p->m_outstanding_trigger); subPtr.p->m_outstanding_trigger--; ndbrequire(c_outstanding_drop_trig_req); c_outstanding_drop_trig_req--; if (subPtr.p->m_outstanding_trigger) { jam(); /** * Wait for more */ return; } drop_triggers_complete(signal, subPtr); } void Suma::execDROP_TRIG_IMPL_CONF(Signal* signal) { jamEntry(); DropTrigImplConf * const conf = (DropTrigImplConf*)signal->getDataPtr(); Ptr<Table> tabPtr; Ptr<Subscription> subPtr; const Uint32 triggerId = conf->triggerId; const Uint32 type = (triggerId >> 16) & 0x3; c_subscriptionPool.getPtr(subPtr, conf->senderData); c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI); ndbrequire(tabPtr.p->m_tableId == conf->tableId); ndbrequire(type < 3); ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID); subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID; ndbrequire(subPtr.p->m_outstanding_trigger); subPtr.p->m_outstanding_trigger--; ndbrequire(c_outstanding_drop_trig_req); c_outstanding_drop_trig_req--; if (subPtr.p->m_outstanding_trigger) { jam(); /** * Wait for more */ return; } drop_triggers_complete(signal, subPtr); } void Suma::drop_triggers_complete(Signal* signal, Ptr<Subscription> subPtr) { switch(subPtr.p->m_trigger_state){ case Subscription::T_UNDEFINED: case Subscription::T_CREATING: case Subscription::T_DEFINED: jam(); ndbrequire(false); break; case Subscription::T_DROPPING: jam(); /** */ subPtr.p->m_trigger_state = Subscription::T_UNDEFINED; if (!subPtr.p->m_start_req.isEmpty()) { jam(); create_triggers(signal, subPtr); return; } break; case Subscription::T_ERROR: jam(); Uint32 err = subPtr.p->m_errorCode; subPtr.p->m_trigger_state = Subscription::T_UNDEFINED; subPtr.p->m_errorCode = 0; report_sub_start_ref(signal, subPtr, err); break; } check_release_subscription(signal, subPtr); } /********************************************************** * Suma participant interface * * Stopping and removing of subscriber * */ void Suma::execSUB_STOP_REQ(Signal* signal){ jamEntry(); ndbassert(signal->getNoOfSections() == 0); DBUG_ENTER("Suma::execSUB_STOP_REQ"); CRASH_INSERTION(13019); SubStopReq * const req = (SubStopReq*)signal->getDataPtr(); Uint32 senderRef = req->senderRef; Uint32 senderData = req->senderData; Uint32 subscriberRef = req->subscriberRef; Uint32 subscriberData = req->subscriberData; SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = req->subscriptionId; key.m_subscriptionKey = req->subscriptionKey; bool abortStart = (req->requestInfo & SubStopReq::RI_ABORT_START); if (c_startup.m_restart_server_node_id == RNIL) { jam(); /** * We havent started syncing yet */ sendSubStopRef(signal, senderRef, senderData, SubStopRef::NotStarted); return; } bool found = c_subscriptions.find(subPtr, key); if (!found) { jam(); sendSubStopRef(signal, senderRef, senderData, SubStopRef::NoSuchSubscription); return; } switch(subPtr.p->m_state){ case Subscription::UNDEFINED: jam(); ndbrequire(false); case Subscription::DEFINING: jam(); sendSubStopRef(signal, senderRef, senderData, SubStopRef::Defining); return; case Subscription::DEFINED: jam(); break; } Ptr<SubOpRecord> subOpPtr; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req); bool empty = list.isEmpty(); if (list.seize(subOpPtr) == false) { jam(); sendSubStopRef(signal, senderRef, senderData, SubStopRef::OutOfSubOpRecords); return; } if (abortStart) { jam(); subOpPtr.p->m_opType = SubOpRecord::R_SUB_ABORT_START_REQ; } else { jam(); subOpPtr.p->m_opType = SubOpRecord::R_SUB_STOP_REQ; } subOpPtr.p->m_subPtrI = subPtr.i; subOpPtr.p->m_senderRef = senderRef; subOpPtr.p->m_senderData = senderData; subOpPtr.p->m_subscriberRef = subscriberRef; subOpPtr.p->m_subscriberData = subscriberData; if (empty) { jam(); signal->theData[0] = SumaContinueB::SUB_STOP_REQ; signal->theData[1] = subOpPtr.i; signal->theData[2] = RNIL; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); } } void Suma::sub_stop_req(Signal* signal) { jam(); Ptr<SubOpRecord> subOpPtr; c_subOpPool.getPtr(subOpPtr, signal->theData[1]); Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI); Ptr<Subscriber> ptr; { LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); if (signal->theData[2] == RNIL) { jam(); list.first(ptr); } else { jam(); list.getPtr(ptr, signal->theData[2]); } for (Uint32 i = 0; i<32 && !ptr.isNull(); i++, list.next(ptr)) { if (ptr.p->m_senderRef == subOpPtr.p->m_subscriberRef && ptr.p->m_senderData == subOpPtr.p->m_subscriberData) { jam(); goto found; } } } if (ptr.isNull()) { jam(); sendSubStopRef(signal, subOpPtr.p->m_senderRef, subOpPtr.p->m_senderData, SubStopRef::NoSuchSubscriber); check_remove_queue(signal, subPtr, subOpPtr, true, true); return; } signal->theData[0] = SumaContinueB::SUB_STOP_REQ; signal->theData[1] = subOpPtr.i; signal->theData[2] = ptr.i; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); return; found: { LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); list.remove(ptr); /** * NOTE: remove before...so we done send UNSUBSCRIBE to self (yuck) */ bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE; report_sub_stop_conf(signal, subOpPtr, ptr, report, list); c_subscriberPool.release(ptr); } check_remove_queue(signal, subPtr, subOpPtr, true, true); check_release_subscription(signal, subPtr); } void Suma::check_remove_queue(Signal* signal, Ptr<Subscription> subPtr, Ptr<SubOpRecord> subOpPtr, bool ishead, bool dorelease) { LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req); { Ptr<SubOpRecord> tmp; list.first(tmp); if (ishead) { jam(); ndbrequire(tmp.i == subOpPtr.i); } else { jam(); ishead = (tmp.i == subOpPtr.i); } } if (dorelease) { jam(); list.release(subOpPtr); } else { jam(); list.remove(subOpPtr); } if (ishead) { jam(); if (list.first(subOpPtr) == false) { jam(); c_restart.m_waiting_on_self = 1; return; } // Fall through } else { jam(); return; } switch(subOpPtr.p->m_opType){ case SubOpRecord::R_SUB_ABORT_START_REQ: case SubOpRecord::R_SUB_STOP_REQ: jam(); signal->theData[0] = SumaContinueB::SUB_STOP_REQ; signal->theData[1] = subOpPtr.i; signal->theData[2] = RNIL; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); return; case SubOpRecord::R_API_FAIL_REQ: jam(); signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION; signal->theData[1] = subOpPtr.i; signal->theData[2] = RNIL; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); return; case SubOpRecord::R_START_ME_REQ: jam(); sendSubCreateReq(signal, subPtr); return; } } void Suma::report_sub_stop_conf(Signal* signal, Ptr<SubOpRecord> subOpPtr, Ptr<Subscriber> ptr, bool report, LocalDLList<Subscriber>& list) { jam(); CRASH_INSERTION(13020); Uint32 senderRef = subOpPtr.p->m_senderRef; Uint32 senderData = subOpPtr.p->m_senderData; bool abortStart = subOpPtr.p->m_opType == SubOpRecord::R_SUB_ABORT_START_REQ; // let subscriber know that subscrber is stopped if (!abortStart) { jam(); send_sub_start_stop_event(signal, ptr, NdbDictionary::Event::_TE_STOP, report, list); } SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend(); const Uint64 gci = m_max_seen_gci; conf->senderRef= reference(); conf->senderData= senderData; conf->gci_hi= Uint32(gci>>32); conf->gci_lo= Uint32(gci); sendSignal(senderRef, GSN_SUB_STOP_CONF, signal, SubStopConf::SignalLength, JBB); Uint32 nodeId = refToNode(ptr.p->m_senderRef); if (c_subscriber_per_node[nodeId]) { c_subscriber_per_node[nodeId]--; if (c_subscriber_per_node[nodeId] == 0) { jam(); c_subscriber_nodes.clear(nodeId); } } } void Suma::sendSubStopRef(Signal* signal, Uint32 retref, Uint32 data, Uint32 errCode) { jam(); SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend(); ref->senderRef = reference(); ref->errorCode = errCode; ref->senderData = data; sendSignal(retref, GSN_SUB_STOP_REF, signal, SubStopRef::SignalLength, JBB); } // report new started subscriber to all other subscribers void Suma::send_sub_start_stop_event(Signal *signal, Ptr<Subscriber> ptr, NdbDictionary::Event::_TableEvent event, bool report, LocalDLList<Subscriber>& list) { const Uint64 gci = get_current_gci(signal); SubTableData * data = (SubTableData*)signal->getDataPtrSend(); Uint32 nodeId = refToNode(ptr.p->m_senderRef); NdbDictionary::Event::_TableEvent other; if (event == NdbDictionary::Event::_TE_STOP) { other = NdbDictionary::Event::_TE_UNSUBSCRIBE; } else if (event == NdbDictionary::Event::_TE_ACTIVE) { other = NdbDictionary::Event::_TE_SUBSCRIBE; } else { jamLine(event); ndbrequire(false); } data->gci_hi = Uint32(gci >> 32); data->gci_lo = Uint32(gci); data->tableId = 0; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, event); SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId()); SubTableData::setReqNodeId(data->requestInfo, nodeId); data->changeMask = 0; data->totalLen = 0; data->senderData = ptr.p->m_senderData; sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB); if (report == false) { return; } data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, other); SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId()); Ptr<Subscriber> tmp; for(list.first(tmp); !tmp.isNull(); list.next(tmp)) { jam(); SubTableData::setReqNodeId(data->requestInfo, nodeId); data->senderData = tmp.p->m_senderData; sendSignal(tmp.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB); ndbassert(tmp.i != ptr.i); // ptr should *NOT* be in list now if (other != NdbDictionary::Event::_TE_UNSUBSCRIBE) { jam(); SubTableData::setReqNodeId(data->requestInfo, refToNode(tmp.p->m_senderRef)); data->senderData = ptr.p->m_senderData; sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB); } } } void Suma::Table::createAttributeMask(AttributeMask& mask, Suma &suma) { mask.clear(); for(Uint32 i = 0; i<m_noOfAttributes; i++) mask.set(i); } void Suma::suma_ndbrequire(bool v) { ndbrequire(v); } /********************************************************** * Scan data interface * * Assumption: one execTRANSID_AI contains all attr info * */ #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1 static Uint32 f_bufferLock = 0; static Uint32 f_buffer[SUMA_BUF_SZ]; static Uint32 f_trigBufferSize = 0; static Uint32 b_bufferLock = 0; static Uint32 b_buffer[SUMA_BUF_SZ]; static Uint32 b_trigBufferSize = 0; void Suma::execTRANSID_AI(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execTRANSID_AI"); CRASH_INSERTION(13015); TransIdAI * const data = (TransIdAI*)signal->getDataPtr(); const Uint32 opPtrI = data->connectPtr; Uint32 length = signal->length() - 3; if(f_bufferLock == 0){ f_bufferLock = opPtrI; } else { ndbrequire(f_bufferLock == opPtrI); } if (signal->getNoOfSections()) { SectionHandle handle(this, signal); SegmentedSectionPtr dataPtr; handle.getSection(dataPtr, 0); length = dataPtr.sz; copy(data->attrData, dataPtr); releaseSections(handle); } Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, (opPtrI >> 16)); Uint32 sum = 0; Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE; Uint32 * headers = f_buffer; const Uint32 * src = &data->attrData[0]; const Uint32 * const end = &src[length]; const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes; for(Uint32 i = 0; i<attribs; i++){ Uint32 tmp = * src++; * headers++ = tmp; Uint32 len = AttributeHeader::getDataSize(tmp); memcpy(dst, src, 4 * len); dst += len; src += len; sum += len; } f_trigBufferSize = sum; ndbrequire(src == end); if ((syncPtr.p->m_requestInfo & SubSyncReq::LM_Exclusive) == 0) { sendScanSubTableData(signal, syncPtr, 0); } DBUG_VOID_RETURN; } void Suma::execKEYINFO20(Signal* signal) { jamEntry(); KeyInfo20* data = (KeyInfo20*)signal->getDataPtr(); const Uint32 opPtrI = data->clientOpPtr; const Uint32 takeOver = data->scanInfo_Node; ndbrequire(f_bufferLock == opPtrI); Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, (opPtrI >> 16)); sendScanSubTableData(signal, syncPtr, takeOver); } void Suma::sendScanSubTableData(Signal* signal, Ptr<SyncRecord> syncPtr, Uint32 takeOver) { const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes; const Uint32 sum = f_trigBufferSize; /** * Send data to subscriber */ LinearSectionPtr ptr[3]; ptr[0].p = f_buffer; ptr[0].sz = attribs; ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE; ptr[1].sz = sum; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI); /** * Initialize signal */ SubTableData * sdata = (SubTableData*)signal->getDataPtrSend(); Uint32 ref = syncPtr.p->m_senderRef; sdata->tableId = syncPtr.p->m_tableId; sdata->senderData = syncPtr.p->m_senderData; sdata->requestInfo = 0; SubTableData::setOperation(sdata->requestInfo, NdbDictionary::Event::_TE_SCAN); // Scan sdata->gci_hi = 0; // Undefined sdata->gci_lo = 0; sdata->takeOver = takeOver; #if PRINT_ONLY ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum); #else sendSignal(ref, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB, ptr, 2); #endif /** * Reset f_bufferLock */ f_bufferLock = 0; } /********************************************************** * * Trigger data interface * */ void Suma::execTRIG_ATTRINFO(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execTRIG_ATTRINFO"); CRASH_INSERTION(13016); TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr(); const Uint32 trigId = trg->getTriggerId(); const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength; if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){ jam(); ndbrequire(b_bufferLock == trigId); memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen); b_trigBufferSize += dataLen; // printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize); } else { jam(); if(f_bufferLock == 0){ f_bufferLock = trigId; f_trigBufferSize = 0; b_bufferLock = trigId; b_trigBufferSize = 0; } else { ndbrequire(f_bufferLock == trigId); } memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen); f_trigBufferSize += dataLen; } DBUG_VOID_RETURN; } #ifdef NODEFAIL_DEBUG2 static int theCounts[64] = {0}; #endif Uint32 Suma::get_responsible_node(Uint32 bucket) const { // id will contain id to responsible suma or // RNIL if we don't have nodegroup info yet jam(); Uint32 node; const Bucket* ptr= c_buckets + bucket; for(Uint32 i = 0; i<MAX_REPLICAS; i++) { node= ptr->m_nodes[i]; if(c_alive_nodes.get(node)) { #ifdef NODEFAIL_DEBUG2 theCounts[node]++; ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u", n,D, id, theCounts[node]); #endif return node; } } return 0; } Uint32 Suma::get_responsible_node(Uint32 bucket, const NdbNodeBitmask& mask) const { jam(); Uint32 node; const Bucket* ptr= c_buckets + bucket; for(Uint32 i = 0; i<MAX_REPLICAS; i++) { node= ptr->m_nodes[i]; if(mask.get(node)) { return node; } } return 0; } bool Suma::check_switchover(Uint32 bucket, Uint64 gci) { const Uint32 send_mask = Bucket::BUCKET_STARTING | Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_SHUTDOWN_TO; bool send = c_buckets[bucket].m_state & send_mask; ndbassert(m_switchover_buckets.get(bucket)); if(unlikely(gci > c_buckets[bucket].m_switchover_gci)) { return send; } return !send; } static Uint32 reformat(Signal* signal, LinearSectionPtr ptr[3], Uint32 * src_1, Uint32 sz_1, Uint32 * src_2, Uint32 sz_2) { Uint32 noOfAttrs = 0, dataLen = 0; Uint32 * headers = signal->theData + 25; Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE; ptr[0].p = headers; ptr[1].p = dst; while(sz_1 > 0){ Uint32 tmp = * src_1 ++; * headers ++ = tmp; Uint32 len = AttributeHeader::getDataSize(tmp); memcpy(dst, src_1, 4 * len); dst += len; src_1 += len; noOfAttrs++; dataLen += len; sz_1 -= (1 + len); } assert(sz_1 == 0); ptr[0].sz = noOfAttrs; ptr[1].sz = dataLen; ptr[2].p = src_2; ptr[2].sz = sz_2; return sz_2 > 0 ? 3 : 2; } /** * Pass entire pages with SUMA-trigger-data from * TUP to SUMA to avoid extensive LongSignalMessage buffer contention */ void Suma::execFIRE_TRIG_ORD_L(Signal* signal) { jamEntry(); ndbassert(signal->getNoOfSections() == 0); Uint32 pageId = signal->theData[0]; Uint32 len = signal->theData[1]; if (pageId == RNIL && len == 0) { jam(); /** * Out of memory */ out_of_buffer(signal); return; } Uint32 * ptr = reinterpret_cast<Uint32*>(c_page_pool.getPtr(pageId)); while (len) { Uint32 * save = ptr; Uint32 msglen = * ptr++; Uint32 siglen = * ptr++; Uint32 sec0len = * ptr++; Uint32 sec1len = * ptr++; Uint32 sec2len = * ptr++; /** * Copy value directly into local buffers */ Uint32 trigId = ((FireTrigOrd*)ptr)->getTriggerId(); memcpy(signal->theData, ptr, 4 * siglen); // signal ptr += siglen; memcpy(f_buffer, ptr, 4*sec0len); ptr += sec0len; memcpy(b_buffer, ptr, 4*sec1len); ptr += sec1len; memcpy(f_buffer + sec0len, ptr, 4*sec2len); ptr += sec2len; f_trigBufferSize = sec0len + sec2len; b_trigBufferSize = sec1len; f_bufferLock = trigId; b_bufferLock = trigId; execFIRE_TRIG_ORD(signal); ndbrequire(ptr == save + msglen); ndbrequire(len >= msglen); len -= msglen; } m_ctx.m_mm.release_page(RT_DBTUP_PAGE, pageId); } void Suma::execFIRE_TRIG_ORD(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execFIRE_TRIG_ORD"); CRASH_INSERTION(13016); FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr(); const Uint32 trigId = trg->getTriggerId(); const Uint32 hashValue = trg->getHashValue(); const Uint32 gci_hi = trg->getGCI(); const Uint32 gci_lo = trg->m_gci_lo; const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32); const Uint32 event = trg->getTriggerEvent(); const Uint32 any_value = trg->getAnyValue(); const Uint32 transId1 = trg->m_transId1; const Uint32 transId2 = trg->m_transId2; Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, trigId & 0xFFFF); ndbassert(gci > m_last_complete_gci); if (signal->getNoOfSections()) { jam(); ndbassert(isNdbMtLqh()); SectionHandle handle(this, signal); ndbrequire(b_bufferLock == 0); ndbrequire(f_bufferLock == 0); f_bufferLock = trigId; b_bufferLock = trigId; SegmentedSectionPtr ptr; handle.getSection(ptr, 0); // Keys Uint32 sz = ptr.sz; copy(f_buffer, ptr); handle.getSection(ptr, 2); // After values copy(f_buffer + sz, ptr); f_trigBufferSize = sz + ptr.sz; handle.getSection(ptr, 1); // Before values copy(b_buffer, ptr); b_trigBufferSize = ptr.sz; releaseSections(handle); } jam(); ndbrequire(f_bufferLock == trigId); /** * Reset f_bufferLock */ f_bufferLock = 0; b_bufferLock = 0; Uint32 tableId = subPtr.p->m_tableId; Uint32 schemaVersion = c_tablePool.getPtr(subPtr.p->m_table_ptrI)->m_schemaVersion; Uint32 bucket= hashValue % c_no_of_buckets; m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci); if(m_active_buckets.get(bucket) || (m_switchover_buckets.get(bucket) && (check_switchover(bucket, gci)))) { m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci); Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords(); ndbrequire(sz == f_trigBufferSize); LinearSectionPtr ptr[3]; const Uint32 nptr= reformat(signal, ptr, f_buffer, f_trigBufferSize, b_buffer, b_trigBufferSize); Uint32 ptrLen= 0; for(Uint32 i =0; i < nptr; i++) ptrLen+= ptr[i].sz; /** * Signal to subscriber(s) */ SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; data->gci_hi = gci_hi; data->gci_lo = gci_lo; data->tableId = tableId; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, event); data->flags = 0; data->anyValue = any_value; data->totalLen = ptrLen; data->transId1 = transId1; data->transId2 = transId2; { LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); SubscriberPtr subbPtr; for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr)) { data->senderData = subbPtr.p->m_senderData; sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLengthWithTransId, JBB, ptr, nptr); } } } else { const uint buffer_header_sz = 6; Uint32* dst; Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz; if((dst = get_buffer_ptr(signal, bucket, gci, sz))) { * dst++ = subPtr.i; * dst++ = schemaVersion; * dst++ = (event << 16) | f_trigBufferSize; * dst++ = any_value; * dst++ = transId1; * dst++ = transId2; memcpy(dst, f_buffer, f_trigBufferSize << 2); dst += f_trigBufferSize; memcpy(dst, b_buffer, b_trigBufferSize << 2); } } DBUG_VOID_RETURN; } void Suma::checkMaxBufferedEpochs(Signal *signal) { /* * Check if any subscribers are exceeding the MaxBufferedEpochs */ Ptr<Gcp_record> gcp; jamEntry(); if (c_gcp_list.isEmpty()) { jam(); return; } c_gcp_list.first(gcp); if (ERROR_INSERTED(13037)) { jam(); CLEAR_ERROR_INSERT_VALUE; ndbout_c("Simulating exceeding the MaxBufferedEpochs %u(%llu,%llu,%llu)", c_maxBufferedEpochs, m_max_seen_gci, m_last_complete_gci, gcp.p->m_gci); } else if (c_gcp_list.count() < c_maxBufferedEpochs) { return; } NodeBitmask subs = gcp.p->m_subscribers; jam(); // Disconnect lagging subscribers waiting for oldest epoch ndbout_c("Found lagging epoch %llu", gcp.p->m_gci); for(Uint32 nodeId = 0; nodeId < MAX_NODES; nodeId++) { if (subs.get(nodeId)) { jam(); subs.clear(nodeId); // Disconnecting node signal->theData[0] = NDB_LE_SubscriptionStatus; signal->theData[1] = 1; // DISCONNECTED; signal->theData[2] = nodeId; signal->theData[3] = (Uint32) gcp.p->m_gci; signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32); signal->theData[5] = (Uint32) c_gcp_list.count(); signal->theData[6] = c_maxBufferedEpochs; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 8, JBB); /** * Force API_FAILREQ */ signal->theData[0] = nodeId; sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA); } } } void Suma::execSUB_GCP_COMPLETE_REP(Signal* signal) { jamEntry(); ndbassert(signal->getNoOfSections() == 0); SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend(); Uint32 gci_hi = rep->gci_hi; Uint32 gci_lo = rep->gci_lo; Uint64 gci = gci_lo | (Uint64(gci_hi) << 32); if (isNdbMtLqh() && m_gcp_rep_cnt > 1) { #define SSPP 0 if (SSPP) printf("execSUB_GCP_COMPLETE_REP(%u/%u)", gci_hi, gci_lo); jam(); Uint32 min = m_min_gcp_rep_counter_index; Uint32 sz = NDB_ARRAY_SIZE(m_gcp_rep_counter); for (Uint32 i = min; i != m_max_gcp_rep_counter_index; i = (i + 1) % sz) { jam(); if (m_gcp_rep_counter[i].m_gci == gci) { jam(); m_gcp_rep_counter[i].m_cnt ++; if (m_gcp_rep_counter[i].m_cnt == m_gcp_rep_cnt) { jam(); /** * Release this entry... */ if (i != min) { jam(); m_gcp_rep_counter[i] = m_gcp_rep_counter[min]; } m_min_gcp_rep_counter_index = (min + 1) % sz; if (SSPP) ndbout_c(" found - complete after: (min: %u max: %u)", m_min_gcp_rep_counter_index, m_max_gcp_rep_counter_index); goto found; } else { jam(); if (SSPP) ndbout_c(" found - wait unchanged: (min: %u max: %u)", m_min_gcp_rep_counter_index, m_max_gcp_rep_counter_index); return; // Wait for more... } } } /** * Not found... */ Uint32 next = (m_max_gcp_rep_counter_index + 1) % sz; ndbrequire(next != min); // ring buffer full m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_gci = gci; m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_cnt = 1; m_max_gcp_rep_counter_index = next; if (SSPP) ndbout_c(" new - after: (min: %u max: %u)", m_min_gcp_rep_counter_index, m_max_gcp_rep_counter_index); return; } found: bool drop = false; Uint32 flags = (m_missing_data) ? rep->flags | SubGcpCompleteRep::MISSING_DATA : rep->flags; if (ERROR_INSERTED(13036)) { jam(); CLEAR_ERROR_INSERT_VALUE; ndbout_c("Simulating out of event buffer at node failure"); flags |= SubGcpCompleteRep::MISSING_DATA; } #ifdef VM_TRACE if (m_gcp_monitor == 0) { } else if (gci_hi == Uint32(m_gcp_monitor >> 32)) { ndbrequire(gci_lo == Uint32(m_gcp_monitor) + 1); } else { ndbrequire(gci_hi == Uint32(m_gcp_monitor >> 32) + 1); ndbrequire(gci_lo == 0); } m_gcp_monitor = gci; #endif m_last_complete_gci = gci; checkMaxBufferedEpochs(signal); m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci); /** * */ if(!m_switchover_buckets.isclear()) { bool unlock = false; Uint32 i = m_switchover_buckets.find(0); for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.find(i + 1)) { if(gci > c_buckets[i].m_switchover_gci) { Uint32 state = c_buckets[i].m_state; m_switchover_buckets.clear(i); printf("%u/%u (%u/%u) switchover complete bucket %d state: %x", Uint32(gci >> 32), Uint32(gci), Uint32(c_buckets[i].m_switchover_gci >> 32), Uint32(c_buckets[i].m_switchover_gci), i, state); if(state & Bucket::BUCKET_STARTING) { /** * NR case */ jam(); m_active_buckets.set(i); c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING; ndbout_c("starting"); m_gcp_complete_rep_count++; unlock = true; } else if(state & Bucket::BUCKET_TAKEOVER) { /** * NF case */ jam(); Bucket* bucket= c_buckets + i; Page_pos pos= bucket->m_buffer_head; ndbrequire(pos.m_max_gci < gci); Buffer_page* page= c_page_pool.getPtr(pos.m_page_id); ndbout_c("takeover %d", pos.m_page_id); page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32); page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF); ndbassert(pos.m_max_gci != 0); page->m_words_used = pos.m_page_pos; page->m_next_page = RNIL; memset(&bucket->m_buffer_head, 0, sizeof(bucket->m_buffer_head)); bucket->m_buffer_head.m_page_id = RNIL; bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1; m_active_buckets.set(i); m_gcp_complete_rep_count++; c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER; } else if (state & Bucket::BUCKET_HANDOVER) { /** * NR, living node */ jam(); c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER; m_gcp_complete_rep_count--; ndbout_c("handover"); } else if (state & Bucket::BUCKET_CREATED_MASK) { jam(); Uint32 cnt = state >> 8; Uint32 mask = Uint32(Bucket::BUCKET_CREATED_MASK) | (cnt << 8); c_buckets[i].m_state &= ~mask; flags |= SubGcpCompleteRep::ADD_CNT; flags |= (cnt << 16); ndbout_c("add %u %s", cnt, state & Bucket::BUCKET_CREATED_SELF ? "self" : "other"); if (state & Bucket::BUCKET_CREATED_SELF && get_responsible_node(i) == getOwnNodeId()) { jam(); m_active_buckets.set(i); m_gcp_complete_rep_count++; } } else if (state & Bucket::BUCKET_DROPPED_MASK) { jam(); Uint32 cnt = state >> 8; Uint32 mask = Uint32(Bucket::BUCKET_DROPPED_MASK) | (cnt << 8); c_buckets[i].m_state &= ~mask; flags |= SubGcpCompleteRep::SUB_CNT; flags |= (cnt << 16); ndbout_c("sub %u %s", cnt, state & Bucket::BUCKET_DROPPED_SELF ? "self" : "other"); if (state & Bucket::BUCKET_DROPPED_SELF) { m_active_buckets.clear(i); drop = true; } } else if (state & Bucket::BUCKET_SHUTDOWN) { jam(); Uint32 nodeId = c_buckets[i].m_switchover_node; ndbrequire(nodeId == getOwnNodeId()); m_active_buckets.clear(i); m_gcp_complete_rep_count--; ndbout_c("shutdown handover"); c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN; } else if (state & Bucket::BUCKET_SHUTDOWN_TO) { jam(); Uint32 nodeId = c_buckets[i].m_switchover_node; NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask; nodegroup.clear(nodeId); ndbrequire(get_responsible_node(i) == nodeId && get_responsible_node(i, nodegroup) == getOwnNodeId()); m_active_buckets.set(i); m_gcp_complete_rep_count++; ndbout_c("shutdown takover"); c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN_TO; } } } if (m_switchover_buckets.isclear()) { jam(); if(getNodeState().startLevel == NodeState::SL_STARTING && c_startup.m_handover_nodes.isclear()) { jam(); sendSTTORRY(signal); } else if (getNodeState().startLevel >= NodeState::SL_STOPPING_1) { jam(); ndbrequire(c_shutdown.m_wait_handover); StopMeConf * conf = CAST_PTR(StopMeConf, signal->getDataPtrSend()); conf->senderData = c_shutdown.m_senderData; conf->senderRef = reference(); sendSignal(c_shutdown.m_senderRef, GSN_STOP_ME_CONF, signal, StopMeConf::SignalLength, JBB); c_shutdown.m_wait_handover = false; infoEvent("Suma: handover complete"); } } if (unlock) { jam(); send_dict_unlock_ord(signal, DictLockReq::SumaHandOver); } } if(ERROR_INSERTED(13010)) { CLEAR_ERROR_INSERT_VALUE; ndbout_c("Don't send GCP_COMPLETE_REP(%llu)", gci); return; } /** * Signal to subscribers */ rep->gci_hi = gci_hi; rep->gci_lo = gci_lo; rep->flags = flags; rep->senderRef = reference(); rep->gcp_complete_rep_count = m_gcp_complete_rep_count; if(m_gcp_complete_rep_count && !c_subscriber_nodes.isclear()) { CRASH_INSERTION(13033); NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes); sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal, SubGcpCompleteRep::SignalLength, JBB); Ptr<Gcp_record> gcp; if(c_gcp_list.seize(gcp)) { gcp.p->m_gci = gci; gcp.p->m_subscribers = c_subscriber_nodes; } else { char buf[100]; c_subscriber_nodes.getText(buf); g_eventLogger->error("c_gcp_list.seize() failed: gci: %llu nodes: %s", gci, buf); } } /** * Add GCP COMPLETE REP to buffer */ bool subscribers = !c_subscriber_nodes.isclear(); for(Uint32 i = 0; i<c_no_of_buckets; i++) { if(m_active_buckets.get(i)) continue; if (subscribers || (c_buckets[i].m_state & Bucket::BUCKET_RESEND)) { //Uint32* dst; get_buffer_ptr(signal, i, gci, 0); } } if(m_out_of_buffer_gci && gci > m_out_of_buffer_gci) { jam(); infoEvent("Reenable event buffer"); m_out_of_buffer_gci = 0; m_missing_data = false; } if (unlikely(drop)) { jam(); m_gcp_complete_rep_count = 0; c_nodeGroup = RNIL; c_nodes_in_nodegroup_mask.clear(); fix_nodegroup(); } } void Suma::execCREATE_TAB_CONF(Signal *signal) { jamEntry(); DBUG_ENTER("Suma::execCREATE_TAB_CONF"); DBUG_VOID_RETURN; } void Suma::execDROP_TAB_CONF(Signal *signal) { jamEntry(); ndbassert(signal->getNoOfSections() == 0); DropTabConf * const conf = (DropTabConf*)signal->getDataPtr(); Uint32 senderRef= conf->senderRef; Uint32 tableId= conf->tableId; TablePtr tabPtr; if (!c_tables.find(tabPtr, tableId)) { jam(); return; } DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i)); const Table::State old_state = tabPtr.p->m_state; tabPtr.p->m_state = Table::DROPPED; c_tables.remove(tabPtr); if (senderRef != 0) { jam(); // dict coordinator sends info to API const Uint64 gci = get_current_gci(signal); SubTableData * data = (SubTableData*)signal->getDataPtrSend(); data->gci_hi = Uint32(gci >> 32); data->gci_lo = Uint32(gci); data->tableId = tableId; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, NdbDictionary::Event::_TE_DROP); SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef)); Ptr<Subscription> subPtr; LocalDLList<Subscription> subList(c_subscriptionPool, tabPtr.p->m_subscriptions); for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr)) { jam(); if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) { jam(); continue; //continue in for-loop if the table is not part of //the subscription. Otherwise, send data to subscriber. } if (subPtr.p->m_options & Subscription::NO_REPORT_DDL) { jam(); continue; } Ptr<Subscriber> ptr; LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); for(list.first(ptr); !ptr.isNull(); list.next(ptr)) { jam(); data->senderData= ptr.p->m_senderData; sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB); } } } if (old_state == Table::DEFINING) { jam(); return; } if (tabPtr.p->m_subscriptions.isEmpty()) { jam(); tabPtr.p->release(* this); c_tablePool.release(tabPtr); return; } else { /** * check_release_subscription create a subList... * weirdness below is to make sure that it's not created twice */ Ptr<Subscription> subPtr; { LocalDLList<Subscription> subList(c_subscriptionPool, tabPtr.p->m_subscriptions); subList.first(subPtr); } while (!subPtr.isNull()) { Ptr<Subscription> tmp = subPtr; { LocalDLList<Subscription> subList(c_subscriptionPool, tabPtr.p->m_subscriptions); subList.next(subPtr); } check_release_subscription(signal, tmp); } } } /** * This receives DICT_TAB_INFO in long signal section 1, and releases the data * after use. */ void Suma::execALTER_TAB_REQ(Signal *signal) { jamEntry(); AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr(); Uint32 senderRef= req->senderRef; Uint32 tableId= req->tableId; Uint32 changeMask= req->changeMask; TablePtr tabPtr; // Copy DICT_TAB_INFO to local linear buffer SectionHandle handle(this, signal); SegmentedSectionPtr tabInfoPtr; handle.getSection(tabInfoPtr, 0); if (!c_tables.find(tabPtr, tableId)) { jam(); releaseSections(handle); return; } if (senderRef == 0) { jam(); releaseSections(handle); return; } // dict coordinator sends info to API #ifndef DBUG_OFF ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz); SimplePropertiesSectionReader reader(handle.m_ptr[0], getSectionSegmentPool()); reader.printAll(ndbout); #endif copy(b_dti_buf, tabInfoPtr); releaseSections(handle); LinearSectionPtr lptr[3]; lptr[0].p = b_dti_buf; lptr[0].sz = tabInfoPtr.sz; const Uint64 gci = get_current_gci(signal); SubTableData * data = (SubTableData*)signal->getDataPtrSend(); data->gci_hi = Uint32(gci >> 32); data->gci_lo = Uint32(gci); data->tableId = tableId; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, NdbDictionary::Event::_TE_ALTER); SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef)); data->flags = 0; data->changeMask = changeMask; data->totalLen = tabInfoPtr.sz; Ptr<Subscription> subPtr; LocalDLList<Subscription> subList(c_subscriptionPool, tabPtr.p->m_subscriptions); for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr)) { if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) { jam(); continue; //continue in for-loop if the table is not part of //the subscription. Otherwise, send data to subscriber. } if (subPtr.p->m_options & Subscription::NO_REPORT_DDL) { jam(); continue; } Ptr<Subscriber> ptr; LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); for(list.first(ptr); !ptr.isNull(); list.next(ptr)) { jam(); data->senderData= ptr.p->m_senderData; Callback c = { 0, 0 }; sendFragmentedSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB, lptr, 1, c); } } } void Suma::execSUB_GCP_COMPLETE_ACK(Signal* signal) { jamEntry(); ndbassert(signal->getNoOfSections() == 0); SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr(); Uint32 gci_hi = ack->rep.gci_hi; Uint32 gci_lo = ack->rep.gci_lo; Uint32 senderRef = ack->rep.senderRef; if (unlikely(signal->getLength() < SubGcpCompleteAck::SignalLength)) { jam(); ndbassert(!ndb_check_micro_gcp(getNodeInfo(refToNode(senderRef)).m_version)); gci_lo = 0; } Uint64 gci = gci_lo | (Uint64(gci_hi) << 32); m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci); if (ERROR_INSERTED(13037)) { jam(); ndbout_c("Simulating exceeding the MaxBufferedEpochs, ignoring ack"); return; } if (refToBlock(senderRef) == SUMA) { jam(); // Ack from other SUMA Uint32 nodeId= refToNode(senderRef); for(Uint32 i = 0; i<c_no_of_buckets; i++) { if(m_active_buckets.get(i) || (m_switchover_buckets.get(i) && (check_switchover(i, gci))) || (!m_switchover_buckets.get(i) && get_responsible_node(i) == nodeId)) { release_gci(signal, i, gci); } } return; } // Ack from User and not an ack from other SUMA, redistribute in nodegroup Uint32 nodeId = refToNode(senderRef); if (ERROR_INSERTED(13023)) { ndbout_c("Throwing SUB_GCP_COMPLETE_ACK gci: %u/%u from %u", Uint32(gci>>32), Uint32(gci), nodeId); return; } jam(); Ptr<Gcp_record> gcp; for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp)) { if(gcp.p->m_gci == gci) { gcp.p->m_subscribers.clear(nodeId); gcp.p->m_subscribers.bitAND(c_subscriber_nodes); if(!gcp.p->m_subscribers.isclear()) { jam(); return; } break; } } if(gcp.isNull()) { g_eventLogger->warning("ACK wo/ gcp record (gci: %u/%u) ref: %.8x from: %.8x", Uint32(gci >> 32), Uint32(gci), senderRef, signal->getSendersBlockRef()); } else { c_gcp_list.release(gcp); } CRASH_INSERTION(13011); if(ERROR_INSERTED(13012)) { CLEAR_ERROR_INSERT_VALUE; ndbout_c("Don't redistribute SUB_GCP_COMPLETE_ACK"); return; } ack->rep.senderRef = reference(); NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask); sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal, SubGcpCompleteAck::SignalLength, JBB); } /************************************************************** * * Removing subscription * */ void Suma::execSUB_REMOVE_REQ(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSUB_REMOVE_REQ"); CRASH_INSERTION(13021); const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr(); SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = req.subscriptionId; key.m_subscriptionKey = req.subscriptionKey; if (c_startup.m_restart_server_node_id == RNIL) { jam(); /** * We havent started syncing yet */ sendSubRemoveRef(signal, req, SubRemoveRef::NotStarted); return; } bool found = c_subscriptions.find(subPtr, key); if(!found) { jam(); sendSubRemoveRef(signal, req, SubRemoveRef::NoSuchSubscription); return; } switch(subPtr.p->m_state){ case Subscription::UNDEFINED: jam(); ndbrequire(false); case Subscription::DEFINING: jam(); sendSubRemoveRef(signal, req, SubRemoveRef::Defining); return; case Subscription::DEFINED: if (subPtr.p->m_options & Subscription::MARKED_DROPPED) { /** * already dropped */ jam(); sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped); return; } break; } subPtr.p->m_options |= Subscription::MARKED_DROPPED; check_release_subscription(signal, subPtr); SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = req.senderData; conf->subscriptionId = req.subscriptionId; conf->subscriptionKey = req.subscriptionKey; sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal, SubRemoveConf::SignalLength, JBB); return; } void Suma::check_release_subscription(Signal* signal, Ptr<Subscription> subPtr) { if (!subPtr.p->m_subscribers.isEmpty()) { jam(); return; } if (!subPtr.p->m_start_req.isEmpty()) { jam(); return; } if (!subPtr.p->m_stop_req.isEmpty()) { jam(); return; } switch(subPtr.p->m_trigger_state){ case Subscription::T_UNDEFINED: jam(); goto do_release; case Subscription::T_CREATING: jam(); /** * Wait for completion */ return; case Subscription::T_DEFINED: jam(); subPtr.p->m_trigger_state = Subscription::T_DROPPING; drop_triggers(signal, subPtr); return; case Subscription::T_DROPPING: jam(); /** * Wait for completion */ return; case Subscription::T_ERROR: jam(); /** * Wait for completion */ return; } ndbrequire(false); do_release: TablePtr tabPtr; c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI); if (tabPtr.p->m_state == Table::DROPPED) { jam(); subPtr.p->m_options |= Subscription::MARKED_DROPPED; } if ((subPtr.p->m_options & Subscription::MARKED_DROPPED) == 0) { jam(); return; } { LocalDLList<Subscription> list(c_subscriptionPool, tabPtr.p->m_subscriptions); list.remove(subPtr); } if (tabPtr.p->m_subscriptions.isEmpty()) { jam(); switch(tabPtr.p->m_state){ case Table::UNDEFINED: ndbrequire(false); case Table::DEFINING: break; case Table::DEFINED: jam(); c_tables.remove(tabPtr); // Fall through case Table::DROPPED: jam(); tabPtr.p->release(* this); c_tablePool.release(tabPtr); }; } c_subscriptions.release(subPtr); } void Suma::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req, Uint32 errCode) { jam(); DBUG_ENTER("Suma::sendSubRemoveRef"); SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend(); ref->senderRef = reference(); ref->senderData = req.senderData; ref->subscriptionId = req.subscriptionId; ref->subscriptionKey = req.subscriptionKey; ref->errorCode = errCode; sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF, signal, SubRemoveRef::SignalLength, JBB); DBUG_VOID_RETURN; } void Suma::Table::release(Suma & suma){ jamBlock(&suma); m_state = UNDEFINED; } void Suma::SyncRecord::release(){ jam(); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments); fragBuf.release(); LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList); attrBuf.release(); LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo); boundBuf.release(); } /************************************************************** * * Restarting remote node functions, master functionality * (slave does nothing special) * - triggered on INCL_NODEREQ calling startNode * - included node will issue START_ME when it's ready to start * the subscribers * */ void Suma::execSUMA_START_ME_REQ(Signal* signal) { jamEntry(); Uint32 retref = signal->getSendersBlockRef(); if (c_restart.m_ref) { jam(); SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend(); ref->errorCode = SumaStartMeRef::Busy; sendSignal(retref, GSN_SUMA_START_ME_REF, signal, SumaStartMeRef::SignalLength, JBB); return; } if (getNodeState().getStarted() == false) { jam(); SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend(); ref->errorCode = SumaStartMeRef::NotStarted; sendSignal(retref, GSN_SUMA_START_ME_REF, signal, SumaStartMeRef::SignalLength, JBB); return; } Ptr<SubOpRecord> subOpPtr; if (c_subOpPool.seize(subOpPtr) == false) { jam(); SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend(); ref->errorCode = SumaStartMeRef::Busy; sendSignal(retref, GSN_SUMA_START_ME_REF, signal, SumaStartMeRef::SignalLength, JBB); return; } subOpPtr.p->m_opType = SubOpRecord::R_START_ME_REQ; c_restart.m_abort = 0; c_restart.m_waiting_on_self = 0; c_restart.m_ref = retref; c_restart.m_max_seq = c_current_seq; c_restart.m_subOpPtrI = subOpPtr.i; DLHashTable<Subscription>::Iterator it; if (c_subscriptions.first(it)) { jam(); /** * We only need to handle subscriptions with seq <= c_current_seq * all subscriptions(s) created after this, will be handled by * starting suma directly */ c_current_seq++; } copySubscription(signal, it); } void Suma::copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator it) { jam(); Ptr<SubOpRecord> subOpPtr; c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI); Ptr<Subscription> subPtr = it.curr; if (!subPtr.isNull()) { jam(); c_restart.m_subPtrI = subPtr.i; c_restart.m_bucket = it.bucket; LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req); bool empty = list.isEmpty(); list.add(subOpPtr); if (!empty) { /** * Wait for lock */ jam(); c_restart.m_waiting_on_self = 1; return; } sendSubCreateReq(signal, subPtr); } else { jam(); SumaStartMeConf* conf = (SumaStartMeConf*)signal->getDataPtrSend(); conf->unused = 0; sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_CONF, signal, SumaStartMeConf::SignalLength, JBB); c_subOpPool.release(subOpPtr); c_restart.m_ref = 0; return; } } void Suma::sendSubCreateReq(Signal* signal, Ptr<Subscription> subPtr) { jam(); if (c_restart.m_abort) { jam(); abort_start_me(signal, subPtr, true); return; } c_restart.m_waiting_on_self = 0; SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = subPtr.i; req->subscriptionId = subPtr.p->m_subscriptionId; req->subscriptionKey = subPtr.p->m_subscriptionKey; req->subscriptionType = subPtr.p->m_subscriptionType; req->tableId = subPtr.p->m_tableId; req->schemaTransId = 0; if (subPtr.p->m_options & Subscription::REPORT_ALL) { req->subscriptionType |= SubCreateReq::ReportAll; } if (subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE) { req->subscriptionType |= SubCreateReq::ReportSubscribe; } if (subPtr.p->m_options & Subscription::NO_REPORT_DDL) { req->subscriptionType |= SubCreateReq::NoReportDDL; } if (subPtr.p->m_options & Subscription::MARKED_DROPPED) { req->subscriptionType |= SubCreateReq::NR_Sub_Dropped; ndbout_c("copying dropped sub: %u", subPtr.i); } Ptr<Table> tabPtr; c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI); if (tabPtr.p->m_state != Table::DROPPED) { jam(); c_restart.m_waiting_on_self = 0; if (!ndbd_suma_dictlock_startme(getNodeInfo(refToNode(c_restart.m_ref)).m_version)) { jam(); /** * Downgrade * * In pre suma v2, SUB_CREATE_REQ::SignalLength is one greater * but code checks length and set a default value... * so we dont need to do anything... * Thank you Ms. Fortuna */ } sendSignal(c_restart.m_ref, GSN_SUB_CREATE_REQ, signal, SubCreateReq::SignalLength, JBB); } else { jam(); ndbout_c("not copying sub %u with dropped table: %u/%u", subPtr.i, tabPtr.p->m_tableId, tabPtr.i); c_restart.m_waiting_on_self = 1; SubCreateConf * conf = (SubCreateConf *)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = subPtr.i; sendSignal(reference(), GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB); } } void Suma::execSUB_CREATE_REF(Signal* signal) { jamEntry(); SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr(); Uint32 error= ref->errorCode; { SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend(); ref->errorCode = error; sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal, SumaStartMeRef::SignalLength, JBB); } Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI); abort_start_me(signal, subPtr, true); } void Suma::execSUB_CREATE_CONF(Signal* signal) { jamEntry(); /** * We have lock...start all subscriber(s) */ Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI); c_restart.m_waiting_on_self = 0; /** * Check if we were aborted... * this signal is sent to self in case of DROPPED subscription... */ if (c_restart.m_abort) { jam(); abort_start_me(signal, subPtr, true); return; } Ptr<Table> tabPtr; c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI); Ptr<Subscriber> ptr; if (tabPtr.p->m_state != Table::DROPPED) { jam(); LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); list.first(ptr); } else { jam(); ptr.setNull(); ndbout_c("not copying subscribers on sub: %u with dropped table %u/%u", subPtr.i, tabPtr.p->m_tableId, tabPtr.i); } copySubscriber(signal, subPtr, ptr); } void Suma::copySubscriber(Signal* signal, Ptr<Subscription> subPtr, Ptr<Subscriber> ptr) { if (!ptr.isNull()) { jam(); SubStartReq* req = (SubStartReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = ptr.i; req->subscriptionId = subPtr.p->m_subscriptionId; req->subscriptionKey = subPtr.p->m_subscriptionKey; req->part = SubscriptionData::TableData; req->subscriberData = ptr.p->m_senderData; req->subscriberRef = ptr.p->m_senderRef; sendSignal(c_restart.m_ref, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength, JBB); return; } else { // remove lock from this subscription Ptr<SubOpRecord> subOpPtr; c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI); check_remove_queue(signal, subPtr, subOpPtr, true, false); check_release_subscription(signal, subPtr); DLHashTable<Subscription>::Iterator it; it.curr = subPtr; it.bucket = c_restart.m_bucket; c_subscriptions.next(it); copySubscription(signal, it); } } void Suma::execSUB_START_CONF(Signal* signal) { jamEntry(); SubStartConf * const conf = (SubStartConf*)signal->getDataPtr(); Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI); Ptr<Subscriber> ptr; c_subscriberPool.getPtr(ptr, conf->senderData); LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); list.next(ptr); copySubscriber(signal, subPtr, ptr); } void Suma::execSUB_START_REF(Signal* signal) { jamEntry(); SubStartRef * sig = (SubStartRef*)signal->getDataPtr(); Uint32 errorCode = sig->errorCode; { SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend(); ref->errorCode = errorCode; sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal, SumaStartMeRef::SignalLength, JBB); } Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI); abort_start_me(signal, subPtr, true); } void Suma::abort_start_me(Signal* signal, Ptr<Subscription> subPtr, bool lockowner) { Ptr<SubOpRecord> subOpPtr; c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI); check_remove_queue(signal, subPtr, subOpPtr, lockowner, true); check_release_subscription(signal, subPtr); c_restart.m_ref = 0; } void Suma::execSUMA_HANDOVER_REQ(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSUMA_HANDOVER_REQ"); // Uint32 sumaRef = signal->getSendersBlockRef(); const SumaHandoverReq * req = CAST_CONSTPTR(SumaHandoverReq, signal->getDataPtr()); Uint32 gci = req->gci; Uint32 nodeId = req->nodeId; Uint32 new_gci = Uint32(m_last_complete_gci >> 32) + MAX_CONCURRENT_GCP + 1; Uint32 requestType = req->requestType; if (!ndbd_suma_stop_me(getNodeInfo(nodeId).m_version)) { jam(); requestType = SumaHandoverReq::RT_START_NODE; } Uint32 start_gci = (gci > new_gci ? gci : new_gci); // mark all active buckets really belonging to restarting SUMA Bucket_mask tmp; if (requestType == SumaHandoverReq::RT_START_NODE) { jam(); c_alive_nodes.set(nodeId); if (DBG_3R) ndbout_c("%u c_alive_nodes.set(%u)", __LINE__, nodeId); for( Uint32 i = 0; i < c_no_of_buckets; i++) { if(get_responsible_node(i) == nodeId) { if (m_active_buckets.get(i)) { // I'm running this bucket but it should really be the restarted node tmp.set(i); m_active_buckets.clear(i); m_switchover_buckets.set(i); c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1; c_buckets[i].m_state |= Bucket::BUCKET_HANDOVER; c_buckets[i].m_switchover_node = nodeId; ndbout_c("prepare to handover bucket: %d", i); } else if(m_switchover_buckets.get(i)) { ndbout_c("dont handover bucket: %d %d", i, nodeId); } } } } else if (requestType == SumaHandoverReq::RT_STOP_NODE) { jam(); for( Uint32 i = 0; i < c_no_of_buckets; i++) { NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask; nodegroup.clear(nodeId); if(get_responsible_node(i) == nodeId && get_responsible_node(i, nodegroup) == getOwnNodeId()) { // I'm will be running this bucket when nodeId shutdown jam(); tmp.set(i); m_switchover_buckets.set(i); c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1; c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN_TO; c_buckets[i].m_switchover_node = nodeId; ndbout_c("prepare to takeover bucket: %d", i); } } } else { jam(); goto ref; } { SumaHandoverConf *conf= CAST_PTR(SumaHandoverConf,signal->getDataPtrSend()); tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask); conf->gci = start_gci; conf->nodeId = getOwnNodeId(); conf->requestType = requestType; sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal, SumaHandoverConf::SignalLength, JBB); } DBUG_VOID_RETURN; ref: signal->theData[0] = 111; signal->theData[1] = getOwnNodeId(); signal->theData[2] = nodeId; sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_REF, signal, 3, JBB); DBUG_VOID_RETURN; } // only run on all but restarting suma void Suma::execSUMA_HANDOVER_REF(Signal* signal) { ndbrequire(false); } void Suma::execSUMA_HANDOVER_CONF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF"); const SumaHandoverConf * conf = CAST_CONSTPTR(SumaHandoverConf, signal->getDataPtr()); CRASH_INSERTION(13043); Uint32 gci = conf->gci; Uint32 nodeId = conf->nodeId; Uint32 requestType = conf->requestType; Bucket_mask tmp; tmp.assign(BUCKET_MASK_SIZE, conf->theBucketMask); #ifdef HANDOVER_DEBUG ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci); #endif if (!ndbd_suma_stop_me(getNodeInfo(nodeId).m_version)) { jam(); requestType = SumaHandoverReq::RT_START_NODE; } if (requestType == SumaHandoverReq::RT_START_NODE) { jam(); for (Uint32 i = 0; i < c_no_of_buckets; i++) { if (tmp.get(i)) { if (DBG_3R) ndbout_c("%u : %u %u", i, get_responsible_node(i), getOwnNodeId()); ndbrequire(get_responsible_node(i) == getOwnNodeId()); // We should run this bucket, but _nodeId_ is c_buckets[i].m_switchover_gci = (Uint64(gci) << 32) - 1; c_buckets[i].m_state |= Bucket::BUCKET_STARTING; } } char buf[255]; tmp.getText(buf); infoEvent("Suma: handover from node %u gci: %u buckets: %s (%u)", nodeId, gci, buf, c_no_of_buckets); g_eventLogger->info("Suma: handover from node %u gci: %u buckets: %s (%u)", nodeId, gci, buf, c_no_of_buckets); m_switchover_buckets.bitOR(tmp); c_startup.m_handover_nodes.clear(nodeId); DBUG_VOID_RETURN; } else if (requestType == SumaHandoverReq::RT_STOP_NODE) { jam(); for (Uint32 i = 0; i < c_no_of_buckets; i++) { if (tmp.get(i)) { ndbrequire(get_responsible_node(i) == getOwnNodeId()); // We should run this bucket, but _nodeId_ is c_buckets[i].m_switchover_node = getOwnNodeId(); c_buckets[i].m_switchover_gci = (Uint64(gci) << 32) - 1; c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN; } } char buf[255]; tmp.getText(buf); infoEvent("Suma: handover to node %u gci: %u buckets: %s (%u)", nodeId, gci, buf, c_no_of_buckets); g_eventLogger->info("Suma: handover to node %u gci: %u buckets: %s (%u)", nodeId, gci, buf, c_no_of_buckets); m_switchover_buckets.bitOR(tmp); c_startup.m_handover_nodes.clear(nodeId); DBUG_VOID_RETURN; } } void Suma::execSTOP_ME_REQ(Signal* signal) { jam(); StopMeReq req = * CAST_CONSTPTR(StopMeReq, signal->getDataPtr()); ndbrequire(refToNode(req.senderRef) == getOwnNodeId()); ndbrequire(c_shutdown.m_wait_handover == false); c_shutdown.m_wait_handover = true; c_shutdown.m_senderRef = req.senderRef; c_shutdown.m_senderData = req.senderData; for (Uint32 i = c_nodes_in_nodegroup_mask.find(0); i != c_nodes_in_nodegroup_mask.NotFound ; i = c_nodes_in_nodegroup_mask.find(i + 1)) { /** * Check that all SUMA nodes support graceful shutdown... * and it's too late to stop it... * Shutdown instead... */ if (!ndbd_suma_stop_me(getNodeInfo(i).m_version)) { jam(); char buf[255]; BaseString::snprintf(buf, sizeof(buf), "Not all versions support graceful shutdown (suma)." " Shutdown directly instead"); progError(__LINE__, NDBD_EXIT_GRACEFUL_SHUTDOWN_ERROR, buf); ndbrequire(false); } } send_handover_req(signal, SumaHandoverReq::RT_STOP_NODE); } #ifdef NOT_USED static NdbOut& operator<<(NdbOut & out, const Suma::Page_pos & pos) { out << "[ Page_pos:" << " m_page_id: " << pos.m_page_id << " m_page_pos: " << pos.m_page_pos << " m_max_gci: " << pos.m_max_gci << " ]"; return out; } #endif Uint32* Suma::get_buffer_ptr(Signal* signal, Uint32 buck, Uint64 gci, Uint32 sz) { sz += 1; // len Bucket* bucket= c_buckets+buck; Page_pos pos= bucket->m_buffer_head; Buffer_page* page = 0; Uint32 *ptr = 0; if (likely(pos.m_page_id != RNIL)) { page= c_page_pool.getPtr(pos.m_page_id); ptr= page->m_data + pos.m_page_pos; } const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022)); pos.m_page_pos += sz; pos.m_last_gci = gci; Uint64 max = pos.m_max_gci > gci ? pos.m_max_gci : gci; if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS)) { pos.m_max_gci = max; bucket->m_buffer_head = pos; * ptr++ = (0x8000 << 16) | sz; // Same gci return ptr; } else if(pos.m_page_pos + Buffer_page::GCI_SZ32 <= Buffer_page::DATA_WORDS) { loop: pos.m_max_gci = max; pos.m_page_pos += Buffer_page::GCI_SZ32; bucket->m_buffer_head = pos; * ptr++ = (sz + Buffer_page::GCI_SZ32); * ptr++ = (Uint32)(gci >> 32); * ptr++ = (Uint32)(gci & 0xFFFFFFFF); return ptr; } else { /** * new page * 1) save header on last page * 2) seize new page */ Uint32 next; if(unlikely((next= seize_page()) == RNIL)) { /** * Out of buffer */ out_of_buffer(signal); return 0; } if(likely(pos.m_page_id != RNIL)) { page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32); page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF); page->m_words_used = pos.m_page_pos - sz; page->m_next_page= next; ndbassert(pos.m_max_gci != 0); } else { bucket->m_buffer_tail = next; } memset(&pos, 0, sizeof(pos)); pos.m_page_id = next; pos.m_page_pos = sz; pos.m_last_gci = gci; page= c_page_pool.getPtr(pos.m_page_id); page->m_next_page= RNIL; ptr= page->m_data; goto loop; // } } void Suma::out_of_buffer(Signal* signal) { if(m_out_of_buffer_gci) { return; } m_out_of_buffer_gci = m_last_complete_gci - 1; infoEvent("Out of event buffer: nodefailure will cause event failures"); m_missing_data = false; out_of_buffer_release(signal, 0); } void Suma::out_of_buffer_release(Signal* signal, Uint32 buck) { Bucket* bucket= c_buckets+buck; Uint32 tail= bucket->m_buffer_tail; if(tail != RNIL) { Buffer_page* page= c_page_pool.getPtr(tail); bucket->m_buffer_tail = page->m_next_page; free_page(tail, page); signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE; signal->theData[1] = buck; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB); return; } /** * Clear head */ bucket->m_buffer_head.m_page_id = RNIL; bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1; buck++; if(buck != c_no_of_buckets) { signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE; signal->theData[1] = buck; sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB); return; } /** * Finished will all release * prepare for inclusion */ m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci ? m_max_seen_gci : m_last_complete_gci; m_missing_data = false; } Uint32 Suma::seize_page() { if (ERROR_INSERTED(13038)) { jam(); CLEAR_ERROR_INSERT_VALUE; ndbout_c("Simulating out of event buffer"); m_out_of_buffer_gci = m_max_seen_gci; } if(unlikely(m_out_of_buffer_gci)) { return RNIL; } loop: Ptr<Page_chunk> ptr; Uint32 ref= m_first_free_page; if(likely(ref != RNIL)) { m_first_free_page = (c_page_pool.getPtr(ref))->m_next_page; Uint32 chunk = (c_page_pool.getPtr(ref))->m_page_chunk_ptr_i; c_page_chunk_pool.getPtr(ptr, chunk); ndbassert(ptr.p->m_free); ptr.p->m_free--; return ref; } if(!c_page_chunk_pool.seize(ptr)) return RNIL; Uint32 count = 16; m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &ref, &count, 1); if (count == 0) return RNIL; ndbout_c("alloc_chunk(%d %d) - ", ref, count); m_first_free_page = ptr.p->m_page_id = ref; ptr.p->m_size = count; ptr.p->m_free = count; Buffer_page* page; LINT_INIT(page); for(Uint32 i = 0; i<count; i++) { page = c_page_pool.getPtr(ref); page->m_page_state= SUMA_SEQUENCE; page->m_page_chunk_ptr_i = ptr.i; page->m_next_page = ++ref; } page->m_next_page = RNIL; goto loop; } void Suma::free_page(Uint32 page_id, Buffer_page* page) { Ptr<Page_chunk> ptr; ndbrequire(page->m_page_state == SUMA_SEQUENCE); Uint32 chunk= page->m_page_chunk_ptr_i; c_page_chunk_pool.getPtr(ptr, chunk); ptr.p->m_free ++; page->m_next_page = m_first_free_page; ndbrequire(ptr.p->m_free <= ptr.p->m_size); m_first_free_page = page_id; } void Suma::release_gci(Signal* signal, Uint32 buck, Uint64 gci) { Bucket* bucket= c_buckets+buck; Uint32 tail= bucket->m_buffer_tail; Page_pos head= bucket->m_buffer_head; Uint64 max_acked = bucket->m_max_acked_gci; const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND; if(unlikely(bucket->m_state & mask)) { jam(); ndbout_c("release_gci(%d, %u/%u) 0x%x-> node failure -> abort", buck, Uint32(gci >> 32), Uint32(gci), bucket->m_state); return; } bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci); if(unlikely(tail == RNIL)) { return; } if(tail == head.m_page_id) { if(gci >= head.m_max_gci) { jam(); if (ERROR_INSERTED(13034)) { jam(); SET_ERROR_INSERT_VALUE(13035); return; } if (ERROR_INSERTED(13035)) { CLEAR_ERROR_INSERT_VALUE; NodeReceiverGroup rg(CMVMI, c_nodes_in_nodegroup_mask); rg.m_nodes.clear(getOwnNodeId()); signal->theData[0] = 9999; sendSignal(rg, GSN_NDB_TAMPER, signal, 1, JBA); return; } head.m_page_pos = 0; head.m_max_gci = gci; head.m_last_gci = 0; bucket->m_buffer_head = head; } return; } else { jam(); Buffer_page* page= c_page_pool.getPtr(tail); Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32); Uint32 next_page = page->m_next_page; ndbassert(max_gci != 0); if(gci >= max_gci) { jam(); free_page(tail, page); bucket->m_buffer_tail = next_page; signal->theData[0] = SumaContinueB::RELEASE_GCI; signal->theData[1] = buck; signal->theData[2] = (Uint32)(gci >> 32); signal->theData[3] = (Uint32)(gci & 0xFFFFFFFF); sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 4, JBB); return; } else { //ndbout_c("do nothing..."); } } } static Uint32 g_cnt = 0; void Suma::start_resend(Signal* signal, Uint32 buck) { printf("start_resend(%d, ", buck); /** * Resend from m_max_acked_gci + 1 until max_gci + 1 */ Bucket* bucket= c_buckets + buck; Page_pos pos= bucket->m_buffer_head; if(m_out_of_buffer_gci) { Ptr<Gcp_record> gcp; c_gcp_list.last(gcp); signal->theData[0] = NDB_LE_SubscriptionStatus; signal->theData[1] = 2; // INCONSISTENT; signal->theData[2] = 0; // Not used signal->theData[3] = (Uint32) pos.m_max_gci; signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32); sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB); m_missing_data = true; return; } if(pos.m_page_id == RNIL) { jam(); m_active_buckets.set(buck); m_gcp_complete_rep_count ++; ndbout_c("empty bucket(RNIL) -> active max_acked: %u/%u max_gci: %u/%u", Uint32(bucket->m_max_acked_gci >> 32), Uint32(bucket->m_max_acked_gci), Uint32(pos.m_max_gci >> 32), Uint32(pos.m_max_gci)); return; } Uint64 min= bucket->m_max_acked_gci + 1; Uint64 max = m_max_seen_gci; ndbrequire(max <= m_max_seen_gci); if(min > max) { ndbrequire(pos.m_page_id == bucket->m_buffer_tail); m_active_buckets.set(buck); m_gcp_complete_rep_count ++; ndbout_c("empty bucket (%u/%u %u/%u) -> active", Uint32(min >> 32), Uint32(min), Uint32(max >> 32), Uint32(max)); return; } g_cnt = 0; bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND); bucket->m_switchover_node = get_responsible_node(buck); bucket->m_switchover_gci = max; m_switchover_buckets.set(buck); signal->theData[0] = SumaContinueB::RESEND_BUCKET; signal->theData[1] = buck; signal->theData[2] = (Uint32)(min >> 32); signal->theData[3] = 0; signal->theData[4] = 0; signal->theData[5] = (Uint32)(min & 0xFFFFFFFF); signal->theData[6] = 0; sendSignal(reference(), GSN_CONTINUEB, signal, 7, JBB); ndbout_c("min: %u/%u - max: %u/%u) page: %d", Uint32(min >> 32), Uint32(min), Uint32(max >> 32), Uint32(max), bucket->m_buffer_tail); ndbrequire(max >= min); } void Suma::resend_bucket(Signal* signal, Uint32 buck, Uint64 min_gci, Uint32 pos, Uint64 last_gci) { Bucket* bucket= c_buckets+buck; Uint32 tail= bucket->m_buffer_tail; Buffer_page* page= c_page_pool.getPtr(tail); Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32); Uint32 next_page = page->m_next_page; Uint32 *ptr = page->m_data + pos; Uint32 *end = page->m_data + page->m_words_used; bool delay = false; ndbrequire(tail != RNIL); if(tail == bucket->m_buffer_head.m_page_id) { max_gci= bucket->m_buffer_head.m_max_gci; end= page->m_data + bucket->m_buffer_head.m_page_pos; next_page= RNIL; if(ptr == end) { delay = true; goto next; } } else if(pos == 0 && min_gci > max_gci) { free_page(tail, page); tail = bucket->m_buffer_tail = next_page; goto next; } #if 0 for(Uint32 i = 0; i<page->m_words_used; i++) { printf("%.8x ", page->m_data[i]); if(((i + 1) % 8) == 0) printf("\n"); } printf("\n"); #endif while(ptr < end) { Uint32 *src = ptr; Uint32 tmp = * src++; Uint32 sz = tmp & 0xFFFF; ptr += sz; if(! (tmp & (0x8000 << 16))) { ndbrequire(sz >= Buffer_page::GCI_SZ32); sz -= Buffer_page::GCI_SZ32; Uint32 last_gci_hi = * src++; Uint32 last_gci_lo = * src++; last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32); } else { ndbrequire(ptr - sz > page->m_data); } if(last_gci < min_gci) { continue; } ndbrequire(sz); sz --; // remove *len* part of sz if(sz == 0) { SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend(); rep->gci_hi = (Uint32)(last_gci >> 32); rep->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF); rep->flags = (m_missing_data) ? SubGcpCompleteRep::MISSING_DATA : 0; rep->senderRef = reference(); rep->gcp_complete_rep_count = 1; if (ERROR_INSERTED(13036)) { jam(); CLEAR_ERROR_INSERT_VALUE; ndbout_c("Simulating out of event buffer at node failure"); rep->flags |= SubGcpCompleteRep::MISSING_DATA; } char buf[255]; c_subscriber_nodes.getText(buf); if (g_cnt) { ndbout_c("resending GCI: %u/%u rows: %d -> %s", Uint32(last_gci >> 32), Uint32(last_gci), g_cnt, buf); } g_cnt = 0; NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes); sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal, SubGcpCompleteRep::SignalLength, JBB); } else { const uint buffer_header_sz = 6; g_cnt++; Uint32 subPtrI = * src++ ; Uint32 schemaVersion = * src++; Uint32 event = * src >> 16; Uint32 sz_1 = (* src ++) & 0xFFFF; Uint32 any_value = * src++; Uint32 transId1 = * src++; Uint32 transId2 = * src++; ndbassert(sz - buffer_header_sz >= sz_1); LinearSectionPtr ptr[3]; const Uint32 nptr= reformat(signal, ptr, src, sz_1, src + sz_1, sz - buffer_header_sz - sz_1); Uint32 ptrLen= 0; for(Uint32 i =0; i < nptr; i++) ptrLen+= ptr[i].sz; /** * Signal to subscriber(s) */ Ptr<Subscription> subPtr; c_subscriptionPool.getPtr(subPtr, subPtrI); Ptr<Table> tabPtr; c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI); Uint32 table = subPtr.p->m_tableId; if (table_version_major(tabPtr.p->m_schemaVersion) == table_version_major(schemaVersion)) { SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; data->gci_hi = (Uint32)(last_gci >> 32); data->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF); data->tableId = table; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, event); data->flags = 0; data->anyValue = any_value; data->totalLen = ptrLen; data->transId1 = transId1; data->transId2 = transId2; { LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers); SubscriberPtr subbPtr; for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr)) { data->senderData = subbPtr.p->m_senderData; sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLengthWithTransId, JBB, ptr, nptr); } } } } break; } if(ptr == end && (tail != bucket->m_buffer_head.m_page_id)) { /** * release... */ free_page(tail, page); tail = bucket->m_buffer_tail = next_page; pos = 0; last_gci = 0; } else { pos = Uint32(ptr - page->m_data); } next: if(tail == RNIL) { bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND; ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER)); ndbout_c("resend done..."); return; } signal->theData[0] = SumaContinueB::RESEND_BUCKET; signal->theData[1] = buck; signal->theData[2] = (Uint32)(min_gci >> 32); signal->theData[3] = pos; signal->theData[4] = (Uint32)(last_gci >> 32); signal->theData[5] = (Uint32)(min_gci & 0xFFFFFFFF); signal->theData[6] = (Uint32)(last_gci & 0xFFFFFFFF); if(!delay) sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 7, JBB); else sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 10, 7); } void Suma::execGCP_PREPARE(Signal *signal) { jamEntry(); const GCPPrepare *prep = (const GCPPrepare *)signal->getDataPtr(); m_current_gci = prep->gci_lo | (Uint64(prep->gci_hi) << 32); } Uint64 Suma::get_current_gci(Signal*) { return m_current_gci; } void Suma::execCREATE_NODEGROUP_IMPL_REQ(Signal* signal) { CreateNodegroupImplReq reqCopy = *(CreateNodegroupImplReq*) signal->getDataPtr(); CreateNodegroupImplReq *req = &reqCopy; Uint32 err = 0; Uint32 rt = req->requestType; NdbNodeBitmask tmp; for (Uint32 i = 0; i<NDB_ARRAY_SIZE(req->nodes) && req->nodes[i]; i++) { tmp.set(req->nodes[i]); } Uint32 cnt = tmp.count(); Uint32 group = req->nodegroupId; switch(rt){ case CreateNodegroupImplReq::RT_ABORT: jam(); break; case CreateNodegroupImplReq::RT_PARSE: jam(); break; case CreateNodegroupImplReq::RT_PREPARE: jam(); break; case CreateNodegroupImplReq::RT_COMMIT: jam(); break; case CreateNodegroupImplReq::RT_COMPLETE: jam(); CRASH_INSERTION(13043); Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo; ndbrequire(gci > m_last_complete_gci); Uint32 state = 0; if (c_nodeGroup != RNIL) { jam(); NdbNodeBitmask check = tmp; check.bitAND(c_nodes_in_nodegroup_mask); ndbrequire(check.isclear()); ndbrequire(c_nodeGroup != group); ndbrequire(cnt == c_nodes_in_nodegroup_mask.count()); state = Bucket::BUCKET_CREATED_OTHER; } else if (tmp.get(getOwnNodeId())) { jam(); c_nodeGroup = group; c_nodes_in_nodegroup_mask.assign(tmp); fix_nodegroup(); state = Bucket::BUCKET_CREATED_SELF; } if (state != 0) { for (Uint32 i = 0; i<c_no_of_buckets; i++) { jam(); m_switchover_buckets.set(i); c_buckets[i].m_switchover_gci = gci - 1; // start from gci c_buckets[i].m_state = state | (c_no_of_buckets << 8); } } } { CreateNodegroupImplConf* conf = (CreateNodegroupImplConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = req->senderData; sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_CONF, signal, CreateNodegroupImplConf::SignalLength, JBB); } return; //error: CreateNodegroupImplRef *ref = (CreateNodegroupImplRef*)signal->getDataPtrSend(); ref->senderRef = reference(); ref->senderData = req->senderData; ref->errorCode = err; sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_REF, signal, CreateNodegroupImplRef::SignalLength, JBB); return; } void Suma::execDROP_NODEGROUP_IMPL_REQ(Signal* signal) { DropNodegroupImplReq reqCopy = *(DropNodegroupImplReq*) signal->getDataPtr(); DropNodegroupImplReq *req = &reqCopy; Uint32 err = 0; Uint32 rt = req->requestType; Uint32 group = req->nodegroupId; switch(rt){ case DropNodegroupImplReq::RT_ABORT: jam(); break; case DropNodegroupImplReq::RT_PARSE: jam(); break; case DropNodegroupImplReq::RT_PREPARE: jam(); break; case DropNodegroupImplReq::RT_COMMIT: jam(); break; case DropNodegroupImplReq::RT_COMPLETE: jam(); CRASH_INSERTION(13043); Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo; ndbrequire(gci > m_last_complete_gci); Uint32 state; if (c_nodeGroup != group) { jam(); state = Bucket::BUCKET_DROPPED_OTHER; break; } else { jam(); state = Bucket::BUCKET_DROPPED_SELF; } for (Uint32 i = 0; i<c_no_of_buckets; i++) { jam(); m_switchover_buckets.set(i); if (c_buckets[i].m_state != 0) { jamLine(c_buckets[i].m_state); ndbout_c("c_buckets[%u].m_state: %u", i, c_buckets[i].m_state); } ndbrequire(c_buckets[i].m_state == 0); // XXX todo c_buckets[i].m_switchover_gci = gci - 1; // start from gci c_buckets[i].m_state = state | (c_no_of_buckets << 8); } break; } { DropNodegroupImplConf* conf = (DropNodegroupImplConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = req->senderData; sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_CONF, signal, DropNodegroupImplConf::SignalLength, JBB); } return; //error: DropNodegroupImplRef *ref = (DropNodegroupImplRef*)signal->getDataPtrSend(); ref->senderRef = reference(); ref->senderData = req->senderData; ref->errorCode = err; sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_REF, signal, DropNodegroupImplRef::SignalLength, JBB); return; } template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);