hoodini.utils.ncbi_api
1import concurrent.futures 2import itertools 3import time 4from concurrent.futures import ProcessPoolExecutor 5from functools import partial 6from pathlib import Path 7 8import polars as pl 9import requests 10import taxoniq 11 12from hoodini.utils.classes import IPGXMLFile 13from hoodini.utils.logging_utils import error, info, warn 14 15 16def nuc2ass(nucleotide_ids, apikey=None, temp_dir="temp", chunk_size=10, max_concurrent=9): 17 """ 18 Fetch nucleotide summaries and link nucleotide IDs to assemblies. 19 20 Parameters: 21 nucleotide_ids (list): List of nucleotide IDs to query. 22 api_key (str): NCBI API key. 23 temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp". 24 chunk_size (int, optional): Number of IDs to query per request. Defaults to 100. 25 26 Returns: 27 pl.DataFrame: DataFrame containing columns `AccessionVersion`, `AssemblyAccession`, `Taxid`, and `superkingdom`. 28 """ 29 link_list = create_ncbi_links( 30 chunk_list=nucleotide_ids, 31 engine="efetch", 32 chunk_size=chunk_size, 33 db="nuccore", 34 rettype="docsum", 35 retmode="xml", 36 apikey=apikey, 37 ) 38 download_files( 39 urls=link_list, folder=f"{temp_dir}/nucsum", max_concurrent_downloads=max_concurrent 40 ) 41 42 df_nucsum = parseXML(f"{temp_dir}/nucsum", "nucsum") 43 df_nucsum = df_nucsum[df_nucsum["doc_id"] != "0"] 44 45 df_nucsum = df_nucsum[["doc_id", "AccessionVersion"]] 46 47 link_list = create_ncbi_links( 48 chunk_list=df_nucsum["AccessionVersion"], 49 engine="elink", 50 chunk_size=chunk_size, 51 db="nuccore", 52 dbto="assembly", 53 retmode="xml", 54 apikey=apikey, 55 ) 56 download_files( 57 urls=link_list, folder=f"{temp_dir}/elink", max_concurrent_downloads=max_concurrent 58 ) 59 60 nuc2ass = parseXML(f"{temp_dir}/elink", "nuc2ass") 61 df_nucsum = df_nucsum.join( 62 nuc2ass[["id_list", "linked_id"]], left_on="doc_id", right_on="id_list", how="left" 63 ) 64 65 assembly_ids = df_nucsum["linked_id"].drop_nulls().unique().to_list() 66 link_list = create_ncbi_links( 67 chunk_list=assembly_ids, 68 engine="efetch", 69 chunk_size=chunk_size, 70 db="assembly", 71 rettype="docsum", 72 retmode="xml", 73 apikey=apikey, 74 ) 75 download_files( 76 urls=link_list, folder=f"{temp_dir}/asssum", max_concurrent_downloads=max_concurrent 77 ) 78 79 df_asssum = parseXML(f"{temp_dir}/asssum", "asssum") 80 81 dicc_tax = {} 82 for taxid in df_asssum["Taxid"].unique(): 83 t = taxoniq.Taxon(taxid) 84 dicc_tax[taxid] = {t.rank.name: t.scientific_name for t in t.ranked_lineage} 85 86 taxdf = pl.DataFrame(dicc_tax).T 87 df_asssum = df_asssum.join(taxdf, left_on="Taxid", right_index=True, how="left") 88 89 df_nucsum = df_nucsum.join( 90 df_asssum[["uid", "AssemblyAccession", "Taxid", "superkingdom"]], 91 left_on="linked_id", 92 right_on="uid", 93 how="left", 94 ) 95 df_nucsum = df_nucsum.dropna(subset=["AssemblyAccession"]) 96 df_nucsum["AssemblyAccession"] = df_nucsum.apply( 97 lambda x: ( 98 f"GCF_{x['AssemblyAccession'].split('_')[1]}" 99 if "_" in x["AccessionVersion"] 100 else f"GCA_{x['AssemblyAccession'].split('_')[1]}" 101 ), 102 how="horizontal", 103 ) 104 105 return df_nucsum[["AccessionVersion", "AssemblyAccession", "Taxid", "superkingdom"]] 106 107 108def nuc2len(nucleotide_ids, apikey, temp_dir="temp", chunk_size=100, max_concurrent=10): 109 """ 110 Fetch nucleotide summaries and link nucleotide IDs to assemblies. 111 112 Parameters: 113 nucleotide_ids (list): List of nucleotide IDs to query. 114 apikey (str): NCBI API key. 115 temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp". 116 chunk_size (int, optional): Number of IDs to query per request. Defaults to 100. 117 118 Returns: 119 pl.DataFrame: DataFrame containing columns `AccessionVersion`, `AssemblyAccession`, `Taxid`, and `superkingdom`. 120 """ 121 link_list = create_ncbi_links( 122 chunk_list=nucleotide_ids, 123 engine="efetch", 124 chunk_size=chunk_size, 125 db="nuccore", 126 rettype="docsum", 127 retmode="xml", 128 apikey=apikey, 129 ) 130 download_files( 131 urls=link_list, folder=f"{temp_dir}/nuclen", max_concurrent_downloads=max_concurrent 132 ) 133 134 df_nucsum = parseXML(f"{temp_dir}/nuclen", "nucsum") 135 df_nucsum = df_nucsum[df_nucsum["doc_id"] != "0"] 136 137 df_nucsum = df_nucsum[["AccessionVersion", "Length"]] 138 df_nucsum = df_nucsum.rename( 139 {"AccessionVersion": "nucleotide_id", "Length": "nucleotide_length"} 140 ) 141 return df_nucsum 142 143 144def chunked_iterable(iterable, size): 145 it = iter(iterable) 146 while True: 147 chunk = tuple(itertools.islice(it, size)) 148 if not chunk: 149 break 150 yield chunk 151 152 153def download_file(url, index, folder): 154 folder = Path(folder) 155 filename = folder / f"{index}.txt" 156 while True: 157 try: 158 response = requests.get(url, timeout=65) 159 if response.content: 160 if ( 161 "error" not in response.content.decode("utf-8") 162 and "Error" not in response.content.decode("utf-8") 163 and "ERROR" not in response.content.decode("utf-8") 164 ): 165 break 166 else: 167 warn("No response received, retrying...") 168 time.sleep(5) 169 except requests.exceptions.ChunkedEncodingError as e: 170 warn(f"Download interrupted (ChunkedEncodingError): {e}, retrying...") 171 time.sleep(5) 172 except requests.RequestException as e: 173 warn(f"General network error: {e}, retrying...") 174 time.sleep(5) 175 except Exception as e: 176 error(f"Unexpected error: {e}, aborting...") 177 break 178 179 with open(filename, "wb") as file: 180 file.write(response.content) 181 time.sleep(2) 182 183 184def download_files(urls, folder, max_concurrent_downloads): 185 folder = Path(folder) 186 folder.mkdir(parents=True, exist_ok=True) 187 with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_downloads) as executor: 188 for index, url in enumerate(urls): 189 info(f"Downloading {url} to {folder / f'{index}.txt'}") 190 executor.submit(download_file, url, index, folder) 191 192 193def create_ncbi_links( 194 chunk_list, chunk_size, db, retmode, apikey, engine=None, rettype=None, dbto=None 195): 196 assert engine in ["efetch", "elink"] 197 link_list = [] 198 for c in chunked_iterable(chunk_list, size=chunk_size): 199 base_url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{engine}.fcgi?" 200 if engine == "efetch": 201 chunk = ",".join([n.strip() for n in c if isinstance(n, str)]) 202 base_url += f"&db={db}&rettype={rettype}&id=" 203 elif engine == "elink": 204 if dbto is None: 205 raise ValueError("dbto parameter should be provided when using engine=elink") 206 chunk = "&id=".join([n.strip() for n in c]) 207 base_url += f"&dbfrom={db}&db={dbto}&id=" 208 end_url = "&retmode=" + retmode 209 if apikey: 210 end_url += "&api_key=" + apikey 211 url = base_url + chunk + end_url 212 link_list.append(url) 213 info(f"Created {len(link_list)} links for {engine} with chunk size {chunk_size}.") 214 return link_list 215 216 217def process_xml(file_path, mode): 218 """ 219 Function to process a single file and return the resulting dataframe. 220 221 Args: 222 file_path (str): Path to the XML file to be processed. 223 mode (str): The mode to use when processing the file ("ipg", "nucsum", etc.). 224 225 Returns: 226 DataFrame: Processed dataframe from the file. 227 """ 228 nuc2ass_file = IPGXMLFile(file_path) 229 return nuc2ass_file.to_dataframe(mode=mode) 230 231 232def parseXML(folder_path, mode): 233 """ 234 Parses XML files in a given folder based on the specified mode and returns a concatenated DataFrame. 235 236 Args: 237 folder_path (str): Path to the folder containing XML files. 238 mode (str): The mode to use for parsing ("ipg", "nucsum", "asssum", etc.). 239 240 Returns: 241 DataFrame: A concatenated DataFrame with the contents of all processed XML files. 242 """ 243 244 folder_path = Path(folder_path) 245 all_file_paths = [p for p in folder_path.iterdir() if p.is_file()] 246 247 final_df = pl.DataFrame() 248 249 with ProcessPoolExecutor() as executor: 250 results = executor.map(partial(process_xml, mode=mode), all_file_paths) 251 252 for df in results: 253 final_df = pl.concat([final_df, df], how="vertical") if final_df.height > 0 else df 254 255 return final_df
17def nuc2ass(nucleotide_ids, apikey=None, temp_dir="temp", chunk_size=10, max_concurrent=9): 18 """ 19 Fetch nucleotide summaries and link nucleotide IDs to assemblies. 20 21 Parameters: 22 nucleotide_ids (list): List of nucleotide IDs to query. 23 api_key (str): NCBI API key. 24 temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp". 25 chunk_size (int, optional): Number of IDs to query per request. Defaults to 100. 26 27 Returns: 28 pl.DataFrame: DataFrame containing columns `AccessionVersion`, `AssemblyAccession`, `Taxid`, and `superkingdom`. 29 """ 30 link_list = create_ncbi_links( 31 chunk_list=nucleotide_ids, 32 engine="efetch", 33 chunk_size=chunk_size, 34 db="nuccore", 35 rettype="docsum", 36 retmode="xml", 37 apikey=apikey, 38 ) 39 download_files( 40 urls=link_list, folder=f"{temp_dir}/nucsum", max_concurrent_downloads=max_concurrent 41 ) 42 43 df_nucsum = parseXML(f"{temp_dir}/nucsum", "nucsum") 44 df_nucsum = df_nucsum[df_nucsum["doc_id"] != "0"] 45 46 df_nucsum = df_nucsum[["doc_id", "AccessionVersion"]] 47 48 link_list = create_ncbi_links( 49 chunk_list=df_nucsum["AccessionVersion"], 50 engine="elink", 51 chunk_size=chunk_size, 52 db="nuccore", 53 dbto="assembly", 54 retmode="xml", 55 apikey=apikey, 56 ) 57 download_files( 58 urls=link_list, folder=f"{temp_dir}/elink", max_concurrent_downloads=max_concurrent 59 ) 60 61 nuc2ass = parseXML(f"{temp_dir}/elink", "nuc2ass") 62 df_nucsum = df_nucsum.join( 63 nuc2ass[["id_list", "linked_id"]], left_on="doc_id", right_on="id_list", how="left" 64 ) 65 66 assembly_ids = df_nucsum["linked_id"].drop_nulls().unique().to_list() 67 link_list = create_ncbi_links( 68 chunk_list=assembly_ids, 69 engine="efetch", 70 chunk_size=chunk_size, 71 db="assembly", 72 rettype="docsum", 73 retmode="xml", 74 apikey=apikey, 75 ) 76 download_files( 77 urls=link_list, folder=f"{temp_dir}/asssum", max_concurrent_downloads=max_concurrent 78 ) 79 80 df_asssum = parseXML(f"{temp_dir}/asssum", "asssum") 81 82 dicc_tax = {} 83 for taxid in df_asssum["Taxid"].unique(): 84 t = taxoniq.Taxon(taxid) 85 dicc_tax[taxid] = {t.rank.name: t.scientific_name for t in t.ranked_lineage} 86 87 taxdf = pl.DataFrame(dicc_tax).T 88 df_asssum = df_asssum.join(taxdf, left_on="Taxid", right_index=True, how="left") 89 90 df_nucsum = df_nucsum.join( 91 df_asssum[["uid", "AssemblyAccession", "Taxid", "superkingdom"]], 92 left_on="linked_id", 93 right_on="uid", 94 how="left", 95 ) 96 df_nucsum = df_nucsum.dropna(subset=["AssemblyAccession"]) 97 df_nucsum["AssemblyAccession"] = df_nucsum.apply( 98 lambda x: ( 99 f"GCF_{x['AssemblyAccession'].split('_')[1]}" 100 if "_" in x["AccessionVersion"] 101 else f"GCA_{x['AssemblyAccession'].split('_')[1]}" 102 ), 103 how="horizontal", 104 ) 105 106 return df_nucsum[["AccessionVersion", "AssemblyAccession", "Taxid", "superkingdom"]]
Fetch nucleotide summaries and link nucleotide IDs to assemblies.
Parameters: nucleotide_ids (list): List of nucleotide IDs to query. api_key (str): NCBI API key. temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp". chunk_size (int, optional): Number of IDs to query per request. Defaults to 100.
Returns:
pl.DataFrame: DataFrame containing columns AccessionVersion, AssemblyAccession, Taxid, and superkingdom.
109def nuc2len(nucleotide_ids, apikey, temp_dir="temp", chunk_size=100, max_concurrent=10): 110 """ 111 Fetch nucleotide summaries and link nucleotide IDs to assemblies. 112 113 Parameters: 114 nucleotide_ids (list): List of nucleotide IDs to query. 115 apikey (str): NCBI API key. 116 temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp". 117 chunk_size (int, optional): Number of IDs to query per request. Defaults to 100. 118 119 Returns: 120 pl.DataFrame: DataFrame containing columns `AccessionVersion`, `AssemblyAccession`, `Taxid`, and `superkingdom`. 121 """ 122 link_list = create_ncbi_links( 123 chunk_list=nucleotide_ids, 124 engine="efetch", 125 chunk_size=chunk_size, 126 db="nuccore", 127 rettype="docsum", 128 retmode="xml", 129 apikey=apikey, 130 ) 131 download_files( 132 urls=link_list, folder=f"{temp_dir}/nuclen", max_concurrent_downloads=max_concurrent 133 ) 134 135 df_nucsum = parseXML(f"{temp_dir}/nuclen", "nucsum") 136 df_nucsum = df_nucsum[df_nucsum["doc_id"] != "0"] 137 138 df_nucsum = df_nucsum[["AccessionVersion", "Length"]] 139 df_nucsum = df_nucsum.rename( 140 {"AccessionVersion": "nucleotide_id", "Length": "nucleotide_length"} 141 ) 142 return df_nucsum
Fetch nucleotide summaries and link nucleotide IDs to assemblies.
Parameters: nucleotide_ids (list): List of nucleotide IDs to query. apikey (str): NCBI API key. temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp". chunk_size (int, optional): Number of IDs to query per request. Defaults to 100.
Returns:
pl.DataFrame: DataFrame containing columns AccessionVersion, AssemblyAccession, Taxid, and superkingdom.
154def download_file(url, index, folder): 155 folder = Path(folder) 156 filename = folder / f"{index}.txt" 157 while True: 158 try: 159 response = requests.get(url, timeout=65) 160 if response.content: 161 if ( 162 "error" not in response.content.decode("utf-8") 163 and "Error" not in response.content.decode("utf-8") 164 and "ERROR" not in response.content.decode("utf-8") 165 ): 166 break 167 else: 168 warn("No response received, retrying...") 169 time.sleep(5) 170 except requests.exceptions.ChunkedEncodingError as e: 171 warn(f"Download interrupted (ChunkedEncodingError): {e}, retrying...") 172 time.sleep(5) 173 except requests.RequestException as e: 174 warn(f"General network error: {e}, retrying...") 175 time.sleep(5) 176 except Exception as e: 177 error(f"Unexpected error: {e}, aborting...") 178 break 179 180 with open(filename, "wb") as file: 181 file.write(response.content) 182 time.sleep(2)
185def download_files(urls, folder, max_concurrent_downloads): 186 folder = Path(folder) 187 folder.mkdir(parents=True, exist_ok=True) 188 with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_downloads) as executor: 189 for index, url in enumerate(urls): 190 info(f"Downloading {url} to {folder / f'{index}.txt'}") 191 executor.submit(download_file, url, index, folder)
194def create_ncbi_links( 195 chunk_list, chunk_size, db, retmode, apikey, engine=None, rettype=None, dbto=None 196): 197 assert engine in ["efetch", "elink"] 198 link_list = [] 199 for c in chunked_iterable(chunk_list, size=chunk_size): 200 base_url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{engine}.fcgi?" 201 if engine == "efetch": 202 chunk = ",".join([n.strip() for n in c if isinstance(n, str)]) 203 base_url += f"&db={db}&rettype={rettype}&id=" 204 elif engine == "elink": 205 if dbto is None: 206 raise ValueError("dbto parameter should be provided when using engine=elink") 207 chunk = "&id=".join([n.strip() for n in c]) 208 base_url += f"&dbfrom={db}&db={dbto}&id=" 209 end_url = "&retmode=" + retmode 210 if apikey: 211 end_url += "&api_key=" + apikey 212 url = base_url + chunk + end_url 213 link_list.append(url) 214 info(f"Created {len(link_list)} links for {engine} with chunk size {chunk_size}.") 215 return link_list
218def process_xml(file_path, mode): 219 """ 220 Function to process a single file and return the resulting dataframe. 221 222 Args: 223 file_path (str): Path to the XML file to be processed. 224 mode (str): The mode to use when processing the file ("ipg", "nucsum", etc.). 225 226 Returns: 227 DataFrame: Processed dataframe from the file. 228 """ 229 nuc2ass_file = IPGXMLFile(file_path) 230 return nuc2ass_file.to_dataframe(mode=mode)
Function to process a single file and return the resulting dataframe.
Args: file_path (str): Path to the XML file to be processed. mode (str): The mode to use when processing the file ("ipg", "nucsum", etc.).
Returns: DataFrame: Processed dataframe from the file.
233def parseXML(folder_path, mode): 234 """ 235 Parses XML files in a given folder based on the specified mode and returns a concatenated DataFrame. 236 237 Args: 238 folder_path (str): Path to the folder containing XML files. 239 mode (str): The mode to use for parsing ("ipg", "nucsum", "asssum", etc.). 240 241 Returns: 242 DataFrame: A concatenated DataFrame with the contents of all processed XML files. 243 """ 244 245 folder_path = Path(folder_path) 246 all_file_paths = [p for p in folder_path.iterdir() if p.is_file()] 247 248 final_df = pl.DataFrame() 249 250 with ProcessPoolExecutor() as executor: 251 results = executor.map(partial(process_xml, mode=mode), all_file_paths) 252 253 for df in results: 254 final_df = pl.concat([final_df, df], how="vertical") if final_df.height > 0 else df 255 256 return final_df
Parses XML files in a given folder based on the specified mode and returns a concatenated DataFrame.
Args: folder_path (str): Path to the folder containing XML files. mode (str): The mode to use for parsing ("ipg", "nucsum", "asssum", etc.).
Returns: DataFrame: A concatenated DataFrame with the contents of all processed XML files.