packetbeat/protos/sip/plugin.go (670 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package sip import ( "bytes" "fmt" "strconv" "strings" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/packetbeat/pb" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) var ( debugf = logp.MakeDebug("sip") detailedf = logp.MakeDebug("sipdetailed") isDebug = false isDetailed = false _ protos.UDPPlugin = &plugin{} _ protos.ExpirationAwareTCPPlugin = &plugin{} ) func init() { protos.Register("sip", New) } type connectionData struct { streams [2]*parsingInfo } // SIP application level protocol analyser UDP and TCP plugin. type plugin struct { cfg *config results protos.Reporter watcher *procs.ProcessesWatcher } func New( testMode bool, results protos.Reporter, watcher *procs.ProcessesWatcher, cfg *conf.C, ) (protos.Plugin, error) { cfgwarn.Beta("packetbeat SIP protocol is used") isDebug = logp.IsDebug("sip") isDetailed = logp.IsDebug("sipdetailed") config := defaultConfig if !testMode { if err := cfg.Unpack(&config); err != nil { return nil, err } } tp := strings.ToLower(config.TransportProto) switch tp { case "tcp", "udp": p := &plugin{} p.init(results, watcher, &config) return p, nil } return nil, fmt.Errorf("unsupported transport_protocol: %s", tp) } // Init initializes the HTTP protocol analyser. func (p *plugin) init(results protos.Reporter, watcher *procs.ProcessesWatcher, config *config) { p.results = results p.watcher = watcher p.cfg = config } func (p *plugin) GetPorts() []int { return p.cfg.Ports } func (p *plugin) ParseUDP(pkt *protos.Packet) { pi := newParsingInfo(pkt, pkt.Tuple.BaseTuple) if _, err := p.doParse(pi); err != nil { logp.Error(err) } } // Parse function is used to process TCP payloads. func (p *plugin) Parse( pkt *protos.Packet, tcptuple *common.TCPTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData { conn := ensureConnection(private) st := conn.streams[dir] if st == nil { st = newParsingInfo(pkt, tcptuple.BaseTuple) conn.streams[dir] = st } else { st.data = append(st.data, pkt.Payload...) st.message.rawData = append(st.message.rawData, pkt.Payload...) } ok, err := p.doParse(st) if err != nil { logp.Error(err) } if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it conn.streams[dir] = nil } return private } func ensureConnection(private protos.ProtocolData) *connectionData { conn := getConnection(private) if conn == nil { conn = &connectionData{} } return conn } func getConnection(private protos.ProtocolData) *connectionData { if private == nil { return nil } priv, ok := private.(*connectionData) if !ok { logp.Warn("connection data type error") return nil } if priv == nil { logp.Warn("Unexpected: connection data not set") return nil } return priv } // ReceivedFin will be called when TCP transaction is terminating. func (p *plugin) ReceivedFin( tcptuple *common.TCPTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData { debugf("Received FIN") conn := getConnection(private) if conn == nil { return private } pi := conn.streams[dir] if pi == nil { return conn } if pi.message != nil { evt, err := p.buildEvent(p.cfg, pi) if err != nil { return conn } p.publish(*evt) // and reset stream for next message pi.prepareForNewMessage() } return conn } // GapInStream is called when a gap of nbytes bytes is found in the stream (due // to packet loss). func (p *plugin) GapInStream( tcptuple *common.TCPTuple, dir uint8, nbytes int, private protos.ProtocolData, ) (priv protos.ProtocolData, drop bool, ) { conn := getConnection(private) if conn == nil { return private, false } st := conn.streams[dir] if st == nil || st.message == nil { // nothing to do return private, false } ok, complete := p.messageGap(st, nbytes) if isDetailed { detailedf("messageGap returned ok=%v complete=%v", ok, complete) } if !ok { // on errors, drop stream conn.streams[dir] = nil return conn, true } if complete { // Current message is complete, we need to publish from here evt, err := p.buildEvent(p.cfg, st) if err != nil { return conn, true } p.publish(*evt) // and reset stream for next message st.prepareForNewMessage() } // don't drop the stream, we can ignore the gap return private, false } func (p *plugin) messageGap(pi *parsingInfo, nbytes int) (ok bool, complete bool) { m := pi.message switch pi.state { case stateStart, stateHeaders: // we know we cannot recover from these return false, false case stateBody: if isDebug { debugf("gap in body: %d", nbytes) } if len(pi.data)+nbytes >= m.contentLength-pi.bodyReceived { // we're done, but the last portion of the data is gone return true, true } else { pi.bodyReceived += nbytes return true, false } } // assume we cannot recover return false, false } // ConnectionTimeout returns the configured HTTP transaction timeout. func (p *plugin) ConnectionTimeout() time.Duration { return p.cfg.TransactionTimeout } func (p *plugin) Expired(tuple *common.TCPTuple, private protos.ProtocolData) { conn := getConnection(private) if conn == nil { return } if isDebug { debugf("expired connection %s", tuple) } // terminate streams for _, st := range conn.streams { // Do not send incomplete or empty messages if st != nil && st.message != nil && st.message.headerOffset > 0 { if isDebug { debugf("got message %+v", st.message) } // Current message is complete, we need to publish from here evt, err := p.buildEvent(p.cfg, st) if err != nil { return } p.publish(*evt) // and reset stream for next message st.prepareForNewMessage() } } } func (p *plugin) doParse(pi *parsingInfo) (bool, error) { if isDetailed { detailedf("Payload received: [%s]", pi.pkt.Payload) } for len(pi.data) > 0 { if pi.message == nil { pi.message = &message{ts: pi.pkt.Ts, rawData: append([]byte{}, pi.data...)} } ok, complete := parse(pi) if !ok { return false, nil } if !complete { // wait for more data break } evt, err := p.buildEvent(p.cfg, pi) if err != nil { return true, err } p.publish(*evt) // and reset stream for next message pi.prepareForNewMessage() } return true, nil } func (p *plugin) publish(evt beat.Event) { if p.results != nil { p.results(evt) } } func (p *plugin) buildEvent(cfg *config, pi *parsingInfo) (*beat.Event, error) { m := pi.message status := common.OK_STATUS if m.statusCode >= 400 { status = common.ERROR_STATUS } evt, pbf := pb.NewBeatEvent(m.ts) fields := evt.Fields fields["type"] = "sip" fields["status"] = status var sipFields ProtocolFields if m.isRequest { populateRequestFields(m, pbf, &sipFields) } else { populateResponseFields(m, &sipFields) } populateHeadersFields(cfg, m, evt, pbf, &sipFields) if cfg.ParseBody { populateBodyFields(m, pbf, &sipFields) } pbf.Network.IANANumber = "17" pbf.Network.Application = "sip" pbf.Network.Protocol = "sip" pbf.Network.Transport = cfg.TransportProto cmdlineTuple := p.watcher.FindProcessesTupleTCP(&pi.pkt.Tuple) src, dst := getEndpoints(pi.pkt.Tuple.BaseTuple, cmdlineTuple) pbf.SetSource(src) pbf.AddIP(src.IP) pbf.SetDestination(dst) pbf.AddIP(dst.IP) populateEventFields(cfg, pi, pbf, sipFields) if err := pb.MarshalStruct(evt.Fields, "sip", sipFields); err != nil { return nil, err } return &evt, nil } func getEndpoints(tuple common.BaseTuple, cmdTuple *common.ProcessTuple) (src *common.Endpoint, dst *common.Endpoint) { source, destination := common.MakeEndpointPair(tuple, cmdTuple) src, dst = &source, &destination return src, dst } func populateRequestFields(m *message, pbf *pb.Fields, fields *ProtocolFields) { fields.Type = "request" fields.Method = bytes.ToUpper(m.method) fields.URIOriginal = m.requestURI scheme, username, host, port, _ := parseURI(fields.URIOriginal) fields.URIScheme = scheme fields.URIHost = host if !bytes.Equal(username, []byte(" ")) && !bytes.Equal(username, []byte("-")) { fields.URIUsername = username pbf.AddUser(string(username)) } fields.URIPort = port fields.Version = m.version.String() pbf.AddHost(string(host)) } func populateResponseFields(m *message, fields *ProtocolFields) { fields.Type = "response" fields.Code = int(m.statusCode) fields.Status = m.statusPhrase fields.Version = m.version.String() } func populateHeadersFields(cfg *config, m *message, evt beat.Event, pbf *pb.Fields, fields *ProtocolFields) { fields.Allow = m.allow fields.CallID = m.callID fields.ContentLength = m.contentLength fields.ContentType = bytes.ToLower(m.contentType) fields.MaxForwards = m.maxForwards fields.Supported = m.supported fields.UserAgentOriginal = m.userAgent fields.ViaOriginal = m.via privateURI, found := m.headers["p-associated-uri"] if found && len(privateURI) > 0 { scheme, username, host, port, _ := parseURI(privateURI[0]) fields.PrivateURIOriginal = privateURI[0] fields.PrivateURIScheme = scheme fields.PrivateURIHost = host if !bytes.Equal(username, []byte(" ")) && !bytes.Equal(username, []byte("-")) { fields.PrivateURIUsername = username pbf.AddUser(string(username)) } fields.PrivateURIPort = port pbf.AddHost(string(host)) } if accept, found := m.headers["accept"]; found && len(accept) > 0 { fields.Accept = bytes.ToLower(accept[0]) } cseqParts := bytes.Split(m.cseq, []byte(" ")) if len(cseqParts) == 2 { fields.CseqCode, _ = strconv.Atoi(string(cseqParts[0])) fields.CseqMethod = bytes.ToUpper(cseqParts[1]) } populateFromFields(m, pbf, fields) populateToFields(m, pbf, fields) populateContactFields(m, pbf, fields) if cfg.ParseAuthorization { populateAuthFields(m, evt, pbf, fields) } } func populateFromFields(m *message, pbf *pb.Fields, fields *ProtocolFields) { if len(m.from) > 0 { displayInfo, uri, params := parseFromToContact(m.from) fields.FromDisplayInfo = displayInfo fields.FromTag = params["tag"] scheme, username, host, port, _ := parseURI(uri) fields.FromURIOriginal = uri fields.FromURIScheme = scheme fields.FromURIHost = host if !bytes.Equal(username, []byte(" ")) && !bytes.Equal(username, []byte("-")) { fields.FromURIUsername = username pbf.AddUser(string(username)) } fields.FromURIPort = port pbf.AddHost(string(host)) } } func populateToFields(m *message, pbf *pb.Fields, fields *ProtocolFields) { if len(m.to) > 0 { displayInfo, uri, params := parseFromToContact(m.to) fields.ToDisplayInfo = displayInfo fields.ToTag = params["tag"] scheme, username, host, port, _ := parseURI(uri) fields.ToURIOriginal = uri fields.ToURIScheme = scheme fields.ToURIHost = host if !bytes.Equal(username, []byte(" ")) && !bytes.Equal(username, []byte("-")) { fields.ToURIUsername = username pbf.AddUser(string(username)) } fields.ToURIPort = port pbf.AddHost(string(host)) } } func populateContactFields(m *message, pbf *pb.Fields, fields *ProtocolFields) { if contact, found := m.headers["contact"]; found && len(contact) > 0 { displayInfo, uri, params := parseFromToContact(m.to) fields.ContactDisplayInfo = displayInfo fields.ContactExpires, _ = strconv.Atoi(string(params["expires"])) fields.ContactQ, _ = strconv.ParseFloat(string(params["q"]), 64) scheme, username, host, port, urlparams := parseURI(uri) fields.ContactURIOriginal = uri fields.ContactURIScheme = scheme fields.ContactURIHost = host if !bytes.Equal(username, []byte(" ")) && !bytes.Equal(username, []byte("-")) { fields.ContactURIUsername = username pbf.AddUser(string(username)) } fields.ContactURIPort = port fields.ContactLine = urlparams["line"] fields.ContactTransport = bytes.ToLower(urlparams["transport"]) pbf.AddHost(string(host)) } } func populateEventFields(cfg *config, pi *parsingInfo, pbf *pb.Fields, fields ProtocolFields) { m := pi.message pbf.Event.Kind = "event" pbf.Event.Type = []string{"info", "protocol"} pbf.Event.Dataset = "sip" pbf.Event.Sequence = int64(fields.CseqCode) // TODO: Get these values from body pbf.Event.Start = m.ts pbf.Event.End = m.ts // if cfg.KeepOriginal { pbf.Event.Original = string(m.rawData) } pbf.Event.Category = []string{"network"} if _, found := m.headers["authorization"]; found { pbf.Event.Category = append(pbf.Event.Category, "authentication") } pbf.Event.Action = func() string { if m.isRequest { return fmt.Sprintf("sip-%s", strings.ToLower(string(m.method))) } return fmt.Sprintf("sip-%s", strings.ToLower(string(fields.CseqMethod))) }() pbf.Event.Outcome = func() string { switch { case m.statusCode < 200: return "" case m.statusCode > 299: return "failure" } return "success" }() pbf.Event.Reason = string(fields.Status) } func populateAuthFields(m *message, evt beat.Event, pbf *pb.Fields, fields *ProtocolFields) { auths, found := m.headers["authorization"] if !found || len(auths) == 0 { if isDetailed { detailedf("sip packet without authorization header") } return } if isDetailed { detailedf("sip packet with authorization header") } auth := bytes.TrimSpace(auths[0]) pos := bytes.IndexByte(auth, ' ') if pos == -1 { if isDebug { debugf("malformed authorization header: missing scheme") } return } fields.AuthScheme = auth[:pos] pos += 1 for _, param := range bytes.Split(auth[pos:], []byte(",")) { kv := bytes.SplitN(param, []byte("="), 2) if len(kv) != 2 { continue } kv[1] = bytes.Trim(kv[1], "'\" \t") switch string(bytes.ToLower(bytes.TrimSpace(kv[0]))) { case "realm": fields.AuthRealm = kv[1] case "username": username := string(kv[1]) if username != "" && username != "-" { _, _ = evt.Fields.Put("user.name", username) pbf.AddUser(username) } case "uri": scheme, _, host, port, _ := parseURI(kv[1]) fields.AuthURIOriginal = kv[1] fields.AuthURIScheme = scheme fields.AuthURIHost = host fields.AuthURIPort = port } } } var constSDPContentType = []byte("application/sdp") func populateBodyFields(m *message, pbf *pb.Fields, fields *ProtocolFields) { if !m.hasContentLength { return } if !bytes.Equal(m.contentType, constSDPContentType) { if isDebug { debugf("body content-type: %s is not supported", m.contentType) } return } if _, found := m.headers["content-encoding"]; found { if isDebug { debugf("body decoding is not supported yet if content-endcoding is present") } return } fields.SDPBodyOriginal = m.body var isInMedia bool for _, line := range bytes.Split(m.body, []byte("\r\n")) { kv := bytes.SplitN(line, []byte("="), 2) if len(kv) != 2 { continue } kv[1] = bytes.TrimSpace(kv[1]) ch := string(bytes.ToLower(bytes.TrimSpace(kv[0]))) switch ch { case "v": fields.SDPVersion = string(kv[1]) case "o": var pos int if kv[1][pos] == '"' { endUserPos := bytes.IndexByte(kv[1][pos+1:], '"') if !bytes.Equal(kv[1][pos+1:endUserPos], []byte("-")) { fields.SDPOwnerUsername = kv[1][pos+1 : endUserPos] } pos = endUserPos + 1 } nParts := func() int { if pos == 0 { return 4 } return 3 // already have user }() parts := bytes.SplitN(kv[1][pos:], []byte(" "), nParts) if len(parts) != nParts { if isDebug { debugf("malformed owner SDP line") } continue } if nParts == 4 { if !bytes.Equal(parts[0], []byte("-")) { fields.SDPOwnerUsername = parts[0] } parts = parts[1:] } fields.SDPOwnerSessID = parts[0] fields.SDPOwnerVersion = parts[1] fields.SDPOwnerIP = func() common.NetString { p := bytes.Split(parts[2], []byte(" ")) return p[len(p)-1] }() pbf.AddUser(string(fields.SDPOwnerUsername)) pbf.AddIP(string(fields.SDPOwnerIP)) case "s": if !bytes.Equal(kv[1], []byte("-")) { fields.SDPSessName = kv[1] } case "c": if isInMedia { continue } fields.SDPConnInfo = kv[1] fields.SDPConnAddr = func() common.NetString { p := bytes.Split(kv[1], []byte(" ")) return p[len(p)-1] }() pbf.AddHost(string(fields.SDPConnAddr)) case "m": isInMedia = true // TODO case "i", "u", "e", "p", "b", "t", "r", "z", "k", "a": // TODO } } } func parseFromToContact(fromTo common.NetString) (displayInfo, uri common.NetString, params map[string]common.NetString) { params = make(map[string]common.NetString) fromTo = bytes.TrimSpace(fromTo) var uriIsWrapped bool pos := func() int { // look for the beginning of a url wrapped in <...> if pos := bytes.IndexByte(fromTo, '<'); pos > -1 { uriIsWrapped = true return pos } // if there is no < char, it means there is no display info, and // that the url starts from the beginning // https://tools.ietf.org/html/rfc3261#section-20.10 return 0 }() displayInfo = bytes.Trim(fromTo[:pos], "'\"\t ") endURIPos := func() int { if uriIsWrapped { return bytes.IndexByte(fromTo, '>') } return bytes.IndexByte(fromTo, ';') }() // not wrapped and no header params if endURIPos == -1 { uri = fromTo[pos:] return displayInfo, uri, params } // if wrapped, we want to get over the < char if uriIsWrapped { pos += 1 } // if wrapped, we will get the string between <...> // if not wrapped, we will get the value before the header params (until ;) uri = fromTo[pos:endURIPos] // parse the header params pos = endURIPos + 1 for _, param := range bytes.Split(fromTo[pos:], []byte(";")) { kv := bytes.SplitN(param, []byte("="), 2) if len(kv) != 2 { continue } params[string(bytes.ToLower(bytes.TrimSpace(kv[0])))] = kv[1] } return displayInfo, uri, params } func parseURI(uri common.NetString) (scheme, username, host common.NetString, port int, params map[string]common.NetString) { var ( prevChar rune inIPv6 bool idx int hasParams bool ) uri = bytes.TrimSpace(uri) prevChar = ' ' pos := -1 ppos := -1 epos := len(uri) params = make(map[string]common.NetString) loop: for idx = 0; idx < len(uri); idx++ { curChar := rune(uri[idx]) switch { case idx == 0: colonIdx := bytes.Index(uri, []byte(":")) if colonIdx == -1 { break loop } scheme = uri[:colonIdx] idx += colonIdx pos = idx + 1 case curChar == '[' && prevChar != '\\': inIPv6 = true case curChar == ']' && prevChar != '\\': inIPv6 = false case curChar == ';' && prevChar != '\\': // we found end of URI hasParams = true epos = idx break loop default: // select which part switch curChar { case '@': if len(host) > 0 { pos = ppos host = nil } username = uri[pos:idx] ppos = pos pos = idx + 1 case ':': if !inIPv6 { host = uri[pos:idx] ppos = pos pos = idx + 1 } } } prevChar = curChar } if pos > 0 && epos <= len(uri) && pos <= epos { if len(host) == 0 { host = bytes.TrimSpace(uri[pos:epos]) } else { port, _ = strconv.Atoi(string(bytes.TrimSpace(uri[pos:epos]))) } } if hasParams { for _, param := range bytes.Split(uri[epos+1:], []byte(";")) { kv := bytes.Split(param, []byte("=")) if len(kv) != 2 { continue } params[string(bytes.ToLower(bytes.TrimSpace(kv[0])))] = kv[1] } } return scheme, username, host, port, params }