hoodini.utils.downloader
1import contextlib 2import re 3import subprocess 4import sys 5from pathlib import Path 6 7import requests 8from rich.progress import ( 9 BarColumn, 10 Progress, 11 TextColumn, 12 TimeRemainingColumn, 13 TransferSpeedColumn, 14) 15 16 17def download_with_aria2c( 18 urls, 19 dest_dir, 20 connections=16, 21 split=16, 22 show_progress=True, 23 show_aria2c_output=False, 24 out_names=None, 25 num_threads: int = 0, 26): 27 """ 28 Download URLs to dest_dir using a single aria2c subprocess with Rich progress. 29 Returns a list of downloaded file paths. 30 """ 31 dest_dir = Path(dest_dir) 32 dest_dir.mkdir(parents=True, exist_ok=True) 33 results = [] 34 35 PERCENT_RE = re.compile(r"(\d+(?:\.\d+)?)%") 36 SIZE_RE = re.compile(r"([\d.]+)\s*([KMGTP]?i?B)/([\d.]+)\s*([KMGTP]?i?B)") 37 38 UNIT = { 39 "B": 1, 40 "KB": 1000, 41 "MB": 1000**2, 42 "GB": 1000**3, 43 "TB": 1000**4, 44 "PB": 1000**5, 45 "KiB": 1024, 46 "MiB": 1024**2, 47 "GiB": 1024**3, 48 "TiB": 1024**4, 49 "PiB": 1024**5, 50 } 51 52 def to_bytes(num_str: str, unit_str: str) -> float: 53 return float(num_str) * UNIT.get(unit_str.strip(), 1) 54 55 def fmt_bytes(n: float) -> str: 56 for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]: 57 if n < 1024 or unit == "PiB": 58 return f"{n:.2f} {unit}" 59 n /= 1024 60 61 from urllib.parse import unquote, urlparse 62 63 out_name_list = [] 64 for idx, url in enumerate(urls): 65 out_name = None 66 if out_names and idx < len(out_names) and out_names[idx]: 67 out_name = out_names[idx] 68 if not out_name: 69 parsed = urlparse(url) 70 out_name = unquote(Path(parsed.path).name) 71 if not out_name: 72 try: 73 r = requests.head(url, allow_redirects=True, timeout=5) 74 cd = r.headers.get("content-disposition") 75 if cd: 76 m = re.search(r"filename\*?=(?:UTF-8'')?\"?([^\";]+)\"?", cd) 77 if m: 78 out_name = m.group(1) 79 except Exception: 80 out_name = "" 81 if not out_name: 82 out_name = f"downloaded_file_{idx}" 83 out_name_list.append(out_name) 84 85 input_lines = [] 86 for url, out_name in zip(urls, out_name_list): 87 input_lines.append(f"{url}\n out={out_name}") 88 from tempfile import NamedTemporaryFile 89 90 with NamedTemporaryFile("w", delete=False) as f: 91 for line in input_lines: 92 f.write(line + "\n") 93 input_file = f.name 94 95 max_conn = str(num_threads or 16) 96 cmd = [ 97 "aria2c", 98 "--summary-interval=1", 99 "--enable-color=false", 100 "--max-connection-per-server", 101 max_conn, 102 "--split", 103 max_conn, 104 "-k", 105 "1M", 106 "-d", 107 str(dest_dir), 108 "-i", 109 input_file, 110 ] 111 112 try: 113 if show_progress: 114 with Progress( 115 TextColumn("[bold blue]{task.description}"), 116 BarColumn(), 117 TextColumn("[progress.percentage]{task.percentage:>5.1f}%"), 118 TextColumn("• {task.fields[bytes_text]}"), 119 TransferSpeedColumn(), 120 TimeRemainingColumn(), 121 refresh_per_second=10, 122 transient=True, 123 ) as progress: 124 task = progress.add_task("aria2c batch", total=None, bytes_text="…") 125 126 proc = subprocess.Popen( 127 cmd, 128 stdout=subprocess.PIPE, 129 stderr=subprocess.STDOUT, 130 bufsize=0, 131 ) 132 133 buf = b"" 134 total_bytes = None 135 136 while True: 137 chunk = proc.stdout.read(1024) 138 if not chunk: 139 if proc.poll() is not None: 140 break 141 continue 142 143 buf += chunk 144 parts = re.split(rb"[\r\n]", buf) 145 for part in parts[:-1]: 146 line = part.decode("utf-8", "ignore") 147 downloaded, total, pct = None, None, None 148 m = SIZE_RE.search(line) 149 if m: 150 d_num, d_unit, t_num, t_unit = m.groups() 151 try: 152 downloaded = to_bytes(d_num, d_unit) 153 total = to_bytes(t_num, t_unit) 154 except Exception: 155 downloaded = total = None 156 p = PERCENT_RE.search(line) 157 if p: 158 try: 159 pct = float(p.group(1)) 160 except Exception: 161 pct = None 162 163 if show_aria2c_output and ( 164 downloaded is not None or total is not None or pct is not None 165 ): 166 sys.stdout.write(line + "\n") 167 sys.stdout.flush() 168 169 if total is not None and downloaded is not None: 170 try: 171 total_bytes = int(total) 172 downloaded_bytes = int(downloaded) 173 except Exception: 174 total_bytes = int(total or 0) 175 downloaded_bytes = int(downloaded or 0) 176 progress.update( 177 task, 178 total=total_bytes, 179 completed=downloaded_bytes, 180 bytes_text=f"{fmt_bytes(downloaded_bytes)} / {fmt_bytes(total_bytes)}", 181 ) 182 elif pct is not None and total_bytes is None: 183 progress.update(task, total=100.0, completed=pct, bytes_text=f"{pct:.1f}%") 184 185 buf = parts[-1] 186 187 if total_bytes: 188 progress.update( 189 task, 190 total=total_bytes, 191 completed=total_bytes, 192 bytes_text=f"{fmt_bytes(total_bytes)} / {fmt_bytes(total_bytes)}", 193 ) 194 else: 195 progress.update(task, total=100.0, completed=100.0, bytes_text="100%") 196 197 code = proc.wait() 198 if code != 0: 199 raise SystemExit(f"aria2c exited with non-zero status: {code}") 200 else: 201 subprocess.run(cmd, check=True) 202 finally: 203 with contextlib.suppress(Exception): 204 Path(input_file).unlink() 205 206 for out_name in out_name_list: 207 candidate = dest_dir / out_name 208 if candidate.exists(): 209 results.append(str(candidate)) 210 return results
def
download_with_aria2c( urls, dest_dir, connections=16, split=16, show_progress=True, show_aria2c_output=False, out_names=None, num_threads: int = 0):
18def download_with_aria2c( 19 urls, 20 dest_dir, 21 connections=16, 22 split=16, 23 show_progress=True, 24 show_aria2c_output=False, 25 out_names=None, 26 num_threads: int = 0, 27): 28 """ 29 Download URLs to dest_dir using a single aria2c subprocess with Rich progress. 30 Returns a list of downloaded file paths. 31 """ 32 dest_dir = Path(dest_dir) 33 dest_dir.mkdir(parents=True, exist_ok=True) 34 results = [] 35 36 PERCENT_RE = re.compile(r"(\d+(?:\.\d+)?)%") 37 SIZE_RE = re.compile(r"([\d.]+)\s*([KMGTP]?i?B)/([\d.]+)\s*([KMGTP]?i?B)") 38 39 UNIT = { 40 "B": 1, 41 "KB": 1000, 42 "MB": 1000**2, 43 "GB": 1000**3, 44 "TB": 1000**4, 45 "PB": 1000**5, 46 "KiB": 1024, 47 "MiB": 1024**2, 48 "GiB": 1024**3, 49 "TiB": 1024**4, 50 "PiB": 1024**5, 51 } 52 53 def to_bytes(num_str: str, unit_str: str) -> float: 54 return float(num_str) * UNIT.get(unit_str.strip(), 1) 55 56 def fmt_bytes(n: float) -> str: 57 for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]: 58 if n < 1024 or unit == "PiB": 59 return f"{n:.2f} {unit}" 60 n /= 1024 61 62 from urllib.parse import unquote, urlparse 63 64 out_name_list = [] 65 for idx, url in enumerate(urls): 66 out_name = None 67 if out_names and idx < len(out_names) and out_names[idx]: 68 out_name = out_names[idx] 69 if not out_name: 70 parsed = urlparse(url) 71 out_name = unquote(Path(parsed.path).name) 72 if not out_name: 73 try: 74 r = requests.head(url, allow_redirects=True, timeout=5) 75 cd = r.headers.get("content-disposition") 76 if cd: 77 m = re.search(r"filename\*?=(?:UTF-8'')?\"?([^\";]+)\"?", cd) 78 if m: 79 out_name = m.group(1) 80 except Exception: 81 out_name = "" 82 if not out_name: 83 out_name = f"downloaded_file_{idx}" 84 out_name_list.append(out_name) 85 86 input_lines = [] 87 for url, out_name in zip(urls, out_name_list): 88 input_lines.append(f"{url}\n out={out_name}") 89 from tempfile import NamedTemporaryFile 90 91 with NamedTemporaryFile("w", delete=False) as f: 92 for line in input_lines: 93 f.write(line + "\n") 94 input_file = f.name 95 96 max_conn = str(num_threads or 16) 97 cmd = [ 98 "aria2c", 99 "--summary-interval=1", 100 "--enable-color=false", 101 "--max-connection-per-server", 102 max_conn, 103 "--split", 104 max_conn, 105 "-k", 106 "1M", 107 "-d", 108 str(dest_dir), 109 "-i", 110 input_file, 111 ] 112 113 try: 114 if show_progress: 115 with Progress( 116 TextColumn("[bold blue]{task.description}"), 117 BarColumn(), 118 TextColumn("[progress.percentage]{task.percentage:>5.1f}%"), 119 TextColumn("• {task.fields[bytes_text]}"), 120 TransferSpeedColumn(), 121 TimeRemainingColumn(), 122 refresh_per_second=10, 123 transient=True, 124 ) as progress: 125 task = progress.add_task("aria2c batch", total=None, bytes_text="…") 126 127 proc = subprocess.Popen( 128 cmd, 129 stdout=subprocess.PIPE, 130 stderr=subprocess.STDOUT, 131 bufsize=0, 132 ) 133 134 buf = b"" 135 total_bytes = None 136 137 while True: 138 chunk = proc.stdout.read(1024) 139 if not chunk: 140 if proc.poll() is not None: 141 break 142 continue 143 144 buf += chunk 145 parts = re.split(rb"[\r\n]", buf) 146 for part in parts[:-1]: 147 line = part.decode("utf-8", "ignore") 148 downloaded, total, pct = None, None, None 149 m = SIZE_RE.search(line) 150 if m: 151 d_num, d_unit, t_num, t_unit = m.groups() 152 try: 153 downloaded = to_bytes(d_num, d_unit) 154 total = to_bytes(t_num, t_unit) 155 except Exception: 156 downloaded = total = None 157 p = PERCENT_RE.search(line) 158 if p: 159 try: 160 pct = float(p.group(1)) 161 except Exception: 162 pct = None 163 164 if show_aria2c_output and ( 165 downloaded is not None or total is not None or pct is not None 166 ): 167 sys.stdout.write(line + "\n") 168 sys.stdout.flush() 169 170 if total is not None and downloaded is not None: 171 try: 172 total_bytes = int(total) 173 downloaded_bytes = int(downloaded) 174 except Exception: 175 total_bytes = int(total or 0) 176 downloaded_bytes = int(downloaded or 0) 177 progress.update( 178 task, 179 total=total_bytes, 180 completed=downloaded_bytes, 181 bytes_text=f"{fmt_bytes(downloaded_bytes)} / {fmt_bytes(total_bytes)}", 182 ) 183 elif pct is not None and total_bytes is None: 184 progress.update(task, total=100.0, completed=pct, bytes_text=f"{pct:.1f}%") 185 186 buf = parts[-1] 187 188 if total_bytes: 189 progress.update( 190 task, 191 total=total_bytes, 192 completed=total_bytes, 193 bytes_text=f"{fmt_bytes(total_bytes)} / {fmt_bytes(total_bytes)}", 194 ) 195 else: 196 progress.update(task, total=100.0, completed=100.0, bytes_text="100%") 197 198 code = proc.wait() 199 if code != 0: 200 raise SystemExit(f"aria2c exited with non-zero status: {code}") 201 else: 202 subprocess.run(cmd, check=True) 203 finally: 204 with contextlib.suppress(Exception): 205 Path(input_file).unlink() 206 207 for out_name in out_name_list: 208 candidate = dest_dir / out_name 209 if candidate.exists(): 210 results.append(str(candidate)) 211 return results
Download URLs to dest_dir using a single aria2c subprocess with Rich progress. Returns a list of downloaded file paths.