bpf/include/socket_data.h (204 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "socket_opts.h"
#include "protocol_analyzer.h"
#include "queue.h"
#define SOCKET_UPLOAD_CHUNK_LIMIT 6
struct socket_data_upload_event {
__u8 protocol;
__u8 have_reduce_after_chunk;
__u8 direction;
__u8 finished;
__u16 sequence;
__u16 data_len;
__u64 start_time;
__u64 end_time;
__u64 conid;
__u64 randomid;
__u64 data_id;
__u64 prev_data_id;
__u64 total_size;
char buffer[MAX_TRANSMIT_SOCKET_READ_LENGTH];
};
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct socket_data_upload_event);
__uint(max_entries, 1);
} socket_data_upload_event_per_cpu_map SEC(".maps");
DATA_QUEUE(socket_data_upload_queue);
struct socket_data_sequence_t {
__u64 data_id;
__u16 sequence;
};
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 1000);
__type(key, __u64);
__type(value, struct socket_data_sequence_t);
} socket_data_sequence_generator SEC(".maps");
static __inline __u16 generate_socket_sequence(__u64 conid, __u64 data_id) {
struct socket_data_sequence_t *seq = bpf_map_lookup_elem(&socket_data_sequence_generator, &conid);
if (seq == NULL) {
struct socket_data_sequence_t data = {};
data.data_id = data_id;
data.sequence = 0;
bpf_map_update_elem(&socket_data_sequence_generator, &conid, &data, BPF_NOEXIST);
return 0;
}
if (seq->data_id != data_id) {
seq->data_id = data_id;
seq->sequence = 0;
} else {
seq->sequence++;
}
return seq->sequence;
}
struct upload_data_args {
__u64 start_time;
__u64 end_time;
__u64 con_id;
__u64 random_id;
__u64 socket_data_id;
__u64 prev_socket_data_id;
struct iovec *socket_data_iovec;
size_t socket_data_iovlen;
ssize_t bytes_count;
char* socket_data_buf;
__u32 data_direction;
__u8 connection_protocol;
__u8 connection_ssl;
__u8 socket_ssl_buffer_force_unfinished;
// skip data upload when the protocol break(such as HTTP2)
__u8 connection_skip_data_upload;
bool socket_data_ssl;
};
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct upload_data_args);
__uint(max_entries, 1);
} socket_data_upload_args_per_cpu_map SEC(".maps");
static __always_inline struct upload_data_args* generate_socket_upload_args() {
__u32 kZero = 0;
return bpf_map_lookup_elem(&socket_data_upload_args_per_cpu_map, &kZero);
}
static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, struct upload_data_args *args) {
struct socket_data_upload_event *socket_data_event;
socket_data_event = rover_reserve_buf(&socket_data_upload_queue, sizeof(*socket_data_event));
if (socket_data_event == NULL) {
return;
}
if (size > sizeof(socket_data_event->buffer)) {
size = sizeof(socket_data_event->buffer);
}
if (size <= 0) {
rover_discard_buf(socket_data_event);
return;
}
// basic data
socket_data_event->start_time = args->start_time;
socket_data_event->end_time = args->end_time;
socket_data_event->protocol = args->connection_protocol;
socket_data_event->direction = args->data_direction;
socket_data_event->conid = args->con_id;
socket_data_event->randomid = args->random_id;
socket_data_event->total_size = args->bytes_count;
socket_data_event->data_id = args->socket_data_id;
socket_data_event->prev_data_id = args->prev_socket_data_id;
socket_data_event->sequence = index;
socket_data_event->data_len = size;
socket_data_event->finished = is_finished;
socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk;
asm volatile("%[size] &= 0x7ff;\n" ::[size] "+r"(size) :);
bpf_probe_read(&socket_data_event->buffer, size, buf);
rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, sizeof(*socket_data_event));
}
static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t size, struct upload_data_args *args, __u8 force_unfinished) {
ssize_t already_send = 0;
#pragma unroll
for (__u8 index = 0; index < SOCKET_UPLOAD_CHUNK_LIMIT; index++) {
// calculate bytes need to send
ssize_t remaining = size - already_send;
size_t need_send_in_chunk = 0;
__u8 have_reduce_after_chunk = 0;
if (remaining > MAX_TRANSMIT_SOCKET_READ_LENGTH) {
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;
have_reduce_after_chunk = 1;
} else {
need_send_in_chunk = remaining;
}
__u32 is_finished = (need_send_in_chunk + already_send) >= size || index == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false;
__u8 sequence = index;
if (force_unfinished == 1 && need_send_in_chunk > 0) {
is_finished = 0;
sequence = generate_socket_sequence(args->con_id, args->socket_data_id);
}
__upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, have_reduce_after_chunk, args);
already_send += need_send_in_chunk;
}
}
#define UPLOAD_PER_SOCKET_DATA_IOV() \
if (iov_index < iovlen) { \
struct iovec cur_iov; \
bpf_probe_read(&cur_iov, sizeof(cur_iov), &iov[iov_index]); \
ssize_t remaining = size - already_send; \
size_t need_send_in_chunk = remaining - cur_iov_sended; \
__u8 have_reduce_after_chunk = 0; \
if (cur_iov_sended + need_send_in_chunk > cur_iov.iov_len) { \
need_send_in_chunk = cur_iov.iov_len - cur_iov_sended; \
if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) { \
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH; \
have_reduce_after_chunk = 1; \
} else { \
iov_index++; \
cur_iov_sended = 0; \
} \
} else if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) { \
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH; \
have_reduce_after_chunk = 1; \
} \
__u32 is_finished = (need_send_in_chunk + already_send) >= size || loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false; \
__upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, args); \
already_send += need_send_in_chunk; \
loop_count++; \
}
static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov, const size_t iovlen, ssize_t size, struct upload_data_args *args) {
ssize_t already_send = 0;
ssize_t cur_iov_sended = 0;
__u8 iov_index = 0;
__u8 loop_count = 0;
// each count is same with SOCKET_UPLOAD_CHUNK_LIMIT
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
}
struct socket_data_last_id_t {
__u64 random_id;
__u64 socket_data_id;
};
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 10000);
__type(key, __u64);
__type(value, struct socket_data_last_id_t);
} socket_data_last_id_map SEC(".maps");
static __inline void upload_socket_data(void *ctx, struct upload_data_args *args) {
// must have protocol and ssl must same(plain)
// if the connection data is needs to skip upload, then skip
if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || args->connection_ssl != args->socket_data_ssl || args->connection_skip_data_upload == 1) {
return;
}
struct socket_data_last_id_t *latest = bpf_map_lookup_elem(&socket_data_last_id_map, &args->con_id);
args->prev_socket_data_id = 0;
if (latest != NULL && latest->random_id == args->random_id) {
args->prev_socket_data_id = latest->socket_data_id;
}
if (args->socket_data_buf != NULL) {
upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, args, args->socket_ssl_buffer_force_unfinished);
} else if (args->socket_data_iovec != NULL) {
upload_socket_data_iov(ctx, args->socket_data_iovec, args->socket_data_iovlen, args->bytes_count, args);
}
if (latest == NULL || latest->socket_data_id != args->socket_data_id) {
struct socket_data_last_id_t data = {};
data.random_id = args->random_id;
data.socket_data_id = args->socket_data_id;
bpf_map_update_elem(&socket_data_last_id_map, &args->con_id, &data, BPF_ANY);
}
}