import asyncio import logging import os import re import tempfile from pathlib import Path from typing import Callable log = logging.getLogger("tafa.rclone") class RcloneError(Exception): def __init__(self, message: str, stderr: str = ""): self.stderr = stderr super().__init__(message) async def check_rclone() -> bool: proc = await asyncio.create_subprocess_exec( "rclone", "version", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await proc.wait() return proc.returncode == 0 async def list_files(remote_path: str, timeout: int = 120) -> list[str]: log.info("Listing files: %s", remote_path) proc = await asyncio.create_subprocess_exec( "rclone", "lsf", remote_path, "--files-only", "--recursive", "--fast-list", "--contimeout", "30s", "--timeout", "120s", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() raise RcloneError(f"rclone lsf timed out for {remote_path}") if proc.returncode != 0: raise RcloneError(f"rclone lsf failed for {remote_path}", stderr.decode()) files = [line for line in stdout.decode().splitlines() if line.strip()] log.info("Listed %d files from %s", len(files), remote_path) return files async def copy_files( remote_path: str, filenames: list[str], local_dir: Path, progress_callback: Callable[[int, int], None] | None = None, ) -> None: log.info("Copying %d files from %s → %s", len(filenames), remote_path, local_dir) local_dir.mkdir(parents=True, exist_ok=True) with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: f.write("\n".join(filenames)) tmp_path = f.name try: proc = await asyncio.create_subprocess_exec( "rclone", "copy", "--files-from", tmp_path, remote_path, str(local_dir), "--stats", "1s", "--stats-one-line", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) stderr_lines: list[str] = [] async for line_bytes in proc.stderr: line = line_bytes.decode().strip() stderr_lines.append(line) if progress_callback: m = re.search(r"Transferred:\s+(\d+)\s*/\s*(\d+)", line) if m: progress_callback(int(m.group(1)), int(m.group(2))) await proc.wait() if proc.returncode != 0: raise RcloneError( f"rclone copy failed for {remote_path}", "\n".join(stderr_lines), ) log.info("Copy complete: %d files from %s", len(filenames), remote_path) finally: try: os.unlink(tmp_path) except OSError: pass