static int log_persistent_manager_recover_inner()

in Sources/aliyun-log-c-sdk/log_persistent_manager.c [275:447]


static int log_persistent_manager_recover_inner(log_persistent_manager *manager,
                                                log_producer_manager *producer_manager)
{
    int rst = recover_log_checkpoint(manager);
    if (rst != 0)
    {
        return rst;
    }

    aos_info_log("project %s, logstore %s, recover persistent checkpoint success, checkpoint %lld %lld %lld %lld",
                 manager->config->project,
                 manager->config->logstore,
                 manager->checkpoint.start_file_offset,
                 manager->checkpoint.now_file_offset,
                 manager->checkpoint.start_log_uuid,
                 manager->checkpoint.now_log_uuid);

    if (manager->checkpoint.start_file_offset == 0 && manager->checkpoint.now_file_offset == 0)
    {
        // no need to recover
        return 0;
    }

    // try recover ring file

    log_persistent_item_header header;

    uint64_t fileOffset = manager->checkpoint.start_file_offset;
    int64_t logUUID = manager->checkpoint.start_log_uuid;

    char * buffer = NULL;
    int max_buffer_size = 0;

    while (1)
    {
        rst = log_ring_file_read(manager->ring_file, fileOffset, &header, sizeof(log_persistent_item_header));
        if (rst != sizeof(log_persistent_item_header))
        {
            if (rst == 0)
            {
                aos_info_log("project %s, logstore %s, read end of file",
                             manager->config->project,
                             manager->config->logstore);
                if (buffer != NULL)
                {
                    free(buffer);
                    buffer = NULL;
                }
                break;
            }
            aos_error_log("project %s, logstore %s, read binlog file header failed, offset %lld, result %d",
                          manager->config->project,
                          manager->config->logstore,
                          fileOffset,
                          rst);
            if (buffer != NULL)
            {
                free(buffer);
                buffer = NULL;
            }
            return -1;
        }
        if (header.magic_code != LOG_PERSISTENT_HEADER_MAGIC ||
            header.log_uuid < logUUID ||
            header.log_size <= 0 || header.log_size > 10*1024*1024 )
        {
            aos_info_log("project %s, logstore %s, read binlog file success, uuid %lld %lld",
                          manager->config->project,
                          manager->config->logstore,
                          header.log_uuid,
                          logUUID);
            break;
        }
        if (buffer == NULL || max_buffer_size < header.log_size)
        {
            if (buffer != NULL)
            {
                free(buffer);
            }
            buffer = (char *)malloc(header.log_size * 2);
            max_buffer_size = header.log_size * 2;
        }
        rst = log_ring_file_read(manager->ring_file, fileOffset + sizeof(log_persistent_item_header), buffer, header.log_size);
        if (rst != header.log_size)
        {
            // if read fail, just break
            aos_warn_log("project %s, logstore %s, read binlog file content failed, offset %lld, result %d",
                          manager->config->project,
                          manager->config->logstore,
                          fileOffset + sizeof(log_persistent_item_header),
                          rst);
            break;
        }
        if (header.log_uuid - logUUID > 1024*1024)
        {
            aos_error_log("project %s, logstore %s, log uuid jump, %lld %lld",
                          manager->config->project,
                          manager->config->logstore,
                          header.log_uuid,
                          logUUID);
            if (buffer != NULL)
            {
                free(buffer);
                buffer = NULL;
            }
            return -3;
        }
        // set empty log uuid len 0
        for (int64_t emptyUUID = logUUID + 1; emptyUUID < header.log_uuid; ++emptyUUID)
        {
            manager->in_buffer_log_sizes[emptyUUID % manager->config->maxPersistentLogCount] = 0;
        }
        manager->in_buffer_log_sizes[header.log_uuid % manager->config->maxPersistentLogCount] = header.log_size + sizeof(log_persistent_item_header);

        logUUID = header.log_uuid;
        fileOffset += header.log_size + sizeof(log_persistent_item_header);

        rst = log_producer_manager_add_log_raw(producer_manager, buffer, header.log_size, 0, header.log_uuid);
        if (rst != 0)
        {
            aos_error_log("project %s, logstore %s, add log to producer manager failed, this log will been dropped",
                          manager->config->project,
                          manager->config->logstore);
        }

    }
    if (buffer != NULL)
    {
        free(buffer);
        buffer = NULL;
    }

    if (logUUID < manager->checkpoint.now_log_uuid - 1)
    {
        // replay fail
        aos_fatal_log("project %s, logstore %s, replay bin log failed, now log uuid %lld, expected min log uuid %lld, start uuid %lld, start offset  %lld, now offset  %lld, replayed offset %lld",
                      manager->config->project,
                      manager->config->logstore,
                      logUUID,
                      manager->checkpoint.now_log_uuid,
                      manager->checkpoint.start_log_uuid,
                      manager->checkpoint.start_file_offset,
                      manager->checkpoint.now_file_offset,
                      fileOffset);
        return -4;
    }

    // update new checkpoint when replay bin log success
    if (fileOffset > manager->checkpoint.start_file_offset)
    {
        manager->checkpoint.now_log_uuid = logUUID + 1;
        manager->checkpoint.now_file_offset = fileOffset;
    }

    aos_info_log("project %s, logstore %s, replay bin log success, now checkpoint %lld %lld %lld %lld",
                 manager->config->project,
                 manager->config->logstore,
                 manager->checkpoint.start_log_uuid,
                 manager->checkpoint.now_log_uuid,
                 manager->checkpoint.start_file_offset,
                 manager->checkpoint.now_file_offset);

    // save new checkpoint
    rst = save_log_checkpoint(manager);
    if (rst != 0)
    {
        aos_error_log("project %s, logstore %s, save checkpoint failed, reason %d",
                      manager->config->project,
                      manager->config->logstore,
                      rst);
    }
    return rst;
}