deploy/platform/kubernetes/templates/feature-postgresql-monitor/fluent-bit.yaml (121 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
{{- if .Values.features.postgresqlMonitor.enabled }}
# @feature: slowsql-psql; fluent bit log configurations
---
apiVersion: v1
kind: ConfigMap
metadata:
name: slowsql-psql-fluent-bit
labels:
app: slowsql-psql-fluent-bit
data:
fluent-bit-conf: |
[SERVICE]
flush 1
log_level info
parsers_File fluent-bit-parser.conf
[INPUT]
name tail
path /data/log/slow.log
parser my-log-format
[FILTER]
name grep
match *
regex log \w*-\w*-\w* \w*:\w*:\w*.\w* UTCLOG: duration: \w*.\w* ms statement.*
[FILTER]
name lua
match *
script fluent-bit-script.lua
call rewrite_body
[OUTPUT]
name stdout
match *
format json
[OUTPUT]
name http
match *
host oap
port 12800
uri /v3/logs
format json
fluent-bit-script-lua: |
function rewrite_body(tag, timestamp, record)
log = record["log"]
record["log"] = nil
record["date"] = nil
record["tags"] = { data={ { key="LOG_KIND", value="SLOW_SQL" } } }
arr = split(log,"\n")
re1 = {}
re1["time"] = os.time() * 1000
re1["layer"] = "POSTGRESQL"
record["layer"] = "POSTGRESQL"
_,durationIndex = string.find(log,"duration: ")
msIndex,_ = string.find(log," ms")
duration = string.sub(log,durationIndex+1,msIndex)
_,statementAf = string.find(log,"statement: ")
re1["statement"] = string.sub(log,statementAf+1)
duration = string.sub(log,durationIndex+1,msIndex-1)
d1 = math.floor(tonumber(duration))
re1["query_time"] = d1
service = "postgresql::"..os.getenv("SW_SERVICE")
record["service"] = service
re1["service"] = service
re1["id"] = uuid()
jsonstr = table2json(re1)
record["body"]={json={}}
record["body"]["json"]["json"] = jsonstr
return 1, timestamp, record
end
function split(input, delimiter)
input = tostring(input)
delimiter = tostring(delimiter)
if (delimiter == "") then return false end
local pos, arr = 0, {}
for st, sp in function() return string.find(input, delimiter, pos, true) end do
table.insert(arr, string.sub(input, pos, st - 1))
pos = sp + 1
end
table.insert(arr, string.sub(input, pos))
return arr
end
function uuid()
local seed={'e','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'}
local tb={}
for i=1,32 do
table.insert(tb,seed[math.random(1,16)])
end
local sid=table.concat(tb)
return string.format('%s-%s-%s-%s-%s',
string.sub(sid,1,8),
string.sub(sid,9,12),
string.sub(sid,13,16),
string.sub(sid,17,20),
string.sub(sid,21,32)
)
end
function table2json(t)
local function serialize(tbl)
local tmp = {}
for k, v in pairs(tbl) do
local k_type = type(k)
local v_type = type(v)
local key = (k_type == "string" and '"' .. k .. '":') or (k_type == "number" and "")
local value =
(v_type == "table" and serialize(v)) or (v_type == "boolean" and tostring(v)) or
(v_type == "string" and '"' .. v .. '"') or
(v_type == "number" and v)
tmp[#tmp + 1] = key and value and tostring(key) .. tostring(value) or nil
end
if table.maxn(tbl) == 0 then
return "{" .. table.concat(tmp, ",") .. "}"
else
return "[" .. table.concat(tmp, ",") .. "]"
end
end
assert(type(t) == "table")
return serialize(t)
end
fluent-bit-parser-conf: |
[PARSER]
name my-log-format
format regex
regex \w*-\w*-\w* \w*:\w*:\w*.\w* UTCLOG: duration: \w*.\w* ms statement.*
{{- end }}