function _M.access()

in apisix/pubsub/kafka.lua [55:146]


function _M.access(api_ctx)
    local pubsub, err = core.pubsub.new()
    if not pubsub then
        core.log.error("failed to initialize pubsub module, err: ", err)
        core.response.exit(400)
        return
    end

    local up_nodes = api_ctx.matched_upstream.nodes

    
    local broker_list = {}
    for i, node in ipairs(up_nodes) do
        broker_list[i] = {
            host = node.host,
            port = node.port,
        }

        if api_ctx.kafka_consumer_enable_sasl then
            broker_list[i].sasl_config = {
                mechanism = "PLAIN",
                user = api_ctx.kafka_consumer_sasl_username,
                password = api_ctx.kafka_consumer_sasl_password,
            }
        end
    end

    local client_config = {refresh_interval = 30 * 60 * 1000}
    if api_ctx.matched_upstream.tls then
        client_config.ssl = true
        client_config.ssl_verify = api_ctx.matched_upstream.tls.verify
    end

    
    
    local consumer = bconsumer:new(broker_list, client_config)

    pubsub:on("cmd_kafka_list_offset", function (params)
        
        
        
        
        
        
        local timestamp = pb_convert_to_int64(params.timestamp)

        local offset, err = consumer:list_offset(params.topic, params.partition, timestamp)

        if not offset then
            return nil, "failed to list offset, topic: " .. params.topic ..
                ", partition: " .. params.partition .. ", err: " .. err
        end

        offset = tostring(offset)
        return {
            kafka_list_offset_resp = {
                offset = str_sub(offset, 1, #offset - 2)
            }
        }
    end)

    pubsub:on("cmd_kafka_fetch", function (params)
        local offset = pb_convert_to_int64(params.offset)

        local ret, err = consumer:fetch(params.topic, params.partition, offset)
        if not ret then
            return nil, "failed to fetch message, topic: " .. params.topic ..
                ", partition: " .. params.partition .. ", err: " .. err
        end

        
        
        local messages = ret.records

        
        for _, message in ipairs(messages) do
            local timestamp = tostring(message.timestamp)
            message.timestamp = str_sub(timestamp, 1, #timestamp - 2)
            local offset = tostring(message.offset)
            message.offset = str_sub(offset, 1, #offset - 2)
        end

        return {
            kafka_fetch_resp = {
                messages = messages,
            },
        }
    end)

    
    pubsub:wait()
end