Sources/aliyun-log-c-sdk/log_persistent_manager.c (428 lines of code) (raw):

// // Created by davidzhang on 2020/8/23. // #include "log_persistent_manager.h" #include "log_producer_manager.h" #include "inner_log.h" #include "log_builder.h" #include "log_sds.h" #define MAX_CHECKPOINT_FILE_SIZE (sizeof(log_persistent_checkpoint) * 1024) #define LOG_PERSISTENT_HEADER_MAGIC (0xf7216a5b76df67f5) static int32_t is_valid_log_checkpoint(log_persistent_checkpoint * checkpoint) { return checkpoint->check_sum == checkpoint->start_log_uuid + checkpoint->now_log_uuid + checkpoint->start_file_offset + checkpoint->now_file_offset; } static int32_t recover_log_checkpoint(log_persistent_manager * manager) { FILE * tmpFile = fopen(manager->checkpoint_file_path, "rb"); if (tmpFile == NULL) { if (errno == ENOENT) { return 0; } return -1; } fseek(tmpFile, 0, SEEK_END); long pos = ftell(tmpFile); if (pos == 0) { // empty file return 0; } long fixedPos = pos - pos%sizeof(log_persistent_checkpoint); long lastPos = fixedPos == 0? 0 : fixedPos - sizeof(log_persistent_checkpoint); fseek(tmpFile, lastPos, SEEK_SET); if (1 != fread((void *)&(manager->checkpoint), sizeof(log_persistent_checkpoint), 1, tmpFile)) { fclose(tmpFile); return -2; } if (!is_valid_log_checkpoint(&(manager->checkpoint))) { fclose(tmpFile); return -3; } fclose(tmpFile); manager->checkpoint_file_size = pos; return 0; } int save_log_checkpoint(log_persistent_manager * manager) { log_persistent_checkpoint * checkpoint = &(manager->checkpoint); checkpoint->check_sum = checkpoint->start_log_uuid + checkpoint->now_log_uuid + checkpoint->start_file_offset + checkpoint->now_file_offset; if (manager->checkpoint_file_size >= MAX_CHECKPOINT_FILE_SIZE) { if (manager->checkpoint_file_ptr != NULL) { fclose(manager->checkpoint_file_ptr); manager->checkpoint_file_ptr = NULL; } char tmpFilePath[256]; strcpy(tmpFilePath, manager->checkpoint_file_path); strcat(tmpFilePath, ".bak"); aos_info_log("start switch checkpoint index file %s \n", manager->checkpoint_file_path); FILE * tmpFile = fopen(tmpFilePath, "wb+"); if (tmpFile == NULL) return -1; if (1 != fwrite((const void *)(&manager->checkpoint), sizeof(log_persistent_checkpoint), 1, tmpFile)) { fclose(tmpFile); return -2; } if (fclose(tmpFile) != 0) return -3; if (rename(tmpFilePath, manager->checkpoint_file_path) != 0 ) return -4; manager->checkpoint_file_size = sizeof(log_persistent_checkpoint); return 0; } if (manager->checkpoint_file_ptr == NULL) { manager->checkpoint_file_ptr = fopen(manager->checkpoint_file_path, "ab+"); if (manager->checkpoint_file_ptr == NULL) return -5; } if (1 != fwrite((const void *)(&manager->checkpoint), sizeof(log_persistent_checkpoint), 1, manager->checkpoint_file_ptr)) return -6; if (fflush(manager->checkpoint_file_ptr) != 0) return -7; manager->checkpoint_file_size += sizeof(log_persistent_checkpoint); return 0; } void on_log_persistent_manager_send_done_uuid(const char * config_name, log_producer_result result, size_t log_bytes, size_t compressed_bytes, const char * req_id, const char * error_message, const unsigned char * raw_buffer, void *persistent_manager, int64_t startId, int64_t endId) { if (result != LOG_PRODUCER_OK && result != LOG_PRODUCER_DROP_ERROR && result != LOG_PRODUCER_INVALID) { return; } log_persistent_manager * manager = (log_persistent_manager *)persistent_manager; if (manager == NULL) { return; } if (manager->is_invalid) { return; } if (startId < 0 || endId < 0 || startId > endId || endId - startId > 1024 * 1024) { aos_fatal_log("invalid id range %lld %lld", startId, endId); manager->is_invalid = 1; return; } // multi thread send is not allowed, and this should never happen if (startId > manager->checkpoint.start_log_uuid) { aos_fatal_log("project %s, logstore %s, invalid checkpoint start log uuid %lld %lld", manager->config->project, manager->config->logstore, startId, manager->checkpoint.start_log_uuid); manager->is_invalid = 1; return; } CS_ENTER(manager->lock); // cal log size uint64_t totalOffset = 0; for (int64_t id = startId; id <= endId; ++id) { totalOffset += manager->in_buffer_log_sizes[id % manager->config->maxPersistentLogCount]; } manager->checkpoint.start_file_offset += totalOffset; manager->checkpoint.start_log_uuid = endId + 1; int rst = save_log_checkpoint(manager); if (rst != 0) { aos_error_log("project %s, logstore %s, save checkpoint failed, reason %d", manager->config->project, manager->config->logstore, rst); } log_ring_file_clean(manager->ring_file, manager->checkpoint.start_file_offset - totalOffset, manager->checkpoint.start_file_offset); CS_LEAVE(manager->lock); } static void log_persistent_manager_init(log_persistent_manager * manager, log_producer_config *config) { memset(manager, 0, sizeof(log_persistent_manager)); manager->builder = log_group_create(); manager->checkpoint.start_log_uuid = (int64_t)(time(NULL)) * 1000LL * 1000LL * 1000LL; manager->checkpoint.now_log_uuid = manager->checkpoint.start_log_uuid; manager->config = config; manager->lock = CreateCriticalSection(); manager->in_buffer_log_sizes = (uint32_t *)malloc(sizeof(uint32_t) * config->maxPersistentLogCount); manager->checkpoint_file_path = log_sdscat(log_sdsdup(config->persistentFilePath), ".idx"); memset(manager->in_buffer_log_sizes, 0, sizeof(uint32_t) * config->maxPersistentLogCount); manager->ring_file = log_ring_file_open(config->persistentFilePath, config->maxPersistentFileCount, config->maxPersistentFileSize, config->forceFlushDisk); } static void log_persistent_manager_clear(log_persistent_manager * manager) { log_group_destroy(manager->builder); ReleaseCriticalSection(manager->lock); if (manager->checkpoint_file_ptr != NULL) { fclose(manager->checkpoint_file_ptr); manager->checkpoint_file_ptr = NULL; } free(manager->in_buffer_log_sizes); log_sdsfree(manager->checkpoint_file_path); log_ring_file_close(manager->ring_file); } log_persistent_manager * create_log_persistent_manager(log_producer_config *config) { if (!log_producer_persistent_config_is_enabled(config)) { return NULL; } log_persistent_manager * manager = (log_persistent_manager *)malloc(sizeof(log_persistent_manager)); log_persistent_manager_init(manager, config); return manager; } void destroy_log_persistent_manager(log_persistent_manager *manager) { if (manager == NULL) { return; } log_persistent_manager_clear(manager); free(manager); } int log_persistent_manager_save_log(log_persistent_manager *manager, const char *logBuf, size_t logSize) { // save binlog const void * buffer[2]; size_t bufferSize[2]; log_persistent_item_header header; header.magic_code = LOG_PERSISTENT_HEADER_MAGIC; header.log_uuid = manager->checkpoint.now_log_uuid; header.log_size = logSize; header.preserved = 0; buffer[0] = &header; buffer[1] = logBuf; bufferSize[0] = sizeof(log_persistent_item_header); bufferSize[1] = logSize; int rst = log_ring_file_write(manager->ring_file, manager->checkpoint.now_file_offset, 2, buffer, bufferSize); if (rst != bufferSize[0] + bufferSize[1]) { aos_error_log("project %s, logstoe %s, write bin log failed, rst %d", manager->config->project, manager->config->logstore, rst); return LOG_PRODUCER_PERSISTENT_ERROR; } // update in memory checkpoint manager->in_buffer_log_sizes[manager->checkpoint.now_log_uuid % manager->config->maxPersistentLogCount] = rst; manager->checkpoint.now_file_offset += rst; ++manager->checkpoint.now_log_uuid; aos_debug_log("project %s, logstore %s, write bin log success, offset %lld, uuid %lld, log size %d", manager->config->project, manager->config->logstore, manager->checkpoint.now_file_offset, manager->checkpoint.now_log_uuid, rst); if (manager->first_checkpoint_saved == 0) { save_log_checkpoint(manager); manager->first_checkpoint_saved = 1; } return 0; } int log_persistent_manager_is_buffer_enough(log_persistent_manager *manager, size_t logSize) { if (manager->checkpoint.now_file_offset - manager->checkpoint.start_file_offset + logSize + 1024 > (uint64_t)manager->config->maxPersistentFileCount * manager->config->maxPersistentFileSize && manager->checkpoint.now_log_uuid - manager->checkpoint.start_log_uuid < manager->config->maxPersistentLogCount - 1) { return 0; } return 1; } static int log_persistent_manager_recover_inner(log_persistent_manager *manager, log_producer_manager *producer_manager) { int rst = recover_log_checkpoint(manager); if (rst != 0) { return rst; } aos_info_log("project %s, logstore %s, recover persistent checkpoint success, checkpoint %lld %lld %lld %lld", manager->config->project, manager->config->logstore, manager->checkpoint.start_file_offset, manager->checkpoint.now_file_offset, manager->checkpoint.start_log_uuid, manager->checkpoint.now_log_uuid); if (manager->checkpoint.start_file_offset == 0 && manager->checkpoint.now_file_offset == 0) { // no need to recover return 0; } // try recover ring file log_persistent_item_header header; uint64_t fileOffset = manager->checkpoint.start_file_offset; int64_t logUUID = manager->checkpoint.start_log_uuid; char * buffer = NULL; int max_buffer_size = 0; while (1) { rst = log_ring_file_read(manager->ring_file, fileOffset, &header, sizeof(log_persistent_item_header)); if (rst != sizeof(log_persistent_item_header)) { if (rst == 0) { aos_info_log("project %s, logstore %s, read end of file", manager->config->project, manager->config->logstore); if (buffer != NULL) { free(buffer); buffer = NULL; } break; } aos_error_log("project %s, logstore %s, read binlog file header failed, offset %lld, result %d", manager->config->project, manager->config->logstore, fileOffset, rst); if (buffer != NULL) { free(buffer); buffer = NULL; } return -1; } if (header.magic_code != LOG_PERSISTENT_HEADER_MAGIC || header.log_uuid < logUUID || header.log_size <= 0 || header.log_size > 10*1024*1024 ) { aos_info_log("project %s, logstore %s, read binlog file success, uuid %lld %lld", manager->config->project, manager->config->logstore, header.log_uuid, logUUID); break; } if (buffer == NULL || max_buffer_size < header.log_size) { if (buffer != NULL) { free(buffer); } buffer = (char *)malloc(header.log_size * 2); max_buffer_size = header.log_size * 2; } rst = log_ring_file_read(manager->ring_file, fileOffset + sizeof(log_persistent_item_header), buffer, header.log_size); if (rst != header.log_size) { // if read fail, just break aos_warn_log("project %s, logstore %s, read binlog file content failed, offset %lld, result %d", manager->config->project, manager->config->logstore, fileOffset + sizeof(log_persistent_item_header), rst); break; } if (header.log_uuid - logUUID > 1024*1024) { aos_error_log("project %s, logstore %s, log uuid jump, %lld %lld", manager->config->project, manager->config->logstore, header.log_uuid, logUUID); if (buffer != NULL) { free(buffer); buffer = NULL; } return -3; } // set empty log uuid len 0 for (int64_t emptyUUID = logUUID + 1; emptyUUID < header.log_uuid; ++emptyUUID) { manager->in_buffer_log_sizes[emptyUUID % manager->config->maxPersistentLogCount] = 0; } manager->in_buffer_log_sizes[header.log_uuid % manager->config->maxPersistentLogCount] = header.log_size + sizeof(log_persistent_item_header); logUUID = header.log_uuid; fileOffset += header.log_size + sizeof(log_persistent_item_header); rst = log_producer_manager_add_log_raw(producer_manager, buffer, header.log_size, 0, header.log_uuid); if (rst != 0) { aos_error_log("project %s, logstore %s, add log to producer manager failed, this log will been dropped", manager->config->project, manager->config->logstore); } } if (buffer != NULL) { free(buffer); buffer = NULL; } if (logUUID < manager->checkpoint.now_log_uuid - 1) { // replay fail aos_fatal_log("project %s, logstore %s, replay bin log failed, now log uuid %lld, expected min log uuid %lld, start uuid %lld, start offset %lld, now offset %lld, replayed offset %lld", manager->config->project, manager->config->logstore, logUUID, manager->checkpoint.now_log_uuid, manager->checkpoint.start_log_uuid, manager->checkpoint.start_file_offset, manager->checkpoint.now_file_offset, fileOffset); return -4; } // update new checkpoint when replay bin log success if (fileOffset > manager->checkpoint.start_file_offset) { manager->checkpoint.now_log_uuid = logUUID + 1; manager->checkpoint.now_file_offset = fileOffset; } aos_info_log("project %s, logstore %s, replay bin log success, now checkpoint %lld %lld %lld %lld", manager->config->project, manager->config->logstore, manager->checkpoint.start_log_uuid, manager->checkpoint.now_log_uuid, manager->checkpoint.start_file_offset, manager->checkpoint.now_file_offset); // save new checkpoint rst = save_log_checkpoint(manager); if (rst != 0) { aos_error_log("project %s, logstore %s, save checkpoint failed, reason %d", manager->config->project, manager->config->logstore, rst); } return rst; } static void log_persistent_manager_reset(log_persistent_manager * manager) { log_producer_config * config = manager->config; log_persistent_manager_clear(manager); log_persistent_manager_init(manager, config); manager->checkpoint.start_log_uuid = (int64_t)(time(NULL)) * 1000LL * 1000LL * 1000LL + 500LL * 1000LL * 1000LL; manager->checkpoint.now_log_uuid = manager->checkpoint.start_log_uuid; manager->is_invalid = 0; } int log_persistent_manager_recover(log_persistent_manager *manager, log_producer_manager *producer_manager) { aos_info_log("project %s, logstore %s, start recover persistent manager", manager->config->project, manager->config->logstore); CS_ENTER(manager->lock); int rst = log_persistent_manager_recover_inner(manager, producer_manager); if (rst != 0) { // if recover failed, reset persistent manager manager->is_invalid = 1; log_persistent_manager_reset(manager); } else { manager->is_invalid = 0; } CS_LEAVE(manager->lock); return rst; }