in apisix/plugins/ext-plugin/init.lua [481:717]
function (conf, ctx, sock, entry)
local lrucache_id = core.lrucache.plugin_ctx_id(ctx, entry)
local token, err = core.lrucache.plugin_ctx(lrucache, ctx, entry, rpc_call,
constants.RPC_PREPARE_CONF, conf, ctx,
lrucache_id)
if not token then
return nil, err
end
builder:Clear()
local var = ctx.var
local uri
if var.upstream_uri == "" then
uri = var.uri
else
uri = var.upstream_uri
local index = core.string.find(uri, "?")
if index then
local raw_uri = uri
uri = str_sub(raw_uri, 1, index - 1)
core.request.set_uri_args(ctx, str_sub(raw_uri, index + 1))
end
end
local path = builder:CreateString(uri)
local bin_addr = var.binary_remote_addr
local src_ip = builder:CreateByteVector(bin_addr)
local args = core.request.get_uri_args(ctx)
local textEntries = {}
for key, val in pairs(args) do
local ty = type(val)
if ty == "table" then
for _, v in ipairs(val) do
core.table.insert(textEntries, build_args(builder, key, v))
end
else
core.table.insert(textEntries, build_args(builder, key, val))
end
end
local len = #textEntries
http_req_call_req.StartArgsVector(builder, len)
for i = len, 1, -1 do
builder:PrependUOffsetTRelative(textEntries[i])
end
local args_vec = builder:EndVector(len)
local hdrs = core.request.headers(ctx)
core.table.clear(textEntries)
for key, val in pairs(hdrs) do
local ty = type(val)
if ty == "table" then
for _, v in ipairs(val) do
core.table.insert(textEntries, build_headers(var, builder, key, v))
end
else
core.table.insert(textEntries, build_headers(var, builder, key, val))
end
end
local len = #textEntries
http_req_call_req.StartHeadersVector(builder, len)
for i = len, 1, -1 do
builder:PrependUOffsetTRelative(textEntries[i])
end
local hdrs_vec = builder:EndVector(len)
local id = generate_id()
local method = var.method
http_req_call_req.Start(builder)
http_req_call_req.AddId(builder, id)
http_req_call_req.AddConfToken(builder, token)
http_req_call_req.AddSrcIp(builder, src_ip)
http_req_call_req.AddPath(builder, path)
http_req_call_req.AddArgs(builder, args_vec)
http_req_call_req.AddHeaders(builder, hdrs_vec)
http_req_call_req.AddMethod(builder, encode_a6_method(method))
local req = http_req_call_req.End(builder)
builder:Finish(req)
local ok, err = send(sock, constants.RPC_HTTP_REQ_CALL, builder:Output())
if not ok then
return nil, "failed to send RPC_HTTP_REQ_CALL: " .. err
end
local ty, resp
while true do
ty, resp = receive(sock)
if ty == nil then
return nil, "failed to receive RPC_HTTP_REQ_CALL: " .. resp
end
if ty ~= constants.RPC_EXTRA_INFO then
break
end
local out, err = handle_extra_info(ctx, resp)
if not out then
return nil, "failed to handle RPC_EXTRA_INFO: " .. err
end
local ok, err = send(sock, constants.RPC_EXTRA_INFO, out)
if not ok then
return nil, "failed to reply RPC_EXTRA_INFO: " .. err
end
end
if ty ~= constants.RPC_HTTP_REQ_CALL then
return nil, "failed to receive RPC_HTTP_REQ_CALL: unexpected type " .. ty
end
local buf = flatbuffers.binaryArray.New(resp)
local call_resp = http_req_call_resp.GetRootAsResp(buf, 0)
local action_type = call_resp:ActionType()
if action_type == http_req_call_action.Stop then
local action = call_resp:Action()
local stop = http_req_call_stop.New()
stop:Init(action.bytes, action.pos)
local len = stop:HeadersLength()
if len > 0 then
local stop_resp_headers = {}
for i = 1, len do
local entry = stop:Headers(i)
local name = str_lower(entry:Name())
if stop_resp_headers[name] == nil then
core.response.set_header(name, entry:Value())
stop_resp_headers[name] = true
else
core.response.add_header(name, entry:Value())
end
end
end
local body
local len = stop:BodyLength()
if len > 0 then
body = stop:BodyAsString()
end
local code = stop:Status()
if code == 0 then
code = 200
end
return true, nil, code, body
end
if action_type == http_req_call_action.Rewrite then
local action = call_resp:Action()
local rewrite = http_req_call_rewrite.New()
rewrite:Init(action.bytes, action.pos)
local path = rewrite:Path()
if path then
path = core.utils.uri_safe_encode(path)
var.upstream_uri = path
end
local len = rewrite:HeadersLength()
if len > 0 then
for i = 1, len do
local entry = rewrite:Headers(i)
local name = entry:Name()
core.request.set_header(ctx, name, entry:Value())
if str_lower(name) == "host" then
var.upstream_host = entry:Value()
end
end
end
local body_len = rewrite:BodyLength()
if body_len > 0 then
local body = rewrite:BodyAsString()
ngx.req.read_body()
ngx.req.set_body_data(body)
end
local len = rewrite:RespHeadersLength()
if len > 0 then
local rewrite_resp_headers = {}
for i = 1, len do
local entry = rewrite:RespHeaders(i)
local name = str_lower(entry:Name())
if exclude_resp_header[name] == nil then
if rewrite_resp_headers[name] == nil then
core.response.set_header(name, entry:Value())
rewrite_resp_headers[name] = true
else
core.response.add_header(name, entry:Value())
end
end
end
end
local len = rewrite:ArgsLength()
if len > 0 then
local changed = {}
for i = 1, len do
local entry = rewrite:Args(i)
local name = entry:Name()
local value = entry:Value()
if value == nil then
args[name] = nil
else
if changed[name] then
if type(args[name]) == "table" then
core.table.insert(args[name], value)
else
args[name] = {args[name], entry:Value()}
end
else
args[name] = entry:Value()
end
changed[name] = true
end
end
core.request.set_uri_args(ctx, args)
if path then
var.upstream_uri = path .. '?' .. var.args
end
end
end
return true
end,