Files
Tafa/rclone.py
Philipp Ludewig de3ab5d2a6 Initial commit
App works locally. No intention to bring this live.
2026-04-23 16:35:12 -04:00

94 lines
2.9 KiB
Python

import asyncio
import logging
import os
import re
import tempfile
from pathlib import Path
from typing import Callable
log = logging.getLogger("tafa.rclone")
class RcloneError(Exception):
def __init__(self, message: str, stderr: str = ""):
self.stderr = stderr
super().__init__(message)
async def check_rclone() -> bool:
proc = await asyncio.create_subprocess_exec(
"rclone", "version",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await proc.wait()
return proc.returncode == 0
async def list_files(remote_path: str, timeout: int = 120) -> list[str]:
log.info("Listing files: %s", remote_path)
proc = await asyncio.create_subprocess_exec(
"rclone", "lsf", remote_path,
"--files-only", "--recursive", "--fast-list",
"--contimeout", "30s", "--timeout", "120s",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise RcloneError(f"rclone lsf timed out for {remote_path}")
if proc.returncode != 0:
raise RcloneError(f"rclone lsf failed for {remote_path}", stderr.decode())
files = [line for line in stdout.decode().splitlines() if line.strip()]
log.info("Listed %d files from %s", len(files), remote_path)
return files
async def copy_files(
remote_path: str,
filenames: list[str],
local_dir: Path,
progress_callback: Callable[[int, int], None] | None = None,
) -> None:
log.info("Copying %d files from %s%s", len(filenames), remote_path, local_dir)
local_dir.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write("\n".join(filenames))
tmp_path = f.name
try:
proc = await asyncio.create_subprocess_exec(
"rclone", "copy",
"--files-from", tmp_path,
remote_path, str(local_dir),
"--stats", "1s", "--stats-one-line",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
stderr_lines: list[str] = []
async for line_bytes in proc.stderr:
line = line_bytes.decode().strip()
stderr_lines.append(line)
if progress_callback:
m = re.search(r"Transferred:\s+(\d+)\s*/\s*(\d+)", line)
if m:
progress_callback(int(m.group(1)), int(m.group(2)))
await proc.wait()
if proc.returncode != 0:
raise RcloneError(
f"rclone copy failed for {remote_path}",
"\n".join(stderr_lines),
)
log.info("Copy complete: %d files from %s", len(filenames), remote_path)
finally:
try:
os.unlink(tmp_path)
except OSError:
pass