func()

in plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go [280:322]


func (a *Analyzer) MapPackets(in chan gopacket.Packet, packetInformationCreator func(packet gopacket.Packet) common.PacketInformation) chan gopacket.Packet {
	if a.mappedPacketChan == nil {
		a.mappedPacketChan = make(chan gopacket.Packet)
		go func() {
			defer close(a.mappedPacketChan)
		mappingLoop:
			for packet := range in {
				switch {
				case packet == nil:
					log.Debug().Msg("Done reading packages. (nil returned)")
					a.mappedPacketChan <- nil
					break mappingLoop
				case packet.ApplicationLayer() == nil:
					a.mappedPacketChan <- packet
				default:
					packetInformation := packetInformationCreator(packet)
					mergeCallback := func(index int) {
						log.Warn().Stringer("packetInformation", packetInformation).
							Int("index", index).
							Msg("we have a split at index")
					}
					if payload, err := a.getCurrentPayload(packetInformation, packet.ApplicationLayer().Payload(), mergeCallback, a.currentPrefilterInboundPayloads, &a.lastMapPayload); err != nil {
						log.Debug().Err(err).Stringer("packetInformation", packetInformation).Msg("Filtering message")
						a.mappedPacketChan <- common.NewFilteredPackage(err, packet)
					} else {
						currentApplicationLayer := packet.ApplicationLayer()
						newPayload := gopacket.Payload(payload)
						if !reflect.DeepEqual(currentApplicationLayer.Payload(), payload) {
							log.Debug().
								Bytes("currentPayload", currentApplicationLayer.Payload()).
								Bytes("payload", payload).
								Msg("Replacing payload currentPayload with payload")
							packet = &manipulatedPackage{Packet: packet, newApplicationLayer: newPayload}
						}
						a.lastMapPayload = payload
						a.mappedPacketChan <- packet
					}
				}
			}
		}()
	}
	return a.mappedPacketChan
}