s3_diver/s3_diver_03-test.py [126:268]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    mpu = s3.create_multipart_upload(Bucket=bucket_name, Key=key_name, StorageClass=s3_class, Metadata={"snowball-auto-extract": "true"})
    mpu_id = mpu["UploadId"]
    return mpu_id

def upload_mpu(key_name, mpu_id, data, index):
    #part = s3.upload_part(Body=data, Bucket=bucket_name, Key=key_name, UploadId=mpu_id, PartNumber=index, ContentLength=max_buf_size)
    part = s3.upload_part(Body=data, Bucket=bucket_name, Key=key_name, UploadId=mpu_id, PartNumber=index)
    parts.append({"PartNumber": index, "ETag": part["ETag"]})
    #print ('parts list: %s' % str(parts))
    return parts

def complete_mpu(key_name, mpu_id, parts):
    result = s3.complete_multipart_upload(
        Bucket=bucket_name,
        Key=key_name,
        UploadId=mpu_id,
        MultipartUpload={"Parts": parts})
    return result

def adjusting_parts_order(mpu_parts):
    return sorted(mpu_parts, key=lambda item: item['PartNumber'])

def buf_fifo(buf):
    tmp_buf = io.BytesIO()            # added for FIFO operation
    tmp_buf.write(buf.read())    # added for FIFO operation
    #print ('3. before fifo, recv_buf_size: %s' % len(buf.getvalue()))
    #print('3.before fifo, recv_buf_pos : %s' % buf.tell())
    buf.seek(0,0)
    buf.truncate(0)
    tmp_buf.seek(0,0)
    buf.write(tmp_buf.read())
    return buf

def copy_to_snowball(error_log, success_log, key_name, org_files_list):
    tar_file_size = 0
    recv_buf = io.BytesIO()
    mpu_id = create_mpu(key_name)
    parts_index = 1
    s_log = success_log
    e_log = error_log
    with tarfile.open(fileobj=recv_buf, mode="w") as tar:
        for org_file in org_files_list:
            if os.path.isfile(org_file):
                tar.add(org_file)
                #print ('1. recv_buf_size: %s' % len(recv_buf.getvalue()))
                log_success(s_log, org_file, " is archiving \n" )
                recv_buf_size = recv_buf.tell()
                #print ('1. recv_buf_pos: %s' % recv_buf.tell())
                if recv_buf_size > max_part_size:
                    print('multi part uploading:  %s / %s , size: %s' % (parts_index, max_part_count, recv_buf_size))
                    chunk_count = int(recv_buf_size / max_part_size)
                    tar_file_size = tar_file_size + recv_buf_size
                    print('%s is accumulating, size: %s' % (key_name, tar_file_size))
                    #print('chunk_count: %s ' % chunk_count)
                    for buf_index in range(chunk_count):
                        start_pos = buf_index * max_part_size
                        recv_buf.seek(start_pos,0)
                        mpu_parts = upload_mpu(key_name, mpu_id, recv_buf.read(max_part_size), parts_index)
                        parts_index += 1
                    ####################
                    buf_fifo(recv_buf)
                    recv_buf_size = recv_buf.tell()
                    #print('3.after fifo, recv_buf_pos : %s' % recv_buf.tell())
                    #print ('3. after fifo, recv_buf_size: %s' % len(recv_buf.getvalue()))
                else:
                    pass
                    #print('accumulating files...')
            else:
                log_error(e_log, org_file," does not exist\n")
                print (org_file + ' is not exist...............................................\n')
    recv_buf.seek(0,0)
    mpu_parts = upload_mpu(key_name, mpu_id, recv_buf.read(), parts_index)
    parts_index += 1
    mpu_parts = adjusting_parts_order(mpu_parts)
    complete_mpu(key_name, mpu_id, mpu_parts)
    ### print metadata
    meta_out = s3.head_object(Bucket=bucket_name, Key=key_name)
    print ('\n metadata info: %s' % str(meta_out)) 
    log_success(s_log, str(meta_out), '!!\n')
    print ("\n tar file: %s \n" % key_name)
    log_success(s_log, key_name, ' is uploaded successfully\n')

