packetbeat/protos/redis/redis.go (277 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package redis import ( "bytes" "strings" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) type stream struct { applayer.Stream parser parser tcptuple *common.TCPTuple } type redisConnectionData struct { streams [2]*stream requests MessageQueue responses MessageQueue } // Redis protocol plugin type redisPlugin struct { // config ports []int sendRequest bool sendResponse bool transactionTimeout time.Duration queueConfig MessageQueueConfig watcher *procs.ProcessesWatcher results protos.Reporter } var ( debugf = logp.MakeDebug("redis") isDebug = false ) var ( unmatchedResponses = monitoring.NewInt(nil, "redis.unmatched_responses") unmatchedRequests = monitoring.NewInt(nil, "redis.unmatched_requests") ) func init() { protos.Register("redis", New) } func New( testMode bool, results protos.Reporter, watcher *procs.ProcessesWatcher, cfg *conf.C, ) (protos.Plugin, error) { p := &redisPlugin{} config := defaultConfig if !testMode { if err := cfg.Unpack(&config); err != nil { return nil, err } } if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } func (redis *redisPlugin) init(results protos.Reporter, watcher *procs.ProcessesWatcher, config *redisConfig) error { redis.setFromConfig(config) redis.results = results redis.watcher = watcher isDebug = logp.IsDebug("redis") return nil } func (redis *redisPlugin) setFromConfig(config *redisConfig) { redis.ports = config.Ports redis.sendRequest = config.SendRequest redis.sendResponse = config.SendResponse redis.transactionTimeout = config.TransactionTimeout redis.queueConfig = config.QueueLimits } func (redis *redisPlugin) GetPorts() []int { return redis.ports } func (s *stream) PrepareForNewMessage() { parser := &s.parser s.Stream.Reset() parser.reset() } func (redis *redisPlugin) ConnectionTimeout() time.Duration { return redis.transactionTimeout } func (redis *redisPlugin) Parse( pkt *protos.Packet, tcptuple *common.TCPTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData { conn := redis.ensureRedisConnection(private) conn = redis.doParse(conn, pkt, tcptuple, dir) if conn == nil { return nil } return conn } func (redis *redisPlugin) newConnectionData() *redisConnectionData { return &redisConnectionData{ requests: NewMessageQueue(redis.queueConfig), responses: NewMessageQueue(redis.queueConfig), } } func (redis *redisPlugin) ensureRedisConnection(private protos.ProtocolData) *redisConnectionData { if private == nil { return redis.newConnectionData() } priv, ok := private.(*redisConnectionData) if !ok { logp.Warn("redis connection data type error, create new one") return redis.newConnectionData() } if priv == nil { logp.Warn("Unexpected: redis connection data not set, create new one") return redis.newConnectionData() } return priv } func (redis *redisPlugin) doParse( conn *redisConnectionData, pkt *protos.Packet, tcptuple *common.TCPTuple, dir uint8, ) *redisConnectionData { st := conn.streams[dir] if st == nil { st = newStream(pkt.Ts, tcptuple) conn.streams[dir] = st if isDebug { debugf("new stream: %p (dir=%v, len=%v)", st, dir, len(pkt.Payload)) } } if err := st.Append(pkt.Payload); err != nil { if isDebug { debugf("%v, dropping TCP stream: ", err) } return nil } if isDebug { debugf("stream add data: %p (dir=%v, len=%v)", st, dir, len(pkt.Payload)) } for st.Buf.Len() > 0 { if st.parser.message == nil { st.parser.message = newMessage(pkt.Ts) } ok, complete := st.parser.parse(&st.Buf) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it conn.streams[dir] = nil if isDebug { debugf("Ignore Redis message. Drop tcp stream. Try parsing with the next segment") } return conn } if !complete { // wait for more data break } msg := st.parser.message if isDebug { if msg.isRequest { debugf("REDIS (%p) request message: %s", conn, msg.message) } else { debugf("REDIS (%p) response message: %s", conn, msg.message) } } // all ok, go to next level and reset stream for new message redis.handleRedis(conn, msg, tcptuple, dir) st.PrepareForNewMessage() } return conn } func newStream(ts time.Time, tcptuple *common.TCPTuple) *stream { s := &stream{ tcptuple: tcptuple, } s.parser.message = newMessage(ts) s.Stream.Init(tcp.TCPMaxDataInStream) return s } func newMessage(ts time.Time) *redisMessage { return &redisMessage{ts: ts} } func (redis *redisPlugin) handleRedis( conn *redisConnectionData, m *redisMessage, tcptuple *common.TCPTuple, dir uint8, ) { m.tcpTuple = *tcptuple m.direction = dir m.cmdlineTuple = redis.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { // wait for response if evicted := conn.requests.Append(m); evicted > 0 { unmatchedRequests.Add(int64(evicted)) } } else { if evicted := conn.responses.Append(m); evicted > 0 { unmatchedResponses.Add(int64(evicted)) } redis.correlate(conn) } } func (redis *redisPlugin) correlate(conn *redisConnectionData) { // drop responses with missing requests if conn.requests.IsEmpty() { for !conn.responses.IsEmpty() { debugf("Response from unknown transaction. Ignoring") unmatchedResponses.Add(1) conn.responses.Pop() } return } // merge requests with responses into transactions for !conn.responses.IsEmpty() && !conn.requests.IsEmpty() { requ, okReq := conn.requests.Pop().(*redisMessage) resp, okResp := conn.responses.Pop().(*redisMessage) if !okReq || !okResp { logp.Err("invalid type found in message queue") continue } if redis.results != nil { event := redis.newTransaction(requ, resp) redis.results(event) } } } func (redis *redisPlugin) newTransaction(requ, resp *redisMessage) beat.Event { source, destination := common.MakeEndpointPair(requ.tcpTuple.BaseTuple, requ.cmdlineTuple) src, dst := &source, &destination if requ.direction == tcp.TCPDirectionReverse { src, dst = dst, src } evt, pbf := pb.NewBeatEvent(requ.ts) pbf.SetSource(src) pbf.SetDestination(dst) pbf.Source.Bytes = int64(requ.size) pbf.Destination.Bytes = int64(resp.size) pbf.Event.Dataset = "redis" pbf.Event.Start = requ.ts pbf.Event.End = resp.ts pbf.Network.Transport = "tcp" pbf.Network.Protocol = pbf.Event.Dataset fields := evt.Fields fields["type"] = pbf.Event.Dataset fields["method"] = common.NetString(bytes.ToUpper(requ.method)) fields["resource"] = requ.path fields["query"] = requ.message if resp.isError { evt.PutValue("status", common.ERROR_STATUS) evt.PutValue("redis.error", resp.message) } else { evt.PutValue("status", common.OK_STATUS) evt.PutValue("redis.return_value", resp.message) } if redis.sendRequest { fields["request"] = requ.message } if redis.sendResponse { fields["response"] = resp.message } pbf.Event.Action = "redis." + strings.ToLower(string(requ.method)) if resp.isError { pbf.Event.Outcome = "failure" } return evt } func (redis *redisPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool, ) { // tsg: being packet loss tolerant is probably not very useful for Redis, // because most requests/response tend to fit in a single packet. return private, true } func (redis *redisPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData { // TODO: check if we have pending data that we can send up the stack return private }