void ProducerStateTable::reloadRedisScript()

in common/producerstatetable.cpp [57:121]


void ProducerStateTable::reloadRedisScript()
{
    // Set m_flushPub to remove publish from a single lua string and let pipeline do publish once per flush

    // However, if m_buffered is false, follow the original one publish per lua design
    // Hence we need to check both m_buffered and m_flushPub, and reload the redis script once setBuffered() changes m_buffered

    /* 1. Inform the pipeline of what channel to publish, when flushPub feature is enabled */
    if (m_buffered && m_flushPub)
        m_pipe->addChannel(getChannelName(m_pipe->getDbId()));

    /* 2. Setup lua strings: determine whether to attach luaPub after each lua string */

    // num in luaSet and luaDel means number of elements that were added to the key set,
    // not including all the elements already present into the set.
    string luaSet =
        "local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
        "for i = 0, #KEYS - 3 do\n"
        "    redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
        "end\n";

    string luaDel =
        "local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
        "redis.call('SADD', KEYS[4], ARGV[2])\n"
        "redis.call('DEL', KEYS[3])\n";

    string luaBatchedSet =
        "local added = 0\n"
        "local idx = 2\n"
        "for i = 0, #KEYS - 4 do\n"
        "    added = added + redis.call('SADD', KEYS[2], KEYS[4 + i])\n"
        "    for j = 0, tonumber(ARGV[idx]) - 1 do\n"
        "        local attr = ARGV[idx + j * 2 + 1]\n"
        "        local val = ARGV[idx + j * 2 + 2]\n"
        "        redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
        "    end\n"
        "    idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
        "end\n";

    string luaBatchedDel =
        "local added = 0\n"
        "for i = 0, #KEYS - 5 do\n"
        "    added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
        "    redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
        "    redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
        "end\n";

    if (!m_flushPub || !m_buffered)
    {
        string luaPub =
            "if added > 0 then \n"
            "    redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
            "end\n";
        luaSet += luaPub;
        luaDel += luaPub;
        luaBatchedSet += luaPub;
        luaBatchedDel += luaPub;
    }

    /* 3. load redis script based on the lua string */
    m_shaSet = m_pipe->loadRedisScript(luaSet);
    m_shaDel = m_pipe->loadRedisScript(luaDel);
    m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
    m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
}