hoodini.utils.extract_neighborhood

  1import contextlib
  2from pathlib import Path
  3
  4import gb_io
  5import orfipy_core
  6import polars as pl
  7import pyrodigal
  8from Bio.Seq import Seq
  9
 10from hoodini.utils.logging_utils import info
 11from hoodini.utils.seq_io import read_fasta
 12
 13
 14def calculate_overlap(start1, end1, start2, end2):
 15    """Calculate overlap percentage between two intervals."""
 16    overlap_start = max(start1, start2)
 17    overlap_end = min(end1, end2)
 18    if overlap_start < overlap_end:
 19        overlap_length = overlap_end - overlap_start
 20        total_length = max(end1 - start1, end2 - start2)
 21        return (overlap_length / total_length) * 100
 22    return 0
 23
 24
 25def process_features(features, record_version):
 26    """Extract feature data from GenBank features."""
 27    feature_data = []
 28    for feature in features:
 29        if feature.type == "CDS":
 30            feature_dict = {
 31                "seqid": record_version,
 32                "source": "GenBank",
 33                "type": "CDS",
 34                "start": int(feature.location.start),
 35                "end": int(feature.location.end),
 36                "strand": "+" if feature.location.strand == 1 else "-",
 37                "phase": ".",
 38            }
 39            # Extract qualifiers
 40            for qual in ["protein_id", "product", "gene"]:
 41                if qual in feature.qualifiers:
 42                    feature_dict[qual] = feature.qualifiers[qual][0]
 43            feature_data.append(feature_dict)
 44    return feature_data
 45
 46
 47def unwrap_attributes(gff_df):
 48    """Unwrap attributes column in GFF DataFrame."""
 49    if "attributes" in gff_df.columns:
 50        # Parse attributes into separate columns
 51        attributes_list = []
 52        for attr in gff_df["attributes"]:
 53            attr_dict = {}
 54            if attr and isinstance(attr, str):
 55                for pair in attr.split(";"):
 56                    if "=" in pair:
 57                        key, value = pair.split("=", 1)
 58                        attr_dict[key.strip()] = value.strip()
 59            attributes_list.append(attr_dict)
 60        attributes_df = pl.DataFrame(attributes_list)
 61        gff_df = pl.concat([gff_df.drop("attributes"), attributes_df], how="horizontal")
 62    return gff_df
 63
 64
 65def extract_neighborhood(
 66    protein_id,
 67    nucleotide_id,
 68    gbf_file,
 69    gff_file,
 70    faa_file,
 71    fna_file,
 72    mode="win_nts",
 73    window=None,
 74    strand=None,
 75    start=None,
 76    end=None,
 77    unique_id=None,
 78    input_type=None,
 79    sorfs=None,
 80):
 81    info(f"✔️\tExtracting neighborhood {unique_id}")
 82    neighborhood = {}
 83    if gbf_file:
 84
 85        record_found = False
 86
 87        if not Path(gbf_file).exists():
 88            return (
 89                None,
 90                None,
 91                unique_id,
 92                "GenBank file not found",
 93            )
 94
 95        try:
 96            record_iter = gb_io.iter(gbf_file)
 97        except Exception:
 98            return None, None, unique_id, "gb_io failed to open GenBank iterator"
 99
