utils.py (123 lines of code) (raw):

import time import torch import random import boto3 import gzip import numpy as np from urllib.parse import urlparse def sigmoid(x): return 1 / (1 + np.exp(-x)) # class for getting binaries from s3 or local filesystem as a # torch dataset generator class BinaryDataLoader(torch.utils.data.Dataset): def __init__(self, uri_path, balance=True): self.s3_client = None # get uris and labels to train on uris, labels = [], [] for l in open(uri_path): uri, label = l[:-1].split(",") label = float(label) uris.append(uri) labels.append(label) # convert to np arrays for easier indexing uris = np.asarray(uris) labels = np.asarray(labels) # split into benign and malicious uris benign_uris = uris[labels == 0] malicious_uris = uris[labels == 1] # balance the dataset by throwing away samples in the majority class if balance: if len(benign_uris) > len(malicious_uris): benign_idxs = np.random.permutation(len(benign_uris)) benign_uris = benign_uris[benign_idxs[:len(malicious_uris)]] else: malicious_idxs = np.random.permutation(len(malicious_uris)) malicious_uris = malicious_uris[malicious_idxs[:len(malicious_uris)]] # finally, stitch everything together self.uris = np.concatenate([malicious_uris, benign_uris]) self.labels = np.concatenate([np.ones(len(malicious_uris)), np.zeros(len(benign_uris))]).astype(np.float32) return def __len__(self): return(len(self.uris)) def __getitem__(self, index): # grab a uri and label uri = self.uris[index] label = self.labels[index] # check if URI looks like an s3 key. if so, download key from s3. if uri[:5] == "s3://": # init s3 client if not yet done if self.s3_client is None: self.s3_client = boto3.Session().client('s3') # split s3 uri into bucket and key uri_parsed = urlparse(uri, allow_fragments=False) bucket, key = uri_parsed.netloc, uri_parsed.path[1:] # download sample from s3 sample_bytes = self.s3_client.get_object(Bucket=bucket, Key=key, RequestPayer='requester')["Body"].read() # otherwise, open from filesystem else: sample_bytes = open(uri, "rb").read() # if the sample is gzip'd, decompress it. if sample_bytes[:2] == b"\x1f\x8b": sample_bytes = gzip.decompress(sample_bytes) return sample_bytes, uri, float(label) # class for the "replay buffer" # # computing scores for every chunk in a large binary is expensive. to speed things up, # we keep track of a "replay buffer" of entire binaries and scores that we can sample # from. # # "timeout" variable controls how long to keep samples around for (seconds). class RpBuffer(): def __init__(self, timeout=600): self.blocks = dict() self.timeout = timeout # delete samples in the replay buffer that are older than "timeout" seconds. def cull_blocks(self): to_delete = set() # create a list of samples that are older than self.timeout curr_t = time.time() for block in self.blocks: block_t, sha, data, label, yps = self.blocks[block] if curr_t - block_t > self.timeout: to_delete.add(block) # print an informative message if len(to_delete) > 0: print("** Clearing out %d blocks from replay buffer**" % len(to_delete)) # clear out the blocks for block in to_delete: del self.blocks[block] # add a sample to the replay buffer def add_block(self, sha, data, label, yps): self.blocks[sha] = (time.time(), sha, data, label, yps) # update the score for a block of a sample in the replay buffer def update_score(self, sha, block_idx, yp_new): block_t, sha, data, label, yps = self.blocks[sha] yps[block_idx] = yp_new self.blocks[sha] = (block_t, sha, data, label, yps) # sample data from the replay buffer def get_samples(self, nsamples): # random sampling samples = [self.blocks[k] for k in self.blocks] random.shuffle(samples) return samples[:nsamples] # helper function for turning a sequence of bytes into a sequence of int64's for torch's # embedding layer def unpackbits(sample_bytes, seqlen): x = np.zeros((len(sample_bytes), seqlen), dtype=np.int64) for i in range(len(sample_bytes)): b = np.frombuffer(sample_bytes[i], dtype=np.uint8) x[i,:len(b)] = b # 0 is reserved for padding -- return bytes in the range of 1-257 return x + 1 # helper function for softmax computation. def softmax(x): x = x - np.max(x) x = np.exp(x) x = x / np.sum(x) return x # take a batch of binaries, feed them through the model, and save them off to the replay buffer. def update_buffers(model, sample_bytes, shas, labels, seqlen, nlen, rpbuffer, det_file): full_yps = np.zeros(len(sample_bytes)) full_labels = np.zeros(len(sample_bytes)) for idx, (s, sha, l) in enumerate(zip(sample_bytes, shas, labels)): # chunk the sample up into seqlen-byte chunks. offset_idxs = list(range(int(np.ceil(len(s) / seqlen)))) seqs = [s[o*seqlen:(o+1)*seqlen] for o in offset_idxs] # align last seq with end of file so we don't have a bunch of blocks # with empty stuff at the end if len(seqs) > 1: seqs[-1] = s[-seqlen:] # init storage for max score of each chunk yps = np.zeros(len(seqs)) # keep track of max output and sig associated with output max_yp = None max_sig = b"" # iterate through the chunks for b in range(len(seqs)): # if the chunk is smaller than the convolutional receptive field, we won't # get any scores. skip it. if len(seqs[b]) < nlen: continue seq = seqs[b] # does this do anything for memory/speed? maybe?? with torch.no_grad(): # feed forward the sample ngrams = torch.from_numpy(unpackbits([seq], seqlen)) yp = model.forward(ngrams) yp = yp.detach().cpu().numpy().squeeze()[:len(seq)-nlen+1] # get the sig and max score for the chunk max_sig_idx = np.argmax(yp) yp = yp[max_sig_idx] # update the max score/sig if max_yp is None or yp > max_yp: max_yp = yp max_sig = seq[max_sig_idx:max_sig_idx+nlen] # fill in the max score for the chunk yps[b] = yp # final score of the sample: max score over all the whole sample. full_yps[idx] = np.max(yps) full_labels[idx] = l # add whole sample to replay buffer rpbuffer.add_block(sha, seqs, l, yps) # write out information to the detections file det_file.write("%d %s %0.3f %s %s\n" % (l, sha, max_yp, max_sig.hex(), str(max_sig))) return full_yps, full_labels # helper function to update tensorboard plots/print stuff to stdout def write_stats(writer, niter, full_yp, labels): # compute accuracy across good and bad y = labels.flatten() acc_good = np.mean((full_yp[y < 0.5] < 0)) acc_bad = np.mean((full_yp[y > 0.5] > 0)) acc = (acc_good + acc_bad) / 2 print(niter, acc, acc_good, acc_bad) writer.add_scalar("acc/yp", acc, niter) writer.add_scalar("acc/good" , acc_good, niter) writer.add_scalar("acc/bad" , acc_bad, niter)