src/geo_map/get_biosample_from_update.py (87 lines of code) (raw):
#!/usr/bin/env python
# encoding: utf-8
'''
*Copyright (c) 2023, Alibaba Group;
*Licensed under the Apache License, Version 2.0 (the "License");
*you may not use this file except in compliance with the License.
*You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*Unless required by applicable law or agreed to in writing, software
*distributed under the License is distributed on an "AS IS" BASIS,
*WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*See the License for the specific language governing permissions and
*limitations under the License.
@author: Hey
@email: sanyuan.**@**.com
@tel: 137****6540
@datetime: 2023/3/11 20:09
@project: DeepProtFunc
@file: get_biosample_from_entrez
@desc: update biosample id from data/00all_SraRunInfo_new.csv
'''
import csv, argparse
import os.path
parser = argparse.ArgumentParser(description='Extract BioSample metadata from Updated File.')
parser.add_argument('-i', '--input', metavar=("in"), required=True,
type=str, help='Input file of sra list')
parser.add_argument('-o', '--output', metavar=("out"), required=True,
type=str, help='Output filename presenting lookup results')
parser.add_argument('-idx', '--sra_idx', required=True,
type=int, help='sra id col index in sra file')
args = parser.parse_args()
sra_query_list = []
with open(args.input, 'r') as rfp:
reader = csv.reader(rfp, delimiter='\t')
next(reader)
for row in reader:
sra = row[args.sra_idx]
sra_query_list.append(sra)
exists_sra_2_biosample = {}
if os.path.exists(args.output):
with open(args.output, "r") as rfp:
reader = csv.reader(rfp, delimiter=',')
header = next(reader)
for row in reader:
sra_id = row[0]
biosample_id = row[1]
exists_sra_2_biosample[sra_id] = set([biosample_id])
print("exists biosample sra size: %d" % len(exists_sra_2_biosample))
sra_2_biosample_update = {}
updated = []
with open("./data/00all_SraRunInfo_new.csv", "r") as rfp:
reader = csv.reader(rfp)
BioSample_Idx = 0
BioProject_Idx = 0
Run_Idx = 0
cnt = 0
for row in reader:
cnt += 1
if cnt == 1:
BioSample_Idx = row.index("BioSample")
BioProject_Idx = row.index("BioProject")
Run_Idx = row.index("Run")
continue
bioSample = row[BioSample_Idx]
sra = row[Run_Idx]
sra_2_biosample_update[sra] = bioSample
if sra in sra_query_list and (sra not in exists_sra_2_biosample or bioSample not in exists_sra_2_biosample[sra]):
updated.append([sra, bioSample])
not_exists_biosample_sras = set(sra_query_list).difference(exists_sra_2_biosample.keys())
exist_updated_biosample_sras = set(sra_query_list).intersection(sra_2_biosample_update.keys())
print("not_exists_biosample_sras size: %d" % len(not_exists_biosample_sras))
print("exist updated biosample size: %d" % len(exist_updated_biosample_sras))
print("updated len: %d" % len(updated))
unfound_biosample_sra_after_update = not_exists_biosample_sras.difference(set(sra_2_biosample_update.keys()))
print("unfound biosample sra after update size: %d" % len(unfound_biosample_sra_after_update))
exists_flag = False
if os.path.exists(args.output):
exists_flag = True
with open(args.output, "a+") as wfp:
writer = csv.writer(wfp)
if not exists_flag:
writer.writerow(["sra", "biosample"])
for item in updated:
writer.writerow(item)
with open("./data/unfound_biosample_sra_after_update.txt", "w") as wfp:
if unfound_biosample_sra_after_update:
for sra in unfound_biosample_sra_after_update:
wfp.write("%s\n" % sra)
print("-"*25 + "Get Biosample by Updating Done" + "-"*25)