in apisix/pubsub/kafka.lua [55:146]
function _M.access(api_ctx)
local pubsub, err = core.pubsub.new()
if not pubsub then
core.log.error("failed to initialize pubsub module, err: ", err)
core.response.exit(400)
return
end
local up_nodes = api_ctx.matched_upstream.nodes
local broker_list = {}
for i, node in ipairs(up_nodes) do
broker_list[i] = {
host = node.host,
port = node.port,
}
if api_ctx.kafka_consumer_enable_sasl then
broker_list[i].sasl_config = {
mechanism = "PLAIN",
user = api_ctx.kafka_consumer_sasl_username,
password = api_ctx.kafka_consumer_sasl_password,
}
end
end
local client_config = {refresh_interval = 30 * 60 * 1000}
if api_ctx.matched_upstream.tls then
client_config.ssl = true
client_config.ssl_verify = api_ctx.matched_upstream.tls.verify
end
local consumer = bconsumer:new(broker_list, client_config)
pubsub:on("cmd_kafka_list_offset", function (params)
local timestamp = pb_convert_to_int64(params.timestamp)
local offset, err = consumer:list_offset(params.topic, params.partition, timestamp)
if not offset then
return nil, "failed to list offset, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end
offset = tostring(offset)
return {
kafka_list_offset_resp = {
offset = str_sub(offset, 1, #offset - 2)
}
}
end)
pubsub:on("cmd_kafka_fetch", function (params)
local offset = pb_convert_to_int64(params.offset)
local ret, err = consumer:fetch(params.topic, params.partition, offset)
if not ret then
return nil, "failed to fetch message, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end
local messages = ret.records
for _, message in ipairs(messages) do
local timestamp = tostring(message.timestamp)
message.timestamp = str_sub(timestamp, 1, #timestamp - 2)
local offset = tostring(message.offset)
message.offset = str_sub(offset, 1, #offset - 2)
end
return {
kafka_fetch_resp = {
messages = messages,
},
}
end)
pubsub:wait()
end