302 lines
11 KiB
Python
302 lines
11 KiB
Python
import asyncio
|
|
import logging
|
|
import re
|
|
import shutil
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from datetime import date
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
|
|
from rclone import RcloneError, copy_files, list_files
|
|
|
|
log = logging.getLogger("tafa.session")
|
|
|
|
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif"}
|
|
|
|
INITIAL_BATCH = 5 # photos downloaded before showing the viewer
|
|
DOWNLOAD_BATCH = 15 # photos fetched per background iteration
|
|
|
|
|
|
class SessionStatus(str, Enum):
|
|
DOWNLOADING = "downloading"
|
|
READY = "ready"
|
|
ERROR = "error"
|
|
EMPTY = "empty"
|
|
|
|
|
|
@dataclass
|
|
class FolderQueue:
|
|
source_path: str
|
|
display_name: str
|
|
local_dir: Path
|
|
files: list[Path] # downloaded and ready to show
|
|
pending: list[str] # rclone relative paths not yet downloaded
|
|
total: int # full count (files + pending)
|
|
position: int = 0
|
|
download_complete: bool = False
|
|
|
|
|
|
_DATE_RE = re.compile(r'(\d{4}-\d{2}-\d{2})')
|
|
|
|
def _filename_in_range(filename: str, start: date, end: date) -> bool:
|
|
m = _DATE_RE.search(filename)
|
|
if not m:
|
|
return False
|
|
try:
|
|
return start <= date.fromisoformat(m.group(1)) <= end
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def _sanitize_dirname(path: str) -> str:
|
|
return re.sub(r"[:/\\ ]+", "_", path).strip("_")
|
|
|
|
|
|
def _safe_destination(dest_dir: Path, filename: str) -> Path:
|
|
stem = Path(filename).stem
|
|
suffix = Path(filename).suffix
|
|
candidate = dest_dir / filename
|
|
n = 1
|
|
while candidate.exists():
|
|
candidate = dest_dir / f"{stem}_{n}{suffix}"
|
|
n += 1
|
|
return candidate
|
|
|
|
|
|
class Session:
|
|
def __init__(self, staging_base: Path, destination: Path):
|
|
self.id = uuid.uuid4().hex
|
|
self.status = SessionStatus.DOWNLOADING
|
|
self.error_message: str | None = None
|
|
self.queues: list[FolderQueue] = []
|
|
self.current_folder_idx = 0
|
|
self.staging_dir = staging_base / self.id
|
|
self.destination = destination
|
|
self.download_progress: dict = {"copied": 0, "total": 0, "current_folder": ""}
|
|
self._processing = False
|
|
log.info("Session created [%s]", self.id[:8])
|
|
|
|
def _find_active_queue(self) -> FolderQueue | None:
|
|
while self.current_folder_idx < len(self.queues):
|
|
q = self.queues[self.current_folder_idx]
|
|
if q.position < len(q.files):
|
|
return q
|
|
if not q.pending:
|
|
self.current_folder_idx += 1
|
|
continue
|
|
return None
|
|
return None
|
|
|
|
def _is_loading(self) -> bool:
|
|
idx = self.current_folder_idx
|
|
if idx >= len(self.queues):
|
|
return False
|
|
q = self.queues[idx]
|
|
return q.position >= len(q.files) and bool(q.pending)
|
|
|
|
def current_photo(self) -> tuple[FolderQueue, Path] | None:
|
|
q = self._find_active_queue()
|
|
if q is None:
|
|
return None
|
|
return (q, q.files[q.position])
|
|
|
|
def photo_at_offset(self, offset: int) -> Path | None:
|
|
q = self._find_active_queue()
|
|
if q is None:
|
|
return None
|
|
idx = q.position + offset
|
|
if idx >= len(q.files):
|
|
return None
|
|
return q.files[idx]
|
|
|
|
def photo_metadata(self) -> dict:
|
|
result = self.current_photo()
|
|
if result is None:
|
|
if self._is_loading():
|
|
return {"session_done": False, "loading": True}
|
|
return {"session_done": True}
|
|
q, file_path = result
|
|
return {
|
|
"filename": file_path.name,
|
|
"folder_name": q.display_name,
|
|
"folder_index": self.current_folder_idx,
|
|
"folder_count": len(self.queues),
|
|
"photo_index": q.position,
|
|
"photo_total": q.total,
|
|
"already_accepted": (self.destination / file_path.name).exists(),
|
|
"session_done": False,
|
|
"loading": False,
|
|
}
|
|
|
|
def _advance(self) -> None:
|
|
q = self._find_active_queue()
|
|
if q:
|
|
q.position += 1
|
|
|
|
def action_accept(self) -> None:
|
|
result = self.current_photo()
|
|
if result is None:
|
|
return
|
|
_, file_path = result
|
|
if (self.destination / file_path.name).exists():
|
|
log.info("Accept skipped — already in destination: %s", file_path.name)
|
|
file_path.unlink(missing_ok=True)
|
|
else:
|
|
self.destination.mkdir(parents=True, exist_ok=True)
|
|
dest = _safe_destination(self.destination, file_path.name)
|
|
try:
|
|
shutil.move(str(file_path), str(dest))
|
|
log.info("Accepted: %s → %s", file_path.name, dest)
|
|
except FileNotFoundError:
|
|
log.warning("Accept skipped — file already gone: %s", file_path.name)
|
|
self._advance()
|
|
|
|
def action_ignore(self) -> None:
|
|
result = self.current_photo()
|
|
if result is None:
|
|
return
|
|
_, file_path = result
|
|
file_path.unlink(missing_ok=True)
|
|
log.info("Ignored: %s", file_path.name)
|
|
self._advance()
|
|
|
|
def action_remind(self) -> None:
|
|
result = self.current_photo()
|
|
if result is None:
|
|
return
|
|
q, file_path = result
|
|
file = q.files.pop(q.position)
|
|
insert_at = min(q.position + 10, len(q.files))
|
|
q.files.insert(insert_at, file)
|
|
log.info("Remind: %s → back in queue at +%d", file_path.name, insert_at - q.position)
|
|
|
|
def cleanup(self) -> None:
|
|
if self.staging_dir.exists():
|
|
shutil.rmtree(self.staging_dir, ignore_errors=True)
|
|
log.info("Staging dir removed: %s", self.staging_dir)
|
|
|
|
async def _background_downloader(self) -> None:
|
|
log.info("Background downloader started")
|
|
for q in self.queues:
|
|
while q.pending:
|
|
batch = q.pending[:DOWNLOAD_BATCH]
|
|
q.pending = q.pending[DOWNLOAD_BATCH:]
|
|
log.info(
|
|
"Background fetch: %d files for %s (%d remaining)",
|
|
len(batch), q.display_name, len(q.pending),
|
|
)
|
|
try:
|
|
await copy_files(q.source_path, batch, q.local_dir)
|
|
new_files = sorted(
|
|
q.local_dir / p for p in batch if (q.local_dir / p).exists()
|
|
)
|
|
q.files.extend(new_files)
|
|
log.info(
|
|
"Background fetch done: +%d files for %s (%d total ready)",
|
|
len(new_files), q.display_name, len(q.files),
|
|
)
|
|
except RcloneError as e:
|
|
log.warning("Background fetch failed for %s: %s", q.display_name, e)
|
|
q.download_complete = True
|
|
log.info("Background downloader finished — all files fetched")
|
|
|
|
async def run_download(self, sources: list[str], date_start: date, date_end: date) -> None:
|
|
log.info(
|
|
"Starting download for %d source(s), date range %s → %s",
|
|
len(sources), date_start, date_end,
|
|
)
|
|
try:
|
|
for source_path in sources:
|
|
parts = source_path.rstrip("/").split("/")
|
|
display_name = parts[-1] or source_path.split(":")[-1]
|
|
local_dir = self.staging_dir / _sanitize_dirname(source_path)
|
|
|
|
self.download_progress = {
|
|
"phase": "scanning",
|
|
"copied": 0,
|
|
"total": 0,
|
|
"current_folder": source_path,
|
|
}
|
|
|
|
try:
|
|
all_files = await list_files(source_path)
|
|
except RcloneError as e:
|
|
self.status = SessionStatus.ERROR
|
|
self.error_message = f"Failed to list {source_path}: {e.stderr or str(e)}"
|
|
log.error("List failed for %s: %s", source_path, e)
|
|
return
|
|
|
|
filtered = [
|
|
f for f in all_files
|
|
if Path(f).suffix.lower() in IMAGE_EXTENSIONS
|
|
and _filename_in_range(f, date_start, date_end)
|
|
]
|
|
|
|
log.info(
|
|
"%s: %d total files, %d match date range",
|
|
display_name, len(all_files), len(filtered),
|
|
)
|
|
|
|
if not filtered:
|
|
log.info("%s: no matching files, skipping", display_name)
|
|
self.queues.append(FolderQueue(
|
|
source_path=source_path,
|
|
display_name=display_name,
|
|
local_dir=local_dir,
|
|
files=[],
|
|
pending=[],
|
|
total=0,
|
|
download_complete=True,
|
|
))
|
|
continue
|
|
|
|
initial = filtered[:INITIAL_BATCH]
|
|
remaining = filtered[INITIAL_BATCH:]
|
|
|
|
self.download_progress["phase"] = "downloading"
|
|
self.download_progress["total"] = len(filtered)
|
|
|
|
def make_cb():
|
|
def cb(copied: int, total: int) -> None:
|
|
self.download_progress["copied"] = copied
|
|
return cb
|
|
|
|
log.info("Downloading initial batch of %d for %s", len(initial), display_name)
|
|
try:
|
|
await copy_files(source_path, initial, local_dir, make_cb())
|
|
except RcloneError as e:
|
|
self.status = SessionStatus.ERROR
|
|
self.error_message = f"Failed to copy from {source_path}: {e.stderr or str(e)}"
|
|
log.error("Copy failed for %s: %s", source_path, e)
|
|
return
|
|
|
|
initial_files = sorted(local_dir / p for p in initial if (local_dir / p).exists())
|
|
|
|
self.queues.append(FolderQueue(
|
|
source_path=source_path,
|
|
display_name=display_name,
|
|
local_dir=local_dir,
|
|
files=initial_files,
|
|
pending=remaining,
|
|
total=len(filtered),
|
|
download_complete=not remaining,
|
|
))
|
|
log.info(
|
|
"%s ready: %d initial photos, %d queued for background download",
|
|
display_name, len(initial_files), len(remaining),
|
|
)
|
|
|
|
if not self.queues or all(q.total == 0 for q in self.queues):
|
|
self.status = SessionStatus.EMPTY
|
|
log.info("Session empty — no photos matched")
|
|
else:
|
|
self.status = SessionStatus.READY
|
|
log.info("Session ready — viewer can start")
|
|
asyncio.create_task(self._background_downloader())
|
|
|
|
except Exception as e:
|
|
self.status = SessionStatus.ERROR
|
|
self.error_message = str(e)
|
|
log.exception("Unexpected error in run_download")
|