diff --git a/src/smart_encoder.py b/src/smart_encoder.py index 7ab3a81..13769f8 100644 --- a/src/smart_encoder.py +++ b/src/smart_encoder.py @@ -144,7 +144,7 @@ def process_file(filepath, log_dir, log_category, lock_dir): filename = filepath.name # Check lock (using shared logic now) - lock_file = common.acquire_lock(lock_dir, filepath) + lock_file = common.acquire_lock(lock_dir, filepath, None) # Legacy encoder, no category root if not lock_file: print(f" šŸ”’ Skipping (locked): {filename}") return True diff --git a/src/smart_gpu_encoder.py b/src/smart_gpu_encoder.py index 026ef9a..c04a180 100644 --- a/src/smart_gpu_encoder.py +++ b/src/smart_gpu_encoder.py @@ -151,10 +151,11 @@ def run_vmaf_check(reference, distorted, status_callback=None): return -1.0 # --- Core Logic --- -def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id=0, status_cb=None): +def process_file(filepath, log_category, lock_dir, log_dir, encoders, category_root=None, worker_id=0, status_cb=None): """ Process a single file. status_cb: function(worker_id, filename, status_text, color) + category_root: Root directory (tv_dir or content_dir) for relative path calculation """ av1_enc, hevc_enc, hw_type = encoders filepath = Path(filepath) @@ -172,7 +173,7 @@ def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id= return # 1. Lock Check (Shared Storage) - lock_file = common.acquire_lock(lock_dir, filepath) + lock_file = common.acquire_lock(lock_dir, filepath, category_root) if not lock_file: return # Locked or skipped @@ -354,6 +355,9 @@ def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id= # Mark as processed to prevent re-encoding in future runs common.mark_processed(log_dir, filepath, chosen_codec, vmaf_score, final_savings) + # Mark lock as completed (keep it for future runs) + common.mark_lock_completed(lock_file) + update("Done", "green") if status_cb: status_cb(worker_id, filename, f"STATS:SAVED:{saved_bytes}", "green") else: @@ -368,8 +372,10 @@ def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id= except Exception as e: update(f"Error: {str(e)[:30]}", "red") + # On error, delete lock so file can be retried + if lock_file and lock_file.exists(): + lock_file.unlink() finally: - if lock_file.exists(): lock_file.unlink() update("Idle", "dim") def main(): @@ -463,9 +469,15 @@ def main(): # 4. Execute print(f"\nšŸš€ Processing {len(tasks)} files...") + # Build category root map + category_roots = { + "tv_shows": Path(args.tv_dir), + "content": Path(args.content_dir) + } + with ThreadPoolExecutor(max_workers=args.jobs) as executor: futures = { - executor.submit(process_file, f, cat, lock_dir, log_dir, (av1, hevc, hw)): f + executor.submit(process_file, f, cat, lock_dir, log_dir, (av1, hevc, hw), category_roots.get(cat)): f for f, cat in tasks } diff --git a/src/smart_monitor.py b/src/smart_monitor.py index 5043bc7..bcdae01 100644 --- a/src/smart_monitor.py +++ b/src/smart_monitor.py @@ -220,7 +220,7 @@ class Dashboard: return layout # --- Worker Bridge --- -def worker_wrapper(worker_id, file_path, category, lock_dir, log_dir, encoders, dashboard): +def worker_wrapper(worker_id, file_path, category, category_root, lock_dir, log_dir, encoders, dashboard): def status_callback(w_id, fname, msg, color): # Handle Stats Signal if msg.startswith("STATS:"): @@ -252,7 +252,7 @@ def worker_wrapper(worker_id, file_path, category, lock_dir, log_dir, encoders, dashboard.update_stats("failed") try: - encoder.process_file(file_path, category, lock_dir, log_dir, encoders, worker_id, status_callback) + encoder.process_file(file_path, category, lock_dir, log_dir, encoders, category_root, worker_id, status_callback) except Exception as e: dashboard.add_log(f"Error in worker {worker_id}: {str(e)[:30]}") dashboard.update_stats("failed") @@ -371,11 +371,18 @@ def main(): threads = [] def worker_loop(w_id): + # Map category to root path + category_roots = { + "tv_shows": Path(args.tv_dir), + "content": Path(args.content_dir) + } + while not encoder.shutdown_requested: try: item = work_queue.get(timeout=1) file_path, category = item - worker_wrapper(w_id, file_path, category, lock_dir, log_dir, encoders, dashboard) + category_root = category_roots.get(category) + worker_wrapper(w_id, file_path, category, category_root, lock_dir, log_dir, encoders, dashboard) work_queue.task_done() except queue.Empty: # If batch mode and scan is done and queue is empty, exit diff --git a/src/vmaf_common.py b/src/vmaf_common.py index b7cf4bb..4fb7264 100644 --- a/src/vmaf_common.py +++ b/src/vmaf_common.py @@ -20,6 +20,7 @@ DEFAULT_CONFIG = { "av1_crf": 34, "hevc_crf": 28, "temp_dir_windows": r"C:\Users\bnair\Videos\encodes", + "temp_dir_macos": "/Users/bnair/Documents/encodes", "temp_dir_linux": "~/Videos/encodes" } @@ -91,7 +92,9 @@ def get_temp_dir(args=None): path = Path(args.temp_dir) elif platform.system() == "Windows": path = Path(DEFAULT_CONFIG["temp_dir_windows"]) - else: + elif platform.system() == "Darwin": # macOS + path = Path(DEFAULT_CONFIG["temp_dir_macos"]) + else: # Linux path = Path(DEFAULT_CONFIG["temp_dir_linux"]).expanduser() path.mkdir(parents=True, exist_ok=True) @@ -226,33 +229,70 @@ def get_video_info(filepath): return None # --- Locks --- -def acquire_lock(lock_dir, filepath): +def acquire_lock(lock_dir, filepath, category_root=None): """ - Simple file-based lock. - Returns lock_path if acquired, None if failed. + Simple file-based lock with completion tracking. + Uses relative path from category root for cross-platform compatibility. + + Lock lifecycle: + - Created with status="processing" when file starts + - Updated to status="completed" when file finishes successfully + - Deleted only if process is cancelled/interrupted + + This way lock files act as "already processed" markers across machines. + + Args: + lock_dir: Directory to store lock files + filepath: Full path to the video file + category_root: Root directory (tv_dir or content_dir) to calculate relative path + + Returns lock_path if acquired, None if failed/already completed. """ - # Use hash of full path (not just filename) for uniqueness - fhash = hashlib.md5(str(filepath).encode()).hexdigest() + # Calculate relative path for consistent hashing across platforms + if category_root: + try: + rel_path = Path(filepath).relative_to(category_root) + # Use forward slashes for consistency across platforms + hash_key = str(rel_path).replace(os.sep, '/') + except ValueError: + # File not under category_root, use filename + hash_key = Path(filepath).name + else: + hash_key = Path(filepath).name + + fhash = hashlib.md5(hash_key.encode()).hexdigest() lock_file = lock_dir / f"{fhash}.lock" if lock_file.exists(): - # Check staleness (24h) try: - if time.time() - lock_file.stat().st_mtime > 86400: - lock_file.unlink() - else: - # Lock exists and is fresh - file is being processed elsewhere + # Read lock file to check status + lock_data = json.loads(lock_file.read_text()) + + if lock_data.get("status") == "completed": + # File already processed successfully - skip it return None + + # Check if it's a stale "processing" lock (older than 24h) + if lock_data.get("status") == "processing": + if time.time() - lock_data.get("timestamp", 0) > 86400: + # Stale lock - remove and retry + lock_file.unlink() + else: + # Fresh processing lock - file is being worked on + return None except: - return None + # Corrupted lock file - remove and retry + lock_file.unlink() try: - # Write hostname and timestamp for debugging + # Create new processing lock import socket lock_info = { "file": str(filepath), + "relative_path": hash_key, "host": socket.gethostname(), - "timestamp": time.time() + "timestamp": time.time(), + "status": "processing" } lock_file.write_text(json.dumps(lock_info)) return lock_file @@ -260,6 +300,19 @@ def acquire_lock(lock_dir, filepath): print(f"[Warning] Failed to acquire lock for {filepath.name}: {e}") return None +def mark_lock_completed(lock_file): + """Mark a lock file as completed (keep it for future runs)""" + if not lock_file or not lock_file.exists(): + return + + try: + lock_data = json.loads(lock_file.read_text()) + lock_data["status"] = "completed" + lock_data["completed_at"] = time.time() + lock_file.write_text(json.dumps(lock_data)) + except Exception as e: + print(f"[Warning] Failed to mark lock as completed: {e}") + # --- Hardware Detection --- def detect_hardware_encoder(args=None): """