management/python/lib/qlslibs/utils.py (139 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
Module: qlslibs.utils
Contains helper functions for qpid_qls_analyze.
"""
import os
import qlslibs.jrnl
import stat
import string
import struct
import subprocess
import zlib
DEFAULT_DBLK_SIZE = 128
DEFAULT_SBLK_SIZE = 4096 # 32 dblks
DEFAULT_SBLK_SIZE_KB = DEFAULT_SBLK_SIZE / 1024
DEFAULT_RECORD_VERSION = 2
DEFAULT_HEADER_SIZE_SBLKS = 1
def adler32(data):
"""return the adler32 checksum of data"""
return zlib.adler32(data) & 0xffffffff
def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data):
"""Helper function to construct a record with xid, data (where applicable) and consistent tail with checksum"""
record_class = qlslibs.jrnl.CLASSES.get(magic[-1])
record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags, journal_file.file_header.serial, record_id)
xid_length = len(xid) if xid is not None else 0
if isinstance(record, qlslibs.jrnl.EnqueueRecord):
data_length = len(data) if data is not None else 0
record.init(None, xid_length, data_length)
elif isinstance(record, qlslibs.jrnl.DequeueRecord):
record.init(None, dequeue_record_id, xid_length)
elif isinstance(record, qlslibs.jrnl.TransactionRecord):
record.init(None, xid_length)
else:
raise qlslibs.err.InvalidClassError(record.__class__.__name__)
if xid is not None:
record.xid = xid
record.xid_complete = True
if data is not None:
record.data = data
record.data_complete = True
record.record_tail = _mk_record_tail(record)
return record
def efp_directory_size(directory_name):
""""Decode the directory name in the format NNNk to a numeric size, where NNN is a number string"""
try:
if directory_name[-1] == 'k':
return int(directory_name[:-1])
except ValueError:
pass
return 0
def format_data(data, data_size=None, show_data_flag=True, txtest_flag=False):
"""Format binary data for printing"""
return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False, txtest_flag)
def format_xid(xid, xid_size=None, show_xid_flag=True):
"""Format binary XID for printing"""
return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True, False)
def get_avail_disk_space(path):
df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE)
output = df_proc.communicate()[0]
return int(output.split('\n')[1].split()[3])
def has_write_permission(path):
stat_info = os.stat(path)
return bool(stat_info.st_mode & stat.S_IRGRP)
def inv_str(in_string):
"""Perform a binary 1's compliment (invert all bits) on a binary string"""
istr = ''
for index in range(0, len(in_string)):
istr += chr(~ord(in_string[index]) & 0xff)
return istr
def load(file_handle, klass):
"""Load a record of class klass from a file"""
args = load_args(file_handle, klass)
subclass = klass.discriminate(args)
result = subclass(*args) # create instance of record
if subclass != klass:
result.init(*load_args(file_handle, subclass))
return result
def load_args(file_handle, klass):
"""Load the arguments from class klass"""
size = struct.calcsize(klass.FORMAT)
foffs = file_handle.tell(),
fbin = file_handle.read(size)
if len(fbin) != size:
raise qlslibs.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name)
return foffs + struct.unpack(klass.FORMAT, fbin)
def load_data(file_handle, element, element_size):
"""Read element_size bytes of binary data from file_handle into element"""
if element_size == 0:
return element, True
if element is None:
element = file_handle.read(element_size)
else:
read_size = element_size - len(element)
element += file_handle.read(read_size)
return element, len(element) == element_size
def skip(file_handle, boundary):
"""Read and discard disk bytes until the next multiple of boundary"""
if not file_handle.closed:
file_handle.read(_rem_bytes_in_block(file_handle, boundary))
#--- protected functions ---
def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag, txtest_flag):
"""Format binary XID for printing"""
if bin_str is None and bin_size is not None:
if bin_size > 0:
raise err_class(bin_size, len(bin_str), bin_str)
return ''
if bin_size is None:
bin_size = len(bin_str)
elif bin_size != len(bin_str):
raise err_class(bin_size, len(bin_str), bin_str)
out_str = '%s(%d)' % (prefix, bin_size)
if txtest_flag:
out_str += '=\'%s\'' % _txtest_msg_str(bin_str)
elif show_bin_flag:
if _is_printable(bin_str):
binstr = '"%s"' % _split_str(bin_str)
elif hex_num_flag:
binstr = '0x%s' % _str_to_hex_num(bin_str)
else:
binstr = _hex_split_str(bin_str, 50, 10, 10)
out_str += '=\'%s\'' % binstr
return out_str
def _hex_str(in_str, begin, end):
"""Return a binary string as a hex string"""
hstr = ''
for index in range(begin, end):
if _is_printable(in_str[index]):
hstr += in_str[index]
else:
hstr += '\\%02x' % ord(in_str[index])
return hstr
def _hex_split_str(in_str, split_size, head_size, tail_size):
"""Split a hex string into two parts separated by an ellipsis"""
if len(in_str) <= split_size:
return _hex_str(in_str, 0, len(in_str))
return _hex_str(in_str, 0, head_size) + ' ... ' + _hex_str(in_str, len(in_str)-tail_size, len(in_str))
def _txtest_msg_str(bin_str):
"""Extract the message number used in qpid-txtest"""
msg_index = bin_str.find('msg')
if msg_index >= 0:
end_index = bin_str.find('\x00', msg_index)
assert end_index >= 0
return bin_str[msg_index:end_index]
return None
def _is_printable(in_str):
"""Return True if in_str in printable; False otherwise."""
for this_char in in_str:
if this_char not in string.letters and this_char not in string.digits and this_char not in string.punctuation:
return False
return True
def _mk_record_tail(record):
record_tail = qlslibs.jrnl.RecordTail(None)
record_tail.xmagic = inv_str(record.magic)
record_tail.checksum = adler32(record.checksum_encode())
record_tail.serial = record.serial
record_tail.record_id = record.record_id
return record_tail
def _rem_bytes_in_block(file_handle, block_size):
"""Return the remaining bytes in a block"""
foffs = file_handle.tell()
return (_size_in_blocks(foffs, block_size) * block_size) - foffs
def _size_in_blocks(size, block_size):
"""Return the size in terms of data blocks"""
return int((size + block_size - 1) / block_size)
def _split_str(in_str, split_size = 50):
"""Split a string into two parts separated by an ellipsis if it is longer than split_size"""
if len(in_str) < split_size:
return in_str
return in_str[:25] + ' ... ' + in_str[-25:]
def _str_to_hex_num(in_str):
"""Turn a string into a hex number representation, little endian assumed (ie LSB is first, MSB is last)"""
return ''.join(x.encode('hex') for x in reversed(in_str))