sample/video_frame_producer_sample.c (168 lines of code) (raw):

// // Created by ZhangCheng on 26/02/2018. // #include "log_api.h" #include "log_producer_config.h" #include "log_producer_client.h" #include "log_multi_thread.h" /** * 发送数据结果的回调函数 * @param config_name * @param result * @param log_bytes * @param compressed_bytes * @param req_id * @param message */ void on_log_send_done(const char * config_name, log_producer_result result, size_t log_bytes, size_t compressed_bytes, const char * req_id, const char * message, const unsigned char * raw_buffer) { if (result == LOG_PRODUCER_OK) { printf("send success, config : %s, result : %d, log bytes : %d, compressed bytes : %d, request id : %s \n", config_name, (result), (int)log_bytes, (int)compressed_bytes, req_id); } else { printf("send fail, config : %s, result : %d, log bytes : %d, compressed bytes : %d, request id : %s, error message : %s\n", config_name, (result), (int)log_bytes, (int)compressed_bytes, req_id, message); } } /** * create producer 的一层封装 * @param on_send_done * @return */ log_producer * create_log_producer_wrapper(on_log_producer_send_done_function on_send_done) { log_producer_config * config = create_log_producer_config(); // endpoint list: https://help.aliyun.com/document_detail/29008.html log_producer_config_set_endpoint(config, "${your_endpoint}"); log_producer_config_set_project(config, "${your_project}"); log_producer_config_set_logstore(config, "${your_logstore}"); log_producer_config_set_access_id(config, "${your_access_key_id}"); log_producer_config_set_access_key(config, "${your_access_key_secret}"); // if you do not need topic or tag, comment it // 设置主题 log_producer_config_set_topic(config, "test_topic"); // 设置tag信息,此tag会附加在每个关键帧上,建议固定的meta信息使用tag设置 log_producer_config_add_tag(config, "os", "linux"); log_producer_config_add_tag(config, "version", "0.1.0"); log_producer_config_add_tag(config, "type", "type_xxx"); // set resource param log_producer_config_set_packet_log_bytes(config, 4*1024*1024); log_producer_config_set_packet_log_count(config, 4096); // 设置后台数据聚合上传的时间,默认为3000ms,如有需求可自定义设置 log_producer_config_set_packet_timeout(config, 3000); // 最大缓存的数据大小,超出缓存时add_log接口会立即返回失败 log_producer_config_set_max_buffer_limit(config, 64*1024*1024); // 设置不使用压缩 log_producer_config_set_compress_type(config, 0); // set send thread count // 发送线程数,为1即可 log_producer_config_set_send_thread_count(config, 1); // on_send_done 为发送的回调函数(发送成功/失败都会返回) return create_log_producer(config, on_send_done); } #define FRAME_FLAG_KEY_FRAME 1 #define FRAME_FLAG_NONE 0 /** * 帧数据的一种封装形式,您可以根据需求自定义调整 */ typedef struct _video_frame { uint64_t duration; uint8_t * frame_data; uint32_t frame_data_size; uint32_t frame_flag; uint64_t decoding_timestamp; uint64_t presentation_timestamp; uint8_t * extra_info; uint8_t extra_info_size; }video_frame; /** * 发送帧数据,这里是一种示例的封装形式,您可以根据需求进行更改 * @param client producer Client * @param frame 帧数据 */ void post_frame(log_producer_client * client, video_frame * frame) { if (client == NULL || frame == NULL) { return; } // 帧数据封装 start char * g_frame_keys[] = { (char *)"duration", (char *)"frame_data", (char *)"frame_flag", (char *)"decoding_timestamp", (char *)"presentation_timestamp", (char *)"extra_info" }; size_t g_frame_keys_len[] = { strlen("duration"), strlen("frame_data"), strlen("frame_flag"), strlen("decoding_timestamp"), strlen("presentation_timestamp"), strlen("extra_info") }; char * values[6]; char duration[32]; sprintf(duration, "%llu", (long long unsigned int)frame->duration); char frame_flag[32]; sprintf(frame_flag, "%u", frame->frame_flag); char decoding_timestamp[32]; sprintf(decoding_timestamp, "%llu", (long long unsigned int)frame->decoding_timestamp); char presentation_timestamp[32]; sprintf(presentation_timestamp, "%llu", (long long unsigned int)frame->presentation_timestamp); values[0] = duration; values[1] = (char *)frame->frame_data; values[2] = frame_flag; values[3] = decoding_timestamp; values[4] = presentation_timestamp; values[5] = (char *)frame->extra_info; size_t values_len[] = { strlen(duration), frame->frame_data_size, strlen(frame_flag), strlen(decoding_timestamp), strlen(presentation_timestamp), frame->extra_info_size }; // 帧数据封装 end // 调用producer接口发送数据 log_producer_result rst = log_producer_client_add_log_with_len(client, 6, g_frame_keys, g_frame_keys_len, values, values_len); // 异常处理 if (rst != LOG_PRODUCER_OK) { // 这边可以加入一些异常处理,如果是关键帧发送失败,可以每隔10ms重试一次,超过10次继续失败时再丢弃 if ((frame->frame_flag & FRAME_FLAG_KEY_FRAME) != 0) { int try_time = 0; for (; try_time < 10; ++try_time) { usleep(10000); if (log_producer_client_add_log_with_len(client, 6, g_frame_keys, g_frame_keys_len, values, values_len) == LOG_PRODUCER_OK) { break; } } if (try_time == 10) { printf("add key frame error %d \n", rst); } } else { printf("add normal frame error %d \n", rst); } } } /** * 示例函数 */ void producer_post_frames() { if (log_producer_env_init(LOG_GLOBAL_ALL) != LOG_PRODUCER_OK) { exit(1); } log_producer * producer = create_log_producer_wrapper(on_log_send_done); if (producer == NULL) { printf("create producer by config fail \n"); exit(1); } log_producer_client * client = get_log_producer_client(producer, NULL); if (client == NULL) { printf("create producer client by config fail \n"); exit(1); } srand(time(NULL)); int i = 0; for (; i < 100; ++i) { video_frame frame; // 每10帧有一个为关键帧 frame.frame_flag = i % 10 == 0 ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE; // mock 帧数据 uint32_t data_size = 960*720; uint8_t * frame_data = (uint8_t *)malloc(data_size); memset(frame_data, i % 256, data_size); frame.frame_data_size = data_size; frame.frame_data = frame_data; frame.duration = rand() % 500; frame.decoding_timestamp = time(NULL); frame.presentation_timestamp = time(NULL); char extra_info[32]; sprintf(extra_info, "extra_%d", i); frame.extra_info = (uint8_t *)extra_info; frame.extra_info_size = strlen(extra_info); // 发送帧数据 post_frame(client, &frame); free(frame_data); usleep(30000); } destroy_log_producer(producer); log_producer_env_destroy(); } int main(int argc, char *argv[]) { producer_post_frames(); return 0; }