Files
Tafa/session.py
Philipp Ludewig de3ab5d2a6 Initial commit
App works locally. No intention to bring this live.
2026-04-23 16:35:12 -04:00

302 lines
11 KiB
Python

import asyncio
import logging
import re
import shutil
import uuid
from dataclasses import dataclass
from datetime import date
from enum import Enum
from pathlib import Path
from rclone import RcloneError, copy_files, list_files
log = logging.getLogger("tafa.session")
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif"}
INITIAL_BATCH = 5 # photos downloaded before showing the viewer
DOWNLOAD_BATCH = 15 # photos fetched per background iteration
class SessionStatus(str, Enum):
DOWNLOADING = "downloading"
READY = "ready"
ERROR = "error"
EMPTY = "empty"
@dataclass
class FolderQueue:
source_path: str
display_name: str
local_dir: Path
files: list[Path] # downloaded and ready to show
pending: list[str] # rclone relative paths not yet downloaded
total: int # full count (files + pending)
position: int = 0
download_complete: bool = False
_DATE_RE = re.compile(r'(\d{4}-\d{2}-\d{2})')
def _filename_in_range(filename: str, start: date, end: date) -> bool:
m = _DATE_RE.search(filename)
if not m:
return False
try:
return start <= date.fromisoformat(m.group(1)) <= end
except ValueError:
return False
def _sanitize_dirname(path: str) -> str:
return re.sub(r"[:/\\ ]+", "_", path).strip("_")
def _safe_destination(dest_dir: Path, filename: str) -> Path:
stem = Path(filename).stem
suffix = Path(filename).suffix
candidate = dest_dir / filename
n = 1
while candidate.exists():
candidate = dest_dir / f"{stem}_{n}{suffix}"
n += 1
return candidate
class Session:
def __init__(self, staging_base: Path, destination: Path):
self.id = uuid.uuid4().hex
self.status = SessionStatus.DOWNLOADING
self.error_message: str | None = None
self.queues: list[FolderQueue] = []
self.current_folder_idx = 0
self.staging_dir = staging_base / self.id
self.destination = destination
self.download_progress: dict = {"copied": 0, "total": 0, "current_folder": ""}
self._processing = False
log.info("Session created [%s]", self.id[:8])
def _find_active_queue(self) -> FolderQueue | None:
while self.current_folder_idx < len(self.queues):
q = self.queues[self.current_folder_idx]
if q.position < len(q.files):
return q
if not q.pending:
self.current_folder_idx += 1
continue
return None
return None
def _is_loading(self) -> bool:
idx = self.current_folder_idx
if idx >= len(self.queues):
return False
q = self.queues[idx]
return q.position >= len(q.files) and bool(q.pending)
def current_photo(self) -> tuple[FolderQueue, Path] | None:
q = self._find_active_queue()
if q is None:
return None
return (q, q.files[q.position])
def photo_at_offset(self, offset: int) -> Path | None:
q = self._find_active_queue()
if q is None:
return None
idx = q.position + offset
if idx >= len(q.files):
return None
return q.files[idx]
def photo_metadata(self) -> dict:
result = self.current_photo()
if result is None:
if self._is_loading():
return {"session_done": False, "loading": True}
return {"session_done": True}
q, file_path = result
return {
"filename": file_path.name,
"folder_name": q.display_name,
"folder_index": self.current_folder_idx,
"folder_count": len(self.queues),
"photo_index": q.position,
"photo_total": q.total,
"already_accepted": (self.destination / file_path.name).exists(),
"session_done": False,
"loading": False,
}
def _advance(self) -> None:
q = self._find_active_queue()
if q:
q.position += 1
def action_accept(self) -> None:
result = self.current_photo()
if result is None:
return
_, file_path = result
if (self.destination / file_path.name).exists():
log.info("Accept skipped — already in destination: %s", file_path.name)
file_path.unlink(missing_ok=True)
else:
self.destination.mkdir(parents=True, exist_ok=True)
dest = _safe_destination(self.destination, file_path.name)
try:
shutil.move(str(file_path), str(dest))
log.info("Accepted: %s%s", file_path.name, dest)
except FileNotFoundError:
log.warning("Accept skipped — file already gone: %s", file_path.name)
self._advance()
def action_ignore(self) -> None:
result = self.current_photo()
if result is None:
return
_, file_path = result
file_path.unlink(missing_ok=True)
log.info("Ignored: %s", file_path.name)
self._advance()
def action_remind(self) -> None:
result = self.current_photo()
if result is None:
return
q, file_path = result
file = q.files.pop(q.position)
insert_at = min(q.position + 10, len(q.files))
q.files.insert(insert_at, file)
log.info("Remind: %s → back in queue at +%d", file_path.name, insert_at - q.position)
def cleanup(self) -> None:
if self.staging_dir.exists():
shutil.rmtree(self.staging_dir, ignore_errors=True)
log.info("Staging dir removed: %s", self.staging_dir)
async def _background_downloader(self) -> None:
log.info("Background downloader started")
for q in self.queues:
while q.pending:
batch = q.pending[:DOWNLOAD_BATCH]
q.pending = q.pending[DOWNLOAD_BATCH:]
log.info(
"Background fetch: %d files for %s (%d remaining)",
len(batch), q.display_name, len(q.pending),
)
try:
await copy_files(q.source_path, batch, q.local_dir)
new_files = sorted(
q.local_dir / p for p in batch if (q.local_dir / p).exists()
)
q.files.extend(new_files)
log.info(
"Background fetch done: +%d files for %s (%d total ready)",
len(new_files), q.display_name, len(q.files),
)
except RcloneError as e:
log.warning("Background fetch failed for %s: %s", q.display_name, e)
q.download_complete = True
log.info("Background downloader finished — all files fetched")
async def run_download(self, sources: list[str], date_start: date, date_end: date) -> None:
log.info(
"Starting download for %d source(s), date range %s%s",
len(sources), date_start, date_end,
)
try:
for source_path in sources:
parts = source_path.rstrip("/").split("/")
display_name = parts[-1] or source_path.split(":")[-1]
local_dir = self.staging_dir / _sanitize_dirname(source_path)
self.download_progress = {
"phase": "scanning",
"copied": 0,
"total": 0,
"current_folder": source_path,
}
try:
all_files = await list_files(source_path)
except RcloneError as e:
self.status = SessionStatus.ERROR
self.error_message = f"Failed to list {source_path}: {e.stderr or str(e)}"
log.error("List failed for %s: %s", source_path, e)
return
filtered = [
f for f in all_files
if Path(f).suffix.lower() in IMAGE_EXTENSIONS
and _filename_in_range(f, date_start, date_end)
]
log.info(
"%s: %d total files, %d match date range",
display_name, len(all_files), len(filtered),
)
if not filtered:
log.info("%s: no matching files, skipping", display_name)
self.queues.append(FolderQueue(
source_path=source_path,
display_name=display_name,
local_dir=local_dir,
files=[],
pending=[],
total=0,
download_complete=True,
))
continue
initial = filtered[:INITIAL_BATCH]
remaining = filtered[INITIAL_BATCH:]
self.download_progress["phase"] = "downloading"
self.download_progress["total"] = len(filtered)
def make_cb():
def cb(copied: int, total: int) -> None:
self.download_progress["copied"] = copied
return cb
log.info("Downloading initial batch of %d for %s", len(initial), display_name)
try:
await copy_files(source_path, initial, local_dir, make_cb())
except RcloneError as e:
self.status = SessionStatus.ERROR
self.error_message = f"Failed to copy from {source_path}: {e.stderr or str(e)}"
log.error("Copy failed for %s: %s", source_path, e)
return
initial_files = sorted(local_dir / p for p in initial if (local_dir / p).exists())
self.queues.append(FolderQueue(
source_path=source_path,
display_name=display_name,
local_dir=local_dir,
files=initial_files,
pending=remaining,
total=len(filtered),
download_complete=not remaining,
))
log.info(
"%s ready: %d initial photos, %d queued for background download",
display_name, len(initial_files), len(remaining),
)
if not self.queues or all(q.total == 0 for q in self.queues):
self.status = SessionStatus.EMPTY
log.info("Session empty — no photos matched")
else:
self.status = SessionStatus.READY
log.info("Session ready — viewer can start")
asyncio.create_task(self._background_downloader())
except Exception as e:
self.status = SessionStatus.ERROR
self.error_message = str(e)
log.exception("Unexpected error in run_download")