in common/dbinterface.cpp [244:290]
bool DBInterface::_unavailable_data_handler(const std::string& dbName, const char *data)
{
auto start = system_clock::now();
SWSS_LOG_DEBUG("Listening on pubsub channel '%s'", dbName.c_str());
auto wait = duration<float>(PUB_SUB_MAXIMUM_DATA_WAIT);
while (system_clock::now() - start < wait)
{
auto& channel = keyspace_notification_channels.at(dbName);
auto ctx = channel->getContext();
redisReply *reply;
ctx->err = REDIS_OK; // Stop later redisGetReply early return on no data after first redisReply timeout
int rc = redisGetReply(ctx, reinterpret_cast<void**>(&reply));
if (rc == REDIS_ERR && ctx->err == REDIS_ERR_IO && errno == EAGAIN)
{
// Timeout
continue;
}
if (rc != REDIS_OK)
{
throw RedisError("Failed to redisGetReply with on pubsub channel on dbName=" + dbName, ctx);
}
RedisReply r(reply);
// r is an array of:
// 0. 'type': 'pmessage',
// 1. 'pattern': '__key*__:*'
// 2. 'channel':
// 3. 'data':
redisReply& r3 = *r.getChild(3);
if (r3.type != REDIS_REPLY_STRING)
{
throw system_error(make_error_code(errc::io_error),
"Wrong expected type of result");
}
if (strcmp(r3.str, data) == 0)
{
SWSS_LOG_INFO("'%s' acquired via pub-sub dbName=%s. Unblocking...", data, dbName.c_str());
// Wait for a "settling" period before releasing the wait.
sleep(DATA_RETRIEVAL_WAIT_TIME);
return true;
}
}
SWSS_LOG_WARN("No notification for '%s' from '%s' received before timeout.", data, dbName.c_str());
return false;
}