pkg/events/events.go (173 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"). // You may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and //limitations under the License. package events import ( "fmt" "os" "sync" "syscall" "unsafe" constdef "github.com/aws/aws-ebpf-sdk-go/pkg/constants" poller "github.com/aws/aws-ebpf-sdk-go/pkg/events/poll" "github.com/aws/aws-ebpf-sdk-go/pkg/logger" ebpf_maps "github.com/aws/aws-ebpf-sdk-go/pkg/maps" "golang.org/x/sys/unix" ) var log = logger.Get() type Events interface { InitRingBuffer(mapFDlist []int) (map[int]chan []byte, error) } var _ Events = &events{} func New() Events { return &events{ PageSize: os.Getpagesize(), RingCnt: 0, EventFdCnt: 0, } } type events struct { RingBuffers []*RingBuffer PageSize int RingCnt int EventFdCnt int eventsStopChannel chan struct{} wg sync.WaitGroup eventsDataChannel chan []byte epoller *poller.EventPoller } func isValidMapFDList(mapFDlist []int) bool { for _, mapFD := range mapFDlist { log.Infof("Got map FD %d", mapFD) if mapFD == -1 { return false } mapInfo, err := ebpf_maps.GetBPFmapInfo(mapFD) if err != nil { log.Errorf("failed to get map info") return false } if mapInfo.Type != constdef.BPF_MAP_TYPE_RINGBUF.Index() { log.Errorf("unsupported map type, should be - BPF_MAP_TYPE_RINGBUF") return false } } return true } func (ev *events) InitRingBuffer(mapFDlist []int) (map[int]chan []byte, error) { // Validate mapFD if !isValidMapFDList(mapFDlist) { return nil, fmt.Errorf("mapFDs passed to InitRingBuffer is invalid") } epoll, err := poller.NewEventPoller() if err != nil { return nil, fmt.Errorf("failed to create epoll instance: %s", err) } ev.epoller = epoll ringBufferChanList := make(map[int]chan []byte) for _, mapFD := range mapFDlist { mapInfo, err := ebpf_maps.GetBPFmapInfo(mapFD) if err != nil { log.Errorf("failed to get map info for mapFD %d", mapFD) return nil, fmt.Errorf("failed to map info") } eventsChan, err := ev.setupRingBuffer(mapFD, mapInfo.MaxEntries) if err != nil { ev.cleanupRingBuffer() return nil, fmt.Errorf("failed to add ring buffer: %s", err) } log.Infof("Ringbuffer setup done for %d", mapFD) ringBufferChanList[mapFD] = eventsChan } return ringBufferChanList, nil } func (ev *events) setupRingBuffer(mapFD int, maxEntries uint32) (chan []byte, error) { ringbuffer := &RingBuffer{ RingBufferMapFD: mapFD, Mask: uint64(maxEntries - 1), } // [Consumer page - 4k][Producer page - 4k][Data section - twice the size of max entries] // Refer kernel code, twice the size of max entries will help in boundary scenarios // https://github.com/torvalds/linux/blob/master/kernel/bpf/ringbuf.c#L125 consumer, err := unix.Mmap(mapFD, 0, ev.PageSize, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED) if err != nil { return nil, fmt.Errorf("failed to create Mmap for consumer -> %d: %s", mapFD, err) } ringbuffer.Consumerpos = unsafe.Pointer(&consumer[0]) ringbuffer.Consumer = consumer mmap_sz := uint32(ev.PageSize) + 2*maxEntries producer, err := unix.Mmap(mapFD, int64(ev.PageSize), int(mmap_sz), unix.PROT_READ, unix.MAP_SHARED) if err != nil { unix.Munmap(producer) return nil, fmt.Errorf("failed to create Mmap for producer -> %d: %s", mapFD, err) } ringbuffer.Producerpos = unsafe.Pointer(&producer[0]) ringbuffer.Producer = producer ringbuffer.Data = unsafe.Pointer(uintptr(unsafe.Pointer(&producer[0])) + uintptr(ev.PageSize)) ev.RingBuffers = append(ev.RingBuffers, ringbuffer) err = ev.epoller.AddEpollCtl(mapFD, ev.EventFdCnt) if err != nil { unix.Munmap(producer) return nil, fmt.Errorf("failed to Epoll event: %s", err) } ev.RingCnt++ ev.EventFdCnt++ //Start channels read ev.eventsStopChannel = make(chan struct{}) ev.eventsDataChannel = make(chan []byte) ev.wg.Add(1) go ev.reconcileEventsDataChannel() return ev.eventsDataChannel, nil } func (ev *events) cleanupRingBuffer() { for i := 0; i < ev.RingCnt; i++ { _ = unix.Munmap(ev.RingBuffers[i].Producer) _ = unix.Munmap(ev.RingBuffers[i].Consumer) ev.RingBuffers[i].Producerpos = nil ev.RingBuffers[i].Consumerpos = nil } if ev.epoller.GetEpollFD() >= 0 { _ = syscall.Close(ev.epoller.GetEpollFD()) } ev.epoller = nil ev.RingBuffers = nil return } func (ev *events) reconcileEventsDataChannelHandler(pollerCh <-chan int) { for { select { case bufferPtr, ok := <-pollerCh: if !ok { return } ev.readRingBuffer(ev.RingBuffers[bufferPtr]) case <-ev.eventsStopChannel: return } } } func (ev *events) reconcileEventsDataChannel() { pollerCh := ev.epoller.EpollStart() defer ev.wg.Done() go ev.reconcileEventsDataChannelHandler(pollerCh) <-ev.eventsStopChannel } // Similar to libbpf poll and process ring // Ref: https://github.com/torvalds/linux/blob/744a759492b5c57ff24a6e8aabe47b17ad8ee964/tools/lib/bpf/ringbuf.c#L227 func (ev *events) readRingBuffer(eventRing *RingBuffer) { consPosition := eventRing.getConsumerPosition() ev.parseBuffer(consPosition, eventRing) } func (ev *events) parseBuffer(consumerPosition uint64, eventRing *RingBuffer) { var readDone bool for { readDone = true producerPosition := eventRing.getProducerPosition() for consumerPosition < producerPosition { // Get the header - Data points to the DataPage which will be offset by consumerPosition ringdata := eventRing.ParseRingData(consumerPosition) // Check if busy then skip, Might not be committed yet // There are 2 steps -> reserve and then commit/discard if ringdata.BusyRecord { readDone = true break } readDone = false // Update the position to the next record irrespective of discard or commit of data consumerPosition += uint64(ringdata.RecordLen) //Pick the data only if committed if !ringdata.DiscardRecord { ev.eventsDataChannel <- ringdata.parseSample() } eventRing.setConsumerPosition(consumerPosition) } if readDone { break } } }