94 lines
2.9 KiB
Python
94 lines
2.9 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
log = logging.getLogger("tafa.rclone")
|
|
|
|
|
|
class RcloneError(Exception):
|
|
def __init__(self, message: str, stderr: str = ""):
|
|
self.stderr = stderr
|
|
super().__init__(message)
|
|
|
|
|
|
async def check_rclone() -> bool:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"rclone", "version",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
await proc.wait()
|
|
return proc.returncode == 0
|
|
|
|
|
|
async def list_files(remote_path: str, timeout: int = 120) -> list[str]:
|
|
log.info("Listing files: %s", remote_path)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"rclone", "lsf", remote_path,
|
|
"--files-only", "--recursive", "--fast-list",
|
|
"--contimeout", "30s", "--timeout", "120s",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
raise RcloneError(f"rclone lsf timed out for {remote_path}")
|
|
if proc.returncode != 0:
|
|
raise RcloneError(f"rclone lsf failed for {remote_path}", stderr.decode())
|
|
files = [line for line in stdout.decode().splitlines() if line.strip()]
|
|
log.info("Listed %d files from %s", len(files), remote_path)
|
|
return files
|
|
|
|
|
|
async def copy_files(
|
|
remote_path: str,
|
|
filenames: list[str],
|
|
local_dir: Path,
|
|
progress_callback: Callable[[int, int], None] | None = None,
|
|
) -> None:
|
|
log.info("Copying %d files from %s → %s", len(filenames), remote_path, local_dir)
|
|
local_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
|
|
f.write("\n".join(filenames))
|
|
tmp_path = f.name
|
|
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"rclone", "copy",
|
|
"--files-from", tmp_path,
|
|
remote_path, str(local_dir),
|
|
"--stats", "1s", "--stats-one-line",
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
stderr_lines: list[str] = []
|
|
async for line_bytes in proc.stderr:
|
|
line = line_bytes.decode().strip()
|
|
stderr_lines.append(line)
|
|
if progress_callback:
|
|
m = re.search(r"Transferred:\s+(\d+)\s*/\s*(\d+)", line)
|
|
if m:
|
|
progress_callback(int(m.group(1)), int(m.group(2)))
|
|
|
|
await proc.wait()
|
|
if proc.returncode != 0:
|
|
raise RcloneError(
|
|
f"rclone copy failed for {remote_path}",
|
|
"\n".join(stderr_lines),
|
|
)
|
|
log.info("Copy complete: %d files from %s", len(filenames), remote_path)
|
|
finally:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|