nightMARE/src/nightmare/analysis/compression/lznt1.py (59 lines of code) (raw):
# Rekall Memory Forensics
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Author: Michael Cohen scudette@google.com.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Decompression support for the LZNT1 compression algorithm.
Reference:
http://msdn.microsoft.com/en-us/library/jj665697.aspx
(2.5 LZNT1 Algorithm Details)
https://github.com/libyal/reviveit/
https://github.com/sleuthkit/sleuthkit/blob/develop/tsk/fs/ntfs.c
"""
import array
import struct
from io import BytesIO
class Lznt1:
def decompress(self, buf: bytes) -> bytes:
return decompress_data(buf)
__call__ = decompress
lznt1 = Lznt1()
def get_displacement(offset: int) -> int:
"""Calculate the displacement."""
result = 0
while offset >= 0x10:
offset >>= 1
result += 1
return result
DISPLACEMENT_TABLE = array.array("B", [get_displacement(x) for x in range(8192)])
COMPRESSED_MASK = 1 << 15
SIGNATURE_MASK = 3 << 12
SIZE_MASK = (1 << 12) - 1
TAG_MASKS = [(1 << i) for i in range(0, 8)]
def decompress_data(cdata: bytes) -> bytes:
"""Decompresses the data."""
in_fd = BytesIO(cdata)
output_fd = BytesIO()
block_end = 0
while in_fd.tell() < len(cdata):
block_offset = in_fd.tell()
uncompressed_chunk_offset = output_fd.tell()
block_header = struct.unpack("<H", in_fd.read(2))[0]
if block_header & SIGNATURE_MASK != SIGNATURE_MASK:
break
size = block_header & SIZE_MASK
block_end = block_offset + size + 3
if block_header & COMPRESSED_MASK:
while in_fd.tell() < block_end:
header = ord(in_fd.read(1))
for mask in TAG_MASKS:
if in_fd.tell() >= block_end:
break
if header & mask:
pointer = struct.unpack("<H", in_fd.read(2))[0]
displacement = DISPLACEMENT_TABLE[
output_fd.tell() - uncompressed_chunk_offset - 1
]
symbol_offset = (pointer >> (12 - displacement)) + 1
symbol_length = (pointer & (0xFFF >> displacement)) + 3
output_fd.seek(-symbol_offset, 2)
data = output_fd.read(symbol_length)
# Pad the data to make it fit.
if 0 < len(data) < symbol_length:
data = data * (symbol_length // len(data) + 1)
data = data[:symbol_length]
output_fd.seek(0, 2)
output_fd.write(data)
else:
data = in_fd.read(1)
output_fd.write(data)
else:
# Block is not compressed
data = in_fd.read(size + 1)
output_fd.write(data)
result = output_fd.getvalue()
return result