"""Download and cache Blender binaries for stub generation.""" import platform import re import tarfile import urllib.request import zipfile from pathlib import Path BLENDER_RELEASE_URL = "https://download.blender.org/release" DOWNLOADS_DIR = Path(__file__).parent / "downloads" def get_platform_suffix() -> str: """Get the platform-specific filename suffix for Blender downloads.""" system = platform.system() machine = platform.machine() if system == "Linux" and machine == "x86_64": return "linux-x64" if system == "Darwin": if machine == "arm64": return "macos-arm64" return "macos-x64" if system == "Windows": return "windows-x64" msg = f"Unsupported platform: {system} {machine}" raise RuntimeError(msg) def get_archive_extension() -> str: """Get the archive extension for the current platform.""" if platform.system() == "Windows": return ".zip" return ".tar.xz" def find_latest_patch(major_minor: str) -> str: """Find the latest patch version for a given major.minor version. Parses the HTML index at https://download.blender.org/release/Blender{X.Y}/ to find the highest patch version. """ url = f"{BLENDER_RELEASE_URL}/Blender{major_minor}/" suffix = get_platform_suffix() ext = get_archive_extension() pattern = ( rf"blender-({re.escape(major_minor)}\.\d+)-{re.escape(suffix)}{re.escape(ext)}" ) req = urllib.request.Request(url, headers={"User-Agent": "blender-python-stubs"}) with urllib.request.urlopen(req) as response: html = response.read().decode() versions = re.findall(pattern, html) if not versions: msg = f"No Blender {major_minor} releases found for {suffix} at {url}" raise RuntimeError(msg) # Sort by patch version and return the latest versions.sort(key=lambda v: [int(x) for x in v.split(".")]) return versions[-1] def get_download_url(version: str) -> str: """Build the download URL for a specific Blender version.""" major_minor = ".".join(version.split(".")[:2]) suffix = get_platform_suffix() ext = get_archive_extension() filename = f"blender-{version}-{suffix}{ext}" return f"{BLENDER_RELEASE_URL}/Blender{major_minor}/{filename}" def get_extracted_dir_name(version: str) -> str: """Get the expected directory name after extraction.""" suffix = get_platform_suffix() return f"blender-{version}-{suffix}" def _expected_executable_path(extracted_dir: Path) -> Path: """Return the Blender executable path within an extracted archive directory.""" system = platform.system() if system == "Windows": return extracted_dir / "blender.exe" if system == "Darwin": return extracted_dir / "Blender.app" / "Contents" / "MacOS" / "Blender" return extracted_dir / "blender" def _is_safe_extract_path(base_dir: Path, member_path: str) -> bool: """Check if an archive member path is safe to extract under base_dir.""" base_resolved = base_dir.resolve() target_resolved = (base_dir / member_path).resolve() try: target_resolved.relative_to(base_resolved) except ValueError: return False return True def extract_tar_archive(archive_path: Path, destination: Path) -> None: """Extract a .tar.xz archive with path traversal protection.""" with tarfile.open(archive_path, "r:xz") as tar: members = tar.getmembers() total_members = len(members) for i, member in enumerate(members): if not _is_safe_extract_path(destination, member.name): msg = f"Unsafe path in archive: {member.name}" raise RuntimeError(msg) tar.extract(member, path=destination) if total_members: pct = (i + 1) * 100 // total_members print( f"\r [{pct:3d}%] {i + 1}/{total_members} files", end="", flush=True ) print() def extract_zip_archive(archive_path: Path, destination: Path) -> None: """Extract a .zip archive with path traversal protection.""" with zipfile.ZipFile(archive_path, "r") as zf: members = zf.infolist() total_members = len(members) for i, member in enumerate(members): if not _is_safe_extract_path(destination, member.filename): msg = f"Unsafe path in archive: {member.filename}" raise RuntimeError(msg) zf.extract(member, path=destination) if total_members: pct = (i + 1) * 100 // total_members print( f"\r [{pct:3d}%] {i + 1}/{total_members} files", end="", flush=True ) print() def _extract_cached_patch(name: str, major_minor: str, suffix: str) -> int | None: """Extract patch number from a cached directory name if it matches major.minor.""" match = re.fullmatch( rf"blender-{re.escape(major_minor)}\.(\d+)-{re.escape(suffix)}", name, ) if match is None: return None return int(match.group(1)) def download_blender(version: str) -> Path: """Download and extract a Blender version. Returns path to the extracted directory.""" DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True) dir_name = get_extracted_dir_name(version) extracted_dir = DOWNLOADS_DIR / dir_name if extracted_dir.exists(): print(f" Already cached: {extracted_dir}") return extracted_dir url = get_download_url(version) suffix = get_platform_suffix() ext = get_archive_extension() archive_name = f"blender-{version}-{suffix}{ext}" archive_path = DOWNLOADS_DIR / archive_name print(f" Downloading {url}...") req = urllib.request.Request(url, headers={"User-Agent": "blender-python-stubs"}) with urllib.request.urlopen(req) as response, archive_path.open("wb") as f: total = int(response.headers.get("Content-Length", 0)) downloaded = 0 chunk_size = 1024 * 1024 # 1MB while True: chunk = response.read(chunk_size) if not chunk: break f.write(chunk) downloaded += len(chunk) if total: pct = downloaded * 100 // total mb_done = downloaded / (1024 * 1024) mb_total = total / (1024 * 1024) print( f"\r [{pct:3d}%] {mb_done:.0f}/{mb_total:.0f} MB", end="", flush=True, ) if total: print() print(f" Extracting {archive_name}...") if ext == ".zip": extract_zip_archive(archive_path, DOWNLOADS_DIR) else: extract_tar_archive(archive_path, DOWNLOADS_DIR) # Remove the archive to save space archive_path.unlink() if not extracted_dir.exists(): msg = f"Expected directory {extracted_dir} not found after extraction" raise RuntimeError(msg) return extracted_dir def get_blender_executable(major_minor: str) -> Path: """Get the path to a Blender executable, downloading if needed. Returns the path to the blender binary. """ # Check if already cached DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True) suffix = get_platform_suffix() cached_candidates: list[tuple[int, Path]] = [] for entry in DOWNLOADS_DIR.iterdir(): if not entry.is_dir(): continue patch = _extract_cached_patch(entry.name, major_minor, suffix) if patch is not None: cached_candidates.append((patch, entry)) for _patch, entry in sorted(cached_candidates, reverse=True): executable = _expected_executable_path(entry) if executable.exists(): print(f" Using cached: {executable}") return executable # Download the latest patch version print(f"Resolving latest Blender {major_minor} patch version...") version = find_latest_patch(major_minor) print(f" Latest: {version}") extracted_dir = download_blender(version) executable = _expected_executable_path(extracted_dir) if not executable.exists(): msg = f"Blender executable not found at {executable}" raise RuntimeError(msg) return executable