hoodini.utils.classes
1from __future__ import annotations 2 3import xml.etree.ElementTree as ET 4from pathlib import Path 5from typing import Any, Literal 6 7import polars as pl 8 9Mode = Literal["ipg", "nuc2ass", "nucsum", "asssum"] 10 11 12class IPGXMLFile: 13 def __init__(self, file_path: str | Path): 14 """Parse an XML file from NCBI IPG/eutils responses.""" 15 16 self.file_path = Path(file_path) 17 self.xml_tree = ET.parse(self.file_path) 18 self.root = self.xml_tree.getroot() 19 20 def to_dict(self, mode: Mode) -> dict[str, Any]: 21 """Return the parsed XML content as a nested dictionary.""" 22 23 if mode not in ("ipg", "nuc2ass", "nucsum", "asssum"): 24 raise ValueError(f"Unsupported mode '{mode}'") 25 26 parsed_dict: dict[str, Any] = {} 27 28 if mode == "ipg": 29 for ipg_report in self.root.findall("IPGReport"): 30 ipg_id = ipg_report.get("ipg") 31 product_acc = ipg_report.get("product_acc") 32 33 product = ipg_report.find("Product") 34 product_details = product.attrib if product is not None else {} 35 36 proteins = [] 37 for protein in ipg_report.findall(".//Protein"): 38 protein_accver = protein.get("accver") 39 protein_details = {k: v for k, v in protein.attrib.items() if k != "accver"} 40 41 cds_list = [] 42 for cds in protein.findall(".//CDS"): 43 cds_list.append(cds.attrib) 44 45 proteins.append( 46 { 47 "protein_accver": protein_accver, 48 "protein_details": protein_details, 49 "cds_list": cds_list, 50 } 51 ) 52 53 parsed_dict[ipg_id] = { 54 "product_acc": product_acc, 55 "product_details": product_details, 56 "proteins": proteins, 57 } 58 59 elif mode == "nuc2ass": 60 for link_set in self.root.findall("LinkSet"): 61 db_from = link_set.findtext("DbFrom") 62 id_list = [id_tag.text for id_tag in link_set.findall(".//IdList/Id")] 63 64 link_set_db = link_set.find("LinkSetDb") 65 if link_set_db is not None: 66 db_to = link_set_db.findtext("DbTo") 67 link_name = link_set_db.findtext("LinkName") 68 linked_ids = [link.findtext("Id") for link in link_set_db.findall(".//Link")] 69 else: 70 db_to = link_name = None 71 linked_ids = [] 72 73 parsed_dict[tuple(id_list)] = { 74 "db_from": db_from, 75 "db_to": db_to, 76 "link_name": link_name, 77 "linked_ids": linked_ids, 78 } 79 80 elif mode == "nucsum": 81 for doc_sum in self.root.findall("DocSum"): 82 doc_id = doc_sum.findtext("Id") 83 doc_items = {item.get("Name"): item.text for item in doc_sum.findall("Item")} 84 parsed_dict[doc_id] = doc_items 85 86 elif mode == "asssum": 87 for doc_summary in self.root.findall(".//DocumentSummary"): 88 uid = doc_summary.get("uid") 89 entry = {child.tag: child.text for child in doc_summary} 90 parsed_dict[uid] = entry 91 92 return parsed_dict 93 94 def to_dataframe(self, mode: Mode) -> pl.DataFrame: 95 """Flatten the parsed XML content into a Polars DataFrame.""" 96 97 parsed_dict = self.to_dict(mode) 98 flattened_data: list[dict[str, Any]] = [] 99 100 if mode == "ipg": 101 for ipg_id, details in parsed_dict.items(): 102 base_info = {"ipg_id": ipg_id, "product_acc": details["product_acc"]} 103 base_info.update(details["product_details"]) 104 105 for protein in details["proteins"]: 106 protein_info = protein["protein_details"].copy() 107 protein_info["protein_accver"] = protein["protein_accver"] 108 109 if protein["cds_list"]: 110 for cds in protein["cds_list"]: 111 flattened_data.append({**base_info, **protein_info, **cds}) 112 else: 113 flattened_data.append({**base_info, **protein_info}) 114 115 elif mode == "nuc2ass": 116 for ids, details in parsed_dict.items(): 117 id_list_str = ",".join(ids) 118 for linked_id in details["linked_ids"]: 119 flattened_data.append( 120 { 121 "db_from": details["db_from"], 122 "id_list": id_list_str, 123 "db_to": details["db_to"], 124 "link_name": details["link_name"], 125 "linked_id": linked_id, 126 } 127 ) 128 129 elif mode == "nucsum": 130 for doc_id, doc_items in parsed_dict.items(): 131 row = {"doc_id": doc_id} 132 row.update(doc_items) 133 flattened_data.append(row) 134 135 elif mode == "asssum": 136 for uid, details in parsed_dict.items(): 137 row = {"uid": uid} 138 row.update(details) 139 flattened_data.append(row) 140 141 return pl.DataFrame(flattened_data)
Mode =
typing.Literal['ipg', 'nuc2ass', 'nucsum', 'asssum']
class
IPGXMLFile:
13class IPGXMLFile: 14 def __init__(self, file_path: str | Path): 15 """Parse an XML file from NCBI IPG/eutils responses.""" 16 17 self.file_path = Path(file_path) 18 self.xml_tree = ET.parse(self.file_path) 19 self.root = self.xml_tree.getroot() 20 21 def to_dict(self, mode: Mode) -> dict[str, Any]: 22 """Return the parsed XML content as a nested dictionary.""" 23 24 if mode not in ("ipg", "nuc2ass", "nucsum", "asssum"): 25 raise ValueError(f"Unsupported mode '{mode}'") 26 27 parsed_dict: dict[str, Any] = {} 28 29 if mode == "ipg": 30 for ipg_report in self.root.findall("IPGReport"): 31 ipg_id = ipg_report.get("ipg") 32 product_acc = ipg_report.get("product_acc") 33 34 product = ipg_report.find("Product") 35 product_details = product.attrib if product is not None else {} 36 37 proteins = [] 38 for protein in ipg_report.findall(".//Protein"): 39 protein_accver = protein.get("accver") 40 protein_details = {k: v for k, v in protein.attrib.items() if k != "accver"} 41 42 cds_list = [] 43 for cds in protein.findall(".//CDS"): 44 cds_list.append(cds.attrib) 45 46 proteins.append( 47 { 48 "protein_accver": protein_accver, 49 "protein_details": protein_details, 50 "cds_list": cds_list, 51 } 52 ) 53 54 parsed_dict[ipg_id] = { 55 "product_acc": product_acc, 56 "product_details": product_details, 57 "proteins": proteins, 58 } 59 60 elif mode == "nuc2ass": 61 for link_set in self.root.findall("LinkSet"): 62 db_from = link_set.findtext("DbFrom") 63 id_list = [id_tag.text for id_tag in link_set.findall(".//IdList/Id")] 64 65 link_set_db = link_set.find("LinkSetDb") 66 if link_set_db is not None: 67 db_to = link_set_db.findtext("DbTo") 68 link_name = link_set_db.findtext("LinkName") 69 linked_ids = [link.findtext("Id") for link in link_set_db.findall(".//Link")] 70 else: 71 db_to = link_name = None 72 linked_ids = [] 73 74 parsed_dict[tuple(id_list)] = { 75 "db_from": db_from, 76 "db_to": db_to, 77 "link_name": link_name, 78 "linked_ids": linked_ids, 79 } 80 81 elif mode == "nucsum": 82 for doc_sum in self.root.findall("DocSum"): 83 doc_id = doc_sum.findtext("Id") 84 doc_items = {item.get("Name"): item.text for item in doc_sum.findall("Item")} 85 parsed_dict[doc_id] = doc_items 86 87 elif mode == "asssum": 88 for doc_summary in self.root.findall(".//DocumentSummary"): 89 uid = doc_summary.get("uid") 90 entry = {child.tag: child.text for child in doc_summary} 91 parsed_dict[uid] = entry 92 93 return parsed_dict 94 95 def to_dataframe(self, mode: Mode) -> pl.DataFrame: 96 """Flatten the parsed XML content into a Polars DataFrame.""" 97 98 parsed_dict = self.to_dict(mode) 99 flattened_data: list[dict[str, Any]] = [] 100 101 if mode == "ipg": 102 for ipg_id, details in parsed_dict.items(): 103 base_info = {"ipg_id": ipg_id, "product_acc": details["product_acc"]} 104 base_info.update(details["product_details"]) 105 106 for protein in details["proteins"]: 107 protein_info = protein["protein_details"].copy() 108 protein_info["protein_accver"] = protein["protein_accver"] 109 110 if protein["cds_list"]: 111 for cds in protein["cds_list"]: 112 flattened_data.append({**base_info, **protein_info, **cds}) 113 else: 114 flattened_data.append({**base_info, **protein_info}) 115 116 elif mode == "nuc2ass": 117 for ids, details in parsed_dict.items(): 118 id_list_str = ",".join(ids) 119 for linked_id in details["linked_ids"]: 120 flattened_data.append( 121 { 122 "db_from": details["db_from"], 123 "id_list": id_list_str, 124 "db_to": details["db_to"], 125 "link_name": details["link_name"], 126 "linked_id": linked_id, 127 } 128 ) 129 130 elif mode == "nucsum": 131 for doc_id, doc_items in parsed_dict.items(): 132 row = {"doc_id": doc_id} 133 row.update(doc_items) 134 flattened_data.append(row) 135 136 elif mode == "asssum": 137 for uid, details in parsed_dict.items(): 138 row = {"uid": uid} 139 row.update(details) 140 flattened_data.append(row) 141 142 return pl.DataFrame(flattened_data)
IPGXMLFile(file_path: str | pathlib.Path)
14 def __init__(self, file_path: str | Path): 15 """Parse an XML file from NCBI IPG/eutils responses.""" 16 17 self.file_path = Path(file_path) 18 self.xml_tree = ET.parse(self.file_path) 19 self.root = self.xml_tree.getroot()
Parse an XML file from NCBI IPG/eutils responses.
def
to_dict( self, mode: Literal['ipg', 'nuc2ass', 'nucsum', 'asssum']) -> dict[str, typing.Any]:
21 def to_dict(self, mode: Mode) -> dict[str, Any]: 22 """Return the parsed XML content as a nested dictionary.""" 23 24 if mode not in ("ipg", "nuc2ass", "nucsum", "asssum"): 25 raise ValueError(f"Unsupported mode '{mode}'") 26 27 parsed_dict: dict[str, Any] = {} 28 29 if mode == "ipg": 30 for ipg_report in self.root.findall("IPGReport"): 31 ipg_id = ipg_report.get("ipg") 32 product_acc = ipg_report.get("product_acc") 33 34 product = ipg_report.find("Product") 35 product_details = product.attrib if product is not None else {} 36 37 proteins = [] 38 for protein in ipg_report.findall(".//Protein"): 39 protein_accver = protein.get("accver") 40 protein_details = {k: v for k, v in protein.attrib.items() if k != "accver"} 41 42 cds_list = [] 43 for cds in protein.findall(".//CDS"): 44 cds_list.append(cds.attrib) 45 46 proteins.append( 47 { 48 "protein_accver": protein_accver, 49 "protein_details": protein_details, 50 "cds_list": cds_list, 51 } 52 ) 53 54 parsed_dict[ipg_id] = { 55 "product_acc": product_acc, 56 "product_details": product_details, 57 "proteins": proteins, 58 } 59 60 elif mode == "nuc2ass": 61 for link_set in self.root.findall("LinkSet"): 62 db_from = link_set.findtext("DbFrom") 63 id_list = [id_tag.text for id_tag in link_set.findall(".//IdList/Id")] 64 65 link_set_db = link_set.find("LinkSetDb") 66 if link_set_db is not None: 67 db_to = link_set_db.findtext("DbTo") 68 link_name = link_set_db.findtext("LinkName") 69 linked_ids = [link.findtext("Id") for link in link_set_db.findall(".//Link")] 70 else: 71 db_to = link_name = None 72 linked_ids = [] 73 74 parsed_dict[tuple(id_list)] = { 75 "db_from": db_from, 76 "db_to": db_to, 77 "link_name": link_name, 78 "linked_ids": linked_ids, 79 } 80 81 elif mode == "nucsum": 82 for doc_sum in self.root.findall("DocSum"): 83 doc_id = doc_sum.findtext("Id") 84 doc_items = {item.get("Name"): item.text for item in doc_sum.findall("Item")} 85 parsed_dict[doc_id] = doc_items 86 87 elif mode == "asssum": 88 for doc_summary in self.root.findall(".//DocumentSummary"): 89 uid = doc_summary.get("uid") 90 entry = {child.tag: child.text for child in doc_summary} 91 parsed_dict[uid] = entry 92 93 return parsed_dict
Return the parsed XML content as a nested dictionary.
def
to_dataframe( self, mode: Literal['ipg', 'nuc2ass', 'nucsum', 'asssum']) -> polars.dataframe.frame.DataFrame:
95 def to_dataframe(self, mode: Mode) -> pl.DataFrame: 96 """Flatten the parsed XML content into a Polars DataFrame.""" 97 98 parsed_dict = self.to_dict(mode) 99 flattened_data: list[dict[str, Any]] = [] 100 101 if mode == "ipg": 102 for ipg_id, details in parsed_dict.items(): 103 base_info = {"ipg_id": ipg_id, "product_acc": details["product_acc"]} 104 base_info.update(details["product_details"]) 105 106 for protein in details["proteins"]: 107 protein_info = protein["protein_details"].copy() 108 protein_info["protein_accver"] = protein["protein_accver"] 109 110 if protein["cds_list"]: 111 for cds in protein["cds_list"]: 112 flattened_data.append({**base_info, **protein_info, **cds}) 113 else: 114 flattened_data.append({**base_info, **protein_info}) 115 116 elif mode == "nuc2ass": 117 for ids, details in parsed_dict.items(): 118 id_list_str = ",".join(ids) 119 for linked_id in details["linked_ids"]: 120 flattened_data.append( 121 { 122 "db_from": details["db_from"], 123 "id_list": id_list_str, 124 "db_to": details["db_to"], 125 "link_name": details["link_name"], 126 "linked_id": linked_id, 127 } 128 ) 129 130 elif mode == "nucsum": 131 for doc_id, doc_items in parsed_dict.items(): 132 row = {"doc_id": doc_id} 133 row.update(doc_items) 134 flattened_data.append(row) 135 136 elif mode == "asssum": 137 for uid, details in parsed_dict.items(): 138 row = {"uid": uid} 139 row.update(details) 140 flattened_data.append(row) 141 142 return pl.DataFrame(flattened_data)
Flatten the parsed XML content into a Polars DataFrame.