int S3fsCurl::ParallelMultipartUploadRequest()

in src/curl.cpp [1212:1300]


int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, int fd)
{
    int            result;
    std::string    upload_id;
    struct stat    st;
    int            fd2;
    etaglist_t     list;
    off_t          remaining_bytes;
    S3fsCurl       s3fscurl(true);

    S3FS_PRN_INFO3("[tpath=%s][fd=%d]", SAFESTRPTR(tpath), fd);

    // duplicate fd
    if(-1 == (fd2 = dup(fd)) || 0 != lseek(fd2, 0, SEEK_SET)){
        S3FS_PRN_ERR("Could not duplicate file descriptor(errno=%d)", errno);
        if(-1 != fd2){
            close(fd2);
        }
        return -errno;
    }
    if(-1 == fstat(fd2, &st)){
        S3FS_PRN_ERR("Invalid file descriptor(errno=%d)", errno);
        close(fd2);
        return -errno;
    }

    if(0 != (result = s3fscurl.PreMultipartPostRequest(tpath, meta, upload_id, false))){
        close(fd2);
        return result;
    }
    s3fscurl.DestroyCurlHandle();

    // Initialize S3fsMultiCurl
    S3fsMultiCurl curlmulti(GetMaxParallelCount());
    curlmulti.SetSuccessCallback(S3fsCurl::UploadMultipartPostCallback);
    curlmulti.SetRetryCallback(S3fsCurl::UploadMultipartPostRetryCallback);

    // cycle through open fd, pulling off 10MB chunks at a time
    for(remaining_bytes = st.st_size; 0 < remaining_bytes; ){
        off_t chunk = remaining_bytes > S3fsCurl::multipart_size ? S3fsCurl::multipart_size : remaining_bytes;

        // s3fscurl sub object
        S3fsCurl* s3fscurl_para            = new S3fsCurl(true);
        s3fscurl_para->partdata.fd         = fd2;
        s3fscurl_para->partdata.startpos   = st.st_size - remaining_bytes;
        s3fscurl_para->partdata.size       = chunk;
        s3fscurl_para->b_partdata_startpos = s3fscurl_para->partdata.startpos;
        s3fscurl_para->b_partdata_size     = s3fscurl_para->partdata.size;
        s3fscurl_para->partdata.add_etag_list(list);

        // initiate upload part for parallel
        if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, s3fscurl_para->partdata.get_part_number(), upload_id))){
            S3FS_PRN_ERR("failed uploading part setup(%d)", result);
            close(fd2);
            delete s3fscurl_para;
            return result;
        }

        // set into parallel object
        if(!curlmulti.SetS3fsCurlObject(s3fscurl_para)){
            S3FS_PRN_ERR("Could not make curl object into multi curl(%s).", tpath);
            close(fd2);
            delete s3fscurl_para;
            return -EIO;
        }
        remaining_bytes -= chunk;
    }

    // Multi request
    if(0 != (result = curlmulti.Request())){
        S3FS_PRN_ERR("error occurred in multi request(errno=%d).", result);

        S3fsCurl s3fscurl_abort(true);
        int result2 = s3fscurl_abort.AbortMultipartUpload(tpath, upload_id);
        s3fscurl_abort.DestroyCurlHandle();
        if(result2 != 0){
            S3FS_PRN_ERR("error aborting multipart upload(errno=%d).", result2);
        }

        return result;
    }

    close(fd2);

    if(0 != (result = s3fscurl.CompleteMultipartPostRequest(tpath, upload_id, list))){
        return result;
    }
    return 0;
}