100        if nucleotide_id:
101            record_found = False
102            for record in record_iter:
103                record_version = getattr(record, "version", None)
104                if record_version and nucleotide_id in record_version:
105                    record_found = True
106                    break
107        if record_found:
108            feature_data = process_features(record.features, record_version)
109            feature_data = pl.DataFrame(feature_data)
110            if "attributes" in feature_data.columns:
111                attributes_df = pl.DataFrame(feature_data["attributes"].to_list())
112                attributes_df.drop(["protein_id"])
113                feature_data = pl.concat(
114                    [feature_data.drop(["attributes"]), attributes_df], how="horizontal"
115                )
116                feature_data = feature_data.rename({"translation": "sequence"})
117            else:
118                return None, None, unique_id, "GenBank file is not annotated"
119        else:
120            return None, None, unique_id, "GenBank record not found"
121
122        if input_type == "protein":
123
124            if "protein_id" in feature_data.columns and not (start and end):
125                start = feature_data[feature_data["protein_id"] == protein_id]["start"].iloc[0]
126                end = feature_data[feature_data["protein_id"] == protein_id]["end"].iloc[0]
127                strand = feature_data[feature_data["protein_id"] == protein_id]["strand"].iloc[0]
128            start, end = int(start), int(end)
129
130            if mode == "win_nts":
131                start_win = start - window
132                end_win = end + window
133                start_win = max(start_win, 0)
134                end_win = min(end_win, len(record.sequence))
135                subgff = feature_data.query("start>=@start_win & end<=@end_win")
136
137            elif mode == "win_ngen":
138                subgff = feature_data.reset_index(drop=True)
139                prot_index = subgff[subgff["protein_id"] == protein_id].index.to_list()[0]
140                subgff = subgff[prot_index - window : prot_index + window]
141                start_win = subgff["start"].min()
142                end_win = subgff["end"].max()
143
144        elif input_type == "nucleotide":
145
146            if nucleotide_id and (start and end) and window:
147                start = int(start)
148                end = int(end)
149                if mode == "win_nts":
150                    start_win = start - window
151                    end_win = end + window
152                    if not strand:
153                        strand = "+" if end > start else "-"
154                    subgff = feature_data.query("start>=@start_win & end<=@end_win")
155
156                elif mode == "win_ngen":
157                    subgff = feature_data.reset_index(drop=True)
158                    start_index = subgff[subgff["start"] >= start].index.to_list()[0]
159                    end_index = subgff[subgff["end"] <= end].index.to_list()[-1]
160                    subgff = subgff[start_index - window : end_index + window]
161                    start_win = subgff["start"].min()
162                    end_win = subgff["end"].max()
163                    if not strand:
164                        strand = "+" if end > start else "-"
165
166            elif not window and nucleotide_id and (start and end):
167                start, end = int(start), int(end)
168                if not strand:
169                    strand = "-" if end < start else "+"
170                subgff = feature_data.query(
171                    "seqid == @nucleotide_id & type =='CDS' & start>=@start & end<=@end"
172                )
173                start_win = subgff["start"].min()
174                end_win = subgff["end"].max()
175
176            elif not (start and end):
177                start = end = 0
178                subgff = feature_data
179                start_win = subgff["start"].min()
180                end_win = subgff["end"].max()
181                strand = "+"
182
183        start_win = max(start_win, 0)
184        end_win = min(end_win, len(record.sequence))
185        subgff["id"] = subgff["protein_id"]
186        header = [
187            "seqid",
188            "source",
189            "type",
190            "start",
191            "end",
192            "score",
193            "strand",
194            "phase",
195            "protein_id",
196            "id",
197            "sequence",
198        ]
199        if "product" in subgff.columns:
200            header.append("product")
201        else:
202            subgff["product"] = None
203        subgff = subgff[header]
204        neighborhood = {
205            "seqid": record_version,
206            "start_target": start,
207            "end_target": end,
208            "start_win": start_win,
209            "end_win": end_win,
210            "strand_win": strand,
211            "sequence": record.sequence[start_win:end_win].decode("utf-8"),
212            "unique_id": unique_id,
213        }
214        if sorfs:
215            orf_finder = pyrodigal.GeneFinder(meta=True, min_gene=10, max_overlap=9)
216            new_genes = []
217
218            for i, pred in enumerate(
219                orf_finder.find_genes(record.sequence[start_win:end_win].decode("utf-8"))
220            ):
221                overlap_flag = False
222                for row in subgff.iter_rows(named=True):
223                    overlap_percentage = calculate_overlap(
224                        row["start"], row["end"], pred.begin + start_win, pred.end + start_win
225                    )
226                    if overlap_percentage > 10:
227                        overlap_flag = True
228                        break
229                if not overlap_flag:
230                    new_genes.append(
231                        {
232                            "seqid": nucleotide_id,
233                            "source": "pyrodigal",
234                            "type": "CDS",
235                            "start": pred.begin + start_win,
236                            "end": pred.end + start_win,
237                            "score": pred.score,
238                            "strand": "-" if pred.strand == "-1" else "+",
239                            "phase": ".",
240                            "protein_id": f"sORF_{unique_id}_{i}",
241                            "id": f"sORF_{unique_id}_{i}",
242                            "sequence": pred.translate(),
243                        }
244                    )
245
246            if new_genes:
247                new_genes_df = pl.DataFrame(new_genes)
248                subgff = pl.concat([subgff, new_genes_df], how="vertical")
249
250            new_genes = []
251            seq = record.sequence[start_win:end_win].decode("utf-8").upper()
252            for i, (start, stop, strand, _description) in enumerate(
253                orfipy_core.orfs(seq, minlen=100, maxlen=1000, partial3=False, between_stops=False)
254            ):
255                overlap_flag = False
256                for row in subgff.iter_rows(named=True):
257                    overlap_percentage = calculate_overlap(
258                        row["start"], row["end"], start + start_win, stop + start_win
259                    )
260                    if overlap_percentage > 0:
261                        overlap_flag = True
262                        break
263
264                if not overlap_flag:
265                    orf_sequence = Seq(record.sequence[start_win:end_win][start:stop])
266                    if strand == "-":
267                        orf_sequence = orf_sequence.reverse_complement()
268                    protein_sequence = orf_sequence.translate(table=11, to_stop=True)
269
270                    new_genes.append(
271                        {
272                            "seqid": nucleotide_id,
273                            "source": "orfipy",
274                            "type": "CDS",
275                            "start": start + start_win,
276                            "end": stop + start_win,
277                            "score": ".",
278                            "strand": "-" if strand == "-" else "+",
279                            "phase": ".",
280                            "protein_id": f"sORF_orfipy_{unique_id}_{i}",
281                            "id": f"sORF_orfipy_{unique_id}_{i}",
282                            "sequence": protein_sequence,
283                        }
284                    )
285
286            if new_genes:
287                new_genes_df = pl.DataFrame(new_genes)
288                subgff = pl.concat([subgff, new_genes_df], how="vertical")
289
290        neighborhood = pl.DataFrame(neighborhood, index=[0])
291
292    elif gff_file and faa_file:
293        gff_header = [
294            "seqid",
295            "source",
296            "type",
297            "start",
298            "end",
299            "score",
300            "strand",
301            "phase",
302            "attributes",
303        ]
304        if not Path(gff_file).exists():
305            return (
306                None,
307                None,
308                unique_id,
309                "GFF file not found",
310            )
311        if not Path(faa_file).exists():
312            return None, None, unique_id, "FAA file not found"
313
314        try:
315            gff = pl.read_csv(
316                filepath_or_buffer=gff_file,
317                separator="\t",
318                comment="#",
319                names=gff_header,
320                engine="c",
321            )
322        except Exception:
323            return None, None, unique_id, "Failed to read GFF file"
324        try:
325            faa_df = read_fasta(faa_file)
326        except Exception:
327            return None, None, unique_id, "Failed to read FAA file"
328
329        if input_type == "protein":
330            if protein_id and window:
331                query = f"={protein_id}"
332                start = gff[gff["attributes"].str.contains(query)]["start"].to_list()
333                if not start:
334                    return None, None, unique_id, "Protein not found in GFF file"
335                else:
336                    start = start[0]
337                end = gff[gff["attributes"].str.contains(query)]["end"].to_list()[0]
338                strand = gff[gff["attributes"].str.contains(query)]["strand"].to_list()[0]
339                nucleotide_id = gff[gff["attributes"].str.contains(query)]["seqid"].to_list()[0]
340
341                if mode == "win_nts":
342                    start_win = start - window
343                    end_win = end + window
344                    start_win = max(start_win, 0)
345                    gff_nuc = gff.query("seqid == @nucleotide_id")
346                    end_win = min(end_win, gff_nuc["end"].max())
347                    subgff = gff.query(
348                        "seqid == @nucleotide_id & type =='CDS' & start>=@start_win & end<=@end_win"
349                    )
350                elif mode == "win_ngen":
351                    subgff = gff.query("seqid == @nucleotide_id & type =='CDS'").reset_index(
352                        drop=True
353                    )
354                    prot_index = subgff[subgff["attributes"].str.contains(query)].index.to_list()[0]
355                    subgff = subgff[prot_index - window : prot_index + window]
356                    start_win = subgff["start"].min()
357                    end_win = subgff["end"].max()
358
359                if strand == "-":
360                    pass
361
362        elif input_type == "nucleotide":
363            if nucleotide_id and (start and end) and window:
364                start, end = int(start), int(end)
365                if not strand:
366                    strand = "-" if end < start else "+"
367
368                if mode == "win_nts":
369                    start_win = start - window
370                    end_win = end + window
371                    start_win = max(start_win, 0)
372                    subgff = gff.query(
373                        "seqid == @nucleotide_id & type =='CDS' & start>=@start_win & end<=@end_win"
374                    )
375                elif mode == "win_ngen":
376                    subgff = gff.query("seqid == @nucleotide_id & type =='CDS'").reset_index(
377                        drop=True
378                    )
379                    prot_index = subgff[subgff["attributes"].str.contains(query)].index.to_list()[0]
380                    subgff = subgff[prot_index - window : prot_index + window]
381                    start_win = subgff["start"].min()
382                    end_win = subgff["end"].max()
383
384                if strand == "-":
385                    pass
386
387            elif not window and nucleotide_id and (start and end):
388                start, end = int(start), int(end)
389                if not strand:
390                    strand = "-" if end < start else "+"
391                subgff = gff.query(
392                    "seqid == @nucleotide_id & type =='CDS' & start>=@start & end<=@end"
393                )
394                start_win = start
395                end_win = end
396                if strand == "-":
397                    pass
398
399            elif (
400                not window
401                and nucleotide_id
402                and not (start and end)
403                or window
404                and nucleotide_id
405                and not (start and end)
406            ):
407                start = end = 0
408                subgff = gff.query("seqid == @nucleotide_id & type =='CDS'")
409                start_win = 0
410                end_win = subgff["end"].max()
411                strand = "+"
412
413            else:
414                return (
415                    None,
416                    None,
417                    unique_id,
418                    "Invalid usage of parameters",
419                )
420
421        subgff = unwrap_attributes(subgff)
422        key_join = "protein_id" if "protein_id" in subgff.columns else "ID"
423
424        subgff = subgff.join(
425            faa_df[["id", "sequence"]], left_on=key_join, right_on="id", how="left"
426        )
427
428        if fna_file:
429            fna_df = read_fasta(fna_file)
430            nucleotide_id = str(nucleotide_id)
431            faa_df["id"] = faa_df["id"].astype(str)
432            if nucleotide_id in fna_df["id"].to_list():
433                sequence = fna_df[fna_df["id"] == nucleotide_id]["sequence"].to_list()[0]
434                end_win = end + window
435                end_win = min(end_win, len(sequence))
436                if sorfs:
437                    orf_finder = pyrodigal.GeneFinder(meta=True)
438                    new_genes = []
439
440                    for i, pred in enumerate(orf_finder.find_genes(sequence.encode())):
441                        overlap_flag = False
442                        for row in subgff.iter_rows(named=True):
443                            overlap_percentage = calculate_overlap(
444                                row["start"], row["end"], pred.begin, pred.end
445                            )
446                            if overlap_percentage > 5:
447                                overlap_flag = True
448                                break
449
450                        if not overlap_flag:
451                            new_genes.append(
452                                {
453                                    "seqid": nucleotide_id,
454                                    "source": "pyrodigal",
455                                    "type": "CDS",
456                                    "start": pred.begin + start_win,
457                                    "end": pred.end + start_win,
458                                    "score": pred.score,
459                                    "strand": "-" if pred.strand == "-1" else "+",
460                                    "phase": ".",
461                                    key_join: f"{key_join}=sORF_{unique_id}_{i}",
462                                    "sequence": pred.translate(),
463                                }
464                            )
465
466                    if new_genes:
467                        new_genes_df = pl.DataFrame(new_genes)
468                        subgff = pl.concat([subgff, new_genes_df], how="vertical")
469        else:
470            sequence = None
471        neighborhood = {
472            "seqid": nucleotide_id,
473            "start_target": start,
474            "end_target": end,
475            "start_win": start_win,
476            "end_win": end_win,
477            "strand_win": strand,
478            "sequence": sequence[start_win:end_win],
479            "unique_id": unique_id,
480        }
481        neighborhood = pl.DataFrame(neighborhood, index=[0])
482
483    subgff["target_prot"] = protein_id
484    subgff["target_nuc"] = nucleotide_id
485    subgff["unique_id"] = str(unique_id)
486
487    try:
488        if isinstance(subgff, pl.DataFrame):
489            if "id" not in subgff.columns:
490                for cand in ("protein_id", "ID", "gene_id"):
491                    if cand in subgff.columns:
492                        subgff["id"] = subgff[cand]
493                        break
494
495            for redundant in ("ID", "gene_id"):
496                if redundant in subgff.columns and redundant != "id":
497                    with contextlib.suppress(Exception):
498                        subgff.drop([redundant])
499
500            if "id" in subgff.columns:
501                with contextlib.suppress(Exception):
502                    subgff["id"] = subgff["id"].astype(str)
503    except Exception:
504        pass
505
506    if "product" not in subgff.columns:
507        subgff["product"] = None
508
509    return subgff, neighborhood, unique_id
def calculate_overlap(start1, end1, start2, end2):
15def calculate_overlap(start1, end1, start2, end2):
16    """Calculate overlap percentage between two intervals."""
17    overlap_start = max(start1, start2)
18    overlap_end = min(end1, end2)
19    if overlap_start < overlap_end:
20        overlap_length = overlap_end - overlap_start
21        total_length = max(end1 - start1, end2 - start2)
22        return (overlap_length / total_length) * 100
23    return 0

