deploy/platform/kubernetes/templates/feature-postgresql-monitor/fluent-bit.yaml (121 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # {{- if .Values.features.postgresqlMonitor.enabled }} # @feature: slowsql-psql; fluent bit log configurations --- apiVersion: v1 kind: ConfigMap metadata: name: slowsql-psql-fluent-bit labels: app: slowsql-psql-fluent-bit data: fluent-bit-conf: | [SERVICE] flush 1 log_level info parsers_File fluent-bit-parser.conf [INPUT] name tail path /data/log/slow.log parser my-log-format [FILTER] name grep match * regex log \w*-\w*-\w* \w*:\w*:\w*.\w* UTCLOG: duration: \w*.\w* ms statement.* [FILTER] name lua match * script fluent-bit-script.lua call rewrite_body [OUTPUT] name stdout match * format json [OUTPUT] name http match * host oap port 12800 uri /v3/logs format json fluent-bit-script-lua: | function rewrite_body(tag, timestamp, record) log = record["log"] record["log"] = nil record["date"] = nil record["tags"] = { data={ { key="LOG_KIND", value="SLOW_SQL" } } } arr = split(log,"\n") re1 = {} re1["time"] = os.time() re1["layer"] = "POSTGRESQL" record["layer"] = "POSTGRESQL" _,durationIndex = string.find(log,"duration: ") msIndex,_ = string.find(log," ms") duration = string.sub(log,durationIndex+1,msIndex) _,statementAf = string.find(log,"statement: ") re1["statement"] = string.sub(log,statementAf+1) duration = string.sub(log,durationIndex+1,msIndex-1) d1 = math.floor(tonumber(duration)) re1["query_time"] = d1 service = "postgresql::"..os.getenv("SW_SERVICE") record["service"] = service re1["service"] = service re1["id"] = uuid() jsonstr = table2json(re1) record["body"]={json={}} record["body"]["json"]["json"] = jsonstr return 1, timestamp, record end function split(input, delimiter) input = tostring(input) delimiter = tostring(delimiter) if (delimiter == "") then return false end local pos, arr = 0, {} for st, sp in function() return string.find(input, delimiter, pos, true) end do table.insert(arr, string.sub(input, pos, st - 1)) pos = sp + 1 end table.insert(arr, string.sub(input, pos)) return arr end function uuid() local seed={'e','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'} local tb={} for i=1,32 do table.insert(tb,seed[math.random(1,16)]) end local sid=table.concat(tb) return string.format('%s-%s-%s-%s-%s', string.sub(sid,1,8), string.sub(sid,9,12), string.sub(sid,13,16), string.sub(sid,17,20), string.sub(sid,21,32) ) end function table2json(t) local function serialize(tbl) local tmp = {} for k, v in pairs(tbl) do local k_type = type(k) local v_type = type(v) local key = (k_type == "string" and '"' .. k .. '":') or (k_type == "number" and "") local value = (v_type == "table" and serialize(v)) or (v_type == "boolean" and tostring(v)) or (v_type == "string" and '"' .. v .. '"') or (v_type == "number" and v) tmp[#tmp + 1] = key and value and tostring(key) .. tostring(value) or nil end if table.maxn(tbl) == 0 then return "{" .. table.concat(tmp, ",") .. "}" else return "[" .. table.concat(tmp, ",") .. "]" end end assert(type(t) == "table") return serialize(t) end fluent-bit-parser-conf: | [PARSER] name my-log-format format regex regex \w*-\w*-\w* \w*:\w*:\w*.\w* UTCLOG: duration: \w*.\w* ms statement.* {{- end }}