int S3fsCurl::ParallelMixMultipartUploadRequest()

in src/curl.cpp [1302:1439]


int S3fsCurl::ParallelMixMultipartUploadRequest(const char* tpath, headers_t& meta, int fd, const fdpage_list_t& mixuppages)
{
    int            result;
    std::string    upload_id;
    struct stat    st;
    int            fd2;
    etaglist_t     list;
    S3fsCurl       s3fscurl(true);

    S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);

    // duplicate fd
    if(-1 == (fd2 = dup(fd)) || 0 != lseek(fd2, 0, SEEK_SET)){
        S3FS_PRN_ERR("Could not duplicate file descriptor(errno=%d)", errno);
        if(-1 != fd2){
            close(fd2);
        }
        return -errno;
    }
    if(-1 == fstat(fd2, &st)){
        S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
        close(fd2);
        return -errno;
    }

    if(0 != (result = s3fscurl.PreMultipartPostRequest(tpath, meta, upload_id, true))){
        close(fd2);
        return result;
    }
    s3fscurl.DestroyCurlHandle();

    // for copy multipart
    std::string srcresource;
    std::string srcurl;
    MakeUrlResource(get_realpath(tpath).c_str(), srcresource, srcurl);
    meta["Content-Type"]      = S3fsCurl::LookupMimeType(std::string(tpath));
    meta["x-oss-copy-source"] = srcresource;

    // Initialize S3fsMultiCurl
    S3fsMultiCurl curlmulti(GetMaxParallelCount());
    curlmulti.SetSuccessCallback(S3fsCurl::MixMultipartPostCallback);
    curlmulti.SetRetryCallback(S3fsCurl::MixMultipartPostRetryCallback);

    for(fdpage_list_t::const_iterator iter = mixuppages.begin(); iter != mixuppages.end(); ++iter){
        if(iter->modified){
            // Multipart upload
            S3fsCurl* s3fscurl_para              = new S3fsCurl(true);

            s3fscurl_para->partdata.fd         = fd2;
            s3fscurl_para->partdata.startpos   = iter->offset;
            s3fscurl_para->partdata.size       = iter->bytes;
            s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
            s3fscurl_para->b_partdata_size     = s3fscurl_para->partdata.size;
            s3fscurl_para->partdata.add_etag_list(list);

            S3FS_PRN_INFO3("Upload Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset), static_cast<long long>(iter->bytes), s3fscurl_para->partdata.get_part_number());

            // initiate upload part for parallel
            if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
                S3FS_PRN_ERR("failed uploading part setup(%d)", result);
                close(fd2);
                delete s3fscurl_para;
                return result;
            }

            // set into parallel object
            if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){
                S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
                close(fd2);
                delete s3fscurl_para;
                return -EIO;
            }
        }else{
            // Multipart copy
            for(off_t i = 0, bytes = 0; i < iter->bytes; i += bytes){
                S3fsCurl* s3fscurl_para              = new S3fsCurl(true);

                bytes = std::min(static_cast<off_t>(GetMultipartCopySize()), iter->bytes - i);
                /* every part should be larger than MIN_MULTIPART_SIZE and smaller than FIVE_GB */
                off_t remain_bytes = iter->bytes - i - bytes;

                if ((MIN_MULTIPART_SIZE > remain_bytes) && (0 < remain_bytes)){
                    if(FIVE_GB < (bytes + remain_bytes)){
                        bytes = (bytes + remain_bytes)/2;
                    } else{
                        bytes += remain_bytes;
                    }
                }

                std::ostringstream strrange;
                strrange << "bytes=" << (iter->offset + i) << "-" << (iter->offset + i + bytes - 1);
                meta["x-oss-copy-source-range"] = strrange.str();

                s3fscurl_para->b_from   = SAFESTRPTR(tpath);
                s3fscurl_para->b_meta   = meta;
                s3fscurl_para->partdata.add_etag_list(list);

                S3FS_PRN_INFO3("Copy Part [tpath=%s][start=%lld][size=%lld][part=%d]", SAFESTRPTR(tpath), static_cast<long long>(iter->offset + i), static_cast<long long>(bytes), s3fscurl_para->partdata.get_part_number());

                // initiate upload part for parallel
                if(0 != (result = s3fscurl_para->CopyMultipartPostSetup(tpath, tpath, s3fscurl_para->partdata.get_part_number(), upload_id, meta))){
                    S3FS_PRN_ERR("failed uploading part setup(%d)", result);
                    close(fd2);
                    delete s3fscurl_para;
                    return result;
                }

                // set into parallel object
                if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){
                    S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
                    close(fd2);
                    delete s3fscurl_para;
                    return -EIO;
                }
            }
        }
    }

    // Multi request
    if(0 != (result = curlmulti.Request())){
        S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);

        S3fsCurl s3fscurl_abort(true);
        int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id);
        s3fscurl_abort.DestroyCurlHandle();
        if(result2 != 0){
            S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
        }
        close(fd2);
        return result;
    }
    close(fd2);

    if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){
        return result;
    }
    return 0;
}