def snowball_uploader_help(**args):
    print ("Usage: %s 'genlist | cp_snowball | help'" % sys.argv[0])
    print ("use python3, not compatible with python2!!!")    
    #print ('\n')
    print ('genlist: ')
    print ('this option will generate files which are containing target files list in %s'% (filelist_dir))
    #print ('\n')
    print ('cp_snowball: ')
    print ('cp_snowball option will copy the files on server to snowball efficiently')
    print ('the mechanism is here:')
    print ('1. reads the target file name from the one filelist file in filelist directory')
    print ('2. accumulates files to max_part_size in memory')
    print ('3. if it reachs max_part_size, send it to snowball using MultiPartUpload')
    print ('4. tar files uploading to Snowball concurrently according to  max_process')
    print ('5. after complete to send, tar file is generated in snowball')
    print ('6. then, moves to the next filelist file recursively')

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print ("Usage: %s genlist | cp_snowball | help" % sys.argv[0])
        print ("use python3, not compatible with python2!!!")
        sys.exit()
    elif sys.argv[1] == "genlist":
        gen_filelist()
    elif sys.argv[1] == "cp_snowball":
        session = boto3.Session(profile_name=profile_name)
        s3 = session.client('s3', endpoint_url=endpoint)
        #source_files =  [ f for f in os.listdir(filelist_dir) if os.path.isfile(f)]
        source_files =  os.listdir(filelist_dir)
        max_source_files = len(source_files)
        source_files_count = 0
        task_process = []
        task_index = 0
        for sf in source_files:
            error_log = ('error_%s_%s.log' % (sf, current_time))
            success_log = ('success_%s_%s.log' % (sf, current_time))
            source_file = os.path.join(filelist_dir, sf)
            #org_files_list = open(source_file, encoding='utf8').readlines()
            org_files_list = get_org_files_list(source_file)
            key_name = ("snowball-%s-%s.tar" % (sf[:-4], current_time))
            #print ("key_name:", key_name)
            #print ('\n0. ###########################')
            #print ('0. %s is starting' % sf)
            #print ('0. ###########################')
            #copy_to_snowball(org_files_list)
            task_process.append(multiprocessing.Process(target = copy_to_snowball, args=(error_log, success_log, key_name, org_files_list,)))
            task_process[-1].start()
            source_files_count+=1
            #print ('1. ###########################')
            print ('1. %s is processing, transfered tar files: %s / %s' % (sf, source_files_count, max_source_files))
            #print ('1. ###########################')
            parts = []
            if task_index >= max_process:
                pjoin = [ proc.join() for proc in task_process ]
                task_index = 0
                task_process = []
            task_index += 1
        #print ('part progess of tar file could not reach the max, sorry for inconvenience')
        print ("Uploading Finished")
    else:
        snowball_uploader_help()
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



snowball_uploader_27-success-prod.py [177:319]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        mpu = s3.create_multipart_upload(Bucket=bucket_name, Key=key_name, StorageClass=s3_class, Metadata={"snowball-auto-extract": "true"})
    mpu_id = mpu["UploadId"]
    return mpu_id

def upload_mpu(key_name, mpu_id, data, index):
    #part = s3.upload_part(Body=data, Bucket=bucket_name, Key=key_name, UploadId=mpu_id, PartNumber=index, ContentLength=max_buf_size)
    part = s3.upload_part(Body=data, Bucket=bucket_name, Key=key_name, UploadId=mpu_id, PartNumber=index)
    parts.append({"PartNumber": index, "ETag": part["ETag"]})
    #print ('parts list: %s' % str(parts))
    return parts

def complete_mpu(key_name, mpu_id, parts):
    result = s3.complete_multipart_upload(
        Bucket=bucket_name,
        Key=key_name,
        UploadId=mpu_id,
        MultipartUpload={"Parts": parts})
    return result

def adjusting_parts_order(mpu_parts):
    return sorted(mpu_parts, key=lambda item: item['PartNumber'])

def buf_fifo(buf):
    tmp_buf = io.BytesIO()            # added for FIFO operation
    tmp_buf.write(buf.read())    # added for FIFO operation
    #print ('3. before fifo, recv_buf_size: %s' % len(buf.getvalue()))
    #print('3.before fifo, recv_buf_pos : %s' % buf.tell())
    buf.seek(0,0)
    buf.truncate(0)
    tmp_buf.seek(0,0)
    buf.write(tmp_buf.read())
    return buf

