Sources/aliyun-log-c-sdk/log_ring_file.c (228 lines of code) (raw):

// // Created by davidzhang on 2020/8/23. // #include "log_ring_file.h" #include "log_sds.h" #include "inner_log.h" #include <fcntl.h> static void get_ring_file_offset(log_ring_file * file, uint64_t offset, int32_t * fileIndex, int32_t * fileOffset) { *fileIndex = (offset / file->maxFileSize) % file->maxFileCount; *fileOffset = offset % file->maxFileSize; } static int log_ring_file_open_fd(log_ring_file *file, uint64_t offset, int32_t fileIndex, int32_t fileOffset) { if (file->nowFD > 0 && file->nowFileIndex == fileIndex && file->nowOffset % file->maxFileSize == fileOffset) { return 0; } if (file->nowFD > 0) { close(file->nowFD); file->nowFD = -1; } file->fileRemoveFlags[fileIndex] = 0; char filePath[256] = {0}; snprintf(filePath, 255, "%s_%03d", file->filePath, fileIndex); int openFlag = O_RDWR|O_CREAT; if (file->syncWrite) { openFlag |= O_SYNC; } file->nowFD = open(filePath, openFlag, 0644); if (file->nowFD < 0) { aos_error_log("open file failed %s, error %s", filePath, strerror(errno)); return -1; } if (fileOffset != 0) { lseek(file->nowFD, fileOffset, SEEK_SET); } file->nowFileIndex = fileIndex; file->nowOffset = offset; return 0; } log_ring_file * log_ring_file_open(const char *filePath, int maxFileCount, int maxFileSize, int syncWrite) { log_ring_file * file = (log_ring_file *)malloc(sizeof(log_ring_file)); memset(file, 0, sizeof(log_ring_file)); file->filePath = log_sdsdup((const log_sds)filePath); file->nowFD = -1; file->maxFileCount = maxFileCount; file->maxFileSize = maxFileSize; file->syncWrite = syncWrite; file->fileRemoveFlags = (int *)malloc(sizeof(int) * file->maxFileCount); memset(file->fileRemoveFlags, 0, sizeof(int) * file->maxFileCount); file->fileUseFlags = (int *)malloc(sizeof(int) * file->maxFileCount); memset(file->fileUseFlags, 0, sizeof(int) * file->maxFileCount); return file; } int log_ring_file_write_single(log_ring_file *file, uint64_t offset, const void *buffer, size_t buffer_size) { // { // int openFlag = O_RDWR|O_CREAT; // if (file->syncWrite) // { // openFlag |= O_SYNC; // } // file->nowFD = open(file->filePath, openFlag, 0644); // if (file->nowFD < 0) // { // aos_error_log("open file failed %s, error %s", file->filePath, strerror(errno)); // return -1; // } // if (offset != 0) // { // lseek(file->nowFD, offset, SEEK_SET); // } // int rst = write(file->nowFD, (char *)buffer, buffer_size); // close(file->nowFD); // file->nowFD = 0; // return rst; // } int32_t fileIndex = 0; int32_t fileOffset = 0; size_t nowOffset = 0; while (nowOffset < buffer_size) { get_ring_file_offset(file, offset + nowOffset, &fileIndex, &fileOffset); if (log_ring_file_open_fd(file, offset, fileIndex, fileOffset) != 0) { return -1; } int writeSize = buffer_size - nowOffset; if (file->maxFileSize - fileOffset <= writeSize) { writeSize = file->maxFileSize - fileOffset; } int rst = write(file->nowFD, (char *)buffer + nowOffset, writeSize); if (rst != writeSize) { aos_error_log("write buffer to file failed, file %s, offset %d, size %d, error %s", file->filePath, fileIndex + nowOffset, file->maxFileSize - fileOffset, strerror(errno)); return -1; } nowOffset += rst; file->nowOffset += rst; } return buffer_size; } int log_ring_file_write(log_ring_file *file, uint64_t offset, int buffer_count, const void **buffer, size_t *buffer_size) { uint64_t inner_offset = 0; for (int (i) = 0; (i) < buffer_count; ++(i)) { if (log_ring_file_write_single(file, offset + inner_offset, buffer[i], buffer_size[i]) != buffer_size[i]) { return -1; } inner_offset += buffer_size[i]; } return inner_offset; } int log_ring_file_read(log_ring_file *file, uint64_t offset, void *buffer, size_t buffer_size) { // { // int openFlag = O_RDWR|O_CREAT; // if (file->syncWrite) // { // openFlag |= O_SYNC; // } // file->nowFD = open(file->filePath, openFlag, 0644); // if (file->nowFD < 0) // { // aos_error_log("open file failed %s, error %s", file->filePath, strerror(errno)); // return -1; // } // if (offset != 0) // { // lseek(file->nowFD, offset, SEEK_SET); // } // int rst = read(file->nowFD, buffer, buffer_size); // close(file->nowFD); // file->nowFD = 0; // return rst; // } int32_t fileIndex = 0; int32_t fileOffset = 0; size_t nowOffset = 0; while (nowOffset < buffer_size) { get_ring_file_offset(file, offset + nowOffset, &fileIndex, &fileOffset); if (log_ring_file_open_fd(file, offset, fileIndex, fileOffset) != 0) { return -1; } int rst = 0; int readSize = buffer_size - nowOffset; if (readSize > file->maxFileSize - fileOffset) { readSize = file->maxFileSize - fileOffset; } if ((rst = read(file->nowFD, buffer + nowOffset, readSize)) != readSize) { if (errno == ENOENT) { return 0; } if (rst > 0) { file->nowOffset += rst; nowOffset += rst; continue; } if (rst == 0) { return file->nowOffset - offset; } aos_error_log("read buffer from file failed, file %s, offset %d, size %d, error %s", file->filePath, fileIndex + nowOffset, file->maxFileSize - fileOffset, strerror(errno)); return -1; } nowOffset += file->maxFileSize - fileOffset; file->nowOffset += file->maxFileSize - fileOffset; } return buffer_size; } int log_ring_file_flush(log_ring_file *file) { if (file->nowFD > 0) { return fsync(file->nowFD); } return -1; } void log_ring_file_remove_file(log_ring_file *file, int32_t fileIndex) { if (file->fileRemoveFlags[fileIndex] > 0) { return; } char filePath[256] = {0}; snprintf(filePath, 255, "%s_%03d", file->filePath, fileIndex); remove(filePath); aos_info_log("remove file %s", filePath); file->fileRemoveFlags[fileIndex] = 1; } int log_ring_file_clean(log_ring_file *file, uint64_t startOffset, uint64_t endOffset) { if (endOffset > file->nowOffset) { aos_error_log("try to clean invalid ring file %s, start %lld, end %lld, now %lld", file->filePath, startOffset, endOffset, file->nowOffset); return -1; } if ((file->nowOffset - endOffset) / file->maxFileSize >= file->maxFileCount - 1) { // no need to clean return 0; } memset(file->fileUseFlags, 0, sizeof(int) * file->maxFileCount); for (int64_t i = endOffset / file->maxFileSize; i <= file->nowOffset / file->maxFileSize; ++i) { file->fileUseFlags[i % file->maxFileCount] = 1; } aos_info_log("remove file %s , offset from %lld to %lld, file offset %lld, index from %d to %d", file->filePath, startOffset, endOffset, file->nowOffset, endOffset / file->maxFileSize, file->nowOffset / file->maxFileSize); for (int i = 0; i < file->maxFileCount; ++i) { if (file->fileUseFlags[i] != 0) continue; log_ring_file_remove_file(file, i); } return 0; } int log_ring_file_close(log_ring_file *file) { if (file != NULL) { log_sdsfree(file->filePath); if (file->nowFD > 0) { close(file->nowFD); file->nowFD = -1; } free(file->fileRemoveFlags); free(file->fileUseFlags); free(file); } return 0; }