Calculate overlap percentage between two intervals.

def process_features(features, record_version):
26def process_features(features, record_version):
27    """Extract feature data from GenBank features."""
28    feature_data = []
29    for feature in features:
30        if feature.type == "CDS":
31            feature_dict = {
32                "seqid": record_version,
33                "source": "GenBank",
34                "type": "CDS",
35                "start": int(feature.location.start),
36                "end": int(feature.location.end),
37                "strand": "+" if feature.location.strand == 1 else "-",
38                "phase": ".",
39            }
40            # Extract qualifiers
41            for qual in ["protein_id", "product", "gene"]:
42                if qual in feature.qualifiers:
43                    feature_dict[qual] = feature.qualifiers[qual][0]
44            feature_data.append(feature_dict)
45    return feature_data

Extract feature data from GenBank features.

def unwrap_attributes(gff_df):
48def unwrap_attributes(gff_df):
49    """Unwrap attributes column in GFF DataFrame."""
50    if "attributes" in gff_df.columns:
51        # Parse attributes into separate columns
52        attributes_list = []
53        for attr in gff_df["attributes"]:
54            attr_dict = {}
55            if attr and isinstance(attr, str):
56                for pair in attr.split(";"):
57                    if "=" in pair:
58                        key, value = pair.split("=", 1)
59                        attr_dict[key.strip()] = value.strip()
60            attributes_list.append(attr_dict)
61        attributes_df = pl.DataFrame(attributes_list)
62        gff_df = pl.concat([gff_df.drop("attributes"), attributes_df], how="horizontal")
63    return gff_df

