src/data_preprocess/subword.py (190 lines of code) (raw):

#!/usr/bin/env python # encoding: utf-8 ''' *Copyright (c) 2023, Alibaba Group; *Licensed under the Apache License, Version 2.0 (the "License"); *you may not use this file except in compliance with the License. *You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 *Unless required by applicable law or agreed to in writing, software *distributed under the License is distributed on an "AS IS" BASIS, *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *See the License for the specific language governing permissions and *limitations under the License. @author: Hey @email: sanyuan.**@**.com @tel: 137****6540 @datetime: 2022/12/1 11:20 @project: DeepProtFunc @file: subword @desc: xxxx Reference: https://github.com/kexinhuang12345/ESPF/blob/master/ ''' import os.path import codecs import argparse from subword_nmt.get_vocab import get_vocab from subword_nmt.apply_bpe import BPE from subword_nmt.learn_bpe import learn_bpe def get_args(): parser = argparse.ArgumentParser() parser.add_argument("--func", default="learn_bpe", type=str, required=True, choices=["learn_bpe", "apply_bpe", "learn_joint_bpe_and_vocab", "tokenize", "get_vocab", "subword_vocab_2_token_vocab"], help="subword running type.") '''For Learn and Apply''' parser.add_argument("--infile", type=str, default="../subword/rdrp/all_sequence.txt", help="corpus") parser.add_argument("--outfile", type=str, default="../subword/rdrp/protein_codes_rdrp_1000.txt", help="output filepath") '''For Learn''' parser.add_argument("--min_frequency", type=int, default=2, help="min frequency") parser.add_argument("--verbose", action="store_true", help="verbose") parser.add_argument("--is_dict", action="store_true", help="is dict") parser.add_argument("--num_symbols", type=int, default=10000, help="the specified vocab size") parser.add_argument("--num_workers", type=int, default=8, help="worker number") '''For Tokenize and Apply''' parser.add_argument("--codes_file", type=str, default="../subword/rdrp/protein_codes_rdrp_1000.txt", help="subword codes filepath") '''For Tokenize ''' parser.add_argument("--seq", type=str, default=None, help="the sequence that want to tokenize") run_args = parser.parse_args() return run_args def read_fasta(filepath, exclude): ''' read fasta file :param filepath: fasta filepath :param exclude: exclude fasta filepath :return: ''' exclude_ids = set() if exclude: if isinstance(exclude, str): exclude = [exclude] for p in exclude: with open(p, "r") as rfp: for line in rfp: protein_id = line.strip().split("|")[1] exclude_ids.add(protein_id) if isinstance(filepath, str): filepath = [filepath] dataset = [] for cur_filepath in filepath: total = 0 with open(cur_filepath, "r") as rfp: seq = "" uuid = "" for line in rfp: line = line.strip() if line.startswith(">"): if seq and len(seq) > 0: if len(exclude_ids) == 0: dataset.append([uuid, seq]) total += 1 else: strs = uuid.strip().split("|") if len(strs) <= 1 or strs[1] not in exclude_ids: dataset.append([uuid, seq]) total += 1 else: pass # print("in exclude list: %s" %uuid) uuid = line seq = "" else: seq += line if seq and uuid and len(seq) > 0: if len(exclude_ids) == 0: dataset.append([uuid, seq]) total += 1 else: strs = uuid.strip().split("|") if len(strs) <= 1 or strs[1] not in exclude_ids: dataset.append([uuid, seq]) total += 1 else: pass # print("in exclude list: %s" %uuid) print("%s: %d" %(cur_filepath, total)) return dataset def fasta_to_corpus(input_filepath, save_filepath): ''' fasta to sequence corpus :param input_filepath: :param save_filepath: :return: ''' dataset = read_fasta(input_filepath, exclude=None) with open(save_filepath, "w") as wfp: for item in dataset: wfp.write(item[1] + "\n") def learn(args): learn_bpe( infile=open(args.infile, "r"), outfile=open(args.outfile, "w"), min_frequency=args.min_frequency, verbose=args.verbose, is_dict=args.is_dict, num_symbols=args.num_symbols, num_workers=args.num_workers ) def apply(args): bpe_codes_prot = codecs.open(args.codes_file) bpe = BPE(codes=bpe_codes_prot) bpe.process_lines( args.infile, open(args.outfile, "w"), num_workers=args.num_workers ) def vocab(args): get_vocab( open(args.infile, "r"), open(args.outfile, "w") ) def subword_vocab_2_token_vocab(args): ''' transform subword results into vocab :param args: :return: ''' vocabs = set() with open(args.infile, "r") as rfp: for line in rfp: v = line.strip().split()[0].replace("@@", "") vocabs.add(v) vocabs = ['[PAD]', '[UNK]', '[SEP]', '[CLS]', '[MASK]'] + sorted(list(vocabs), key=lambda x:(len(x), x)) with open(args.outfile, "w") as wfp: for v in vocabs: wfp.write(v + "\n") def tokenize(args): bpe_codes_prot = codecs.open(args.codes_file) bpe = BPE(bpe_codes_prot, merges=-1, separator='') p = bpe.process_line(args.seq).split() return p if __name__ == "__main__": input_args = get_args() for attr, value in sorted(input_args.__dict__.items()): print("\t{}={}".format(attr, value)) if input_args.func != "tokenize": dir_path = os.path.dirname(input_args.outfile) if not os.path.exists(dir_path): print("Warning: ouput dir %s not exists, created!" % dir_path) os.makedirs(dir_path) if input_args.func == "learn_bpe": # python subword.py --func learn_bpe --infile ../subword/rdrp/all_sequence.txt --outfile ../subword/rdrp/protein/binary_class/protein_codes_rdrp_1000.txt --verbose # python subword.py --func learn_bpe --infile ../data/rdrp/all_rdrp_domain.fas --outfile ../subword/rdrp/protein/binary_class/all_rdrp_domain_codes_1000.txt --verbose --num_symbols 1000 # python subword.py --func learn_bpe --infile ../data/rdrp/all_rdrp_motifABC.fas --outfile ../subword/rdrp/protein/binary_class/all_rdrp_motif_codes_1000.txt --verbose --num_symbols 1000 # python subword.py --func learn_bpe --infile ../data/rdrp/RdRp20211115.fasta --outfile ../subword/rdrp/protein/binary_class/all_rdrp_codes_1000.txt --verbose --num_symbols 1000 if ".fas" in input_args.infile: # transform fasta to corpus print('fasta convect to corpus txt') savepath = os.path.join(os.path.dirname(input_args.infile), ".".join(os.path.basename(input_args.infile).split(".")[0:-1]) + ".txt") if os.path.exists(savepath): raise Exception("Save path :%s exsits!" % savepath) fasta_to_corpus(input_args.infile, savepath) input_args.infile = savepath learn(input_args) elif input_args.func == "tokenize": # python subword.py --func tokenize --seq IPKIDNPEFASQYRPISCCNIFYKCISKMFCSRLKAVVLHLVAENQAAFVQGSQARGGAMDRITTTTRKFE --codes_file ../subword/rdrp/protein_codes_rdrp_1000.txt print("input seq:") print(input_args.seq) print("input seq size:") print(len(input_args.seq)) token = tokenize(input_args) print("seq tokenize output:") print(token) print("seq tokenize size:") print(len(token)) elif input_args.func == "apply_bpe": # python subword.py --func apply_bpe --infile ../subword/rdrp/all_sequence.txt --codes_file ../subword/rdrp/protein/binary_class/protein_codes_rdrp_1000.txt --outfile ../subword/rdrp/protein/binary_class/all_sequence_token_1000.txt # python subword.py --func apply_bpe --infile ../data/rdrp/all_rdrp_domain.txt --codes_file ../subword/rdrp/protein/binary_class/all_rdrp_domain_codes_1000.txt --outfile ../subword/rdrp/protein/binary_class/all_rdrp_domain_token_1000.txt # python subword.py --func apply_bpe --infile ../data/rdrp/all_rdrp_motifABC.txt --codes_file ../subword/rdrp/protein/binary_class/all_rdrp_motif_codes_1000.txt --outfile ../subword/rdrp/protein/binary_class/all_rdrp_motif_token_1000.txt # python subword.py --func apply_bpe --infile ../data/rdrp/RdRp20211115.txt --codes_file ../subword/rdrp/protein/binary_class/all_rdrp_codes_1000.txt --outfile ../subword/rdrp/protein/binary_class/RdRp20211115_token_1000.txt if ".fas" in input_args.infile: # fasta,not sequence tokenization corpus raise Exception("the input file is fasta,not sequence tokenization corpus") apply(input_args) elif input_args.func == "get_vocab": # python subword.py --func get_vocab --infile ../subword/rdrp/protein/binary_class/all_sequence_token_1000.txt --outfile ../subword/rdrp/protein/binary_class/subword_vocab_1000.txt # python subword.py --func get_vocab --infile ../subword/rdrp/protein/binary_class/all_rdrp_domain_token_1000.txt --outfile ../subword/rdrp/protein/binary_class/all_rdrp_domain_vocab_1000.txt # python subword.py --func get_vocab --infile ../subword/rdrp/protein/binary_class/all_rdrp_motif_token_1000.txt --outfile ../subword/rdrp/protein/binary_class/all_rdrp_motif_vocab_1000.txt # python subword.py --func get_vocab --infile ../subword/rdrp/protein/binary_class/RdRp20211115_token_1000.txt --outfile ../subword/rdrp/protein/binary_class/RdRp20211115_vocab_1000.txt vocab(input_args) elif input_args.func == "subword_vocab_2_token_vocab": # python subword.py --func subword_vocab_2_token_vocab --infile ../subword/rdrp/protein/binary_class/subword_vocab_1000.tx --outfile ../vocab/rdrp/protein/binary_class/subword_vocab_1000.txt # python subword.py --func subword_vocab_2_token_vocab --infile ../subword/rdrp/protein/binary_class/all_rdrp_domain_vocab_1000.txt --outfile ../vocab/rdrp/aprotein/binary_class/ll_rdrp_domain_vocab_1000.txt # python subword.py --func subword_vocab_2_token_vocab --infile ../subword/rdrp/protein/binary_class/all_rdrp_motif_vocab_1000.txt --outfile ../vocab/rdrp/protein/binary_class/all_rdrp_motif_vocab_1000.txt # python subword.py --func subword_vocab_2_token_vocab --infile ../subword/rdrp/protein/binary_class/RdRp20211115_vocab_1000.txt --outfile ../vocab/rdrp/protein/binary_class/RdRp20211115_vocab_1000.txt subword_vocab_2_token_vocab(input_args)