hoodini.utils.extract_neighborhood
1import contextlib 2from pathlib import Path 3 4import gb_io 5import orfipy_core 6import polars as pl 7import pyrodigal 8from Bio.Seq import Seq 9 10from hoodini.utils.logging_utils import info 11from hoodini.utils.seq_io import read_fasta 12 13 14def calculate_overlap(start1, end1, start2, end2): 15 """Calculate overlap percentage between two intervals.""" 16 overlap_start = max(start1, start2) 17 overlap_end = min(end1, end2) 18 if overlap_start < overlap_end: 19 overlap_length = overlap_end - overlap_start 20 total_length = max(end1 - start1, end2 - start2) 21 return (overlap_length / total_length) * 100 22 return 0 23 24 25def process_features(features, record_version): 26 """Extract feature data from GenBank features.""" 27 feature_data = [] 28 for feature in features: 29 if feature.type == "CDS": 30 feature_dict = { 31 "seqid": record_version, 32 "source": "GenBank", 33 "type": "CDS", 34 "start": int(feature.location.start), 35 "end": int(feature.location.end), 36 "strand": "+" if feature.location.strand == 1 else "-", 37 "phase": ".", 38 } 39 # Extract qualifiers 40 for qual in ["protein_id", "product", "gene"]: 41 if qual in feature.qualifiers: 42 feature_dict[qual] = feature.qualifiers[qual][0] 43 feature_data.append(feature_dict) 44 return feature_data 45 46 47def unwrap_attributes(gff_df): 48 """Unwrap attributes column in GFF DataFrame.""" 49 if "attributes" in gff_df.columns: 50 # Parse attributes into separate columns 51 attributes_list = [] 52 for attr in gff_df["attributes"]: 53 attr_dict = {} 54 if attr and isinstance(attr, str): 55 for pair in attr.split(";"): 56 if "=" in pair: 57 key, value = pair.split("=", 1) 58 attr_dict[key.strip()] = value.strip() 59 attributes_list.append(attr_dict) 60 attributes_df = pl.DataFrame(attributes_list) 61 gff_df = pl.concat([gff_df.drop("attributes"), attributes_df], how="horizontal") 62 return gff_df 63 64 65def extract_neighborhood( 66 protein_id, 67 nucleotide_id, 68 gbf_file, 69 gff_file, 70 faa_file, 71 fna_file, 72 mode="win_nts", 73 window=None, 74 strand=None, 75 start=None, 76 end=None, 77 unique_id=None, 78 input_type=None, 79 sorfs=None, 80): 81 info(f"✔️\tExtracting neighborhood {unique_id}") 82 neighborhood = {} 83 if gbf_file: 84 85 record_found = False 86 87 if not Path(gbf_file).exists(): 88 return ( 89 None, 90 None, 91 unique_id, 92 "GenBank file not found", 93 ) 94 95 try: 96 record_iter = gb_io.iter(gbf_file) 97 except Exception: 98 return None, None, unique_id, "gb_io failed to open GenBank iterator" 99 100 if nucleotide_id: 101 record_found = False 102 for record in record_iter: 103 record_version = getattr(record, "version", None) 104 if record_version and nucleotide_id in record_version: 105 record_found = True 106 break 107 if record_found: 108 feature_data = process_features(record.features, record_version) 109 feature_data = pl.DataFrame(feature_data) 110 if "attributes" in feature_data.columns: 111 attributes_df = pl.DataFrame(feature_data["attributes"].to_list()) 112 attributes_df.drop(["protein_id"]) 113 feature_data = pl.concat( 114 [feature_data.drop(["attributes"]), attributes_df], how="horizontal" 115 ) 116 feature_data = feature_data.rename({"translation": "sequence"}) 117 else: 118 return None, None, unique_id, "GenBank file is not annotated" 119 else: 120 return None, None, unique_id, "GenBank record not found" 121 122 if input_type == "protein": 123 124 if "protein_id" in feature_data.columns and not (start and end): 125 start = feature_data[feature_data["protein_id"] == protein_id]["start"].iloc[0] 126 end = feature_data[feature_data["protein_id"] == protein_id]["end"].iloc[0] 127 strand = feature_data[feature_data["protein_id"] == protein_id]["strand"].iloc[0] 128 start, end = int(start), int(end) 129 130 if mode == "win_nts": 131 start_win = start - window 132 end_win = end + window 133 start_win = max(start_win, 0) 134 end_win = min(end_win, len(record.sequence)) 135 subgff = feature_data.query("start>=@start_win & end<=@end_win") 136 137 elif mode == "win_ngen": 138 subgff = feature_data.reset_index(drop=True) 139 prot_index = subgff[subgff["protein_id"] == protein_id].index.to_list()[0] 140 subgff = subgff[prot_index - window : prot_index + window] 141 start_win = subgff["start"].min() 142 end_win = subgff["end"].max() 143 144 elif input_type == "nucleotide": 145 146 if nucleotide_id and (start and end) and window: 147 start = int(start) 148 end = int(end) 149 if mode == "win_nts": 150 start_win = start - window 151 end_win = end + window 152 if not strand: 153 strand = "+" if end > start else "-" 154 subgff = feature_data.query("start>=@start_win & end<=@end_win") 155 156 elif mode == "win_ngen": 157 subgff = feature_data.reset_index(drop=True) 158 start_index = subgff[subgff["start"] >= start].index.to_list()[0] 159 end_index = subgff[subgff["end"] <= end].index.to_list()[-1] 160 subgff = subgff[start_index - window : end_index + window] 161 start_win = subgff["start"].min() 162 end_win = subgff["end"].max() 163 if not strand: 164 strand = "+" if end > start else "-" 165 166 elif not window and nucleotide_id and (start and end): 167 start, end = int(start), int(end) 168 if not strand: 169 strand = "-" if end < start else "+" 170 subgff = feature_data.query( 171 "seqid == @nucleotide_id & type =='CDS' & start>=@start & end<=@end" 172 ) 173 start_win = subgff["start"].min() 174 end_win = subgff["end"].max() 175 176 elif not (start and end): 177 start = end = 0 178 subgff = feature_data 179 start_win = subgff["start"].min() 180 end_win = subgff["end"].max() 181 strand = "+" 182 183 start_win = max(start_win, 0) 184 end_win = min(end_win, len(record.sequence)) 185 subgff["id"] = subgff["protein_id"] 186 header = [ 187 "seqid", 188 "source", 189 "type", 190 "start", 191 "end", 192 "score", 193 "strand", 194 "phase", 195 "protein_id", 196 "id", 197 "sequence", 198 ] 199 if "product" in subgff.columns: 200 header.append("product") 201 else: 202 subgff["product"] = None 203 subgff = subgff[header] 204 neighborhood = { 205 "seqid": record_version, 206 "start_target": start, 207 "end_target": end, 208 "start_win": start_win, 209 "end_win": end_win, 210 "strand_win": strand, 211 "sequence": record.sequence[start_win:end_win].decode("utf-8"), 212 "unique_id": unique_id, 213 } 214 if sorfs: 215 orf_finder = pyrodigal.GeneFinder(meta=True, min_gene=10, max_overlap=9) 216 new_genes = [] 217 218 for i, pred in enumerate( 219 orf_finder.find_genes(record.sequence[start_win:end_win].decode("utf-8")) 220 ): 221 overlap_flag = False 222 for row in subgff.iter_rows(named=True): 223 overlap_percentage = calculate_overlap( 224 row["start"], row["end"], pred.begin + start_win, pred.end + start_win 225 ) 226 if overlap_percentage > 10: 227 overlap_flag = True 228 break 229 if not overlap_flag: 230 new_genes.append( 231 { 232 "seqid": nucleotide_id, 233 "source": "pyrodigal", 234 "type": "CDS", 235 "start": pred.begin + start_win, 236 "end": pred.end + start_win, 237 "score": pred.score, 238 "strand": "-" if pred.strand == "-1" else "+", 239 "phase": ".", 240 "protein_id": f"sORF_{unique_id}_{i}", 241 "id": f"sORF_{unique_id}_{i}", 242 "sequence": pred.translate(), 243 } 244 ) 245 246 if new_genes: 247 new_genes_df = pl.DataFrame(new_genes) 248 subgff = pl.concat([subgff, new_genes_df], how="vertical") 249 250 new_genes = [] 251 seq = record.sequence[start_win:end_win].decode("utf-8").upper() 252 for i, (start, stop, strand, _description) in enumerate( 253 orfipy_core.orfs(seq, minlen=100, maxlen=1000, partial3=False, between_stops=False) 254 ): 255 overlap_flag = False 256 for row in subgff.iter_rows(named=True): 257 overlap_percentage = calculate_overlap( 258 row["start"], row["end"], start + start_win, stop + start_win 259 ) 260 if overlap_percentage > 0: 261 overlap_flag = True 262 break 263 264 if not overlap_flag: 265 orf_sequence = Seq(record.sequence[start_win:end_win][start:stop]) 266 if strand == "-": 267 orf_sequence = orf_sequence.reverse_complement() 268 protein_sequence = orf_sequence.translate(table=11, to_stop=True) 269 270 new_genes.append( 271 { 272 "seqid": nucleotide_id, 273 "source": "orfipy", 274 "type": "CDS", 275 "start": start + start_win, 276 "end": stop + start_win, 277 "score": ".", 278 "strand": "-" if strand == "-" else "+", 279 "phase": ".", 280 "protein_id": f"sORF_orfipy_{unique_id}_{i}", 281 "id": f"sORF_orfipy_{unique_id}_{i}", 282 "sequence": protein_sequence, 283 } 284 ) 285 286 if new_genes: 287 new_genes_df = pl.DataFrame(new_genes) 288 subgff = pl.concat([subgff, new_genes_df], how="vertical") 289 290 neighborhood = pl.DataFrame(neighborhood, index=[0]) 291 292 elif gff_file and faa_file: 293 gff_header = [ 294 "seqid", 295 "source", 296 "type", 297 "start", 298 "end", 299 "score", 300 "strand", 301 "phase", 302 "attributes", 303 ] 304 if not Path(gff_file).exists(): 305 return ( 306 None, 307 None, 308 unique_id, 309 "GFF file not found", 310 ) 311 if not Path(faa_file).exists(): 312 return None, None, unique_id, "FAA file not found" 313 314 try: 315 gff = pl.read_csv( 316 filepath_or_buffer=gff_file, 317 separator="\t", 318 comment="#", 319 names=gff_header, 320 engine="c", 321 ) 322 except Exception: 323 return None, None, unique_id, "Failed to read GFF file" 324 try: 325 faa_df = read_fasta(faa_file) 326 except Exception: 327 return None, None, unique_id, "Failed to read FAA file" 328 329 if input_type == "protein": 330 if protein_id and window: 331 query = f"={protein_id}" 332 start = gff[gff["attributes"].str.contains(query)]["start"].to_list() 333 if not start: 334 return None, None, unique_id, "Protein not found in GFF file" 335 else: 336 start = start[0] 337 end = gff[gff["attributes"].str.contains(query)]["end"].to_list()[0] 338 strand = gff[gff["attributes"].str.contains(query)]["strand"].to_list()[0] 339 nucleotide_id = gff[gff["attributes"].str.contains(query)]["seqid"].to_list()[0] 340 341 if mode == "win_nts": 342 start_win = start - window 343 end_win = end + window 344 start_win = max(start_win, 0) 345 gff_nuc = gff.query("seqid == @nucleotide_id") 346 end_win = min(end_win, gff_nuc["end"].max()) 347 subgff = gff.query( 348 "seqid == @nucleotide_id & type =='CDS' & start>=@start_win & end<=@end_win" 349 ) 350 elif mode == "win_ngen": 351 subgff = gff.query("seqid == @nucleotide_id & type =='CDS'").reset_index( 352 drop=True 353 ) 354 prot_index = subgff[subgff["attributes"].str.contains(query)].index.to_list()[0] 355 subgff = subgff[prot_index - window : prot_index + window] 356 start_win = subgff["start"].min() 357 end_win = subgff["end"].max() 358 359 if strand == "-": 360 pass 361 362 elif input_type == "nucleotide": 363 if nucleotide_id and (start and end) and window: 364 start, end = int(start), int(end) 365 if not strand: 366 strand = "-" if end < start else "+" 367 368 if mode == "win_nts": 369 start_win = start - window 370 end_win = end + window 371 start_win = max(start_win, 0) 372 subgff = gff.query( 373 "seqid == @nucleotide_id & type =='CDS' & start>=@start_win & end<=@end_win" 374 ) 375 elif mode == "win_ngen": 376 subgff = gff.query("seqid == @nucleotide_id & type =='CDS'").reset_index( 377 drop=True 378 ) 379 prot_index = subgff[subgff["attributes"].str.contains(query)].index.to_list()[0] 380 subgff = subgff[prot_index - window : prot_index + window] 381 start_win = subgff["start"].min() 382 end_win = subgff["end"].max() 383 384 if strand == "-": 385 pass 386 387 elif not window and nucleotide_id and (start and end): 388 start, end = int(start), int(end) 389 if not strand: 390 strand = "-" if end < start else "+" 391 subgff = gff.query( 392 "seqid == @nucleotide_id & type =='CDS' & start>=@start & end<=@end" 393 ) 394 start_win = start 395 end_win = end 396 if strand == "-": 397 pass 398 399 elif ( 400 not window 401 and nucleotide_id 402 and not (start and end) 403 or window 404 and nucleotide_id 405 and not (start and end) 406 ): 407 start = end = 0 408 subgff = gff.query("seqid == @nucleotide_id & type =='CDS'") 409 start_win = 0 410 end_win = subgff["end"].max() 411 strand = "+" 412 413 else: 414 return ( 415 None, 416 None, 417 unique_id, 418 "Invalid usage of parameters", 419 ) 420 421 subgff = unwrap_attributes(subgff) 422 key_join = "protein_id" if "protein_id" in subgff.columns else "ID" 423 424 subgff = subgff.join( 425 faa_df[["id", "sequence"]], left_on=key_join, right_on="id", how="left" 426 ) 427 428 if fna_file: 429 fna_df = read_fasta(fna_file) 430 nucleotide_id = str(nucleotide_id) 431 faa_df["id"] = faa_df["id"].astype(str) 432 if nucleotide_id in fna_df["id"].to_list(): 433 sequence = fna_df[fna_df["id"] == nucleotide_id]["sequence"].to_list()[0] 434 end_win = end + window 435 end_win = min(end_win, len(sequence)) 436 if sorfs: 437 orf_finder = pyrodigal.GeneFinder(meta=True) 438 new_genes = [] 439 440 for i, pred in enumerate(orf_finder.find_genes(sequence.encode())): 441 overlap_flag = False 442 for row in subgff.iter_rows(named=True): 443 overlap_percentage = calculate_overlap( 444 row["start"], row["end"], pred.begin, pred.end 445 ) 446 if overlap_percentage > 5: 447 overlap_flag = True 448 break 449 450 if not overlap_flag: 451 new_genes.append( 452 { 453 "seqid": nucleotide_id, 454 "source": "pyrodigal", 455 "type": "CDS", 456 "start": pred.begin + start_win, 457 "end": pred.end + start_win, 458 "score": pred.score, 459 "strand": "-" if pred.strand == "-1" else "+", 460 "phase": ".", 461 key_join: f"{key_join}=sORF_{unique_id}_{i}", 462 "sequence": pred.translate(), 463 } 464 ) 465 466 if new_genes: 467 new_genes_df = pl.DataFrame(new_genes) 468 subgff = pl.concat([subgff, new_genes_df], how="vertical") 469 else: 470 sequence = None 471 neighborhood = { 472 "seqid": nucleotide_id, 473 "start_target": start, 474 "end_target": end, 475 "start_win": start_win, 476 "end_win": end_win, 477 "strand_win": strand, 478 "sequence": sequence[start_win:end_win], 479 "unique_id": unique_id, 480 } 481 neighborhood = pl.DataFrame(neighborhood, index=[0]) 482 483 subgff["target_prot"] = protein_id 484 subgff["target_nuc"] = nucleotide_id 485 subgff["unique_id"] = str(unique_id) 486 487 try: 488 if isinstance(subgff, pl.DataFrame): 489 if "id" not in subgff.columns: 490 for cand in ("protein_id", "ID", "gene_id"): 491 if cand in subgff.columns: 492 subgff["id"] = subgff[cand] 493 break 494 495 for redundant in ("ID", "gene_id"): 496 if redundant in subgff.columns and redundant != "id": 497 with contextlib.suppress(Exception): 498 subgff.drop([redundant]) 499 500 if "id" in subgff.columns: 501 with contextlib.suppress(Exception): 502 subgff["id"] = subgff["id"].astype(str) 503 except Exception: 504 pass 505 506 if "product" not in subgff.columns: 507 subgff["product"] = None 508 509 return subgff, neighborhood, unique_id
def
calculate_overlap(start1, end1, start2, end2):
15def calculate_overlap(start1, end1, start2, end2): 16 """Calculate overlap percentage between two intervals.""" 17 overlap_start = max(start1, start2) 18 overlap_end = min(end1, end2) 19 if overlap_start < overlap_end: 20 overlap_length = overlap_end - overlap_start 21 total_length = max(end1 - start1, end2 - start2) 22 return (overlap_length / total_length) * 100 23 return 0
Calculate overlap percentage between two intervals.
def
process_features(features, record_version):
26def process_features(features, record_version): 27 """Extract feature data from GenBank features.""" 28 feature_data = [] 29 for feature in features: 30 if feature.type == "CDS": 31 feature_dict = { 32 "seqid": record_version, 33 "source": "GenBank", 34 "type": "CDS", 35 "start": int(feature.location.start), 36 "end": int(feature.location.end), 37 "strand": "+" if feature.location.strand == 1 else "-", 38 "phase": ".", 39 } 40 # Extract qualifiers 41 for qual in ["protein_id", "product", "gene"]: 42 if qual in feature.qualifiers: 43 feature_dict[qual] = feature.qualifiers[qual][0] 44 feature_data.append(feature_dict) 45 return feature_data
Extract feature data from GenBank features.
def
unwrap_attributes(gff_df):
48def unwrap_attributes(gff_df): 49 """Unwrap attributes column in GFF DataFrame.""" 50 if "attributes" in gff_df.columns: 51 # Parse attributes into separate columns 52 attributes_list = [] 53 for attr in gff_df["attributes"]: 54 attr_dict = {} 55 if attr and isinstance(attr, str): 56 for pair in attr.split(";"): 57 if "=" in pair: 58 key, value = pair.split("=", 1) 59 attr_dict[key.strip()] = value.strip() 60 attributes_list.append(attr_dict) 61 attributes_df = pl.DataFrame(attributes_list) 62 gff_df = pl.concat([gff_df.drop("attributes"), attributes_df], how="horizontal") 63 return gff_df
Unwrap attributes column in GFF DataFrame.
def
extract_neighborhood( protein_id, nucleotide_id, gbf_file, gff_file, faa_file, fna_file, mode='win_nts', window=None, strand=None, start=None, end=None, unique_id=None, input_type=None, sorfs=None):
66def extract_neighborhood( 67 protein_id, 68 nucleotide_id, 69 gbf_file, 70 gff_file, 71 faa_file, 72 fna_file, 73 mode="win_nts", 74 window=None, 75 strand=None, 76 start=None, 77 end=None, 78 unique_id=None, 79 input_type=None, 80 sorfs=None, 81): 82 info(f"✔️\tExtracting neighborhood {unique_id}") 83 neighborhood = {} 84 if gbf_file: 85 86 record_found = False 87 88 if not Path(gbf_file).exists(): 89 return ( 90 None, 91 None, 92 unique_id, 93 "GenBank file not found", 94 ) 95 96 try: 97 record_iter = gb_io.iter(gbf_file) 98 except Exception: 99 return None, None, unique_id, "gb_io failed to open GenBank iterator" 100 101 if nucleotide_id: 102 record_found = False 103 for record in record_iter: 104 record_version = getattr(record, "version", None) 105 if record_version and nucleotide_id in record_version: 106 record_found = True 107 break 108 if record_found: 109 feature_data = process_features(record.features, record_version) 110 feature_data = pl.DataFrame(feature_data) 111 if "attributes" in feature_data.columns: 112 attributes_df = pl.DataFrame(feature_data["attributes"].to_list()) 113 attributes_df.drop(["protein_id"]) 114 feature_data = pl.concat( 115 [feature_data.drop(["attributes"]), attributes_df], how="horizontal" 116 ) 117 feature_data = feature_data.rename({"translation": "sequence"}) 118 else: 119 return None, None, unique_id, "GenBank file is not annotated" 120 else: 121 return None, None, unique_id, "GenBank record not found" 122 123 if input_type == "protein": 124 125 if "protein_id" in feature_data.columns and not (start and end): 126 start = feature_data[feature_data["protein_id"] == protein_id]["start"].iloc[0] 127 end = feature_data[feature_data["protein_id"] == protein_id]["end"].iloc[0] 128 strand = feature_data[feature_data["protein_id"] == protein_id]["strand"].iloc[0] 129 start, end = int(start), int(end) 130 131 if mode == "win_nts": 132 start_win = start - window 133 end_win = end + window 134 start_win = max(start_win, 0) 135 end_win = min(end_win, len(record.sequence)) 136 subgff = feature_data.query("start>=@start_win & end<=@end_win") 137 138 elif mode == "win_ngen": 139 subgff = feature_data.reset_index(drop=True) 140 prot_index = subgff[subgff["protein_id"] == protein_id].index.to_list()[0] 141 subgff = subgff[prot_index - window : prot_index + window] 142 start_win = subgff["start"].min() 143 end_win = subgff["end"].max() 144 145 elif input_type == "nucleotide": 146 147 if nucleotide_id and (start and end) and window: 148 start = int(start) 149 end = int(end) 150 if mode == "win_nts": 151 start_win = start - window 152 end_win = end + window 153 if not strand: 154 strand = "+" if end > start else "-" 155 subgff = feature_data.query("start>=@start_win & end<=@end_win") 156 157 elif mode == "win_ngen": 158 subgff = feature_data.reset_index(drop=True) 159 start_index = subgff[subgff["start"] >= start].index.to_list()[0] 160 end_index = subgff[subgff["end"] <= end].index.to_list()[-1] 161 subgff = subgff[start_index - window : end_index + window] 162 start_win = subgff["start"].min() 163 end_win = subgff["end"].max() 164 if not strand: 165 strand = "+" if end > start else "-" 166 167 elif not window and nucleotide_id and (start and end): 168 start, end = int(start), int(end) 169 if not strand: 170 strand = "-" if end < start else "+" 171 subgff = feature_data.query( 172 "seqid == @nucleotide_id & type =='CDS' & start>=@start & end<=@end" 173 ) 174 start_win = subgff["start"].min() 175 end_win = subgff["end"].max() 176 177 elif not (start and end): 178 start = end = 0 179 subgff = feature_data 180 start_win = subgff["start"].min() 181 end_win = subgff["end"].max() 182 strand = "+" 183 184 start_win = max(start_win, 0) 185 end_win = min(end_win, len(record.sequence)) 186 subgff["id"] = subgff["protein_id"] 187 header = [ 188 "seqid", 189 "source", 190 "type", 191 "start", 192 "end", 193 "score", 194 "strand", 195 "phase", 196 "protein_id", 197 "id", 198 "sequence", 199 ] 200 if "product" in subgff.columns: 201 header.append("product") 202 else: 203 subgff["product"] = None 204 subgff = subgff[header] 205 neighborhood = { 206 "seqid": record_version, 207 "start_target": start, 208 "end_target": end, 209 "start_win": start_win, 210 "end_win": end_win, 211 "strand_win": strand, 212 "sequence": record.sequence[start_win:end_win].decode("utf-8"), 213 "unique_id": unique_id, 214 } 215 if sorfs: 216 orf_finder = pyrodigal.GeneFinder(meta=True, min_gene=10, max_overlap=9) 217 new_genes = [] 218 219 for i, pred in enumerate( 220 orf_finder.find_genes(record.sequence[start_win:end_win].decode("utf-8")) 221 ): 222 overlap_flag = False 223 for row in subgff.iter_rows(named=True): 224 overlap_percentage = calculate_overlap( 225 row["start"], row["end"], pred.begin + start_win, pred.end + start_win 226 ) 227 if overlap_percentage > 10: 228 overlap_flag = True 229 break 230 if not overlap_flag: 231 new_genes.append( 232 { 233 "seqid": nucleotide_id, 234 "source": "pyrodigal", 235 "type": "CDS", 236 "start": pred.begin + start_win, 237 "end": pred.end + start_win, 238 "score": pred.score, 239 "strand": "-" if pred.strand == "-1" else "+", 240 "phase": ".", 241 "protein_id": f"sORF_{unique_id}_{i}", 242 "id": f"sORF_{unique_id}_{i}", 243 "sequence": pred.translate(), 244 } 245 ) 246 247 if new_genes: 248 new_genes_df = pl.DataFrame(new_genes) 249 subgff = pl.concat([subgff, new_genes_df], how="vertical") 250 251 new_genes = [] 252 seq = record.sequence[start_win:end_win].decode("utf-8").upper() 253 for i, (start, stop, strand, _description) in enumerate( 254 orfipy_core.orfs(seq, minlen=100, maxlen=1000, partial3=False, between_stops=False) 255 ): 256 overlap_flag = False 257 for row in subgff.iter_rows(named=True): 258 overlap_percentage = calculate_overlap( 259 row["start"], row["end"], start + start_win, stop + start_win 260 ) 261 if overlap_percentage > 0: 262 overlap_flag = True 263 break 264 265 if not overlap_flag: 266 orf_sequence = Seq(record.sequence[start_win:end_win][start:stop]) 267 if strand == "-": 268 orf_sequence = orf_sequence.reverse_complement() 269 protein_sequence = orf_sequence.translate(table=11, to_stop=True) 270 271 new_genes.append( 272 { 273 "seqid": nucleotide_id, 274 "source": "orfipy", 275 "type": "CDS", 276 "start": start + start_win, 277 "end": stop + start_win, 278 "score": ".", 279 "strand": "-" if strand == "-" else "+", 280 "phase": ".", 281 "protein_id": f"sORF_orfipy_{unique_id}_{i}", 282 "id": f"sORF_orfipy_{unique_id}_{i}", 283 "sequence": protein_sequence, 284 } 285 ) 286 287 if new_genes: 288 new_genes_df = pl.DataFrame(new_genes) 289 subgff = pl.concat([subgff, new_genes_df], how="vertical") 290 291 neighborhood = pl.DataFrame(neighborhood, index=[0]) 292 293 elif gff_file and faa_file: 294 gff_header = [ 295 "seqid", 296 "source", 297 "type", 298 "start", 299 "end", 300 "score", 301 "strand", 302 "phase", 303 "attributes", 304 ] 305 if not Path(gff_file).exists(): 306 return ( 307 None, 308 None, 309 unique_id, 310 "GFF file not found", 311 ) 312 if not Path(faa_file).exists(): 313 return None, None, unique_id, "FAA file not found" 314 315 try: 316 gff = pl.read_csv( 317 filepath_or_buffer=gff_file, 318 separator="\t", 319 comment="#", 320 names=gff_header, 321 engine="c", 322 ) 323 except Exception: 324 return None, None, unique_id, "Failed to read GFF file" 325 try: 326 faa_df = read_fasta(faa_file) 327 except Exception: 328 return None, None, unique_id, "Failed to read FAA file" 329 330 if input_type == "protein": 331 if protein_id and window: 332 query = f"={protein_id}" 333 start = gff[gff["attributes"].str.contains(query)]["start"].to_list() 334 if not start: 335 return None, None, unique_id, "Protein not found in GFF file" 336 else: 337 start = start[0] 338 end = gff[gff["attributes"].str.contains(query)]["end"].to_list()[0] 339 strand = gff[gff["attributes"].str.contains(query)]["strand"].to_list()[0] 340 nucleotide_id = gff[gff["attributes"].str.contains(query)]["seqid"].to_list()[0] 341 342 if mode == "win_nts": 343 start_win = start - window 344 end_win = end + window 345 start_win = max(start_win, 0) 346 gff_nuc = gff.query("seqid == @nucleotide_id") 347 end_win = min(end_win, gff_nuc["end"].max()) 348 subgff = gff.query( 349 "seqid == @nucleotide_id & type =='CDS' & start>=@start_win & end<=@end_win" 350 ) 351 elif mode == "win_ngen": 352 subgff = gff.query("seqid == @nucleotide_id & type =='CDS'").reset_index( 353 drop=True 354 ) 355 prot_index = subgff[subgff["attributes"].str.contains(query)].index.to_list()[0] 356 subgff = subgff[prot_index - window : prot_index + window] 357 start_win = subgff["start"].min() 358 end_win = subgff["end"].max() 359 360 if strand == "-": 361 pass 362 363 elif input_type == "nucleotide": 364 if nucleotide_id and (start and end) and window: 365 start, end = int(start), int(end) 366 if not strand: 367 strand = "-" if end < start else "+" 368 369 if mode == "win_nts": 370 start_win = start - window 371 end_win = end + window 372 start_win = max(start_win, 0) 373 subgff = gff.query( 374 "seqid == @nucleotide_id & type =='CDS' & start>=@start_win & end<=@end_win" 375 ) 376 elif mode == "win_ngen": 377 subgff = gff.query("seqid == @nucleotide_id & type =='CDS'").reset_index( 378 drop=True 379 ) 380 prot_index = subgff[subgff["attributes"].str.contains(query)].index.to_list()[0] 381 subgff = subgff[prot_index - window : prot_index + window] 382 start_win = subgff["start"].min() 383 end_win = subgff["end"].max() 384 385 if strand == "-": 386 pass 387 388 elif not window and nucleotide_id and (start and end): 389 start, end = int(start), int(end) 390 if not strand: 391 strand = "-" if end < start else "+" 392 subgff = gff.query( 393 "seqid == @nucleotide_id & type =='CDS' & start>=@start & end<=@end" 394 ) 395 start_win = start 396 end_win = end 397 if strand == "-": 398 pass 399 400 elif ( 401 not window 402 and nucleotide_id 403 and not (start and end) 404 or window 405 and nucleotide_id 406 and not (start and end) 407 ): 408 start = end = 0 409 subgff = gff.query("seqid == @nucleotide_id & type =='CDS'") 410 start_win = 0 411 end_win = subgff["end"].max() 412 strand = "+" 413 414 else: 415 return ( 416 None, 417 None, 418 unique_id, 419 "Invalid usage of parameters", 420 ) 421 422 subgff = unwrap_attributes(subgff) 423 key_join = "protein_id" if "protein_id" in subgff.columns else "ID" 424 425 subgff = subgff.join( 426 faa_df[["id", "sequence"]], left_on=key_join, right_on="id", how="left" 427 ) 428 429 if fna_file: 430 fna_df = read_fasta(fna_file) 431 nucleotide_id = str(nucleotide_id) 432 faa_df["id"] = faa_df["id"].astype(str) 433 if nucleotide_id in fna_df["id"].to_list(): 434 sequence = fna_df[fna_df["id"] == nucleotide_id]["sequence"].to_list()[0] 435 end_win = end + window 436 end_win = min(end_win, len(sequence)) 437 if sorfs: 438 orf_finder = pyrodigal.GeneFinder(meta=True) 439 new_genes = [] 440 441 for i, pred in enumerate(orf_finder.find_genes(sequence.encode())): 442 overlap_flag = False 443 for row in subgff.iter_rows(named=True): 444 overlap_percentage = calculate_overlap( 445 row["start"], row["end"], pred.begin, pred.end 446 ) 447 if overlap_percentage > 5: 448 overlap_flag = True 449 break 450 451 if not overlap_flag: 452 new_genes.append( 453 { 454 "seqid": nucleotide_id, 455 "source": "pyrodigal", 456 "type": "CDS", 457 "start": pred.begin + start_win, 458 "end": pred.end + start_win, 459 "score": pred.score, 460 "strand": "-" if pred.strand == "-1" else "+", 461 "phase": ".", 462 key_join: f"{key_join}=sORF_{unique_id}_{i}", 463 "sequence": pred.translate(), 464 } 465 ) 466 467 if new_genes: 468 new_genes_df = pl.DataFrame(new_genes) 469 subgff = pl.concat([subgff, new_genes_df], how="vertical") 470 else: 471 sequence = None 472 neighborhood = { 473 "seqid": nucleotide_id, 474 "start_target": start, 475 "end_target": end, 476 "start_win": start_win, 477 "end_win": end_win, 478 "strand_win": strand, 479 "sequence": sequence[start_win:end_win], 480 "unique_id": unique_id, 481 } 482 neighborhood = pl.DataFrame(neighborhood, index=[0]) 483 484 subgff["target_prot"] = protein_id 485 subgff["target_nuc"] = nucleotide_id 486 subgff["unique_id"] = str(unique_id) 487 488 try: 489 if isinstance(subgff, pl.DataFrame): 490 if "id" not in subgff.columns: 491 for cand in ("protein_id", "ID", "gene_id"): 492 if cand in subgff.columns: 493 subgff["id"] = subgff[cand] 494 break 495 496 for redundant in ("ID", "gene_id"): 497 if redundant in subgff.columns and redundant != "id": 498 with contextlib.suppress(Exception): 499 subgff.drop([redundant]) 500 501 if "id" in subgff.columns: 502 with contextlib.suppress(Exception): 503 subgff["id"] = subgff["id"].astype(str) 504 except Exception: 505 pass 506 507 if "product" not in subgff.columns: 508 subgff["product"] = None 509 510 return subgff, neighborhood, unique_id