hoodini.utils.ncbi_api

  1import concurrent.futures
  2import itertools
  3import time
  4from concurrent.futures import ProcessPoolExecutor
  5from functools import partial
  6from pathlib import Path
  7
  8import polars as pl
  9import requests
 10import taxoniq
 11
 12from hoodini.utils.classes import IPGXMLFile
 13from hoodini.utils.logging_utils import error, info, warn
 14
 15
 16def nuc2ass(nucleotide_ids, apikey=None, temp_dir="temp", chunk_size=10, max_concurrent=9):
 17    """
 18    Fetch nucleotide summaries and link nucleotide IDs to assemblies.
 19
 20    Parameters:
 21        nucleotide_ids (list): List of nucleotide IDs to query.
 22        api_key (str): NCBI API key.
 23        temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp".
 24        chunk_size (int, optional): Number of IDs to query per request. Defaults to 100.
 25
 26    Returns:
 27        pl.DataFrame: DataFrame containing columns `AccessionVersion`, `AssemblyAccession`, `Taxid`, and `superkingdom`.
 28    """
 29    link_list = create_ncbi_links(
 30        chunk_list=nucleotide_ids,
 31        engine="efetch",
 32        chunk_size=chunk_size,
 33        db="nuccore",
 34        rettype="docsum",
 35        retmode="xml",
 36        apikey=apikey,
 37    )
 38    download_files(
 39        urls=link_list, folder=f"{temp_dir}/nucsum", max_concurrent_downloads=max_concurrent
 40    )
 41
 42    df_nucsum = parseXML(f"{temp_dir}/nucsum", "nucsum")
 43    df_nucsum = df_nucsum[df_nucsum["doc_id"] != "0"]
 44
 45    df_nucsum = df_nucsum[["doc_id", "AccessionVersion"]]
 46
 47    link_list = create_ncbi_links(
 48        chunk_list=df_nucsum["AccessionVersion"],
 49        engine="elink",
 50        chunk_size=chunk_size,
 51        db="nuccore",
 52        dbto="assembly",
 53        retmode="xml",
 54        apikey=apikey,
 55    )
 56    download_files(
 57        urls=link_list, folder=f"{temp_dir}/elink", max_concurrent_downloads=max_concurrent
 58    )
 59
 60    nuc2ass = parseXML(f"{temp_dir}/elink", "nuc2ass")
 61    df_nucsum = df_nucsum.join(
 62        nuc2ass[["id_list", "linked_id"]], left_on="doc_id", right_on="id_list", how="left"
 63    )
 64
 65    assembly_ids = df_nucsum["linked_id"].drop_nulls().unique().to_list()
 66    link_list = create_ncbi_links(
 67        chunk_list=assembly_ids,
 68        engine="efetch",
 69        chunk_size=chunk_size,
 70        db="assembly",
 71        rettype="docsum",
 72        retmode="xml",
 73        apikey=apikey,
 74    )
 75    download_files(
 76        urls=link_list, folder=f"{temp_dir}/asssum", max_concurrent_downloads=max_concurrent
 77    )
 78
 79    df_asssum = parseXML(f"{temp_dir}/asssum", "asssum")
 80
 81    dicc_tax = {}
 82    for taxid in df_asssum["Taxid"].unique():
 83        t = taxoniq.Taxon(taxid)
 84        dicc_tax[taxid] = {t.rank.name: t.scientific_name for t in t.ranked_lineage}
 85
 86    taxdf = pl.DataFrame(dicc_tax).T
 87    df_asssum = df_asssum.join(taxdf, left_on="Taxid", right_index=True, how="left")
 88
 89    df_nucsum = df_nucsum.join(
 90        df_asssum[["uid", "AssemblyAccession", "Taxid", "superkingdom"]],
 91        left_on="linked_id",
 92        right_on="uid",
 93        how="left",
 94    )
 95    df_nucsum = df_nucsum.dropna(subset=["AssemblyAccession"])
 96    df_nucsum["AssemblyAccession"] = df_nucsum.apply(
 97        lambda x: (
 98            f"GCF_{x['AssemblyAccession'].split('_')[1]}"
 99            if "_" in x["AccessionVersion"]
100            else f"GCA_{x['AssemblyAccession'].split('_')[1]}"
101        ),
102        how="horizontal",
103    )
104
105    return df_nucsum[["AccessionVersion", "AssemblyAccession", "Taxid", "superkingdom"]]
106
107
108def nuc2len(nucleotide_ids, apikey, temp_dir="temp", chunk_size=100, max_concurrent=10):
109    """
110    Fetch nucleotide summaries and link nucleotide IDs to assemblies.
111
112    Parameters:
113        nucleotide_ids (list): List of nucleotide IDs to query.
114        apikey (str): NCBI API key.
115        temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp".
116        chunk_size (int, optional): Number of IDs to query per request. Defaults to 100.
117
118    Returns:
119        pl.DataFrame: DataFrame containing columns `AccessionVersion`, `AssemblyAccession`, `Taxid`, and `superkingdom`.
120    """
121    link_list = create_ncbi_links(
122        chunk_list=nucleotide_ids,
123        engine="efetch",
124        chunk_size=chunk_size,
125        db="nuccore",
126        rettype="docsum",
127        retmode="xml",
128        apikey=apikey,
129    )
130    download_files(
131        urls=link_list, folder=f"{temp_dir}/nuclen", max_concurrent_downloads=max_concurrent
132    )
133
134    df_nucsum = parseXML(f"{temp_dir}/nuclen", "nucsum")
135    df_nucsum = df_nucsum[df_nucsum["doc_id"] != "0"]
136
137    df_nucsum = df_nucsum[["AccessionVersion", "Length"]]
138    df_nucsum = df_nucsum.rename(
139        {"AccessionVersion": "nucleotide_id", "Length": "nucleotide_length"}
140    )
141    return df_nucsum
142
143
144def chunked_iterable(iterable, size):
145    it = iter(iterable)
146    while True:
147        chunk = tuple(itertools.islice(it, size))
148        if not chunk:
149            break
150        yield chunk
151
152
153def download_file(url, index, folder):
154    folder = Path(folder)
155    filename = folder / f"{index}.txt"
156    while True:
157        try:
158            response = requests.get(url, timeout=65)
159            if response.content:
160                if (
161                    "error" not in response.content.decode("utf-8")
162                    and "Error" not in response.content.decode("utf-8")
163                    and "ERROR" not in response.content.decode("utf-8")
164                ):
165                    break
166            else:
167                warn("No response received, retrying...")
168                time.sleep(5)
169        except requests.exceptions.ChunkedEncodingError as e:
170            warn(f"Download interrupted (ChunkedEncodingError): {e}, retrying...")
171            time.sleep(5)
172        except requests.RequestException as e:
173            warn(f"General network error: {e}, retrying...")
174            time.sleep(5)
175        except Exception as e:
176            error(f"Unexpected error: {e}, aborting...")
177            break
178
179    with open(filename, "wb") as file:
180        file.write(response.content)
181    time.sleep(2)
182
183
184def download_files(urls, folder, max_concurrent_downloads):
185    folder = Path(folder)
186    folder.mkdir(parents=True, exist_ok=True)
187    with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_downloads) as executor:
188        for index, url in enumerate(urls):
189            info(f"Downloading {url} to {folder / f'{index}.txt'}")
190            executor.submit(download_file, url, index, folder)
191
192
193def create_ncbi_links(
194    chunk_list, chunk_size, db, retmode, apikey, engine=None, rettype=None, dbto=None
195):
196    assert engine in ["efetch", "elink"]
197    link_list = []
198    for c in chunked_iterable(chunk_list, size=chunk_size):
199        base_url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{engine}.fcgi?"
200        if engine == "efetch":
201            chunk = ",".join([n.strip() for n in c if isinstance(n, str)])
202            base_url += f"&db={db}&rettype={rettype}&id="
203        elif engine == "elink":
204            if dbto is None:
205                raise ValueError("dbto parameter should be provided when using engine=elink")
206            chunk = "&id=".join([n.strip() for n in c])
207            base_url += f"&dbfrom={db}&db={dbto}&id="
208        end_url = "&retmode=" + retmode
209        if apikey:
210            end_url += "&api_key=" + apikey
211        url = base_url + chunk + end_url
212        link_list.append(url)
213    info(f"Created {len(link_list)} links for {engine} with chunk size {chunk_size}.")
214    return link_list
215
216
217def process_xml(file_path, mode):
218    """
219    Function to process a single file and return the resulting dataframe.
220
221    Args:
222    file_path (str): Path to the XML file to be processed.
223    mode (str): The mode to use when processing the file ("ipg", "nucsum", etc.).
224
225    Returns:
226    DataFrame: Processed dataframe from the file.
227    """
228    nuc2ass_file = IPGXMLFile(file_path)
229    return nuc2ass_file.to_dataframe(mode=mode)
230
231
232def parseXML(folder_path, mode):
233    """
234    Parses XML files in a given folder based on the specified mode and returns a concatenated DataFrame.
235
236    Args:
237    folder_path (str): Path to the folder containing XML files.
238    mode (str): The mode to use for parsing ("ipg", "nucsum", "asssum", etc.).
239
240    Returns:
241    DataFrame: A concatenated DataFrame with the contents of all processed XML files.
242    """
243
244    folder_path = Path(folder_path)
245    all_file_paths = [p for p in folder_path.iterdir() if p.is_file()]
246
247    final_df = pl.DataFrame()
248
249    with ProcessPoolExecutor() as executor:
250        results = executor.map(partial(process_xml, mode=mode), all_file_paths)
251
252        for df in results:
253            final_df = pl.concat([final_df, df], how="vertical") if final_df.height > 0 else df
254
255    return final_df
def nuc2ass( nucleotide_ids, apikey=None, temp_dir='temp', chunk_size=10, max_concurrent=9):
 17def nuc2ass(nucleotide_ids, apikey=None, temp_dir="temp", chunk_size=10, max_concurrent=9):
 18    """
 19    Fetch nucleotide summaries and link nucleotide IDs to assemblies.
 20
 21    Parameters:
 22        nucleotide_ids (list): List of nucleotide IDs to query.
 23        api_key (str): NCBI API key.
 24        temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp".
 25        chunk_size (int, optional): Number of IDs to query per request. Defaults to 100.
 26
 27    Returns:
 28        pl.DataFrame: DataFrame containing columns `AccessionVersion`, `AssemblyAccession`, `Taxid`, and `superkingdom`.
 29    """
 30    link_list = create_ncbi_links(
 31        chunk_list=nucleotide_ids,
 32        engine="efetch",
 33        chunk_size=chunk_size,
 34        db="nuccore",
 35        rettype="docsum",
 36        retmode="xml",
 37        apikey=apikey,
 38    )
 39    download_files(
 40        urls=link_list, folder=f"{temp_dir}/nucsum", max_concurrent_downloads=max_concurrent
 41    )
 42
 43    df_nucsum = parseXML(f"{temp_dir}/nucsum", "nucsum")
 44    df_nucsum = df_nucsum[df_nucsum["doc_id"] != "0"]
 45
 46    df_nucsum = df_nucsum[["doc_id", "AccessionVersion"]]
 47
 48    link_list = create_ncbi_links(
 49        chunk_list=df_nucsum["AccessionVersion"],
 50        engine="elink",
 51        chunk_size=chunk_size,
 52        db="nuccore",
 53        dbto="assembly",
 54        retmode="xml",
 55        apikey=apikey,
 56    )
 57    download_files(
 58        urls=link_list, folder=f"{temp_dir}/elink", max_concurrent_downloads=max_concurrent
 59    )
 60
 61    nuc2ass = parseXML(f"{temp_dir}/elink", "nuc2ass")
 62    df_nucsum = df_nucsum.join(
 63        nuc2ass[["id_list", "linked_id"]], left_on="doc_id", right_on="id_list", how="left"
 64    )
 65
 66    assembly_ids = df_nucsum["linked_id"].drop_nulls().unique().to_list()
 67    link_list = create_ncbi_links(
 68        chunk_list=assembly_ids,
 69        engine="efetch",
 70        chunk_size=chunk_size,
 71        db="assembly",
 72        rettype="docsum",
 73        retmode="xml",
 74        apikey=apikey,
 75    )
 76    download_files(
 77        urls=link_list, folder=f"{temp_dir}/asssum", max_concurrent_downloads=max_concurrent
 78    )
 79
 80    df_asssum = parseXML(f"{temp_dir}/asssum", "asssum")
 81
 82    dicc_tax = {}
 83    for taxid in df_asssum["Taxid"].unique():
 84        t = taxoniq.Taxon(taxid)
 85        dicc_tax[taxid] = {t.rank.name: t.scientific_name for t in t.ranked_lineage}
 86
 87    taxdf = pl.DataFrame(dicc_tax).T
 88    df_asssum = df_asssum.join(taxdf, left_on="Taxid", right_index=True, how="left")
 89
 90    df_nucsum = df_nucsum.join(
 91        df_asssum[["uid", "AssemblyAccession", "Taxid", "superkingdom"]],
 92        left_on="linked_id",
 93        right_on="uid",
 94        how="left",
 95    )
 96    df_nucsum = df_nucsum.dropna(subset=["AssemblyAccession"])
 97    df_nucsum["AssemblyAccession"] = df_nucsum.apply(
 98        lambda x: (
 99            f"GCF_{x['AssemblyAccession'].split('_')[1]}"
100            if "_" in x["AccessionVersion"]
101            else f"GCA_{x['AssemblyAccession'].split('_')[1]}"
102        ),
103        how="horizontal",
104    )
105
106    return df_nucsum[["AccessionVersion", "AssemblyAccession", "Taxid", "superkingdom"]]