Unwrap attributes column in GFF DataFrame.

def extract_neighborhood( protein_id, nucleotide_id, gbf_file, gff_file, faa_file, fna_file, mode='win_nts', window=None, strand=None, start=None, end=None, unique_id=None, input_type=None, sorfs=None):
 66def extract_neighborhood(
 67    protein_id,
 68    nucleotide_id,
 69    gbf_file,
 70    gff_file,
 71    faa_file,
 72    fna_file,
 73    mode="win_nts",
 74    window=None,
 75    strand=None,
 76    start=None,
 77    end=None,
 78    unique_id=None,
 79    input_type=None,
 80    sorfs=None,
 81):
 82    info(f"✔️\tExtracting neighborhood {unique_id}")
 83    neighborhood = {}
 84    if gbf_file:
 85
 86        record_found = False
 87
 88        if not Path(gbf_file).exists():
 89            return (
 90                None,
 91                None,
 92                unique_id,
 93                "GenBank file not found",
 94            )
 95
 96        try:
 97            record_iter = gb_io.iter(gbf_file)
 98        except Exception:
 99            return None, None, unique_id, "gb_io failed to open GenBank iterator"
100
101        if nucleotide_id:
102            record_found = False
103            for record in record_iter:
104                record_version = getattr(record, "version", None)
105                if record_version and nucleotide_id in record_version:
106                    record_found = True
107                    break
108        if record_found:
109            feature_data = process_features(record.features, record_version)
110            feature_data = pl.DataFrame(feature_data)
111            if "attributes" in feature_data.columns:
112                attributes_df = pl.DataFrame(feature_data["attributes"].to_list())
113                attributes_df.drop(["protein_id"])
114                feature_data = pl.concat(
115                    [feature_data.drop(["attributes"]), attributes_df], how="horizontal"
116                )
117                feature_data = feature_data.rename({"translation": "sequence"})
118            else:
119                return None, None, unique_id, "GenBank file is not annotated"
120        else:
121            return None, None, unique_id, "GenBank record not found"
122
123        if input_type == "protein":
124
125            if "protein_id" in feature_data.columns and not (start and end):
126                start = feature_data[feature_data["protein_id"] == protein_id]["start"].iloc[0]
127                end = feature_data[feature_data["protein_id"] == protein_id]["end"].iloc[0]
128                strand = feature_data[feature_data["protein_id"] == protein_id]["strand"].iloc[0]
129            start, end = int(start), int(end)
130
131            if mode == "win_nts":
132                start_win = start - window
133                end_win = end + window
134                start_win = max(start_win, 0)
135                end_win = min(end_win, len(record.sequence))
136                subgff = feature_data.query("start>=@start_win & end<=@end_win")
137
138            elif mode == "win_ngen":
139                subgff = feature_data.reset_index(drop=True)
140                prot_index = subgff[subgff["protein_id"] == protein_id].index.to_list()[0]
141                subgff = subgff[prot_index - window : prot_index + window]
142                start_win = subgff["start"].min()
143                end_win = subgff["end"].max()
144
145        elif input_type == "nucleotide":
146
147            if nucleotide_id and (start and end) and window:
148                start = int(start)
149                end = int(end)
150                if mode == "win_nts":
151                    start_win = start - window
152                    end_win = end + window
153                    if not strand:
154                        strand = "+" if end > start else "-"
155                    subgff = feature_data.query("start>=@start_win & end<=@end_win")
156
157                elif mode == "win_ngen":
158                    subgff = feature_data.reset_index(drop=True)
159                    start_index = subgff[subgff["start"] >= start].index.to_list()[0]
160                    end_index = subgff[subgff["end"] <= end].index.to_list()[-1]
161                    subgff = subgff[start_index - window : end_index + window]
162                    start_win = subgff["start"].min()
163                    end_win = subgff["end"].max()
164                    if not strand:
165                        strand = "+" if end > start else "-"
166
167            elif not window and nucleotide_id and (start and end):
168                start, end = int(start), int(end)
169                if not strand:
170                    strand = "-" if end < start else "+"
171                subgff = feature_data.query(
172                    "seqid == @nucleotide_id & type =='CDS' & start>=@start & end<=@end"
173                )
174                start_win = subgff["start"].min()
175                end_win = subgff["end"].max()
176
177            elif not (start and end):
178                start = end = 0
179                subgff = feature_data
180                start_win = subgff["start"].min()
181                end_win = subgff["end"].max()
182                strand = "+"
183
184        start_win = max(start_win, 0)
185        end_win = min(end_win, len(record.sequence))
186        subgff["id"] = subgff["protein_id"]
187        header = [
188            "seqid",
189            "source",
190            "type",
191            "start",
192            "end",
193            "score",
194            "strand",
195            "phase",
196            "protein_id",
197            "id",
198            "sequence",
199        ]
200        if "product" in subgff.columns:
201            header.append("product")
202        else:
203            subgff["product"] = None
204        subgff = subgff[header]
205        neighborhood = {
206            "seqid": record_version,
207            "start_target": start,
208            "end_target": end,
209            "start_win": start_win,
210            "end_win": end_win,
211            "strand_win": strand,
212            "sequence": record.sequence[start_win:end_win].decode("utf-8"),
213            "unique_id": unique_id,
214        }
215        if sorfs:
216            orf_finder = pyrodigal.GeneFinder(meta=True, min_gene=10, max_overlap=9)
217            new_genes = []
218
219            for i, pred in enumerate(
220                orf_finder.find_genes(record.sequence[start_win:end_win].decode("utf-8"))
221            ):
222                overlap_flag = False
223                for row in subgff.iter_rows(named=True):
224                    overlap_percentage = calculate_overlap(
225                        row["start"], row["end"], pred.begin + start_win, pred.end + start_win
226                    )
227                    if overlap_percentage > 10:
228                        overlap_flag = True
229                        break
230                if not overlap_flag:
231                    new_genes.append(
232                        {
233                            "seqid": nucleotide_id,
234                            "source": "pyrodigal",
235                            "type": "CDS",
236                            "start": pred.begin + start_win,
237                            "end": pred.end + start_win,
238                            "score": pred.score,
239                            "strand": "-" if pred.strand == "-1" else "+",
240                            "phase": ".",
241                            "protein_id": f"sORF_{unique_id}_{i}",
242                            "id": f"sORF_{unique_id}_{i}",
243                            "sequence": pred.translate(),
244                        }
245                    )
246
247            if new_genes:
248                new_genes_df = pl.DataFrame(new_genes)
249                subgff = pl.concat([subgff, new_genes_df], how="vertical")
250
251            new_genes = []
252            seq = record.sequence[start_win:end_win].decode("utf-8").upper()
253            for i, (start, stop, strand, _description) in enumerate(
254                orfipy_core.orfs(seq, minlen=100, maxlen=1000, partial3=False, between_stops=False)
255            ):
256                overlap_flag = False
257                for row in subgff.iter_rows(named=True):
258                    overlap_percentage = calculate_overlap(
259                        row["start"], row["end"], start + start_win, stop + start_win
260                    )
261                    if overlap_percentage > 0:
262                        overlap_flag = True
263                        break
264
265                if not overlap_flag:
266                    orf_sequence = Seq(record.sequence[start_win:end_win][start:stop])
267                    if strand == "-":
268                        orf_sequence = orf_sequence.reverse_complement()
269                    protein_sequence = orf_sequence.translate(table=11, to_stop=True)
270
271                    new_genes.append(
272                        {
273                            "seqid": nucleotide_id,
274                            "source": "orfipy",
275                            "type": "CDS",
276                            "start": start + start_win,
277                            "end": stop + start_win,
278                            "score": ".",
279                            "strand": "-" if strand == "-" else "+",
280                            "phase": ".",
281                            "protein_id": f"sORF_orfipy_{unique_id}_{i}",
282                            "id": f"sORF_orfipy_{unique_id}_{i}",
283                            "sequence": protein_sequence,
284                        }
285                    )
286
287            if new_genes:
288                new_genes_df = pl.DataFrame(new_genes)
289                subgff = pl.concat([subgff, new_genes_df], how="vertical")
290
291        neighborhood = pl.DataFrame(neighborhood, index=[0])
292
293    elif gff_file and faa_file:
294        gff_header = [
295            "seqid",
296            "source",
297            "type",
298            "start",
299            "end",
300            "score",
301            "strand",
302            "phase",
303            "attributes",
304        ]
305        if not Path(gff_file).exists():
306            return (
307                None,
308                None,
309                unique_id,
310                "GFF file not found",
311            )
312        if not Path(faa_file).exists():
313            return None, None, unique_id, "FAA file not found"
314
315        try:
316            gff = pl.read_csv(
317                filepath_or_buffer=gff_file,
318                separator="\t",
319                comment="#",
320                names=gff_header,
321                engine="c",
322            )
323        except Exception:
324            return None, None, unique_id, "Failed to read GFF file"
325        try:
326            faa_df = read_fasta(faa_file)
327        except Exception:
328            return None, None, unique_id, "Failed to read FAA file"
329
330        if input_type == "protein":
331            if protein_id and window:
332                query = f"={protein_id}"
333                start = gff[gff["attributes"].str.contains(query)]["start"].to_list()
334                if not start:
335                    return None, None, unique_id, "Protein not found in GFF file"
336                else:
337                    start = start[0]
338                end = gff[gff["attributes"].str.contains(query)]["end"].to_list()[0]
339                strand = gff[gff["attributes"].str.contains(query)]["strand"].to_list()[0]
340                nucleotide_id = gff[gff["attributes"].str.contains(query)]["seqid"].to_list()[0]
341
342                if mode == "win_nts":
343                    start_win = start - window
344                    end_win = end + window
345                    start_win = max(start_win, 0)
346                    gff_nuc = gff.query("seqid == @nucleotide_id")
347                    end_win = min(end_win, gff_nuc["end"].max())
348                    subgff = gff.query(
349                        "seqid == @nucleotide_id & type =='CDS' & start>=@start_win & end<=@end_win"
350                    )
351                elif mode == "win_ngen":
352                    subgff = gff.query("seqid == @nucleotide_id & type =='CDS'").reset_index(
353                        drop=True
354                    )
355                    prot_index = subgff[subgff["attributes"].str.contains(query)].index.to_list()[0]
356                    subgff = subgff[prot_index - window : prot_index + window]
357                    start_win = subgff["start"].min()
358                    end_win = subgff["end"].max()
359
360                if strand == "-":
361                    pass
362
363        elif input_type == "nucleotide":
364            if nucleotide_id and (start and end) and window:
365                start, end = int(start), int(end)
366                if not strand:
367                    strand = "-" if end < start else "+"
368
369                if mode == "win_nts":
370                    start_win = start - window
371                    end_win = end + window
372                    start_win = max(start_win, 0)
373                    subgff = gff.query(
374                        "seqid == @nucleotide_id & type =='CDS' & start>=@start_win & end<=@end_win"
375                    )
376                elif mode == "win_ngen":
377                    subgff = gff.query("seqid == @nucleotide_id & type =='CDS'").reset_index(
378                        drop=True
379                    )
380                    prot_index = subgff[subgff["attributes"].str.contains(query)].index.to_list()[0]
381                    subgff = subgff[prot_index - window : prot_index + window]
382                    start_win = subgff["start"].min()
383                    end_win = subgff["end"].max()
384
385                if strand == "-":
386                    pass
387
388            elif not window and nucleotide_id and (start and end):
389                start, end = int(start), int(end)
390                if not strand:
391                    strand = "-" if end < start else "+"
392                subgff = gff.query(
393                    "seqid == @nucleotide_id & type =='CDS' & start>=@start & end<=@end"
394                )
395                start_win = start
396                end_win = end
397                if strand == "-":
398                    pass
399
400            elif (
401                not window
402                and nucleotide_id
403                and not (start and end)
404                or window
405                and nucleotide_id
406                and not (start and end)
407            ):
408                start = end = 0
409                subgff = gff.query("seqid == @nucleotide_id & type =='CDS'")
410                start_win = 0
411                end_win = subgff["end"].max()
412                strand = "+"
413
414            else:
415                return (
416                    None,
417                    None,
418                    unique_id,
419                    "Invalid usage of parameters",
420                )
421
422        subgff = unwrap_attributes(subgff)
423        key_join = "protein_id" if "protein_id" in subgff.columns else "ID"
424
425        subgff = subgff.join(
426            faa_df[["id", "sequence"]], left_on=key_join, right_on="id", how="left"
427        )
428
429        if fna_file:
430            fna_df = read_fasta(fna_file)
431            nucleotide_id = str(nucleotide_id)
432            faa_df["id"] = faa_df["id"].astype(str)
433            if nucleotide_id in fna_df["id"].to_list():
434                sequence = fna_df[fna_df["id"] == nucleotide_id]["sequence"].to_list()[0]
435                end_win = end + window
436                end_win = min(end_win, len(sequence))
437                if sorfs:
438                    orf_finder = pyrodigal.GeneFinder(meta=True)
439                    new_genes = []
440
441                    for i, pred in enumerate(orf_finder.find_genes(sequence.encode())):
442                        overlap_flag = False
443                        for row in subgff.iter_rows(named=True):
444                            overlap_percentage = calculate_overlap(
445                                row["start"], row["end"], pred.begin, pred.end
446                            )
447                            if overlap_percentage > 5:
448                                overlap_flag = True
449                                break
450
451                        if not overlap_flag:
452                            new_genes.append(
453                                {
454                                    "seqid": nucleotide_id,
455                                    "source": "pyrodigal",
456                                    "type": "CDS",
457                                    "start": pred.begin + start_win,
458                                    "end": pred.end + start_win,
459                                    "score": pred.score,
460                                    "strand": "-" if pred.strand == "-1" else "+",
461                                    "phase": ".",
462                                    key_join: f"{key_join}=sORF_{unique_id}_{i}",
463                                    "sequence": pred.translate(),
464                                }
465                            )
466
467                    if new_genes:
468                        new_genes_df = pl.DataFrame(new_genes)
469                        subgff = pl.concat([subgff, new_genes_df], how="vertical")
470        else:
471            sequence = None
472        neighborhood = {
473            "seqid": nucleotide_id,
474            "start_target": start,
475            "end_target": end,
476            "start_win": start_win,
477            "end_win": end_win,
478            "strand_win": strand,
479            "sequence": sequence[start_win:end_win],
480            "unique_id": unique_id,
481        }
482        neighborhood = pl.DataFrame(neighborhood, index=[0])
483
484    subgff["target_prot"] = protein_id
485    subgff["target_nuc"] = nucleotide_id
486    subgff["unique_id"] = str(unique_id)
487
488    try:
489        if isinstance(subgff, pl.DataFrame):
490            if "id" not in subgff.columns:
491                for cand in ("protein_id", "ID", "gene_id"):
492                    if cand in subgff.columns:
493                        subgff["id"] = subgff[cand]
494                        break
495
496            for redundant in ("ID", "gene_id"):
497                if redundant in subgff.columns and redundant != "id":
498                    with contextlib.suppress(Exception):
499                        subgff.drop([redundant])
500
501            if "id" in subgff.columns:
502                with contextlib.suppress(Exception):
503                    subgff["id"] = subgff["id"].astype(str)
504    except Exception:
505        pass
506
507    if "product" not in subgff.columns:
508        subgff["product"] = None
509
510    return subgff, neighborhood, unique_id