in libminifi/src/sitetosite/RawSocketProtocol.cpp [239:334]
bool RawSiteToSiteClient::handShake() {
if (peer_state_ != ESTABLISHED) {
logger_->log_error("Site2Site peer state is not established while handshake");
return false;
}
logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_.to_string());
_commsIdentifier = id_generator_->generate();
{
const auto ret = peer_->write(_commsIdentifier);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
std::map<std::string, std::string> properties;
properties[HandShakePropertyStr[GZIP]] = "false";
properties[HandShakePropertyStr[PORT_IDENTIFIER]] = port_id_.to_string();
properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(_timeout.load().count());
if (_currentVersion >= 5) {
if (_batchCount > 0)
properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(_batchCount);
if (_batchSize > 0)
properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(_batchSize);
if (_batchDuration.load() > 0ms)
properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(_batchDuration.load().count());
}
if (_currentVersion >= 3) {
const auto ret = peer_->write(peer_->getURL());
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto size = gsl::narrow<uint32_t>(properties.size());
const auto ret = peer_->write(size);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
std::map<std::string, std::string>::iterator it;
for (it = properties.begin(); it != properties.end(); it++) {
{
const auto ret = peer_->write(it->first);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = peer_->write(it->second);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", it->first, it->second);
}
RespondCode code;
std::string message;
{
const auto ret = readRespond(nullptr, code, message);
if (ret <= 0) {
return false;
}
}
auto logPortStateError = [this](const std::string& error) {
logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
};
switch (code) {
case PROPERTIES_OK:
logger_->log_debug("Site2Site HandShake Completed");
peer_state_ = HANDSHAKED;
return true;
case PORT_NOT_IN_VALID_STATE:
logPortStateError("in invalid state");
return false;
case UNKNOWN_PORT:
logPortStateError("an unknown port");
return false;
case PORTS_DESTINATION_FULL:
logPortStateError("full");
return false;
case UNAUTHORIZED:
logger_->log_error("Site2Site HandShake on port %s failed: UNAUTHORIZED", port_id_.to_string());
return false;
default:
logger_->log_error("Site2Site HandShake on port %s failed: unknown respond code %d", port_id_.to_string(), code);
return false;
}
}