Fetch nucleotide summaries and link nucleotide IDs to assemblies.

Parameters: nucleotide_ids (list): List of nucleotide IDs to query. api_key (str): NCBI API key. temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp". chunk_size (int, optional): Number of IDs to query per request. Defaults to 100.

Returns: pl.DataFrame: DataFrame containing columns AccessionVersion, AssemblyAccession, Taxid, and superkingdom.

def nuc2len( nucleotide_ids, apikey, temp_dir='temp', chunk_size=100, max_concurrent=10):
109def nuc2len(nucleotide_ids, apikey, temp_dir="temp", chunk_size=100, max_concurrent=10):
110    """
111    Fetch nucleotide summaries and link nucleotide IDs to assemblies.
112
113    Parameters:
114        nucleotide_ids (list): List of nucleotide IDs to query.
115        apikey (str): NCBI API key.
116        temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp".
117        chunk_size (int, optional): Number of IDs to query per request. Defaults to 100.
118
119    Returns:
120        pl.DataFrame: DataFrame containing columns `AccessionVersion`, `AssemblyAccession`, `Taxid`, and `superkingdom`.
121    """
122    link_list = create_ncbi_links(
123        chunk_list=nucleotide_ids,
124        engine="efetch",
125        chunk_size=chunk_size,
126        db="nuccore",
127        rettype="docsum",
128        retmode="xml",
129        apikey=apikey,
130    )
131    download_files(
132        urls=link_list, folder=f"{temp_dir}/nuclen", max_concurrent_downloads=max_concurrent
133    )
134
135    df_nucsum = parseXML(f"{temp_dir}/nuclen", "nucsum")
136    df_nucsum = df_nucsum[df_nucsum["doc_id"] != "0"]
137
138    df_nucsum = df_nucsum[["AccessionVersion", "Length"]]
139    df_nucsum = df_nucsum.rename(
140        {"AccessionVersion": "nucleotide_id", "Length": "nucleotide_length"}
141    )
142    return df_nucsum

