import os import sys import subprocess import argparse import json import shutil import platform import time import signal from pathlib import Path from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor from threading import Lock # --- Configuration --- DEFAULT_VMAF = 95.0 DEFAULT_PRESET = 6 DEFAULT_WORKERS = 1 DEFAULT_SAMPLES = 4 EXTENSIONS = {".mkv", ".mp4", ".mov", ".avi", ".ts"} TARGETS = [94.0, 93.0, 92.0, 90.0] MIN_SAVINGS_PERCENT = 12.0 TARGET_SAVINGS_FOR_ESTIMATE = 15.0 # Global state for resume capability _processed_files = set() _lock = Lock() _shutdown_requested = False _AB_AV1_HELP_CACHE = {} def signal_handler(signum, frame): """Handle graceful shutdown""" global _shutdown_requested _shutdown_requested = True print("\n\n⚠️ Shutdown requested. Finishing current tasks...") signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def check_dependencies(): missing = [] for tool in ["ffmpeg", "ffprobe", "ab-av1"]: if not shutil.which(tool): missing.append(tool) if missing: print(f"Error: Missing required tools: {', '.join(missing)}") print( "Please install FFmpeg and 'ab-av1' (via cargo install ab-av1) before running." ) sys.exit(1) def is_wsl(): if os.environ.get("WSL_DISTRO_NAME"): return True try: with open("/proc/sys/kernel/osrelease", "r", encoding="utf-8") as f: return "microsoft" in f.read().lower() except FileNotFoundError: return False def platform_label(): system = platform.system() if system == "Linux" and is_wsl(): return "Linux (WSL)" return system def _ab_av1_help(subcommand): cached = _AB_AV1_HELP_CACHE.get(subcommand) if cached is not None: return cached try: result = subprocess.run( ["ab-av1", subcommand, "--help"], capture_output=True, text=True, check=False, ) help_text = (result.stdout or "") + "\n" + (result.stderr or "") except Exception: help_text = "" _AB_AV1_HELP_CACHE[subcommand] = help_text return help_text def ab_av1_supports(subcommand, flag): return flag in _ab_av1_help(subcommand) def normalize_hwaccel(value): if value is None: return None v = value.strip() if not v: return None v_lower = v.lower() if v_lower in {"none", "off", "false", "0"}: return None if v_lower != "auto": return v system = platform.system() if system == "Windows": return "d3d11va" if system == "Darwin": return "videotoolbox" return "vaapi" def get_probe_data(filepath): """Get comprehensive video data using ffprobe""" try: cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(filepath), ] res = subprocess.run(cmd, capture_output=True, text=True, check=True) return json.loads(res.stdout) except Exception as e: print(f"Error probing {filepath}: {e}") return None def get_video_stats(data): """Extract video statistics from ffprobe output""" if not data or "streams" not in data or "format" not in data: return None v_stream = next((s for s in data["streams"] if s["codec_type"] == "video"), None) if not v_stream: return None size = int(data["format"].get("size", 0)) duration = float(data["format"].get("duration", 0)) # Calculate bitrate from file size and duration (more reliable than ffprobe's bitrate) if size > 0 and duration > 0: bitrate = int((size * 8) / duration / 1000) else: bitrate = int(data["format"].get("bitrate", 0)) // 1000 return { "codec": v_stream.get("codec_name"), "width": v_stream.get("width"), "height": v_stream.get("height"), "bitrate": bitrate, "size": size, "duration": duration, } def log_result(log_dir, log_name, data): """Log result to JSONL file""" os.makedirs(log_dir, exist_ok=True) log_file = Path(log_dir) / f"{log_name}.jsonl" data["timestamp"] = datetime.now().isoformat() with _lock: with open(log_file, "a") as f: f.write(json.dumps(data) + "\n") def run_command_streaming(cmd, description=""): """Run command and stream output in real-time""" print(f" [Running {description}]") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1, ) if process.stdout: for line in process.stdout: if _shutdown_requested: process.terminate() break print(f" {line.rstrip()}") process.wait() return process.returncode def run_crf_search(filepath, target_vmaf, preset, temp_dir, use_hw=False, hwaccel=None): """Run CRF search for a specific VMAF target""" cmd = [ "ab-av1", "crf-search", "-i", str(filepath), "--min-vmaf", str(target_vmaf), "--preset", str(preset), "--max-encoded-percent", "100", "--temp-dir", temp_dir, "--samples", "4", # Use 4 samples for speed/accuracy balance ] # Hardware encoding support if use_hw and hwaccel: if ab_av1_supports("crf-search", "--enc-input"): cmd.extend(["--enc-input", f"hwaccel={hwaccel}"]) if hwaccel == "vaapi": cmd.extend(["--enc-input", "hwaccel_output_format=vaapi"]) print(f" - Searching for CRF to hit VMAF {target_vmaf}...") returncode = run_command_streaming(cmd, f"crf-search VMAF {target_vmaf}") if returncode == 0: # Parse output to find CRF and predicted size res = subprocess.run( [ "ab-av1", "crf-search", "-i", str(filepath), "--min-vmaf", str(target_vmaf), "--preset", str(preset), "--temp-dir", temp_dir, "--samples", "4", ], capture_output=True, text=True, ) lines = res.stdout.strip().split("\n") for line in reversed(lines): if "crf" in line.lower(): try: parts = line.split() crf_val = float(parts[1]) percent = 100.0 for p in parts: if "%" in p: percent = float(p.strip("()%")) break return { "crf": crf_val, "predicted_percent": percent, "vmaf": target_vmaf, } except Exception as e: print(f" ! Failed to parse crf-search output: {e}") return None def find_target_savings_params( filepath, start_vmaf, preset, temp_dir, use_hw=False, hwaccel=None ): """Find VMAF target that achieves minimum savings""" print(f"\n --- Finding VMAF for {TARGET_SAVINGS_FOR_ESTIMATE}% savings ---") test_targets = [t for t in TARGETS if t <= start_vmaf] for i, target in enumerate(test_targets): if _shutdown_requested: return None print( f" Testing VMAF {target} for {TARGET_SAVINGS_FOR_ESTIMATE}% target... (test {i + 1}/{len(test_targets)})" ) result = run_crf_search(filepath, target, preset, temp_dir, use_hw, hwaccel) if result: predicted_savings = 100.0 - result["predicted_percent"] quality_drop = start_vmaf - target print( f" ✓ VMAF {target}: CRF {result['crf']}, Savings {predicted_savings:.1f}%, Drop -{quality_drop:.0f}" ) if predicted_savings >= TARGET_SAVINGS_FOR_ESTIMATE: print(f"\n ✅ FOUND {TARGET_SAVINGS_FOR_ESTIMATE}%+ SAVINGS:") print(f" Target VMAF: {target} (quality drop: -{quality_drop:.0f})") print(f" CRF: {result['crf']}") print(f" Predicted savings: {predicted_savings:.1f}%") return { "target_vmaf": target, "crf": result["crf"], "savings": predicted_savings, "quality_drop": quality_drop, "found": True, } else: print(f" ✗ Could not achieve VMAF {target}") print(f"\n 📝 COULD NOT ACHIEVE {TARGET_SAVINGS_FOR_ESTIMATE}% SAVINGS") print(f" Tried VMAF targets: {test_targets}") return None def run_encode(filepath, output_path, crf, preset, use_hw=False, hwaccel=None): """Run full encoding with real-time output streaming""" cmd = [ "ab-av1", "encode", "--input", str(filepath), "--output", str(output_path), "--crf", str(crf), "--preset", str(preset), "--acodec", "copy", ] # Hardware encoding support if use_hw and hwaccel: if ab_av1_supports("encode", "--enc-input"): cmd.extend(["--enc-input", f"hwaccel={hwaccel}"]) if hwaccel == "vaapi": cmd.extend(["--enc-input", "hwaccel_output_format=vaapi"]) return run_command_streaming(cmd, f"encoding (CRF {crf})") def provide_recommendations( stats_before, hit_vmaf, predicted_savings, target_result=None ): """Provide recommendations based on analysis results""" print(f"\n --- Recommendations for {stats_before['codec']} content ---") if target_result and target_result["found"]: print(f" 📊 TO HIT {TARGET_SAVINGS_FOR_ESTIMATE}% SAVINGS:") print( f" → Target VMAF: {target_result['target_vmaf']} (drop: -{target_result['quality_drop']:.0f})" ) print(f" → CRF: {target_result['crf']}") print(f" → Predicted: {target_result['savings']:.1f}% savings") print(f" → Trade-off: Quality reduction for space savings") print() if stats_before["bitrate"] < 2000: print(f" → Source bitrate is low ({stats_before['bitrate']}k)") print(f" → AV1 gains minimal on highly compressed sources") print(f" → Recommendation: SKIP - Source already optimized") return if stats_before["codec"] in ["hevc", "h265", "vp9"]: print(f" → Source already uses modern codec ({stats_before['codec']})") print(f" → AV1 gains minimal on already-compressed content") print(f" → Recommendation: SKIP - Codec already efficient") return if target_result and not target_result["found"]: print( f" → Could not achieve {TARGET_SAVINGS_FOR_ESTIMATE}% even at lowest VMAF" ) print(f" → Content may not compress well with AV1") print(f" → Recommendation: SKIP - Review manually") def refresh_plex(plex_url, plex_token): """Refresh Plex library after encoding completion""" if not plex_url or not plex_token: return try: print("\n📡 Refreshing Plex library...") cmd = [ "curl", "-X", "GET", f"{plex_url}/library/sections/1/refresh", "-H", f"X-Plex-Token: {plex_token}", ] subprocess.run(cmd, capture_output=True, check=False) print(" ✓ Plex refresh triggered") except Exception as e: print(f" ⚠️ Failed to refresh Plex: {e}") def process_file(filepath, log_dir, log_name, preset, use_hw=False, hwaccel=None): """Process a single video file with intelligent VMAF targeting""" global _shutdown_requested filepath = Path(filepath) lock_file = Path(log_dir).parent / ".lock" / f"{filepath.name}.lock" # Check lock file (multi-machine coordination) if lock_file.exists(): print(f"Skipping (Locked): {filepath.name}") return True # Create lock lock_file.parent.mkdir(parents=True, exist_ok=True) lock_file.touch() try: probe_data = get_probe_data(filepath) if not probe_data: print(f"Skipping (Invalid probe data): {filepath.name}") return True stats_before = get_video_stats(probe_data) if not stats_before or stats_before["codec"] == "av1": print(f"Skipping (Already AV1 or invalid): {filepath.name}") return True # Mark as processed file_key = str(filepath) with _lock: if file_key in _processed_files: return True _processed_files.add(file_key) print(f"\n--- Processing: {filepath.name} ---") print( f" Source: {stats_before['codec']} @ {stats_before['bitrate']}k, {stats_before['size'] / (1024**3):.2f} GB" ) if _shutdown_requested: return False temp_dir = Path(log_dir).parent / "tmp" temp_dir.mkdir(exist_ok=True) # Step 1: Try VMAF 94 print(f"\n [Step 1] Testing VMAF 94...") search_result_94 = run_crf_search( filepath, 94.0, preset, str(temp_dir), use_hw, hwaccel ) if not search_result_94: print(f" !! Could not hit VMAF 94") search_result_94 = run_crf_search( filepath, 93.0, preset, str(temp_dir), use_hw, hwaccel ) if not search_result_94: search_result_94 = run_crf_search( filepath, 92.0, preset, str(temp_dir), use_hw, hwaccel ) if not search_result_94: search_result_94 = run_crf_search( filepath, 90.0, preset, str(temp_dir), use_hw, hwaccel ) if not search_result_94: print(f" !! Failed all VMAF targets ({TARGETS}) for {filepath.name}") log_result( log_dir, "failed_searches", { "file": str(filepath), "status": "all_targets_failed", "targets": TARGETS, }, ) provide_recommendations(stats_before, None, 0) return False crf_94 = search_result_94["crf"] predicted_savings_94 = 100.0 - search_result_94["predicted_percent"] if predicted_savings_94 >= MIN_SAVINGS_PERCENT: print( f"\n ✅ VMAF 94 gives {predicted_savings_94:.1f}% savings (≥{MIN_SAVINGS_PERCENT}%)" ) print(f" → Proceeding with VMAF 94, CRF {crf_94}") encode_params = { "crf": crf_94, "vmaf": 94.0, "predicted_percent": search_result_94["predicted_percent"], } else: print( f"\n ⚠️ VMAF 94 gives {predicted_savings_94:.1f}% savings (<{MIN_SAVINGS_PERCENT}%)" ) search_result_93 = run_crf_search( filepath, 93.0, preset, str(temp_dir), use_hw, hwaccel ) if search_result_93: predicted_savings_93 = 100.0 - search_result_93["predicted_percent"] if predicted_savings_93 >= MIN_SAVINGS_PERCENT: print( f" ✅ VMAF 93 gives {predicted_savings_93:.1f}% savings (≥{MIN_SAVINGS_PERCENT}%)" ) print( f" → Proceeding with VMAF 93, CRF {search_result_93['crf']}" ) encode_params = { "crf": search_result_93["crf"], "vmaf": 93.0, "predicted_percent": search_result_93["predicted_percent"], } else: print( f" ⚠️ VMAF 93 gives {predicted_savings_93:.1f}% savings (also <{MIN_SAVINGS_PERCENT}%)" ) print( f" → Finding VMAF for {TARGET_SAVINGS_FOR_ESTIMATE}% savings..." ) target_result = find_target_savings_params( filepath, 93.0, preset, str(temp_dir), use_hw, hwaccel ) provide_recommendations( stats_before, 93.0, predicted_savings_93, target_result ) log_result( log_dir, "low_savings_skips", { "file": str(filepath), "vmaf_94": 94.0, "savings_94": predicted_savings_94, "vmaf_93": 93.0, "savings_93": predicted_savings_93, "target_for_15_percent": target_result, "recommendations": "logged_for_review", }, ) return True else: print(f" !! Could not achieve VMAF 93") log_result( log_dir, "failed_searches", {"file": str(filepath), "status": "vmaf_93_failed"}, ) return False temp_output = temp_dir / f"{filepath.stem}.av1_temp.mkv" if temp_output.exists(): temp_output.unlink() start_time = time.time() res = run_encode( filepath, temp_output, encode_params["crf"], preset, use_hw, hwaccel ) if res != 0: print(f"\n !! Encode failed with code {res}") if temp_output.exists(): temp_output.unlink() log_result( log_dir, "failed_encodes", {"file": str(filepath), "status": "encode_failed", "returncode": res}, ) return False encode_duration = time.time() - start_time print(f" ✓ Encode completed in {encode_duration:.1f}s") probe_after = get_probe_data(temp_output) stats_after = get_video_stats(probe_after) if not stats_after: print(f" !! Failed to probe encoded file") if temp_output.exists(): temp_output.unlink() return False actual_savings = (1 - (stats_after["size"] / stats_before["size"])) * 100 print(f"\n Results:") print( f" - Before: {stats_before['size'] / (1024**3):.2f} GB @ {stats_before['bitrate']}k" ) print( f" - After: {stats_after['size'] / (1024**3):.2f} GB @ {stats_after['bitrate']}k" ) print(f" - Savings: {actual_savings:.2f}%") final_path = filepath if filepath.suffix.lower() == ".mp4": final_path = filepath.with_suffix(".mkv") if final_path.exists(): final_path.unlink() shutil.move(str(filepath), str(final_path)) shutil.move(str(temp_output), str(final_path)) print(f" ✓ Successfully optimized: {final_path.name}") log_result( log_dir, log_name, { "file": str(final_path), "status": "success", "vmaf": encode_params["vmaf"], "crf": encode_params["crf"], "before": stats_before, "after": stats_after, "duration": encode_duration, "savings": actual_savings, }, ) return True finally: # Remove lock file if lock_file.exists(): lock_file.unlink() def scan_library(root, exclude_dirs=None): """Scan library for video files, excluding certain directories""" exclude_dirs = exclude_dirs or [] files = [] for dirpath, dirnames, filenames in os.walk(root): # Skip excluded directories dirnames[:] = [d for d in dirnames if d not in exclude_dirs] for filename in filenames: if Path(filename).suffix.lower() not in EXTENSIONS: continue full_path = Path(dirpath) / filename if "_av1" in full_path.stem: continue files.append(full_path) return files def main(): parser = argparse.ArgumentParser( description="Optimize video library to AV1 using VMAF targeting." ) parser.add_argument("directory", help="Root directory to scan") parser.add_argument( "--vmaf", type=float, default=DEFAULT_VMAF, help=f"Target VMAF score (default: {DEFAULT_VMAF})", ) parser.add_argument( "--preset", type=int, default=DEFAULT_PRESET, help=f"SVT-AV1 Preset (default: {DEFAULT_PRESET})", ) parser.add_argument( "--workers", type=int, default=DEFAULT_WORKERS, help=f"Concurrent files to process (default: {DEFAULT_WORKERS})", ) parser.add_argument( "--samples", type=int, default=DEFAULT_SAMPLES, help=f"Samples to use for CRF search if supported (default: {DEFAULT_SAMPLES})", ) parser.add_argument( "--hwaccel", default=None, help=( "Hardware acceleration for decode. " "Examples: auto, vaapi, d3d11va, videotoolbox. Use 'none' to disable." ), ) parser.add_argument( "--use-hardware-worker", action="store_true", help="Use 1 hardware encoding worker + rest CPU workers (requires --hwaccel)", ) parser.add_argument( "--plex-url", default=None, help="Plex server URL (e.g., http://localhost:32400)", ) parser.add_argument( "--plex-token", default=None, help="Plex auth token", ) parser.add_argument( "--log-dir", default="/opt/Optmiser/logs", help="Log directory (default: /opt/Optmiser/logs)", ) args = parser.parse_args() if args.workers < 1: print("Error: --workers must be >= 1") sys.exit(2) check_dependencies() root = Path(args.directory) if not root.exists(): print(f"Directory not found: {root}") sys.exit(1) hwaccel = normalize_hwaccel(args.hwaccel) print(f"Platform: {platform_label()}") print(f"Scanning library: {root}") print(f"VMAF targets: {TARGETS}") print(f"Minimum savings: {MIN_SAVINGS_PERCENT}%") print(f"Estimate target: {TARGET_SAVINGS_FOR_ESTIMATE}%") print(f"Encoder Preset: {args.preset}") print(f"Workers: {args.workers}") if hwaccel: print(f"HWAccel: {hwaccel}") if args.use_hardware_worker: print(f"Hardware worker: 1 HW + {args.workers - 1} CPU") if args.plex_url: print(f"Plex refresh: Enabled") print("-" * 60) # Determine log name based on directory root_parts = str(root).lower().split("/") if "content" in root_parts: log_name = "content" exclude_dirs = [] else: log_name = "tv_movies" exclude_dirs = ["content"] print(f"Log file: {log_name}.jsonl") files = scan_library(root, exclude_dirs) if not files: print("No media files found.") return print(f"Found {len(files)} files to process") print("-" * 60) processed_count = 0 success_count = 0 fail_count = 0 # Hardware worker configuration use_hw_primary = args.use_hardware_worker and hwaccel is not None if args.workers == 1: # Single thread - just process files for file_path in files: if _shutdown_requested: break processed_count += 1 result = process_file( file_path, args.log_dir, log_name, args.preset, use_hw_primary, hwaccel ) if result: success_count += 1 else: fail_count += 1 else: # Multi-threaded processing with ThreadPoolExecutor(max_workers=args.workers) as executor: futures = [] for file_path in files: if _shutdown_requested: break # Use hardware for first file, CPU for rest use_hw_for_this = use_hw_primary and len(futures) == 0 future = executor.submit( process_file, file_path, args.log_dir, log_name, args.preset, use_hw_for_this, hwaccel, ) futures.append(future) for future in as_completed(futures): if _shutdown_requested: break processed_count += 1 try: result = future.result() if result: success_count += 1 else: fail_count += 1 except Exception as e: print(f" !! Unexpected error: {e}") import traceback traceback.print_exc() fail_count += 1 print("\n" + "=" * 60) print(f"SUMMARY: {root}") print(f" Processed: {processed_count} files") print(f" Success/Skip: {success_count}") print(f" Failed: {fail_count}") print("=" * 60) # Refresh Plex on completion if success_count > 0: refresh_plex(args.plex_url, args.plex_token) if __name__ == "__main__": main()