in Sources/aliyun-log-c-sdk/log_persistent_manager.c [275:447]
static int log_persistent_manager_recover_inner(log_persistent_manager *manager,
log_producer_manager *producer_manager)
{
int rst = recover_log_checkpoint(manager);
if (rst != 0)
{
return rst;
}
aos_info_log("project %s, logstore %s, recover persistent checkpoint success, checkpoint %lld %lld %lld %lld",
manager->config->project,
manager->config->logstore,
manager->checkpoint.start_file_offset,
manager->checkpoint.now_file_offset,
manager->checkpoint.start_log_uuid,
manager->checkpoint.now_log_uuid);
if (manager->checkpoint.start_file_offset == 0 && manager->checkpoint.now_file_offset == 0)
{
// no need to recover
return 0;
}
// try recover ring file
log_persistent_item_header header;
uint64_t fileOffset = manager->checkpoint.start_file_offset;
int64_t logUUID = manager->checkpoint.start_log_uuid;
char * buffer = NULL;
int max_buffer_size = 0;
while (1)
{
rst = log_ring_file_read(manager->ring_file, fileOffset, &header, sizeof(log_persistent_item_header));
if (rst != sizeof(log_persistent_item_header))
{
if (rst == 0)
{
aos_info_log("project %s, logstore %s, read end of file",
manager->config->project,
manager->config->logstore);
if (buffer != NULL)
{
free(buffer);
buffer = NULL;
}
break;
}
aos_error_log("project %s, logstore %s, read binlog file header failed, offset %lld, result %d",
manager->config->project,
manager->config->logstore,
fileOffset,
rst);
if (buffer != NULL)
{
free(buffer);
buffer = NULL;
}
return -1;
}
if (header.magic_code != LOG_PERSISTENT_HEADER_MAGIC ||
header.log_uuid < logUUID ||
header.log_size <= 0 || header.log_size > 10*1024*1024 )
{
aos_info_log("project %s, logstore %s, read binlog file success, uuid %lld %lld",
manager->config->project,
manager->config->logstore,
header.log_uuid,
logUUID);
break;
}
if (buffer == NULL || max_buffer_size < header.log_size)
{
if (buffer != NULL)
{
free(buffer);
}
buffer = (char *)malloc(header.log_size * 2);
max_buffer_size = header.log_size * 2;
}
rst = log_ring_file_read(manager->ring_file, fileOffset + sizeof(log_persistent_item_header), buffer, header.log_size);
if (rst != header.log_size)
{
// if read fail, just break
aos_warn_log("project %s, logstore %s, read binlog file content failed, offset %lld, result %d",
manager->config->project,
manager->config->logstore,
fileOffset + sizeof(log_persistent_item_header),
rst);
break;
}
if (header.log_uuid - logUUID > 1024*1024)
{
aos_error_log("project %s, logstore %s, log uuid jump, %lld %lld",
manager->config->project,
manager->config->logstore,
header.log_uuid,
logUUID);
if (buffer != NULL)
{
free(buffer);
buffer = NULL;
}
return -3;
}
// set empty log uuid len 0
for (int64_t emptyUUID = logUUID + 1; emptyUUID < header.log_uuid; ++emptyUUID)
{
manager->in_buffer_log_sizes[emptyUUID % manager->config->maxPersistentLogCount] = 0;
}
manager->in_buffer_log_sizes[header.log_uuid % manager->config->maxPersistentLogCount] = header.log_size + sizeof(log_persistent_item_header);
logUUID = header.log_uuid;
fileOffset += header.log_size + sizeof(log_persistent_item_header);
rst = log_producer_manager_add_log_raw(producer_manager, buffer, header.log_size, 0, header.log_uuid);
if (rst != 0)
{
aos_error_log("project %s, logstore %s, add log to producer manager failed, this log will been dropped",
manager->config->project,
manager->config->logstore);
}
}
if (buffer != NULL)
{
free(buffer);
buffer = NULL;
}
if (logUUID < manager->checkpoint.now_log_uuid - 1)
{
// replay fail
aos_fatal_log("project %s, logstore %s, replay bin log failed, now log uuid %lld, expected min log uuid %lld, start uuid %lld, start offset %lld, now offset %lld, replayed offset %lld",
manager->config->project,
manager->config->logstore,
logUUID,
manager->checkpoint.now_log_uuid,
manager->checkpoint.start_log_uuid,
manager->checkpoint.start_file_offset,
manager->checkpoint.now_file_offset,
fileOffset);
return -4;
}
// update new checkpoint when replay bin log success
if (fileOffset > manager->checkpoint.start_file_offset)
{
manager->checkpoint.now_log_uuid = logUUID + 1;
manager->checkpoint.now_file_offset = fileOffset;
}
aos_info_log("project %s, logstore %s, replay bin log success, now checkpoint %lld %lld %lld %lld",
manager->config->project,
manager->config->logstore,
manager->checkpoint.start_log_uuid,
manager->checkpoint.now_log_uuid,
manager->checkpoint.start_file_offset,
manager->checkpoint.now_file_offset);
// save new checkpoint
rst = save_log_checkpoint(manager);
if (rst != 0)
{
aos_error_log("project %s, logstore %s, save checkpoint failed, reason %d",
manager->config->project,
manager->config->logstore,
rst);
}
return rst;
}