in cppcache/src/TcrConnection.cpp [109:476]
bool TcrConnection::initTcrConnection(
std::shared_ptr<TcrEndpoint> endpointObj,
synchronized_set<std::unordered_set<uint16_t>>& ports,
bool isClientNotification, bool isSecondary,
std::chrono::microseconds connectTimeout) {
endpointObj_ = endpointObj;
poolDM_ = dynamic_cast<ThinClientPoolDM*>(endpointObj_->getPoolHADM());
hasServerQueue_ = NON_REDUNDANT_SERVER;
queueSize_ = 0;
lastAccessed_ = creationTime_ = std::chrono::steady_clock::now();
connectionId = INITIAL_CONNECTION_ID;
auto cacheImpl = poolDM_->getConnectionManager().getCacheImpl();
const auto& distributedSystem = cacheImpl->getDistributedSystem();
const auto& sysProp = distributedSystem.getSystemProperties();
bool isPool = false;
isBeingUsed_ = false;
// Precondition:
// 1. isSecondary ==> isClientNotification
// Create TcpConn object which manages a socket connection with the endpoint.
if (endpointObj && endpointObj->getPoolHADM()) {
createConnection(endpointObj_->name().c_str(), connectTimeout,
static_cast<int32_t>(
endpointObj->getPoolHADM()->getSocketBufferSize()));
isPool = true;
} else {
createConnection(endpointObj_->name().c_str(), connectTimeout,
sysProp.maxSocketBufferSize());
}
auto handShakeMsg = cacheImpl->createDataOutput();
bool isNotificationChannel = false;
// Send byte Acceptor.CLIENT_TO_SERVER = (byte) 100;
// Send byte Acceptor.SERVER_TO_CLIENT = (byte) 101;
if (isClientNotification) {
isNotificationChannel = true;
if (isSecondary) {
handShakeMsg.write(static_cast<int8_t>(SECONDARY_SERVER_TO_CLIENT));
} else {
handShakeMsg.write(static_cast<int8_t>(PRIMARY_SERVER_TO_CLIENT));
}
} else {
handShakeMsg.write(static_cast<int8_t>(CLIENT_TO_SERVER));
}
Version::write(handShakeMsg, Version::current());
LOGFINE("Client version ordinal is %d", Version::current().getOrdinal());
handShakeMsg.write(static_cast<int8_t>(REPLY_OK));
// Send byte REPLY_OK = (byte)58;
if (!isClientNotification) {
port_ = conn_->getPort();
ports.insert(port_);
} else {
auto&& lock = ports.make_lock();
handShakeMsg.writeInt(static_cast<int32_t>(ports.size()));
for (const auto& port : ports) {
handShakeMsg.writeInt(static_cast<int32_t>(port));
}
}
// Writing handshake readtimeout value for CSVER_51+.
if (!isClientNotification) {
// SW: The timeout has been artificially raised to the highest
// permissible value for bug #232 for now.
// minus 10 sec because the GFE 5.7 gridDev branch adds a
// 5 sec buffer which was causing an int overflow.
handShakeMsg.writeInt(static_cast<int32_t>(0x7fffffff) - 10000);
}
// Write header for byte FixedID since GFE 5.7
handShakeMsg.write(static_cast<int8_t>(DSCode::FixedIDByte));
// Writing byte for ClientProxyMembershipID class id=38 as registered on the
// java server.
handShakeMsg.write(static_cast<int8_t>(DSFid::ClientProxyMembershipId));
if (endpointObj->getPoolHADM()) {
ClientProxyMembershipID* memId =
endpointObj->getPoolHADM()->getMembershipId();
const auto& dsMemberId = memId->getDSMemberId();
handShakeMsg.writeBytes(
reinterpret_cast<const uint8_t*>(dsMemberId.c_str()),
static_cast<int32_t>(dsMemberId.size()));
} else {
// Add 3 durable Subcription properties to ClientProxyMembershipID
auto&& durableId = sysProp.durableClientId();
auto&& durableTimeOut = sysProp.durableTimeout();
// Write ClientProxyMembershipID serialized object.
auto memId = cacheImpl->getClientProxyMembershipIDFactory().create(
durableId.c_str(), durableTimeOut);
const auto& dsMemberId = memId->getDSMemberId();
handShakeMsg.writeBytes(
reinterpret_cast<const uint8_t*>(dsMemberId.c_str()),
static_cast<int32_t>(dsMemberId.size()));
}
handShakeMsg.writeInt(static_cast<int32_t>(1));
bool requireServerAuth = false;
std::shared_ptr<Properties> credentials;
std::shared_ptr<CacheableBytes> serverChallenge;
// Write overrides (just conflation for now)
handShakeMsg.write(getOverrides(&sysProp));
bool tmpIsSecurityOn = nullptr != cacheImpl->getAuthInitialize();
if (endpointObj_) {
tmpIsSecurityOn = tmpIsSecurityOn || endpointObj_->isMultiUserMode();
}
LOGDEBUG(
"TcrConnection tmpIsSecurityOn = %d isNotificationChannel = "
"%d ",
tmpIsSecurityOn, isNotificationChannel);
bool doIneedToSendCreds = true;
if (isNotificationChannel && endpointObj_ &&
this->endpointObj_->isMultiUserMode()) {
tmpIsSecurityOn = false;
doIneedToSendCreds = false;
}
if (isNotificationChannel && !doIneedToSendCreds) {
handShakeMsg.write(
static_cast<uint8_t>(SECURITY_MULTIUSER_NOTIFICATIONCHANNEL));
} else if (tmpIsSecurityOn) {
handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NORMAL));
} else {
handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NONE));
}
if (tmpIsSecurityOn) {
try {
LOGFINER("TcrConnection: about to invoke authloader");
const auto& tmpSecurityProperties = sysProp.getSecurityProperties();
if (tmpSecurityProperties == nullptr) {
LOGWARN("TcrConnection: security properties not found.");
}
// only for backward connection
if (isClientNotification) {
if (const auto& authInitialize = cacheImpl->getAuthInitialize()) {
LOGFINER(
"TcrConnection: acquired handle to authLoader, "
"invoking getCredentials");
const auto& tmpAuthIniSecurityProperties =
authInitialize->getCredentials(tmpSecurityProperties,
endpointObj_->name().c_str());
LOGFINER("TcrConnection: after getCredentials ");
credentials = tmpAuthIniSecurityProperties;
}
}
if (isClientNotification) {
credentials->toData(handShakeMsg);
}
} catch (const AuthenticationRequiredException&) {
LOGDEBUG("AuthenticationRequiredException got");
throw;
} catch (const AuthenticationFailedException&) {
LOGDEBUG("AuthenticationFailedException got");
throw;
} catch (const Exception& ex) {
LOGWARN("TcrConnection: failed to acquire handle to authLoader: [%s] %s",
ex.getName().c_str(), ex.what());
auto message =
std::string("TcrConnection: failed to load authInit library: ") +
ex.what();
throwException(AuthenticationFailedException(message));
}
}
size_t msgLength;
auto data = reinterpret_cast<char*>(
const_cast<uint8_t*>(handShakeMsg.getBuffer(&msgLength)));
LOGFINE("Attempting handshake with endpoint %s for %s%s connection",
endpointObj->name().c_str(),
isClientNotification ? (isSecondary ? "secondary " : "primary ") : "",
isClientNotification ? "subscription" : "client");
LOGDEBUG("%s(%p): Handshake bytes: (%d): %s", __GNFN__, this, msgLength,
Utils::convertBytesToString(data, msgLength).c_str());
ConnErrType error = sendData(data, msgLength, connectTimeout);
if (error == CONN_NOERR) {
std::vector<int8_t> recdBytes;
auto acceptanceCode = readHandshakeData(1, connectTimeout);
recdBytes.push_back(acceptanceCode[0]);
if (acceptanceCode[0] == REPLY_SSL_ENABLED && !sysProp.sslEnabled()) {
LOGERROR("SSL is enabled on server, enable SSL in client as well");
AuthenticationRequiredException ex(
"SSL is enabled on server, enable SSL in client as well");
conn_.reset();
throwException(ex);
}
auto serverQueueStatus = readHandshakeData(1, connectTimeout);
recdBytes.push_back(serverQueueStatus[0]);
// TESTING: Durable clients - set server queue status.
// 0 - Non-Redundant , 1- Redundant , 2- Primary
if (serverQueueStatus[0] == 1) {
hasServerQueue_ = REDUNDANT_SERVER;
} else if (serverQueueStatus[0] == 2) {
hasServerQueue_ = PRIMARY_SERVER;
} else {
hasServerQueue_ = NON_REDUNDANT_SERVER;
}
auto queueSizeMsg = readHandshakeData(4, connectTimeout);
recdBytes.insert(std::end(recdBytes), std::begin(queueSizeMsg),
std::end(queueSizeMsg));
queueSize_ = static_cast<int32_t>(queueSizeMsg[0]) << 24 |
static_cast<int32_t>(queueSizeMsg[1]) << 16 |
static_cast<int32_t>(queueSizeMsg[2]) << 8 |
static_cast<int32_t>(queueSizeMsg[3]);
queueSize_ = queueSize_ > 0 ? queueSize_ : 0;
endpointObj_->setServerQueueStatus(hasServerQueue_, queueSize_);
////////////////////////// Set Pool Specific Q Size when
///////////////////////////////////
////////////////////////// 1. ServerQStatus is Primary or
///////////////////////////////////
////////////////////////// 2. ServerQStatus is Non-Redundant but
///////////////////////////////////
////////////////////////// 3. ONLY when handshake is for subscription
///////////////////////////////////
if ((hasServerQueue_ == PRIMARY_SERVER ||
hasServerQueue_ == NON_REDUNDANT_SERVER) &&
isClientNotification) {
poolDM_->setPrimaryServerQueueSize(queueSize_);
}
if (!isClientNotification) {
// Read the DistributedMember object
auto recvMsgLen = readHandshakeArraySize(connectTimeout);
recdBytes.push_back((recvMsgLen & 0xff000000) >> 24);
recdBytes.push_back((recvMsgLen & 0x00ff0000) >> 16);
recdBytes.push_back((recvMsgLen & 0x0000ff00) >> 8);
recdBytes.push_back(recvMsgLen & 0x000000ff);
auto recvMessage = readHandshakeData(recvMsgLen, connectTimeout);
recdBytes.insert(std::end(recdBytes), std::begin(recvMessage),
std::end(recvMessage));
// If the distributed member has not been set yet, set it.
if (getEndpointObject()->getDistributedMemberID() == 0) {
LOGDEBUG("Deserializing distributed member Id");
auto dataInputForClient = cacheImpl->createDataInput(
reinterpret_cast<const uint8_t*>(recvMessage.data()),
recvMessage.size());
auto member = std::dynamic_pointer_cast<ClientProxyMembershipID>(
dataInputForClient.readObject());
auto memId = cacheImpl->getMemberListForVersionStamp()->add(member);
getEndpointObject()->setDistributedMemberID(memId);
LOGDEBUG("Deserialized distributed member Id %d", memId);
}
}
auto recvMsgLenBytes = readHandshakeData(2, connectTimeout);
uint16_t recvMsgLen2 = static_cast<int16_t>(recvMsgLenBytes[0]) << 8 |
static_cast<int16_t>(recvMsgLenBytes[1]);
recdBytes.insert(std::end(recdBytes), std::begin(recvMsgLenBytes),
std::end(recvMsgLenBytes));
auto recvMessage = readHandshakeData(recvMsgLen2, connectTimeout);
recdBytes.insert(std::end(recdBytes), std::begin(recvMessage),
std::end(recvMessage));
if (!isClientNotification) {
auto deltaEnabledMsg = readHandshakeData(1, connectTimeout);
recdBytes.push_back(deltaEnabledMsg[0]);
ThinClientBaseDM::setDeltaEnabledOnServer(deltaEnabledMsg[0] ? true
: false);
}
LOGDEBUG(
"%s(%p): isClientNotification=%s, Handshake response bytes: (%d) %s",
__GNFN__, this, isClientNotification ? "true" : "false",
recdBytes.size(),
Utils::convertBytesToString(recdBytes.data(), recdBytes.size())
.c_str());
switch (acceptanceCode[0]) {
case REPLY_OK:
case SUCCESSFUL_SERVER_TO_CLIENT:
LOGFINER("Handshake reply: %u,%u,%u", acceptanceCode[0],
serverQueueStatus[0], recvMsgLen2);
if (isClientNotification) {
readHandshakeInstantiatorMsg(connectTimeout);
}
break;
case REPLY_AUTHENTICATION_FAILED: {
AuthenticationFailedException ex(
reinterpret_cast<char*>(recvMessage.data()));
conn_.reset();
throwException(ex);
}
case REPLY_AUTHENTICATION_REQUIRED: {
AuthenticationRequiredException ex(
reinterpret_cast<char*>(recvMessage.data()));
conn_.reset();
throwException(ex);
}
case REPLY_DUPLICATE_DURABLE_CLIENT: {
DuplicateDurableClientException ex(
reinterpret_cast<char*>(recvMessage.data()));
conn_.reset();
throwException(ex);
}
case REPLY_REFUSED:
case REPLY_INVALID:
case UNSUCCESSFUL_SERVER_TO_CLIENT: {
LOGERROR("Handshake rejected by server[%s]: %s",
endpointObj_->name().c_str(),
reinterpret_cast<char*>(recvMessage.data()));
auto message = std::string("TcrConnection::TcrConnection: ") +
"Handshake rejected by server: " +
reinterpret_cast<char*>(recvMessage.data());
CacheServerException ex(message);
conn_.reset();
throw ex;
}
default: {
LOGERROR(
"Unknown error[%d] received from server [%s] in handshake: "
"%s",
acceptanceCode[0], endpointObj_->name().c_str(),
recvMessage.data());
auto message =
std::string("TcrConnection::TcrConnection: Unknown error") +
" received from server in handshake: " +
reinterpret_cast<char*>(recvMessage.data());
MessageException ex(message);
conn_.reset();
throw ex;
}
}
} else {
conn_.reset();
if (error & CONN_TIMEOUT) {
throw TimeoutException(
"TcrConnection::TcrConnection: "
"connection timed out during handshake");
} else {
throw GeodeIOException(
"TcrConnection::TcrConnection: "
"Handshake failure");
}
}
// TODO: we can authenticate endpoint here if pool is not in multiuser mode.
// for backward connection we send credentials to server in handshake itself.
// for forward connection we need to send credentail to server
//---if pool in not in multiuser node
//---or old endpoint case.
if (this->endpointObj_ && !isNotificationChannel && tmpIsSecurityOn &&
(!isPool || !this->endpointObj_->isMultiUserMode())) {
// this->endpointObj_->authenticateEndpoint(this);
return true;
}
return false;
}