mini/mini_server_cb.c (275 lines of code) (raw):

/** * @file mini_server_cb.c contains callbacks definitions for mini_server, including: * 1. engine callbacks * 2. hq callbacks * 3. h3 callbacks */ #include "mini_server_cb.h" /* engine callbacks */ const char *line_break = "\n"; /** * @brief engine callbacks to trigger engine main logic */ void xqc_mini_svr_engine_cb(int fd, short what, void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t *) arg; xqc_engine_main_logic(ctx->engine); } /** * @brief callbacks to set timer of engine callbacks */ void xqc_mini_svr_set_event_timer(xqc_msec_t wake_after, void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t *)arg; struct timeval tv; tv.tv_sec = wake_after / 1000000; tv.tv_usec = wake_after % 1000000; event_add(ctx->ev_engine, &tv); } int xqc_mini_svr_open_log_file(void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t*)arg; return open(ctx->args->env_cfg.log_path, (O_WRONLY | O_APPEND | O_CREAT), 0644); } void xqc_mini_svr_write_log_file(xqc_log_level_t lvl, const void *buf, size_t size, void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t*)arg; if (ctx->log_fd <= 0) { return; } int write_len = write(ctx->log_fd, buf, size); if (write_len < 0) { printf("write log failed, errno: %d\n", get_sys_errno()); return; } write_len = write(ctx->log_fd, line_break, 1); if (write_len < 0) { printf("write log failed, errno: %d\n", get_sys_errno()); } } void xqc_mini_svr_close_log_file(void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t*)arg; if (ctx->log_fd > 0) { close(ctx->log_fd); ctx->log_fd = 0; } } void xqc_mini_svr_write_qlog_file(qlog_event_importance_t imp, const void *buf, size_t size, void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t*)arg; if (ctx->log_fd <= 0) { return; } int write_len = write(ctx->log_fd, buf, size); if (write_len < 0) { printf("write qlog failed, errno: %d\n", get_sys_errno()); return; } write_len = write(ctx->log_fd, line_break, 1); if (write_len < 0) { printf("write qlog failed, errno: %d\n", get_sys_errno()); } } int xqc_mini_svr_open_keylog_file(void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t*)arg; return open(ctx->args->env_cfg.key_out_path, (O_WRONLY | O_APPEND | O_CREAT), 0644); } void xqc_mini_svr_keylog_cb(const xqc_cid_t *scid, const char *line, void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t*)arg; if (ctx->keylog_fd <= 0) { printf("write keys error!\n"); return; } int write_len = write(ctx->keylog_fd, line, strlen(line)); if (write_len < 0) { printf("write keys failed, errno: %d\n", get_sys_errno()); return; } write_len = write(ctx->keylog_fd, line_break, 1); if (write_len < 0) { printf("write keys failed, errno: %d\n", get_sys_errno()); } } void xqc_mini_svr_close_keylog_file(void *arg) { xqc_mini_svr_ctx_t *ctx = (xqc_mini_svr_ctx_t*)arg; if (ctx->keylog_fd > 0) { close(ctx->keylog_fd); ctx->keylog_fd = 0; } } int xqc_mini_svr_accept(xqc_engine_t *engine, xqc_connection_t *conn, const xqc_cid_t *cid, void *arg) { DEBUG; return 0; } ssize_t xqc_mini_svr_write_socket(const unsigned char *buf, size_t size, const struct sockaddr *peer_addr, socklen_t peer_addrlen, void *arg) { ssize_t res = XQC_OK; xqc_mini_svr_user_conn_t *user_conn = (xqc_mini_svr_user_conn_t *)arg; int fd = user_conn->ctx->current_fd; do { set_sys_errno(0); res = sendto(fd, buf, size, 0, peer_addr, peer_addrlen); if (res < 0) { printf("[error] xqc_mini_svr_write_socket err %zd %s, fd: %d\n", res, strerror(get_sys_errno()), fd); if (get_sys_errno() == EAGAIN) { res = XQC_SOCKET_EAGAIN; } } } while ((res < 0) && (get_sys_errno() == EINTR)); // printf("[report] xqc_mini_svr_write_socket success size=%lu\n", size); return res; } ssize_t xqc_mini_svr_write_socket_ex(uint64_t path_id, const unsigned char *buf, size_t size, const struct sockaddr *peer_addr,socklen_t peer_addrlen, void *conn_user_data) { return xqc_mini_svr_write_socket(buf, size, peer_addr, peer_addrlen, conn_user_data); } void xqc_mini_svr_conn_update_cid_notify(xqc_connection_t *conn, const xqc_cid_t *retire_cid, const xqc_cid_t *new_cid, void *user_data) { DEBUG; // xqc_mini_svr_user_conn_t *user_conn = (xqc_mini_svr_user_conn_t *)user_data; // memcpy(&user_conn->cid, new_cid, sizeof(*new_cid)); } /* h3 callbacks */ int xqc_mini_svr_h3_conn_create_notify(xqc_h3_conn_t *h3_conn, const xqc_cid_t *cid, void *conn_user_data) { DEBUG; xqc_mini_svr_user_conn_t *user_conn = (xqc_mini_svr_user_conn_t *)conn_user_data; xqc_h3_conn_set_user_data(h3_conn, user_conn); xqc_h3_conn_get_peer_addr(h3_conn, (struct sockaddr *)user_conn->peer_addr, sizeof(struct sockaddr_in), &user_conn->peer_addrlen); memcpy(&user_conn->cid, cid, sizeof(*cid)); printf("[stats] xqc_mini_svr_h3_conn_create_notify \n"); return 0; } int xqc_mini_svr_h3_conn_close_notify(xqc_h3_conn_t *h3_conn, const xqc_cid_t *cid, void *conn_user_data) { DEBUG; xqc_mini_svr_user_conn_t *user_conn = (xqc_mini_svr_user_conn_t*)conn_user_data; printf("[stats] xqc_mini_svr_h3_conn_close_notify success \n"); return 0; } void xqc_mini_svr_h3_conn_handshake_finished(xqc_h3_conn_t *h3_conn, void *conn_user_data) { DEBUG; xqc_mini_svr_user_conn_t *user_conn = (xqc_mini_svr_user_conn_t *)conn_user_data; xqc_conn_stats_t stats = xqc_conn_get_stats(user_conn->ctx->engine, &user_conn->cid); } int xqc_mini_svr_h3_request_create_notify(xqc_h3_request_t *h3_request, void *strm_user_data) { DEBUG; xqc_mini_svr_user_stream_t *user_stream = calloc(1, sizeof(*user_stream)); user_stream->h3_request = h3_request; xqc_h3_request_set_user_data(h3_request, user_stream); user_stream->recv_buf = calloc(1, REQ_BUF_SIZE); printf("[stats] xqc_mini_svr_h3_request_create_notify success \n"); return 0; } int xqc_mini_svr_h3_request_close_notify(xqc_h3_request_t *h3_request, void *strm_user_data) { DEBUG; xqc_request_stats_t stats = xqc_h3_request_get_stats(h3_request); printf("[stats] xqc_mini_svr_h3_request_close_notify success, cwnd_blocked:%"PRIu64"\n", stats.cwnd_blocked_ms); xqc_mini_svr_user_stream_t *user_stream = (xqc_mini_svr_user_stream_t*)strm_user_data; free(user_stream); return 0; } int xqc_mini_cli_handle_h3_request(xqc_mini_svr_user_stream_t *user_stream) { DEBUG; ssize_t ret = 0; /* response header buf list */ xqc_http_header_t rsp_hdr[] = { { .name = {.iov_base = "content-type", .iov_len = 12}, .value = {.iov_base = "text/plain", .iov_len = 10}, .flags = 0, } }; /* response header */ xqc_http_headers_t rsp_hdrs; rsp_hdrs.headers = rsp_hdr; rsp_hdrs.count = sizeof(rsp_hdr) / sizeof(rsp_hdr[0]); if (user_stream->header_sent == 0) { ret = xqc_h3_request_send_headers(user_stream->h3_request, &rsp_hdrs, 0); if (ret < 0) { printf("[error] xqc_h3_request_send_headers error %zd\n", ret); return ret; } else { printf("[stats] xqc_h3_request_send_headers success \n"); user_stream->header_sent = 1; } } ret = xqc_mini_svr_send_body(user_stream); return ret; } int xqc_mini_svr_h3_request_read_notify(xqc_h3_request_t *h3_request, xqc_request_notify_flag_t flag, void *strm_user_data) { DEBUG; int ret; char recv_buff[4096] = {0}; ssize_t recv_buff_size, read, read_sum; unsigned char fin = 0; xqc_http_headers_t *headers = NULL; xqc_mini_svr_user_stream_t *user_stream = (xqc_mini_svr_user_stream_t *)strm_user_data; read = read_sum = 0; recv_buff_size = 4096; /* recv headers */ if (flag & XQC_REQ_NOTIFY_READ_HEADER) { headers = xqc_h3_request_recv_headers(h3_request, &fin); if (headers == NULL) { printf("[error] xqc_h3_request_recv_headers error\n"); return XQC_ERROR; } /* TODO: if recv headers once for all? */ user_stream->header_recvd = 1; } else if (flag & XQC_REQ_NOTIFY_READ_BODY) { /* recv body */ do { read = xqc_h3_request_recv_body(h3_request, recv_buff, recv_buff_size, &fin); if (read == -XQC_EAGAIN) { break; } else if (read < 0) { printf("[error] xqc_h3_request_recv_body error %zd\n", read); return XQC_OK; } read_sum += read; user_stream->recv_body_len += read; } while (read > 0 && !fin); } if (fin) { printf("[stats] read h3 request finish. \n"); xqc_mini_cli_handle_h3_request(user_stream); } return 0; } int xqc_mini_svr_send_body(xqc_mini_svr_user_stream_t *user_stream) { int fin = 1, send_buf_size, ret; char send_buf[REQ_BUF_SIZE]; send_buf_size = REQ_BUF_SIZE; memset(send_buf, 'D', send_buf_size); ret = xqc_h3_request_send_body(user_stream->h3_request, send_buf, send_buf_size, fin); printf("[reports] xqc_mini_svr_send_body success, size:%d \n", ret); return ret; } int xqc_mini_svr_h3_request_write_notify(xqc_h3_request_t *h3_request, void *strm_user_data) { DEBUG; xqc_mini_svr_user_stream_t *user_stream = (xqc_mini_svr_user_stream_t *)strm_user_data; int ret = xqc_mini_svr_send_body(user_stream); printf("[stats] write h3 request notify finish \n"); return ret; }