Fetch nucleotide summaries and link nucleotide IDs to assemblies.

Parameters: nucleotide_ids (list): List of nucleotide IDs to query. apikey (str): NCBI API key. temp_dir (str, optional): Directory to store temporary XML files. Defaults to "temp". chunk_size (int, optional): Number of IDs to query per request. Defaults to 100.

Returns: pl.DataFrame: DataFrame containing columns AccessionVersion, AssemblyAccession, Taxid, and superkingdom.

def chunked_iterable(iterable, size):
145def chunked_iterable(iterable, size):
146    it = iter(iterable)
147    while True:
148        chunk = tuple(itertools.islice(it, size))
149        if not chunk:
150            break
151        yield chunk
def download_file(url, index, folder):
154def download_file(url, index, folder):
155    folder = Path(folder)
156    filename = folder / f"{index}.txt"
157    while True:
158        try:
159            response = requests.get(url, timeout=65)
160            if response.content:
161                if (
162                    "error" not in response.content.decode("utf-8")
163                    and "Error" not in response.content.decode("utf-8")
164                    and "ERROR" not in response.content.decode("utf-8")
165                ):
166                    break
167            else:
168                warn("No response received, retrying...")
169                time.sleep(5)
170        except requests.exceptions.ChunkedEncodingError as e:
171            warn(f"Download interrupted (ChunkedEncodingError): {e}, retrying...")
172            time.sleep(5)
173        except requests.RequestException as e:
174            warn(f"General network error: {e}, retrying...")
175            time.sleep(5)
176        except Exception as e:
177            error(f"Unexpected error: {e}, aborting...")
178            break
179
180    with open(filename, "wb") as file:
181        file.write(response.content)
182    time.sleep(2)
def download_files(urls, folder, max_concurrent_downloads):
185def download_files(urls, folder, max_concurrent_downloads):
186    folder = Path(folder)
187    folder.mkdir(parents=True, exist_ok=True)
188    with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_downloads) as executor:
189        for index, url in enumerate(urls):
190            info(f"Downloading {url} to {folder / f'{index}.txt'}")
191            executor.submit(download_file, url, index, folder)
def process_xml(file_path, mode):
218def process_xml(file_path, mode):
219    """
220    Function to process a single file and return the resulting dataframe.
221
222    Args:
223    file_path (str): Path to the XML file to be processed.
224    mode (str): The mode to use when processing the file ("ipg", "nucsum", etc.).
225
226    Returns:
227    DataFrame: Processed dataframe from the file.
228    """
229    nuc2ass_file = IPGXMLFile(file_path)
230    return nuc2ass_file.to_dataframe(mode=mode)

