in common/producerstatetable.cpp [57:121]
void ProducerStateTable::reloadRedisScript()
{
// Set m_flushPub to remove publish from a single lua string and let pipeline do publish once per flush
// However, if m_buffered is false, follow the original one publish per lua design
// Hence we need to check both m_buffered and m_flushPub, and reload the redis script once setBuffered() changes m_buffered
/* 1. Inform the pipeline of what channel to publish, when flushPub feature is enabled */
if (m_buffered && m_flushPub)
m_pipe->addChannel(getChannelName(m_pipe->getDbId()));
/* 2. Setup lua strings: determine whether to attach luaPub after each lua string */
// num in luaSet and luaDel means number of elements that were added to the key set,
// not including all the elements already present into the set.
string luaSet =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"for i = 0, #KEYS - 3 do\n"
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n";
string luaDel =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[4], ARGV[2])\n"
"redis.call('DEL', KEYS[3])\n";
string luaBatchedSet =
"local added = 0\n"
"local idx = 2\n"
"for i = 0, #KEYS - 4 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[4 + i])\n"
" for j = 0, tonumber(ARGV[idx]) - 1 do\n"
" local attr = ARGV[idx + j * 2 + 1]\n"
" local val = ARGV[idx + j * 2 + 2]\n"
" redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
" end\n"
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
"end\n";
string luaBatchedDel =
"local added = 0\n"
"for i = 0, #KEYS - 5 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
"end\n";
if (!m_flushPub || !m_buffered)
{
string luaPub =
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
luaSet += luaPub;
luaDel += luaPub;
luaBatchedSet += luaPub;
luaBatchedDel += luaPub;
}
/* 3. load redis script based on the lua string */
m_shaSet = m_pipe->loadRedisScript(luaSet);
m_shaDel = m_pipe->loadRedisScript(luaDel);
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
}