def copy_to_snowball(error_log, success_log, key_name, org_files_list):
    tar_file_size = 0
    recv_buf = io.BytesIO()
    mpu_id = create_mpu(key_name)
    parts_index = 1
    s_log = success_log
    e_log = error_log
    with tarfile.open(fileobj=recv_buf, mode="w") as tar:
        for org_file in org_files_list:
            if os.path.isfile(org_file):
                tar.add(org_file)
                #print ('1. recv_buf_size: %s' % len(recv_buf.getvalue()))
                log_success(s_log, org_file, " is archiving \n" )
                recv_buf_size = recv_buf.tell()
                #print ('1. recv_buf_pos: %s' % recv_buf.tell())
                if recv_buf_size > max_part_size:
                    print('multi part uploading:  %s / %s , size: %s' % (parts_index, max_part_count, recv_buf_size))
                    chunk_count = int(recv_buf_size / max_part_size)
                    tar_file_size = tar_file_size + recv_buf_size
                    print('%s is accumulating, size: %s' % (key_name, tar_file_size))
                    #print('chunk_count: %s ' % chunk_count)
                    for buf_index in range(chunk_count):
                        start_pos = buf_index * max_part_size
                        recv_buf.seek(start_pos,0)
                        mpu_parts = upload_mpu(key_name, mpu_id, recv_buf.read(max_part_size), parts_index)
                        parts_index += 1
                    ####################
                    buf_fifo(recv_buf)
                    recv_buf_size = recv_buf.tell()
                    #print('3.after fifo, recv_buf_pos : %s' % recv_buf.tell())
                    #print ('3. after fifo, recv_buf_size: %s' % len(recv_buf.getvalue()))
                else:
                    pass
                    #print('accumulating files...')
            else:
                log_error(e_log, org_file," does not exist\n")
                print (org_file + ' is not exist...............................................\n')
    recv_buf.seek(0,0)
    mpu_parts = upload_mpu(key_name, mpu_id, recv_buf.read(), parts_index)
    parts_index += 1
    mpu_parts = adjusting_parts_order(mpu_parts)
    complete_mpu(key_name, mpu_id, mpu_parts)
    ### print metadata
    meta_out = s3.head_object(Bucket=bucket_name, Key=key_name)
    print ('\n metadata info: %s' % str(meta_out)) 
    log_success(s_log, str(meta_out), '!!\n')
    print ("\n tar file: %s \n" % key_name)
    log_success(s_log, key_name, ' is uploaded successfully\n')

def snowball_uploader_help(**args):
    print ("Usage: %s 'genlist | cp_snowball | help'" % sys.argv[0])
    print ("use python3, not compatible with python2!!!")    
    #print ('\n')
    print ('genlist: ')
    print ('this option will generate files which are containing target files list in %s'% (filelist_dir))
    #print ('\n')
    print ('cp_snowball: ')
    print ('cp_snowball option will copy the files on server to snowball efficiently')
    print ('the mechanism is here:')
    print ('1. reads the target file name from the one filelist file in filelist directory')
    print ('2. accumulates files to max_part_size in memory')
    print ('3. if it reachs max_part_size, send it to snowball using MultiPartUpload')
    print ('4. tar files uploading to Snowball concurrently according to  max_process')
    print ('5. after complete to send, tar file is generated in snowball')
    print ('6. then, moves to the next filelist file recursively')

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print ("Usage: %s genlist | cp_snowball | help" % sys.argv[0])
        print ("use python3, not compatible with python2!!!")
        sys.exit()
    elif sys.argv[1] == "genlist":
        gen_filelist()
    elif sys.argv[1] == "cp_snowball":
        session = boto3.Session(profile_name=profile_name)
        s3 = session.client('s3', endpoint_url=endpoint)
        #source_files =  [ f for f in os.listdir(filelist_dir) if os.path.isfile(f)]
        source_files =  os.listdir(filelist_dir)
        max_source_files = len(source_files)
        source_files_count = 0
        task_process = []
        task_index = 0
        for sf in source_files:
            error_log = ('error_%s_%s.log' % (sf, current_time))
            success_log = ('success_%s_%s.log' % (sf, current_time))
            source_file = os.path.join(filelist_dir, sf)
            #org_files_list = open(source_file, encoding='utf8').readlines()
            org_files_list = get_org_files_list(source_file)
            key_name = ("snowball-%s-%s.tar" % (sf[:-4], current_time))
            #print ("key_name:", key_name)
            #print ('\n0. ###########################')
            #print ('0. %s is starting' % sf)
            #print ('0. ###########################')
            #copy_to_snowball(org_files_list)
            task_process.append(multiprocessing.Process(target = copy_to_snowball, args=(error_log, success_log, key_name, org_files_list,)))
            task_process[-1].start()
            source_files_count+=1
            #print ('1. ###########################')
            print ('1. %s is processing, transfered tar files: %s / %s' % (sf, source_files_count, max_source_files))
            #print ('1. ###########################')
            parts = []
            if task_index >= max_process:
                pjoin = [ proc.join() for proc in task_process ]
                task_index = 0
                task_process = []
            task_index += 1
        #print ('part progess of tar file could not reach the max, sorry for inconvenience')
        print ("Uploading Finished")
    else:
        snowball_uploader_help()
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