Function to process a single file and return the resulting dataframe.

Args: file_path (str): Path to the XML file to be processed. mode (str): The mode to use when processing the file ("ipg", "nucsum", etc.).

Returns: DataFrame: Processed dataframe from the file.

def parseXML(folder_path, mode):
233def parseXML(folder_path, mode):
234    """
235    Parses XML files in a given folder based on the specified mode and returns a concatenated DataFrame.
236
237    Args:
238    folder_path (str): Path to the folder containing XML files.
239    mode (str): The mode to use for parsing ("ipg", "nucsum", "asssum", etc.).
240
241    Returns:
242    DataFrame: A concatenated DataFrame with the contents of all processed XML files.
243    """
244
245    folder_path = Path(folder_path)
246    all_file_paths = [p for p in folder_path.iterdir() if p.is_file()]
247
248    final_df = pl.DataFrame()
249
250    with ProcessPoolExecutor() as executor:
251        results = executor.map(partial(process_xml, mode=mode), all_file_paths)
252
253        for df in results:
254            final_df = pl.concat([final_df, df], how="vertical") if final_df.height > 0 else df
255
256    return final_df

Parses XML files in a given folder based on the specified mode and returns a concatenated DataFrame.

Args: folder_path (str): Path to the folder containing XML files. mode (str): The mode to use for parsing ("ipg", "nucsum", "asssum", etc.).

Returns: DataFrame: A concatenated DataFrame with the contents of all processed XML files.