mini/mini_client_cb.c (327 lines of code) (raw):
/**
* @file mini_server_cb.c contains callbacks definitions for mini_server, including:
* 1. engine callbacks
* 2. hq callbacks
* 3. h3 callbacks
*/
#include "mini_client_cb.h"
/**
* @brief engine callbacks to trigger engine main logic
*/
const char *line_break = "\n";
void
xqc_mini_cli_engine_cb(int fd, short what, void *arg)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t *) arg;
xqc_engine_main_logic(ctx->engine);
}
int
xqc_mini_cli_open_log_file(void *arg)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t*)arg;
return open(ctx->args->env_cfg.log_path, (O_WRONLY | O_APPEND | O_CREAT), 0644);
}
void
xqc_mini_cli_close_log_file(void *arg)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t*)arg;
if (ctx->log_fd > 0) {
close(ctx->log_fd);
ctx->log_fd = 0;
}
}
void
xqc_mini_cli_write_log_file(xqc_log_level_t lvl, const void *buf, size_t size, void *engine_user_data)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t*)engine_user_data;
if (ctx->log_fd <= 0) {
return;
}
//printf("%s", (char *)buf);
int write_len = write(ctx->log_fd, buf, size);
if (write_len < 0) {
printf("write log failed, errno: %d\n", get_sys_errno());
return;
}
write_len = write(ctx->log_fd, line_break, 1);
if (write_len < 0) {
printf("write log failed, errno: %d\n", get_sys_errno());
}
}
int
xqc_mini_cli_open_keylog_file(void *arg)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t*)arg;
return open(ctx->args->env_cfg.key_out_path, (O_WRONLY | O_APPEND | O_CREAT), 0644);
}
void
xqc_mini_cli_close_keylog_file(void *arg)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t*)arg;
if (ctx->keylog_fd > 0) {
close(ctx->keylog_fd);
ctx->keylog_fd = 0;
}
}
void
xqc_mini_cli_write_qlog_file(qlog_event_importance_t imp, const void *buf, size_t size, void *engine_user_data)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t*)engine_user_data;
if (ctx->log_fd <= 0) {
return;
}
int write_len = write(ctx->log_fd, buf, size);
if (write_len < 0) {
printf("write qlog failed, errno: %d\n", get_sys_errno());
return;
}
write_len = write(ctx->log_fd, line_break, 1);
if (write_len < 0) {
printf("write qlog failed, errno: %d\n", get_sys_errno());
}
}
void
xqc_mini_cli_keylog_cb(const xqc_cid_t *scid, const char *line, void *engine_user_data)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t*)engine_user_data;
if (ctx->keylog_fd <= 0) {
printf("write keys error!\n");
return;
}
int write_len = write(ctx->keylog_fd, line, strlen(line));
if (write_len < 0) {
printf("write keys failed, errno: %d\n", get_sys_errno());
return;
}
write_len = write(ctx->keylog_fd, line_break, 1);
if (write_len < 0) {
printf("write keys failed, errno: %d\n", get_sys_errno());
}
}
int
xqc_mini_cli_h3_conn_create_notify(xqc_h3_conn_t *conn, const xqc_cid_t *cid, void *user_data)
{
xqc_mini_cli_user_conn_t *user_conn = (xqc_mini_cli_user_conn_t *)user_data;
user_conn->h3_conn = conn;
memcpy(&user_conn->cid, cid, sizeof(xqc_cid_t));
return XQC_OK;
}
int
xqc_mini_cli_h3_conn_close_notify(xqc_h3_conn_t *conn, const xqc_cid_t *cid, void *user_data)
{
xqc_mini_cli_user_conn_t *user_conn = (xqc_mini_cli_user_conn_t *)user_data;
event_base_loopbreak(user_conn->ctx->eb);
printf("[stats] xqc_mini_cli_h3_conn_close_notify success \n");
return XQC_OK;
}
void
xqc_mini_cli_h3_conn_handshake_finished(xqc_h3_conn_t *h3_conn, void *user_data)
{
return;
}
int
xqc_mini_cli_h3_request_create_notify(xqc_h3_request_t *h3_request, void *h3s_user_data)
{
return 0;
}
int
xqc_mini_cli_h3_request_close_notify(xqc_h3_request_t *h3_request, void *user_data)
{
xqc_mini_cli_user_stream_t *user_stream = (xqc_mini_cli_user_stream_t *)user_data;
xqc_mini_cli_user_conn_t *user_conn = user_stream->user_conn;
xqc_mini_cli_ctx_t *conn_ctx = user_conn->ctx;
xqc_request_stats_t stats = xqc_h3_request_get_stats(h3_request);
xqc_h3_conn_close(conn_ctx->engine, &user_conn->cid);
free(user_stream);
printf("[stats] xqc_mini_cli_h3_request_close_notify success, cwnd_blocked:%"PRIu64"\n", stats.cwnd_blocked_ms);
return 0;
}
int
xqc_mini_cli_h3_request_read_notify(xqc_h3_request_t *h3_request,
xqc_request_notify_flag_t flag, void *h3s_user_data)
{
char recv_buff[XQC_MAX_BUFF_SIZE] = {0};
size_t recv_buff_size;
ssize_t read, read_sum;
unsigned char fin = 0;
xqc_mini_cli_user_stream_t *user_stream = (xqc_mini_cli_user_stream_t *)h3s_user_data;
xqc_mini_cli_user_conn_t *user_conn = user_stream->user_conn;
if (flag & XQC_REQ_NOTIFY_READ_HEADER) {
xqc_http_headers_t *headers;
headers = xqc_h3_request_recv_headers(h3_request, &fin);
if (headers == NULL) {
printf("[error] xqc_h3_request_recv_headers error\n");
return XQC_ERROR;
}
for (int i = 0; i < headers->count; i++) {
printf("[receive report] %s = %s\n", (char *)headers->headers[i].name.iov_base,
(char *)headers->headers[i].value.iov_base);
}
if (fin) {
/* only header in request */
user_stream->recv_fin = 1;
printf("[stats] h3 request read header finish \n");
return XQC_OK;
}
}
/* continue to recv body */
if (!(flag & XQC_REQ_NOTIFY_READ_BODY)) {
return XQC_OK;
}
recv_buff_size = XQC_MAX_BUFF_SIZE;
read = read_sum = 0;
do {
read = xqc_h3_request_recv_body(h3_request, recv_buff, recv_buff_size, &fin);
if (read == -XQC_EAGAIN) {
break;
} else if (read < 0) {
printf("xqc_h3_request_recv_body error %zd\n", read);
return XQC_OK;
}
read_sum += read;
user_stream->recv_body_len += read;
} while (read > 0 && !fin);
printf("[report] xqc_h3_request_recv_body size %zd, fin:%d\n", read, fin);
if (fin) {
printf("[stats] read h3 request finish. \n");
}
return XQC_OK;
}
int
xqc_mini_cli_h3_request_write_notify(xqc_h3_request_t *h3_request, void *h3s_user_data)
{
int ret = 0;
xqc_mini_cli_user_stream_t *user_stream = (xqc_mini_cli_user_stream_t *)h3s_user_data;
ret = xqc_mini_cli_request_send(h3_request, user_stream);
printf("[stats] finish h3 request write notify!:%"PRIu64"\n", xqc_h3_stream_id(h3_request));
return ret;
}
void
xqc_mini_cli_set_event_timer(xqc_usec_t wake_after, void *user_data)
{
xqc_mini_cli_ctx_t *ctx = (xqc_mini_cli_ctx_t *) user_data;
//printf("xqc_engine_wakeup_after %llu us, now %llu\n", wake_after, xqc_now());
struct timeval tv;
tv.tv_sec = wake_after / 1000000;
tv.tv_usec = wake_after % 1000000;
event_add(ctx->ev_engine, &tv);
}
ssize_t
xqc_mini_cli_write_socket(const unsigned char *buf, size_t size, const struct sockaddr *peer_addr,
socklen_t peer_addrlen, void *conn_user_data)
{
return xqc_mini_cli_write_socket_ex(0, buf, size, peer_addr, peer_addrlen, conn_user_data);
}
ssize_t
xqc_mini_cli_write_socket_ex(uint64_t path_id, const unsigned char *buf, size_t size,
const struct sockaddr *peer_addr, socklen_t peer_addrlen, void *conn_user_data)
{
int fd;
ssize_t res;
xqc_mini_cli_user_conn_t *user_conn = (xqc_mini_cli_user_conn_t *)conn_user_data;
fd = user_conn->fd;
res = 0;
do {
set_sys_errno(0);
res = sendto(fd, buf, size, 0, peer_addr, peer_addrlen);
if (res < 0) {
printf("xqc_mini_cli_write_socket err %zd %s, fd: %d, buf: %p, size: %zu, "
"server_addr: %s\n", res, strerror(get_sys_errno()), fd, buf, size,
user_conn->peer_addr->sa_data);
if (get_sys_errno() == EAGAIN) {
res = XQC_SOCKET_EAGAIN;
}
}
} while ((res < 0) && (get_sys_errno() == EINTR));
// printf("[report] xqc_mini_cli_write_socket_ex success size=%lu\n", size);
return res;
}
int
xqc_mini_cli_read_token(unsigned char *token, unsigned token_len)
{
int fd = open(TOKEN_FILE, O_RDONLY);
if (fd < 0) {
return -1;
}
ssize_t n = read(fd, token, token_len);
close(fd);
return n;
}
void
xqc_mini_cli_save_token(const unsigned char *token, unsigned token_len, void *user_data)
{
xqc_mini_cli_user_conn_t *user_conn = (xqc_mini_cli_user_conn_t *)user_data;
printf("[stats] start xqc_mini_cli_save_token, use client ip as the key.\n");
int fd = open(TOKEN_FILE, O_TRUNC | O_CREAT | O_WRONLY, 0666);
if (fd < 0) {
printf("save token error %s\n", strerror(get_sys_errno()));
return;
}
ssize_t n = write(fd, token, token_len);
if (n < token_len) {
printf("save token error %s\n", strerror(get_sys_errno()));
close(fd);
return;
}
close(fd);
}
void
xqc_mini_cli_save_session_cb(const char * data, size_t data_len, void *user_data)
{
xqc_mini_cli_user_conn_t *user_conn = (xqc_mini_cli_user_conn_t *)user_data;
printf("[stats] start save_session_cb. \n");
FILE * fp = fopen(SESSION_TICKET_FILE, "wb");
if (fp < 0) {
printf("save session error %s\n", strerror(get_sys_errno()));
return;
}
int write_size = fwrite(data, 1, data_len, fp);
if (data_len != write_size) {
printf("save _session_cb error\n");
fclose(fp);
return;
}
fclose(fp);
return;
}
void
xqc_mini_cli_save_tp_cb(const char * data, size_t data_len, void * user_data)
{
xqc_mini_cli_user_conn_t *user_conn = (xqc_mini_cli_user_conn_t *)user_data;
printf("[stats] start save_tp_cb\n");
FILE * fp = fopen(TRANSPORT_PARAMS_FILE, "wb");
if (fp < 0) {
printf("save transport callback error %s\n", strerror(get_sys_errno()));
return;
}
int write_size = fwrite(data, 1, data_len, fp);
if (data_len != write_size) {
printf("save _tp_cb error\n");
fclose(fp);
return;
}
fclose(fp);
return;
}
void
xqc_mini_cli_timeout_callback(int fd, short what, void *arg)
{
int conn_timeout, last_socket_time, ret;
xqc_usec_t socket_idle_time;
struct timeval tv;
xqc_mini_cli_ctx_t *ctx;
xqc_mini_cli_user_conn_t *user_conn;
user_conn = (xqc_mini_cli_user_conn_t *)arg;
ctx = user_conn->ctx;
conn_timeout = ctx->args->net_cfg.conn_timeout;
last_socket_time = ctx->args->net_cfg.last_socket_time;
socket_idle_time = xqc_now() - last_socket_time;
if (socket_idle_time < conn_timeout * 1000000) {
tv.tv_sec = conn_timeout;
tv.tv_usec = 0;
event_add(user_conn->ev_timeout, &tv);
return;
}
conn_close:
printf("[stats] client process timeout, connection closing... \n");
ret = xqc_h3_conn_close(ctx->engine, &user_conn->cid);
if (ret) {
printf("[error] xqc_conn_close error:%d\n", ret);
return;
}
}
int
xqc_mini_cli_conn_create_notify(xqc_connection_t *conn, const xqc_cid_t *cid, void *user_data, void *conn_proto_data)
{
DEBUG;
xqc_mini_cli_user_conn_t *user_conn = (xqc_mini_cli_user_conn_t *) user_data;
xqc_conn_set_alp_user_data(conn, user_conn);
printf("[stats] xqc_conn_is_ready_to_send_early_data:%d\n", xqc_conn_is_ready_to_send_early_data(conn));
return XQC_OK;
}