import asyncio import logging import re import shutil import uuid from dataclasses import dataclass from datetime import date from enum import Enum from pathlib import Path from rclone import RcloneError, copy_files, list_files log = logging.getLogger("tafa.session") IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif"} INITIAL_BATCH = 5 # photos downloaded before showing the viewer DOWNLOAD_BATCH = 15 # photos fetched per background iteration class SessionStatus(str, Enum): DOWNLOADING = "downloading" READY = "ready" ERROR = "error" EMPTY = "empty" @dataclass class FolderQueue: source_path: str display_name: str local_dir: Path files: list[Path] # downloaded and ready to show pending: list[str] # rclone relative paths not yet downloaded total: int # full count (files + pending) position: int = 0 download_complete: bool = False _DATE_RE = re.compile(r'(\d{4}-\d{2}-\d{2})') def _filename_in_range(filename: str, start: date, end: date) -> bool: m = _DATE_RE.search(filename) if not m: return False try: return start <= date.fromisoformat(m.group(1)) <= end except ValueError: return False def _sanitize_dirname(path: str) -> str: return re.sub(r"[:/\\ ]+", "_", path).strip("_") def _safe_destination(dest_dir: Path, filename: str) -> Path: stem = Path(filename).stem suffix = Path(filename).suffix candidate = dest_dir / filename n = 1 while candidate.exists(): candidate = dest_dir / f"{stem}_{n}{suffix}" n += 1 return candidate class Session: def __init__(self, staging_base: Path, destination: Path): self.id = uuid.uuid4().hex self.status = SessionStatus.DOWNLOADING self.error_message: str | None = None self.queues: list[FolderQueue] = [] self.current_folder_idx = 0 self.staging_dir = staging_base / self.id self.destination = destination self.download_progress: dict = {"copied": 0, "total": 0, "current_folder": ""} self._processing = False log.info("Session created [%s]", self.id[:8]) def _find_active_queue(self) -> FolderQueue | None: while self.current_folder_idx < len(self.queues): q = self.queues[self.current_folder_idx] if q.position < len(q.files): return q if not q.pending: self.current_folder_idx += 1 continue return None return None def _is_loading(self) -> bool: idx = self.current_folder_idx if idx >= len(self.queues): return False q = self.queues[idx] return q.position >= len(q.files) and bool(q.pending) def current_photo(self) -> tuple[FolderQueue, Path] | None: q = self._find_active_queue() if q is None: return None return (q, q.files[q.position]) def photo_at_offset(self, offset: int) -> Path | None: q = self._find_active_queue() if q is None: return None idx = q.position + offset if idx >= len(q.files): return None return q.files[idx] def photo_metadata(self) -> dict: result = self.current_photo() if result is None: if self._is_loading(): return {"session_done": False, "loading": True} return {"session_done": True} q, file_path = result return { "filename": file_path.name, "folder_name": q.display_name, "folder_index": self.current_folder_idx, "folder_count": len(self.queues), "photo_index": q.position, "photo_total": q.total, "already_accepted": (self.destination / file_path.name).exists(), "session_done": False, "loading": False, } def _advance(self) -> None: q = self._find_active_queue() if q: q.position += 1 def action_accept(self) -> None: result = self.current_photo() if result is None: return _, file_path = result if (self.destination / file_path.name).exists(): log.info("Accept skipped — already in destination: %s", file_path.name) file_path.unlink(missing_ok=True) else: self.destination.mkdir(parents=True, exist_ok=True) dest = _safe_destination(self.destination, file_path.name) try: shutil.move(str(file_path), str(dest)) log.info("Accepted: %s → %s", file_path.name, dest) except FileNotFoundError: log.warning("Accept skipped — file already gone: %s", file_path.name) self._advance() def action_ignore(self) -> None: result = self.current_photo() if result is None: return _, file_path = result file_path.unlink(missing_ok=True) log.info("Ignored: %s", file_path.name) self._advance() def action_remind(self) -> None: result = self.current_photo() if result is None: return q, file_path = result file = q.files.pop(q.position) insert_at = min(q.position + 10, len(q.files)) q.files.insert(insert_at, file) log.info("Remind: %s → back in queue at +%d", file_path.name, insert_at - q.position) def cleanup(self) -> None: if self.staging_dir.exists(): shutil.rmtree(self.staging_dir, ignore_errors=True) log.info("Staging dir removed: %s", self.staging_dir) async def _background_downloader(self) -> None: log.info("Background downloader started") for q in self.queues: while q.pending: batch = q.pending[:DOWNLOAD_BATCH] q.pending = q.pending[DOWNLOAD_BATCH:] log.info( "Background fetch: %d files for %s (%d remaining)", len(batch), q.display_name, len(q.pending), ) try: await copy_files(q.source_path, batch, q.local_dir) new_files = sorted( q.local_dir / p for p in batch if (q.local_dir / p).exists() ) q.files.extend(new_files) log.info( "Background fetch done: +%d files for %s (%d total ready)", len(new_files), q.display_name, len(q.files), ) except RcloneError as e: log.warning("Background fetch failed for %s: %s", q.display_name, e) q.download_complete = True log.info("Background downloader finished — all files fetched") async def run_download(self, sources: list[str], date_start: date, date_end: date) -> None: log.info( "Starting download for %d source(s), date range %s → %s", len(sources), date_start, date_end, ) try: for source_path in sources: parts = source_path.rstrip("/").split("/") display_name = parts[-1] or source_path.split(":")[-1] local_dir = self.staging_dir / _sanitize_dirname(source_path) self.download_progress = { "phase": "scanning", "copied": 0, "total": 0, "current_folder": source_path, } try: all_files = await list_files(source_path) except RcloneError as e: self.status = SessionStatus.ERROR self.error_message = f"Failed to list {source_path}: {e.stderr or str(e)}" log.error("List failed for %s: %s", source_path, e) return filtered = [ f for f in all_files if Path(f).suffix.lower() in IMAGE_EXTENSIONS and _filename_in_range(f, date_start, date_end) ] log.info( "%s: %d total files, %d match date range", display_name, len(all_files), len(filtered), ) if not filtered: log.info("%s: no matching files, skipping", display_name) self.queues.append(FolderQueue( source_path=source_path, display_name=display_name, local_dir=local_dir, files=[], pending=[], total=0, download_complete=True, )) continue initial = filtered[:INITIAL_BATCH] remaining = filtered[INITIAL_BATCH:] self.download_progress["phase"] = "downloading" self.download_progress["total"] = len(filtered) def make_cb(): def cb(copied: int, total: int) -> None: self.download_progress["copied"] = copied return cb log.info("Downloading initial batch of %d for %s", len(initial), display_name) try: await copy_files(source_path, initial, local_dir, make_cb()) except RcloneError as e: self.status = SessionStatus.ERROR self.error_message = f"Failed to copy from {source_path}: {e.stderr or str(e)}" log.error("Copy failed for %s: %s", source_path, e) return initial_files = sorted(local_dir / p for p in initial if (local_dir / p).exists()) self.queues.append(FolderQueue( source_path=source_path, display_name=display_name, local_dir=local_dir, files=initial_files, pending=remaining, total=len(filtered), download_complete=not remaining, )) log.info( "%s ready: %d initial photos, %d queued for background download", display_name, len(initial_files), len(remaining), ) if not self.queues or all(q.total == 0 for q in self.queues): self.status = SessionStatus.EMPTY log.info("Session empty — no photos matched") else: self.status = SessionStatus.READY log.info("Session ready — viewer can start") asyncio.create_task(self._background_downloader()) except Exception as e: self.status = SessionStatus.ERROR self.error_message = str(e) log.exception("Unexpected error in run_download")