hoodini.utils.downloader

  1import contextlib
  2import re
  3import subprocess
  4import sys
  5from pathlib import Path
  6
  7import requests
  8from rich.progress import (
  9    BarColumn,
 10    Progress,
 11    TextColumn,
 12    TimeRemainingColumn,
 13    TransferSpeedColumn,
 14)
 15
 16
 17def download_with_aria2c(
 18    urls,
 19    dest_dir,
 20    connections=16,
 21    split=16,
 22    show_progress=True,
 23    show_aria2c_output=False,
 24    out_names=None,
 25    num_threads: int = 0,
 26):
 27    """
 28    Download URLs to dest_dir using a single aria2c subprocess with Rich progress.
 29    Returns a list of downloaded file paths.
 30    """
 31    dest_dir = Path(dest_dir)
 32    dest_dir.mkdir(parents=True, exist_ok=True)
 33    results = []
 34
 35    PERCENT_RE = re.compile(r"(\d+(?:\.\d+)?)%")
 36    SIZE_RE = re.compile(r"([\d.]+)\s*([KMGTP]?i?B)/([\d.]+)\s*([KMGTP]?i?B)")
 37
 38    UNIT = {
 39        "B": 1,
 40        "KB": 1000,
 41        "MB": 1000**2,
 42        "GB": 1000**3,
 43        "TB": 1000**4,
 44        "PB": 1000**5,
 45        "KiB": 1024,
 46        "MiB": 1024**2,
 47        "GiB": 1024**3,
 48        "TiB": 1024**4,
 49        "PiB": 1024**5,
 50    }
 51
 52    def to_bytes(num_str: str, unit_str: str) -> float:
 53        return float(num_str) * UNIT.get(unit_str.strip(), 1)
 54
 55    def fmt_bytes(n: float) -> str:
 56        for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]:
 57            if n < 1024 or unit == "PiB":
 58                return f"{n:.2f} {unit}"
 59            n /= 1024
 60
 61    from urllib.parse import unquote, urlparse
 62
 63    out_name_list = []
 64    for idx, url in enumerate(urls):
 65        out_name = None
 66        if out_names and idx < len(out_names) and out_names[idx]:
 67            out_name = out_names[idx]
 68        if not out_name:
 69            parsed = urlparse(url)
 70            out_name = unquote(Path(parsed.path).name)
 71            if not out_name:
 72                try:
 73                    r = requests.head(url, allow_redirects=True, timeout=5)
 74                    cd = r.headers.get("content-disposition")
 75                    if cd:
 76                        m = re.search(r"filename\*?=(?:UTF-8'')?\"?([^\";]+)\"?", cd)
 77                        if m:
 78                            out_name = m.group(1)
 79                except Exception:
 80                    out_name = ""
 81        if not out_name:
 82            out_name = f"downloaded_file_{idx}"
 83        out_name_list.append(out_name)
 84
 85    input_lines = []
 86    for url, out_name in zip(urls, out_name_list):
 87        input_lines.append(f"{url}\n  out={out_name}")
 88    from tempfile import NamedTemporaryFile
 89
 90    with NamedTemporaryFile("w", delete=False) as f:
 91        for line in input_lines:
 92            f.write(line + "\n")
 93        input_file = f.name
 94
 95    max_conn = str(num_threads or 16)
 96    cmd = [
 97        "aria2c",
 98        "--summary-interval=1",
 99        "--enable-color=false",
100        "--max-connection-per-server",
101        max_conn,
102        "--split",
103        max_conn,
104        "-k",
105        "1M",
106        "-d",
107        str(dest_dir),
108        "-i",
109        input_file,
110    ]
111
112    try:
113        if show_progress:
114            with Progress(
115                TextColumn("[bold blue]{task.description}"),
116                BarColumn(),
117                TextColumn("[progress.percentage]{task.percentage:>5.1f}%"),
118                TextColumn("• {task.fields[bytes_text]}"),
119                TransferSpeedColumn(),
120                TimeRemainingColumn(),
121                refresh_per_second=10,
122                transient=True,
123            ) as progress:
124                task = progress.add_task("aria2c batch", total=None, bytes_text="…")
125
126            proc = subprocess.Popen(
127                cmd,
128                stdout=subprocess.PIPE,
129                stderr=subprocess.STDOUT,
130                bufsize=0,
131            )
132
133            buf = b""
134            total_bytes = None
135
136            while True:
137                chunk = proc.stdout.read(1024)
138                if not chunk:
139                    if proc.poll() is not None:
140                        break
141                    continue
142
143                buf += chunk
144                parts = re.split(rb"[\r\n]", buf)
145                for part in parts[:-1]:
146                    line = part.decode("utf-8", "ignore")
147                    downloaded, total, pct = None, None, None
148                    m = SIZE_RE.search(line)
149                    if m:
150                        d_num, d_unit, t_num, t_unit = m.groups()
151                        try:
152                            downloaded = to_bytes(d_num, d_unit)
153                            total = to_bytes(t_num, t_unit)
154                        except Exception:
155                            downloaded = total = None
156                    p = PERCENT_RE.search(line)
157                    if p:
158                        try:
159                            pct = float(p.group(1))
160                        except Exception:
161                            pct = None
162
163                    if show_aria2c_output and (
164                        downloaded is not None or total is not None or pct is not None
165                    ):
166                        sys.stdout.write(line + "\n")
167                        sys.stdout.flush()
168
169                    if total is not None and downloaded is not None:
170                        try:
171                            total_bytes = int(total)
172                            downloaded_bytes = int(downloaded)
173                        except Exception:
174                            total_bytes = int(total or 0)
175                            downloaded_bytes = int(downloaded or 0)
176                        progress.update(
177                            task,
178                            total=total_bytes,
179                            completed=downloaded_bytes,
180                            bytes_text=f"{fmt_bytes(downloaded_bytes)} / {fmt_bytes(total_bytes)}",
181                        )
182                    elif pct is not None and total_bytes is None:
183                        progress.update(task, total=100.0, completed=pct, bytes_text=f"{pct:.1f}%")
184
185                buf = parts[-1]
186
187            if total_bytes:
188                progress.update(
189                    task,
190                    total=total_bytes,
191                    completed=total_bytes,
192                    bytes_text=f"{fmt_bytes(total_bytes)} / {fmt_bytes(total_bytes)}",
193                )
194            else:
195                progress.update(task, total=100.0, completed=100.0, bytes_text="100%")
196
197            code = proc.wait()
198            if code != 0:
199                raise SystemExit(f"aria2c exited with non-zero status: {code}")
200        else:
201            subprocess.run(cmd, check=True)
202    finally:
203        with contextlib.suppress(Exception):
204            Path(input_file).unlink()
205
206    for out_name in out_name_list:
207        candidate = dest_dir / out_name
208        if candidate.exists():
209            results.append(str(candidate))
210    return results
def download_with_aria2c( urls, dest_dir, connections=16, split=16, show_progress=True, show_aria2c_output=False, out_names=None, num_threads: int = 0):
 18def download_with_aria2c(
 19    urls,
 20    dest_dir,
 21    connections=16,
 22    split=16,
 23    show_progress=True,
 24    show_aria2c_output=False,
 25    out_names=None,
 26    num_threads: int = 0,
 27):
 28    """
 29    Download URLs to dest_dir using a single aria2c subprocess with Rich progress.
 30    Returns a list of downloaded file paths.
 31    """
 32    dest_dir = Path(dest_dir)
 33    dest_dir.mkdir(parents=True, exist_ok=True)
 34    results = []
 35
 36    PERCENT_RE = re.compile(r"(\d+(?:\.\d+)?)%")
 37    SIZE_RE = re.compile(r"([\d.]+)\s*([KMGTP]?i?B)/([\d.]+)\s*([KMGTP]?i?B)")
 38
 39    UNIT = {
 40        "B": 1,
 41        "KB": 1000,
 42        "MB": 1000**2,
 43        "GB": 1000**3,
 44        "TB": 1000**4,
 45        "PB": 1000**5,
 46        "KiB": 1024,
 47        "MiB": 1024**2,
 48        "GiB": 1024**3,
 49        "TiB": 1024**4,
 50        "PiB": 1024**5,
 51    }
 52
 53    def to_bytes(num_str: str, unit_str: str) -> float:
 54        return float(num_str) * UNIT.get(unit_str.strip(), 1)
 55
 56    def fmt_bytes(n: float) -> str:
 57        for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]:
 58            if n < 1024 or unit == "PiB":
 59                return f"{n:.2f} {unit}"
 60            n /= 1024
 61
 62    from urllib.parse import unquote, urlparse
 63
 64    out_name_list = []
 65    for idx, url in enumerate(urls):
 66        out_name = None
 67        if out_names and idx < len(out_names) and out_names[idx]:
 68            out_name = out_names[idx]
 69        if not out_name:
 70            parsed = urlparse(url)
 71            out_name = unquote(Path(parsed.path).name)
 72            if not out_name:
 73                try:
 74                    r = requests.head(url, allow_redirects=True, timeout=5)
 75                    cd = r.headers.get("content-disposition")
 76                    if cd:
 77                        m = re.search(r"filename\*?=(?:UTF-8'')?\"?([^\";]+)\"?", cd)
 78                        if m:
 79                            out_name = m.group(1)
 80                except Exception:
 81                    out_name = ""
 82        if not out_name:
 83            out_name = f"downloaded_file_{idx}"
 84        out_name_list.append(out_name)
 85
 86    input_lines = []
 87    for url, out_name in zip(urls, out_name_list):
 88        input_lines.append(f"{url}\n  out={out_name}")
 89    from tempfile import NamedTemporaryFile
 90
 91    with NamedTemporaryFile("w", delete=False) as f:
 92        for line in input_lines:
 93            f.write(line + "\n")
 94        input_file = f.name
 95
 96    max_conn = str(num_threads or 16)
 97    cmd = [
 98        "aria2c",
 99        "--summary-interval=1",
100        "--enable-color=false",
101        "--max-connection-per-server",
102        max_conn,
103        "--split",
104        max_conn,
105        "-k",
106        "1M",
107        "-d",
108        str(dest_dir),
109        "-i",
110        input_file,
111    ]
112
113    try:
114        if show_progress:
115            with Progress(
116                TextColumn("[bold blue]{task.description}"),
117                BarColumn(),
118                TextColumn("[progress.percentage]{task.percentage:>5.1f}%"),
119                TextColumn("• {task.fields[bytes_text]}"),
120                TransferSpeedColumn(),
121                TimeRemainingColumn(),
122                refresh_per_second=10,
123                transient=True,
124            ) as progress:
125                task = progress.add_task("aria2c batch", total=None, bytes_text="…")
126
127            proc = subprocess.Popen(
128                cmd,
129                stdout=subprocess.PIPE,
130                stderr=subprocess.STDOUT,
131                bufsize=0,
132            )
133
134            buf = b""
135            total_bytes = None
136
137            while True:
138                chunk = proc.stdout.read(1024)
139                if not chunk:
140                    if proc.poll() is not None:
141                        break
142                    continue
143
144                buf += chunk
145                parts = re.split(rb"[\r\n]", buf)
146                for part in parts[:-1]:
147                    line = part.decode("utf-8", "ignore")
148                    downloaded, total, pct = None, None, None
149                    m = SIZE_RE.search(line)
150                    if m:
151                        d_num, d_unit, t_num, t_unit = m.groups()
152                        try:
153                            downloaded = to_bytes(d_num, d_unit)
154                            total = to_bytes(t_num, t_unit)
155                        except Exception:
156                            downloaded = total = None
157                    p = PERCENT_RE.search(line)
158                    if p:
159                        try:
160                            pct = float(p.group(1))
161                        except Exception:
162                            pct = None
163
164                    if show_aria2c_output and (
165                        downloaded is not None or total is not None or pct is not None
166                    ):
167                        sys.stdout.write(line + "\n")
168                        sys.stdout.flush()
169
170                    if total is not None and downloaded is not None:
171                        try:
172                            total_bytes = int(total)
173                            downloaded_bytes = int(downloaded)
174                        except Exception:
175                            total_bytes = int(total or 0)
176                            downloaded_bytes = int(downloaded or 0)
177                        progress.update(
178                            task,
179                            total=total_bytes,
180                            completed=downloaded_bytes,
181                            bytes_text=f"{fmt_bytes(downloaded_bytes)} / {fmt_bytes(total_bytes)}",
182                        )
183                    elif pct is not None and total_bytes is None:
184                        progress.update(task, total=100.0, completed=pct, bytes_text=f"{pct:.1f}%")
185
186                buf = parts[-1]
187
188            if total_bytes:
189                progress.update(
190                    task,
191                    total=total_bytes,
192                    completed=total_bytes,
193                    bytes_text=f"{fmt_bytes(total_bytes)} / {fmt_bytes(total_bytes)}",
194                )
195            else:
196                progress.update(task, total=100.0, completed=100.0, bytes_text="100%")
197
198            code = proc.wait()
199            if code != 0:
200                raise SystemExit(f"aria2c exited with non-zero status: {code}")
201        else:
202            subprocess.run(cmd, check=True)
203    finally:
204        with contextlib.suppress(Exception):
205            Path(input_file).unlink()
206
207    for out_name in out_name_list:
208        candidate = dest_dir / out_name
209        if candidate.exists():
210            results.append(str(candidate))
211    return results

Download URLs to dest_dir using a single aria2c subprocess with Rich progress. Returns a list of downloaded file paths.