testSuite/scripts/utility.py (496 lines of code) (raw):
import ctypes
import os
import platform
import shutil
import subprocess
import shlex
import uuid
import random
import json
from pathlib import Path
from collections import namedtuple
# Command Class is used to create azcopy commands and validator commands.
class Command(object):
def __init__(self, command_type):
self.command_type = command_type
# initializing dictionary to store flags and its values.
self.flags = dict()
# initializing list to store arguments for azcopy and validator.
self.args = list()
# this api is used by command class instance to add arguments.
def add_arguments(self, argument):
if argument == None:
return
self.args.append(argument)
# auto-set MD5 checking flags, so that we always check when testing
ct = str.lower(self.command_type)
is_copy_or_sync = ct == "copy" or ct == "cp" or ct == "sync"
if is_copy_or_sync and (not str.startswith(argument, "http")): # this is a local location
if len(self.args) == 1:
self.add_flags("put-md5", "true") # this is an upload
else:
if "s3" in self.args[0]:
self.add_flags("check-md5", "FailIfDifferent")
else:
self.add_flags("check-md5", "FailIfDifferentOrMissing")
if ct == "create":
if len(self.args) == 1:
self.add_flags("generate-md5", "true") # We want to generate an MD5 on the way up.
return self
def add_flags(self, flag, value):
self.flags[flag] = value
return self
# returns the command by combining arguments and flags.
def string(self):
command = self.command_type
if len(self.args) > 0:
for arg in self.args:
if (len(arg) > 0):
# add '"' at start and end of each argument.
command += " " + '"' + arg + '"'
# iterating through all the values in dict and combining them.
if len(self.flags) > 0:
for key, value in self.flags.items():
command += " --" + key + "=" + '"' + str(value) + '"'
return command
# this api is used to execute a azcopy copy command.
# by default, command execute a upload command.
# return true or false for success or failure of command.
def execute_azcopy_copy_command(self):
return execute_azcopy_command(self.string())
# this api is used to execute a azcopy copy command.
# by default, command execute a upload command.
# return azcopy console output on successful execution.
def execute_azcopy_copy_command_get_output(self):
return execute_azcopy_command_get_output(self.string())
def execute_azcopy_command_interactive(self):
return execute_azcopy_command_interactive(self.string())
# api execute other azcopy commands like cancel, pause, resume or list.
def execute_azcopy_operation_get_output(self):
return execute_azcopy_command_get_output(self.string())
# api executes the azcopy validator to verify the azcopy operation.
def execute_azcopy_verify(self):
return verify_operation(self.string())
# api executes the clean command to delete the blob/container/file/share contents.
def execute_azcopy_clean(self):
return verify_operation(self.string())
# api executes the create command to create the blob/container/file/share/directory contents.
def execute_azcopy_create(self):
return verify_operation(self.string())
# api executes the info command to get AzCopy binary embedded infos.
def execute_azcopy_info(self):
return verify_operation_get_output(self.string())
# api executes the testSuite's upload command to upload(prepare) data to source URL.
def execute_testsuite_upload(self):
return verify_operation(self.string())
# processes oauth command according to switches
def process_oauth_command(
cmd,
fromTo=""):
if fromTo!="":
cmd.add_flags("from-to", fromTo)
# api executes the clean command on validator which deletes all the contents of the container.
def clean_test_container(container):
# execute the clean command.
result = Command("clean").add_arguments(container).add_flags("serviceType", "Blob").add_flags("resourceType", "Bucket").execute_azcopy_clean()
if not result:
print("error cleaning the container. please check the container sas provided")
return False
return True
def clean_test_blob_account(account):
result = Command("clean").add_arguments(account).add_flags("serviceType", "Blob").add_flags("resourceType", "Account").execute_azcopy_clean()
if not result:
print("error cleaning the blob account. please check the account sas provided")
return False
return True
def clean_test_s3_account(account):
if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "":
return True
result = Command("clean").add_arguments(account).add_flags("serviceType", "S3").add_flags("resourceType", "Account").execute_azcopy_clean()
if not result:
print("error cleaning the S3 account.")
return False
return True
def clean_test_gcp_account(account):
if 'GCP_TESTS_OFF' in os.environ and os.environ['GCP_TESTS_OFF'] != "":
return True
result = Command("clean").add_arguments(account).add_flags("serviceType", "GCP").add_flags("resourceType", "Account").execute_azcopy_clean()
if not result:
print("error cleaning the GCP account.")
return False
return True
def clean_test_file_account(account):
result = Command("clean").add_arguments(account).add_flags("serviceType", "File").add_flags("resourceType", "Account").execute_azcopy_clean()
if not result:
print("error cleaning the file account. please check the account sas provided")
return False
return True
# api executes the clean command on validator which deletes all the contents of the container.
def clean_test_share(shareURLStr):
# execute the clean command.
result = Command("clean").add_arguments(shareURLStr).add_flags("serviceType", "File").add_flags("resourceType", "Bucket").execute_azcopy_clean()
if not result:
print("error cleaning the share. please check the share sas provided")
return False
return True
def clean_test_filesystem(fileSystemURLStr):
result = Command("clean").add_arguments(fileSystemURLStr).add_flags("serviceType", "BlobFS").add_flags("resourceType", "Bucket").execute_azcopy_clean()
if not result:
print("error cleaning the filesystem. please check the filesystem URL, user and key provided")
return False
return True
# initialize_test_suite initializes the setup for executing test cases.
def initialize_test_suite(test_dir_path, container_sas, container_oauth, container_oauth_validate, share_sas_url, premium_container_sas, filesystem_url, filesystem_sas_url,
s2s_src_blob_account_url, s2s_src_file_account_url, s2s_src_s3_service_url, s2s_src_gcp_service_url, s2s_dst_blob_account_url, azcopy_exec_location, test_suite_exec_location):
# test_directory_path is global variable holding the location of test directory to execute all the test cases.
# contents are created, copied, uploaded and downloaded to and from this test directory only
global test_directory_path
# test_container_url is a global variable used in the entire testSuite holding the user given container shared access signature.
# all files / directory are uploaded and downloaded to and from this container.
global test_container_url
# test_oauth_container_url is a global variable used in the entire testSuite holding the user given container for oAuth testing.
# all files / directory are uploaded and downloaded to and from this container.
global test_oauth_container_url
# test_container_oauth_validate_sas_url is same container as test_oauth_container_url, while for validation purpose.
global test_oauth_container_validate_sas_url
# test_premium_account_contaier_url is a global variable used in the entire test suite holding the user given container sas of premium storage account container.
global test_premium_account_contaier_url
# test_share_url is a global variable used in the entire testSuite holding the user given share URL with shared access signature.
# all files / directory are uploaded and downloaded to and from this share.
global test_share_url
# holds the name of the azcopy executable
global azcopy_executable_name
# holds the name of the test suite executable
global test_suite_executable_name
# holds the filesystem url to perform the operations for blob fs service
global test_bfs_account_url
global test_bfs_sas_account_url
# holds account for s2s copy tests
global test_s2s_src_blob_account_url
global test_s2s_dst_blob_account_url
global test_s2s_src_file_account_url
global test_s2s_src_s3_service_url
global test_s2s_src_gcp_service_url
# creating a test_directory in the location given by user.
# this directory will be used to created and download all the test files.
new_dir_path = os.path.join(test_dir_path, "test_data")
# todo finally
try:
# removing the directory and its contents, if directory exists
shutil.rmtree(new_dir_path)
os.mkdir(new_dir_path)
except:
os.mkdir(new_dir_path)
# copying the azcopy executable to the newly created test directory.
# this copying is done to avoid using the executables at location which might be used by the user
# while test suite is running.
if os.path.isfile(azcopy_exec_location):
shutil.copy2(azcopy_exec_location, new_dir_path)
azcopy_executable_name = parse_out_executable_name(azcopy_exec_location)
else:
print("please verify the azcopy executable location")
return False
# copying the test executable to the newly created test directory.
# this copying is done to avoid using the executables at location which might be used by the user
# while test suite is running.
if os.path.isfile(test_suite_exec_location):
shutil.copy2(test_suite_exec_location, new_dir_path)
test_suite_executable_name = parse_out_executable_name(test_suite_exec_location)
else:
print("please verify the test suite executable location")
return False
test_directory_path = new_dir_path
test_bfs_account_url = filesystem_url
test_bfs_sas_account_url = filesystem_sas_url
if not (test_bfs_account_url.endswith("/") and test_bfs_account_url.endswith("\\")):
test_bfs_account_url = test_bfs_account_url + "/"
test_container_url = container_sas
test_oauth_container_url = container_oauth
if not (test_oauth_container_url.endswith("/") and test_oauth_container_url.endwith("\\")):
test_oauth_container_url = test_oauth_container_url + "/"
test_oauth_container_validate_sas_url = container_oauth_validate
test_premium_account_contaier_url = premium_container_sas
test_s2s_src_blob_account_url = s2s_src_blob_account_url
test_s2s_src_file_account_url = s2s_src_file_account_url
test_s2s_dst_blob_account_url = s2s_dst_blob_account_url
test_s2s_src_s3_service_url = s2s_src_s3_service_url
test_s2s_src_gcp_service_url = s2s_src_gcp_service_url
test_share_url = share_sas_url
if not clean_test_filesystem(test_bfs_account_url.rstrip("/").rstrip("\\")): # rstrip because clean fails if trailing /
print("failed to clean test filesystem.")
if not clean_test_container(test_container_url):
print("failed to clean test blob container.")
if not clean_test_container(test_oauth_container_validate_sas_url):
print("failed to clean OAuth test blob container.")
if not clean_test_container(test_premium_account_contaier_url):
print("failed to clean premium container.")
if not clean_test_blob_account(test_s2s_src_blob_account_url):
print("failed to clean s2s blob source account.")
if not clean_test_file_account(test_s2s_src_file_account_url):
print("failed to clean s2s file source account.")
if not clean_test_blob_account(test_s2s_dst_blob_account_url):
print("failed to clean s2s blob destination account.")
if not clean_test_s3_account(test_s2s_src_s3_service_url):
print("failed to clean s3 account.")
if not clean_test_gcp_account(test_s2s_src_gcp_service_url):
print("failed to clean GCS account")
if not clean_test_share(test_share_url):
print("failed to clean test share.")
return True
# initialize_test_suite initializes the setup for executing test cases.
def initialize_interactive_test_suite(test_dir_path, container_oauth, container_oauth_validate,
filesystem_url, oauth_tenant_id, oauth_aad_endpoint, azcopy_exec_location, test_suite_exec_location):
# test_directory_path is global variable holding the location of test directory to execute all the test cases.
# contents are created, copied, uploaded and downloaded to and from this test directory only
global test_directory_path
# test_oauth_container_url is a global variable used in the entire testSuite holding the user given container for oAuth testing.
# all files / directory are uploaded and downloaded to and from this container.
global test_oauth_container_url
# test_container_oauth_validate_sas_url is same container as test_oauth_container_url, while for validation purpose.
global test_oauth_container_validate_sas_url
# holds the name of the azcopy executable
global azcopy_executable_name
# holds the name of the test suite executable
global test_suite_executable_name
# holds the filesystem url to perform the operations for blob fs service
global test_bfs_account_url
# holds the oauth tenant id
global test_oauth_tenant_id
# holds the oauth aad encpoint
global test_oauth_aad_endpoint
# creating a test_directory in the location given by user.
# this directory will be used to created and download all the test files.
new_dir_path = os.path.join(test_dir_path, "test_data")
# todo finally
try:
# removing the directory and its contents, if directory exists
shutil.rmtree(new_dir_path)
os.mkdir(new_dir_path)
except:
os.mkdir(new_dir_path)
# copying the azcopy executable to the newly created test directory.
# this copying is done to avoid using the executables at location which might be used by the user
# while test suite is running.
if os.path.isfile(azcopy_exec_location):
shutil.copy2(azcopy_exec_location, new_dir_path)
azcopy_executable_name = parse_out_executable_name(azcopy_exec_location)
else:
print("please verify the azcopy executable location")
return False
# copying the test executable to the newly created test directory.
# this copying is done to avoid using the executables at location which might be used by the user
# while test suite is running.
if os.path.isfile(test_suite_exec_location):
shutil.copy2(test_suite_exec_location, new_dir_path)
test_suite_executable_name = parse_out_executable_name(test_suite_exec_location)
else:
print("please verify the test suite executable location")
return False
test_directory_path = new_dir_path
test_oauth_tenant_id = oauth_tenant_id
test_oauth_aad_endpoint = oauth_aad_endpoint
# set the filesystem url
test_bfs_account_url = filesystem_url
if not clean_test_filesystem(test_bfs_account_url):
return False
if not (test_bfs_account_url.endswith("/") and test_bfs_account_url.endwith("\\")):
test_bfs_account_url = test_bfs_account_url + "/"
test_oauth_container_url = container_oauth
if not (test_oauth_container_url.endswith("/") and test_oauth_container_url.endwith("\\")):
test_oauth_container_url = test_oauth_container_url + "/"
# as validate container URL point to same URL as oauth container URL, do clean up with validate container URL
test_oauth_container_validate_sas_url = container_oauth_validate
if not clean_test_container(test_oauth_container_validate_sas_url):
return False
return True
# given a path, parse out the name of the executable
def parse_out_executable_name(full_path):
head, tail = os.path.split(full_path)
return tail
# todo : find better way
# create_test_file creates a file with given file name and of given size inside the test directory.
# returns the local file path.
def create_test_file(filename, size):
# creating the file path
file_path = os.path.join(test_directory_path, filename)
# if file already exists, then removing the file.
if os.path.isfile(file_path):
os.remove(file_path)
f = open(file_path, 'w')
# since size of file can very large and size variable can overflow while holding the file size
# file is written in blocks of 1MB.
if size > 1024 * 1024:
total_size = size
while total_size > 0:
num_chars = 1024 * 1024
if total_size < num_chars:
num_chars = total_size
f.write('0' * num_chars)
total_size = total_size - num_chars
else:
num_chars = size
f.write('0' * num_chars)
f.close()
return file_path
def create_json_file(filename, jsonData):
# creating the file path
file_path = os.path.join(test_directory_path, filename + ".json")
# if file already exists, then removing the file.
if os.path.isfile(file_path):
os.remove(file_path)
with open(file_path, 'w') as outfile:
json.dump(jsonData, outfile)
outfile.close()
return file_path
def create_new_list_of_files(filename, list):
# creating the file path
file_path = os.path.join(test_directory_path, filename + ".txt")
if os.path.isfile(file_path):
os.remove(file_path)
with open(file_path, 'w') as outfile:
outfile.writelines(list)
outfile.close()
return file_path
# creates the a test html file inside the test directory.
# returns the local file path.
def create_test_html_file(filename):
# creating the file path
file_path = os.path.join(test_directory_path, filename)
# if file already exists, then removing the file.
if os.path.isfile(file_path):
os.remove(file_path)
f = open(file_path, 'w')
message = """<html>
<head></head>
<body><p>Hello World!</p></body>
</html>"""
f.write(message)
f.close()
return file_path
# creates a dir with given inside test directory
def create_test_dir(dir_name):
# If the directory exists, remove it.
dir_path = os.path.join(test_directory_path, dir_name)
if os.path.isdir(dir_path):
shutil.rmtree(dir_path)
try:
os.mkdir(dir_path)
except:
raise Exception("error creating directory ", dir_path)
return dir_path
# create_test_n_files creates given number of files for given size
# inside directory inside test directory.
# returns the path of directory in which n files are created.
def create_test_n_files(size, n, dir_name):
# creating directory inside test directory.
dir_n_files_path = os.path.join(test_directory_path, dir_name)
try:
shutil.rmtree(dir_n_files_path)
os.mkdir(dir_n_files_path)
except:
os.mkdir(dir_n_files_path)
# creating file prefix
filesprefix = "test" + str(n) + str(size)
# creating n files.
for index in range(0, n):
filename = filesprefix + '_' + str(index) + ".txt"
# creating the file path
file_path = os.path.join(dir_n_files_path, filename)
# if file already exists, then removing the file.
if os.path.isfile(file_path):
os.remove(file_path)
f = open(file_path, 'w')
# since size of file can very large and size variable can overflow while holding the file size
# file is written in blocks of 1MB.
if size > 1024 * 1024:
total_size = size
while total_size > 0:
num_chars = 1024 * 1024
if total_size < num_chars:
num_chars = total_size
f.write('0' * num_chars)
total_size = total_size - num_chars
else:
num_chars = size
f.write('0' * num_chars)
f.close()
return dir_n_files_path
# create_complete_sparse_file creates an empty used to
# test the page blob operations of azcopy
def create_complete_sparse_file(filename, filesize):
file_path = os.path.join(test_directory_path, filename)
sparse = Path(file_path)
sparse.touch()
os.truncate(str(sparse), filesize)
return file_path
# create_partial_sparse_file create a sparse file in test directory
# of size multiple of 8MB. for each 8MB, first 4MB is '0'
# and next 4MB is '\0'.
# return the local file path of created file.
def create_partial_sparse_file(filename, filesize):
file_path = os.path.join(test_directory_path, filename)
if os.path.isfile(file_path):
os.remove(file_path)
f = open(file_path, 'w')
# file size is less than 8MB or given size is not multiple of 8MB,
# no file is created.
if filesize < 8 * 1024 * 1024 or filesize % (8 * 1024 * 1024) != 0:
return None
else:
total_size = filesize
while total_size > 0:
num_chars = 4 * 1024 * 1024
f.write('0' * num_chars)
total_size = total_size - num_chars
if total_size <= 0:
break
f.write('\0' * num_chars)
total_size = total_size - num_chars
return file_path
# execute_azcopy_command executes the given azcopy command.
# returns true / false on success / failure of command.
def execute_azcopy_command(command):
# azcopy executable path location.
azspath = os.path.join(test_directory_path, azcopy_executable_name)
cmnd = azspath + " " + command
try:
# executing the command with timeout to set 3 minutes / 360 sec.
subprocess.check_output(
cmnd, stderr=subprocess.STDOUT, shell=True, timeout=360,
universal_newlines=True)
except subprocess.CalledProcessError as exec:
# todo kill azcopy command in case of timeout
print("command failed with error code " , exec.returncode , " and message " + exec.output)
return False
else:
return True
# execute_azcopy_command_interactive executes the given azcopy command in "inproc" mode.
# returns azcopy console output or none on success / failure of command.
def execute_azcopy_command_interactive(command):
# azcopy executable path location concatenated with inproc keyword.
azspath = os.path.join(test_directory_path, azcopy_executable_name)
cmnd = azspath + " " + command
if os.name == "nt":
process = subprocess.Popen(cmnd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
process = subprocess.Popen(shlex.split(cmnd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in iter(process.stdout.readline, b''):
print(line.decode('utf-8'))
process.wait()
if process.poll() == 0:
return True
else:
return False
# execute_azcopy_command_get_output executes the given azcopy command in "inproc" mode.
# returns azcopy console output or none on success / failure of command.
def execute_azcopy_command_get_output(command):
# azcopy executable path location concatenated with inproc keyword.
azspath = os.path.join(test_directory_path, azcopy_executable_name)
cmnd = azspath + " " + command
output = ""
try:
# executing the command with timeout set to 6 minutes / 360 sec.
output = subprocess.check_output(
cmnd, stderr=subprocess.STDOUT, shell=True, timeout=360,
universal_newlines=True)
except subprocess.CalledProcessError as exec:
# print("command failed with error code ", exec.returncode, " and message " + exec.output)
return exec.output
else:
return output
# verify_operation executes the validator command to verify the azcopy operations.
# return true / false on success / failure of command.
def verify_operation(command):
# testSuite executable local path inside the test directory.
test_suite_path = os.path.join(test_directory_path, test_suite_executable_name)
command = test_suite_path + " " + command
try:
# executing the command with timeout set to 6 minutes / 360 sec.
subprocess.check_output(
command, stderr=subprocess.STDOUT, shell=True, timeout=360,
universal_newlines=True)
except subprocess.CalledProcessError as exec:
print("command failed with error code ", exec.returncode, " and message " + exec.output)
return False
else:
return True
# verify_operation_get_output executes the validator command and returns output.
def verify_operation_get_output(command):
# testSuite executable local path inside the test directory.
test_suite_path = os.path.join(test_directory_path, test_suite_executable_name)
command = test_suite_path + " " + command
try:
# executing the command with timeout set to 10 minutes / 600 sec.
output = subprocess.check_output(
command, stderr=subprocess.STDOUT, shell=True, timeout=600,
universal_newlines=True)
except subprocess.CalledProcessError as exec:
#print("command failed with error code ", exec.returncode, " and message " + exec.output)
return None
else:
return output
def get_object_sas(url_with_sas, object_name):
# Splitting the container URL to add the uploaded blob name to the SAS
url_parts = url_with_sas.split("?")
# adding the blob name after the container name
if url_parts[0].endswith("/"):
resource_sas = url_parts[0] + object_name + '?' + url_parts[1]
else:
resource_sas = url_parts[0] + "/" + object_name + '?' + url_parts[1]
return resource_sas
def get_object_without_sas(url, object_name):
# Splitting the container URL to add the uploaded blob name to the SAS
url_parts = url.split("?")
# adding the blob name after the container name
if url_parts[0].endswith("/"):
resource_sas = url_parts[0] + object_name
else:
resource_sas = url_parts[0] + "/" + object_name
return resource_sas
# get_resource_sas return the shared access signature for the given resource
# using the container url.
def get_resource_sas(resource_name):
# Splitting the container URL to add the uploaded blob name to the SAS
url_parts = test_container_url.split("?")
# adding the blob name after the container name
resource_sas = url_parts[0] + "/" + resource_name + '?' + url_parts[1]
return resource_sas
def get_resource_from_oauth_container_validate(resource_name):
# Splitting the container URL to add the uploaded blob name to the SAS
url_parts = test_oauth_container_validate_sas_url.split("?")
# adding the blob name after the container name
resource_sas = url_parts[0] + "/" + resource_name + '?' + url_parts[1]
return resource_sas
def get_resource_from_oauth_container(resource_name):
return test_oauth_container_url + resource_name
def append_text_path_resource_sas(resource_sas, text):
# Splitting the resource sas to add the text to the SAS
url_parts = resource_sas.split("?")
# adding the text to the blob name of the resource sas
if url_parts[0].endswith("/"):
# If there is a separator at the end of blob name
# no need to append "/" before the text after the blob name
resource_sas = url_parts[0] + text + '?' + url_parts[1]
else:
resource_sas = url_parts[0] + "/" + text + '?' + url_parts[1]
return resource_sas
# get_resource_sas_from_share return the shared access signature for the given resource
# based on the share url.
def get_resource_sas_from_share(resource_name):
# Splitting the share URL to add the file or directory name to the SAS
url_parts = test_share_url.split("?")
# adding the file or directory name after the share name
resource_sas = url_parts[0] + "/" + resource_name + '?' + url_parts[1]
return resource_sas
def get_resource_sas_from_bfs(resource_name):
# Splitting the share URL to add the file or directory name to the SAS
url_parts = test_bfs_sas_account_url.split("?")
# adding the file or directory name after the share name
resource_sas = url_parts[0] + "/" + resource_name + '?' + url_parts[1]
return resource_sas
# get_resource_sas return the shared access signature for the given resource
# using the premium storage account container url.
def get_resource_sas_from_premium_container_sas(resource_name):
# Splitting the container URL to add the uploaded blob name to the SAS
url_parts = test_premium_account_contaier_url.split("?")
# adding the blob name after the container name
resource_sas = url_parts[0] + "/" + resource_name + '?' + url_parts[1]
return resource_sas
# parseAzcopyOutput parses the Azcopy Output in JSON format to give the final Azcopy Output in JSON Format
# Final Azcopy Output is the last JobSummary for the Job
# Azcopy Output can have more than one Summary for the Job
# parseAzcopyOutput returns the final JobSummary in JSON format.
def parseAzcopyOutput(s):
count = 0
output = ""
final_output = ""
# Split the lines
lines = s.split('\n')
# Iterating through the output in reverse order since last summary has to be considered.
# Increment the count when line is "}"
# Reduce the count when line is "{"
# append the line to final output
# When the count is 0, it means the last Summary has been traversed
for line in reversed(lines):
# If the line is empty, then continue
if line == "":
continue
elif line == '}':
count = count + 1
elif line == "{":
count = count - 1
if count >= 0:
if len(output) > 0:
output = output + '\n' + line
else:
output = line
if count == 0:
break
lines = output.split('\n')
# Since the lines were iterated in reverse order revering them again and
# concatenating the lines to get the final JobSummary
for line in reversed(lines):
if len(final_output) > 0:
final_output = final_output + '\n' + line
else:
final_output = line
x = json.loads(final_output, object_hook=lambda d: namedtuple('X', d.keys())(*d.values()))
return x.MessageContent
def get_resource_name(prefix=''):
return prefix + str(uuid.uuid4()).replace('-', '')
def get_random_bytes(size):
rand = random.Random()
result = bytearray(size)
for i in range(size):
result[i] = int(rand.random()*255) # random() is consistent between python 2 and 3
return bytes(result)
def create_hidden_file(path, file_name, data):
FILE_ATTRIBUTE_HIDDEN = 0x02
os_type = platform.system()
os_type = os_type.upper()
# For *nix add a '.' prefix.
prefix = '.' if os_type != "WINDOWS" else ''
file_name = prefix + file_name
file_path = os.path.join(path, file_name)
# Write file.
with open(file_path, 'w') as f:
f.write(data)
# For windows set file attribute.
if os_type == "WINDOWS":
ret = ctypes.windll.kernel32.SetFileAttributesW(file_path,
FILE_ATTRIBUTE_HIDDEN)
if not ret: # There was an error.
raise ctypes.WinError()
def create_file_in_path(path, file_name, data):
file_path = os.path.join(path, file_name)
with open(file_path, 'w') as f:
f.write(data)
return file_path