import os import sys import subprocess import argparse import json import shutil import platform import time import signal from pathlib import Path from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor from threading import Lock, get_ident # --- Configuration --- DEFAULT_VMAF = 95.0 DEFAULT_PRESET = 6 DEFAULT_WORKERS = 1 DEFAULT_SAMPLES = 4 EXTENSIONS = {".mkv", ".mp4", ".mov", ".avi", ".ts"} TARGETS = [94.0, 93.0, 92.0, 90.0] MIN_SAVINGS_PERCENT = 12.0 TARGET_SAVINGS_FOR_ESTIMATE = 15.0 # Hardware encoder quality compensation (GPU needs higher VMAF target to match CPU quality) HW_ENCODER_VMAF_OFFSET = 2.0 HW_ENCODERS = {"av1_amf", "av1_nvenc", "av1_qsv", "av1_vaapi"} # Global state for resume capability _processed_files = set() _lock = Lock() _shutdown_requested = False _AB_AV1_HELP_CACHE = {} _hw_worker_id = None # Thread ID of the designated hardware worker def claim_hardware_worker(): """Attempt to claim hardware worker status for this thread. Returns True if claimed.""" global _hw_worker_id with _lock: thread_id = get_ident() if _hw_worker_id is None: _hw_worker_id = thread_id return True return _hw_worker_id == thread_id def signal_handler(signum, frame): """Handle graceful shutdown""" global _shutdown_requested _shutdown_requested = True print("\n\n⚠️ Shutdown requested. Finishing current tasks...") signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def check_dependencies(): missing = [] for tool in ["ffmpeg", "ffprobe", "ab-av1"]: if not shutil.which(tool): missing.append(tool) if missing: print(f"Error: Missing required tools: {', '.join(missing)}") print( "Please install FFmpeg and 'ab-av1' (via cargo install ab-av1) before running." ) sys.exit(1) def is_wsl(): if os.environ.get("WSL_DISTRO_NAME"): return True try: with open("/proc/sys/kernel/osrelease", "r", encoding="utf-8") as f: return "microsoft" in f.read().lower() except FileNotFoundError: return False def platform_label(): system = platform.system() if system == "Linux" and is_wsl(): return "Linux (WSL)" return system def _ab_av1_help(subcommand): cached = _AB_AV1_HELP_CACHE.get(subcommand) if cached is not None: return cached try: result = subprocess.run( ["ab-av1", subcommand, "--help"], capture_output=True, text=True, check=False, ) help_text = (result.stdout or "") + "\n" + (result.stderr or "") except Exception: help_text = "" _AB_AV1_HELP_CACHE[subcommand] = help_text return help_text def ab_av1_supports(subcommand, flag): return flag in _ab_av1_help(subcommand) def normalize_hwaccel(value): if value is None: return None v = value.strip() if not v: return None v_lower = v.lower() if v_lower in {"none", "off", "false", "0"}: return None if v_lower != "auto": return v system = platform.system() if system == "Windows": return "d3d11va" if system == "Darwin": return "videotoolbox" return "vaapi" def get_probe_data(filepath): """Get comprehensive video data using ffprobe""" try: cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(filepath), ] res = subprocess.run(cmd, capture_output=True, text=True, check=True) return json.loads(res.stdout) except Exception as e: print(f"Error probing {filepath}: {e}") return None def get_video_stats(data): """Extract video statistics from ffprobe output""" if not data or "streams" not in data or "format" not in data: return None v_stream = next((s for s in data["streams"] if s["codec_type"] == "video"), None) if not v_stream: return None size = int(data["format"].get("size", 0)) duration = float(data["format"].get("duration", 0)) # Calculate bitrate from file size and duration (more reliable than ffprobe's bitrate) if size > 0 and duration > 0: bitrate = int((size * 8) / duration / 1000) else: bitrate = int(data["format"].get("bitrate", 0)) // 1000 return { "codec": v_stream.get("codec_name"), "width": v_stream.get("width"), "height": v_stream.get("height"), "bitrate": bitrate, "size": size, "duration": duration, } def log_result(log_dir, log_name, data): """Log result to JSONL file""" os.makedirs(log_dir, exist_ok=True) log_file = Path(log_dir) / f"{log_name}.jsonl" data["timestamp"] = datetime.now().isoformat() with _lock: with open(log_file, "a") as f: f.write(json.dumps(data) + "\n") def run_command_streaming(cmd, description=""): """Run command and stream output in real-time""" print(f" [Running {description}]") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1, ) if process.stdout: for line in process.stdout: if _shutdown_requested: process.terminate() break print(f" {line.rstrip()}") process.wait() return process.returncode def run_crf_search( filepath, target_vmaf, preset, temp_dir, encoder="svt-av1", use_hw=False, hwaccel=None, ): """Run CRF search for a specific VMAF target""" is_hw_encoder = encoder in HW_ENCODERS # Apply VMAF offset for hardware encoders to match software quality effective_vmaf = ( target_vmaf + HW_ENCODER_VMAF_OFFSET if is_hw_encoder else target_vmaf ) cmd = [ "ab-av1", "crf-search", "-i", str(filepath), "--min-vmaf", str(effective_vmaf), "--preset", str(preset), "--max-encoded-percent", "100", "--temp-dir", temp_dir, "--samples", "4", ] # Add encoder if not default if encoder != "svt-av1": if ab_av1_supports("crf-search", "--encoder"): cmd.extend(["--encoder", encoder]) # Hardware decode acceleration if use_hw and hwaccel: if ab_av1_supports("crf-search", "--enc-input"): cmd.extend(["--enc-input", f"hwaccel={hwaccel}"]) if hwaccel == "vaapi": cmd.extend(["--enc-input", "hwaccel_output_format=vaapi"]) vmaf_label = f"VMAF {effective_vmaf}" if is_hw_encoder else f"VMAF {target_vmaf}" print(f" - Searching for CRF to hit {vmaf_label}...") returncode = run_command_streaming(cmd, f"crf-search {vmaf_label}") if returncode == 0: # Parse output to find CRF and predicted size res = subprocess.run( [ "ab-av1", "crf-search", "-i", str(filepath), "--min-vmaf", str(target_vmaf), "--preset", str(preset), "--temp-dir", temp_dir, "--samples", "4", ], capture_output=True, text=True, ) lines = res.stdout.strip().split("\n") for line in reversed(lines): if "crf" in line.lower(): try: parts = line.split() crf_val = float(parts[1]) percent = 100.0 for p in parts: if "%" in p: percent = float(p.strip("()%")) break return { "crf": crf_val, "predicted_percent": percent, "vmaf": target_vmaf, } except Exception as e: print(f" ! Failed to parse crf-search output: {e}") return None def find_target_savings_params( filepath, start_vmaf, preset, temp_dir, encoder="svt-av1", use_hw=False, hwaccel=None, ): """Find VMAF target that achieves minimum savings""" print(f"\n --- Finding VMAF for {TARGET_SAVINGS_FOR_ESTIMATE}% savings ---") test_targets = [t for t in TARGETS if t <= start_vmaf] for i, target in enumerate(test_targets): if _shutdown_requested: return None print( f" Testing VMAF {target} for {TARGET_SAVINGS_FOR_ESTIMATE}% target... (test {i + 1}/{len(test_targets)})" ) result = run_crf_search( filepath, target, preset, temp_dir, encoder, use_hw, hwaccel ) if result: predicted_savings = 100.0 - result["predicted_percent"] quality_drop = start_vmaf - target print( f" ✓ VMAF {target}: CRF {result['crf']}, Savings {predicted_savings:.1f}%, Drop -{quality_drop:.0f}" ) if predicted_savings >= TARGET_SAVINGS_FOR_ESTIMATE: print(f"\n ✅ FOUND {TARGET_SAVINGS_FOR_ESTIMATE}%+ SAVINGS:") print(f" Target VMAF: {target} (quality drop: -{quality_drop:.0f})") print(f" CRF: {result['crf']}") print(f" Predicted savings: {predicted_savings:.1f}%") return { "target_vmaf": target, "crf": result["crf"], "savings": predicted_savings, "quality_drop": quality_drop, "found": True, } else: print(f" ✗ Could not achieve VMAF {target}") print(f"\n 📝 COULD NOT ACHIEVE {TARGET_SAVINGS_FOR_ESTIMATE}% SAVINGS") print(f" Tried VMAF targets: {test_targets}") return None def run_encode( filepath, output_path, crf, preset, encoder="svt-av1", use_hw=False, hwaccel=None ): """Run full encoding with real-time output streaming""" cmd = [ "ab-av1", "encode", "--input", str(filepath), "--output", str(output_path), "--crf", str(crf), "--preset", str(preset), "--acodec", "copy", ] if encoder != "svt-av1": if ab_av1_supports("encode", "--encoder"): cmd.extend(["--encoder", encoder]) if use_hw and hwaccel: if ab_av1_supports("encode", "--enc-input"): cmd.extend(["--enc-input", f"hwaccel={hwaccel}"]) if hwaccel == "vaapi": cmd.extend(["--enc-input", "hwaccel_output_format=vaapi"]) return run_command_streaming(cmd, f"encoding (CRF {crf})") def provide_recommendations( stats_before, hit_vmaf, predicted_savings, target_result=None ): """Provide recommendations based on analysis results""" print(f"\n --- Recommendations for {stats_before['codec']} content ---") if target_result and target_result["found"]: print(f" 📊 TO HIT {TARGET_SAVINGS_FOR_ESTIMATE}% SAVINGS:") print( f" → Target VMAF: {target_result['target_vmaf']} (drop: -{target_result['quality_drop']:.0f})" ) print(f" → CRF: {target_result['crf']}") print(f" → Predicted: {target_result['savings']:.1f}% savings") print(f" → Trade-off: Quality reduction for space savings") print() if stats_before["bitrate"] < 2000: print(f" → Source bitrate is low ({stats_before['bitrate']}k)") print(f" → AV1 gains minimal on highly compressed sources") print(f" → Recommendation: SKIP - Source already optimized") return if stats_before["codec"] in ["hevc", "h265", "vp9"]: print(f" → Source already uses modern codec ({stats_before['codec']})") print(f" → AV1 gains minimal on already-compressed content") print(f" → Recommendation: SKIP - Codec already efficient") return if target_result and not target_result["found"]: print( f" → Could not achieve {TARGET_SAVINGS_FOR_ESTIMATE}% even at lowest VMAF" ) print(f" → Content may not compress well with AV1") print(f" → Recommendation: SKIP - Review manually") def refresh_plex(plex_url, plex_token): """Refresh Plex library after encoding completion""" if not plex_url or not plex_token: return try: print("\n📡 Refreshing Plex library...") cmd = [ "curl", "-X", "GET", f"{plex_url}/library/sections/1/refresh", "-H", f"X-Plex-Token: {plex_token}", ] subprocess.run(cmd, capture_output=True, check=False) print(" ✓ Plex refresh triggered") except Exception as e: print(f" ⚠️ Failed to refresh Plex: {e}") def process_file( filepath, log_dir, log_name, preset, hw_encoder="svt-av1", use_hw_mode=False, hwaccel=None, ): """Process a single video file with intelligent VMAF targeting""" global _shutdown_requested # Determine if THIS worker should use hardware encoder use_hw = False if use_hw_mode and hwaccel and hw_encoder in HW_ENCODERS: use_hw = claim_hardware_worker() # HW worker uses hardware encoder; CPU workers use svt-av1 encoder = hw_encoder if use_hw else "svt-av1" filepath = Path(filepath) lock_file = Path(log_dir).parent / ".lock" / f"{filepath.name}.lock" # Check lock file (multi-machine coordination) if lock_file.exists(): print(f"Skipping (Locked): {filepath.name}") return True # Create lock lock_file.parent.mkdir(parents=True, exist_ok=True) lock_file.touch() try: probe_data = get_probe_data(filepath) if not probe_data: print(f"Skipping (Invalid probe data): {filepath.name}") return True stats_before = get_video_stats(probe_data) if not stats_before or stats_before["codec"] == "av1": print(f"Skipping (Already AV1 or invalid): {filepath.name}") return True # Mark as processed file_key = str(filepath) with _lock: if file_key in _processed_files: return True _processed_files.add(file_key) print(f"\n--- Processing: {filepath.name} ---") print( f" Source: {stats_before['codec']} @ {stats_before['bitrate']}k, {stats_before['size'] / (1024**3):.2f} GB" ) if _shutdown_requested: return False temp_dir = Path(log_dir).parent / "tmp" temp_dir.mkdir(exist_ok=True) # Step 1: Try VMAF 94 print(f"\n [Step 1] Testing VMAF 94...") search_result_94 = run_crf_search( filepath, 94.0, preset, str(temp_dir), encoder, use_hw, hwaccel ) if not search_result_94: print(f" !! Could not hit VMAF 94") search_result_94 = run_crf_search( filepath, 93.0, preset, str(temp_dir), encoder, use_hw, hwaccel ) if not search_result_94: search_result_94 = run_crf_search( filepath, 92.0, preset, str(temp_dir), encoder, use_hw, hwaccel ) if not search_result_94: search_result_94 = run_crf_search( filepath, 90.0, preset, str(temp_dir), encoder, use_hw, hwaccel ) if not search_result_94: print(f" !! Failed all VMAF targets ({TARGETS}) for {filepath.name}") log_result( log_dir, "failed_searches", { "file": str(filepath), "status": "all_targets_failed", "targets": TARGETS, }, ) provide_recommendations(stats_before, None, 0) return False crf_94 = search_result_94["crf"] predicted_savings_94 = 100.0 - search_result_94["predicted_percent"] if predicted_savings_94 >= MIN_SAVINGS_PERCENT: print( f"\n ✅ VMAF 94 gives {predicted_savings_94:.1f}% savings (≥{MIN_SAVINGS_PERCENT}%)" ) print(f" → Proceeding with VMAF 94, CRF {crf_94}") encode_params = { "crf": crf_94, "vmaf": 94.0, "predicted_percent": search_result_94["predicted_percent"], } else: print( f"\n ⚠️ VMAF 94 gives {predicted_savings_94:.1f}% savings (<{MIN_SAVINGS_PERCENT}%)" ) search_result_93 = run_crf_search( filepath, 93.0, preset, str(temp_dir), encoder, use_hw, hwaccel ) if search_result_93: predicted_savings_93 = 100.0 - search_result_93["predicted_percent"] if predicted_savings_93 >= MIN_SAVINGS_PERCENT: print( f" ✅ VMAF 93 gives {predicted_savings_93:.1f}% savings (≥{MIN_SAVINGS_PERCENT}%)" ) print( f" → Proceeding with VMAF 93, CRF {search_result_93['crf']}" ) encode_params = { "crf": search_result_93["crf"], "vmaf": 93.0, "predicted_percent": search_result_93["predicted_percent"], } else: print( f" ⚠️ VMAF 93 gives {predicted_savings_93:.1f}% savings (also <{MIN_SAVINGS_PERCENT}%)" ) print( f" → Finding VMAF for {TARGET_SAVINGS_FOR_ESTIMATE}% savings..." ) target_result = find_target_savings_params( filepath, 93.0, preset, str(temp_dir), encoder, use_hw, hwaccel ) provide_recommendations( stats_before, 93.0, predicted_savings_93, target_result ) log_result( log_dir, "low_savings_skips", { "file": str(filepath), "vmaf_94": 94.0, "savings_94": predicted_savings_94, "vmaf_93": 93.0, "savings_93": predicted_savings_93, "target_for_15_percent": target_result, "recommendations": "logged_for_review", }, ) return True else: print(f" !! Could not achieve VMAF 93") log_result( log_dir, "failed_searches", {"file": str(filepath), "status": "vmaf_93_failed"}, ) return False temp_output = temp_dir / f"{filepath.stem}.av1_temp.mkv" if temp_output.exists(): temp_output.unlink() start_time = time.time() res = run_encode( filepath, temp_output, encode_params["crf"], preset, encoder, use_hw, hwaccel, ) if res != 0: print(f"\n !! Encode failed with code {res}") if temp_output.exists(): temp_output.unlink() log_result( log_dir, "failed_encodes", {"file": str(filepath), "status": "encode_failed", "returncode": res}, ) return False encode_duration = time.time() - start_time print(f" ✓ Encode completed in {encode_duration:.1f}s") probe_after = get_probe_data(temp_output) stats_after = get_video_stats(probe_after) if not stats_after: print(f" !! Failed to probe encoded file") if temp_output.exists(): temp_output.unlink() return False actual_savings = (1 - (stats_after["size"] / stats_before["size"])) * 100 print(f"\n Results:") print( f" - Before: {stats_before['size'] / (1024**3):.2f} GB @ {stats_before['bitrate']}k" ) print( f" - After: {stats_after['size'] / (1024**3):.2f} GB @ {stats_after['bitrate']}k" ) print(f" - Savings: {actual_savings:.2f}%") final_path = filepath if filepath.suffix.lower() == ".mp4": final_path = filepath.with_suffix(".mkv") if final_path.exists(): final_path.unlink() shutil.move(str(filepath), str(final_path)) shutil.move(str(temp_output), str(final_path)) print(f" ✓ Successfully optimized: {final_path.name}") log_result( log_dir, log_name, { "file": str(final_path), "status": "success", "vmaf": encode_params["vmaf"], "crf": encode_params["crf"], "before": stats_before, "after": stats_after, "duration": encode_duration, "savings": actual_savings, }, ) return True finally: # Remove lock file if lock_file.exists(): lock_file.unlink() def scan_library(root, exclude_dirs=None): """Scan library for video files, excluding certain directories""" exclude_dirs = exclude_dirs or [] files = [] for dirpath, dirnames, filenames in os.walk(root): # Skip excluded directories dirnames[:] = [d for d in dirnames if d not in exclude_dirs] for filename in filenames: if Path(filename).suffix.lower() not in EXTENSIONS: continue full_path = Path(dirpath) / filename if "_av1" in full_path.stem: continue files.append(full_path) return files def main(): parser = argparse.ArgumentParser( description="Optimize video library to AV1 using VMAF targeting." ) parser.add_argument("directory", help="Root directory to scan") parser.add_argument( "--vmaf", type=float, default=DEFAULT_VMAF, help=f"Target VMAF score (default: {DEFAULT_VMAF})", ) parser.add_argument( "--preset", type=int, default=DEFAULT_PRESET, help=f"SVT-AV1 Preset (default: {DEFAULT_PRESET})", ) parser.add_argument( "--workers", type=int, default=DEFAULT_WORKERS, help=f"Concurrent files to process (default: {DEFAULT_WORKERS})", ) parser.add_argument( "--samples", type=int, default=DEFAULT_SAMPLES, help=f"Samples to use for CRF search if supported (default: {DEFAULT_SAMPLES})", ) parser.add_argument( "--hwaccel", default=None, help=( "Hardware acceleration for decode. " "Examples: auto, vaapi, d3d11va, videotoolbox. Use 'none' to disable." ), ) parser.add_argument( "--encoder", default="svt-av1", help=( "Video encoder to use. Default: svt-av1 (CPU). " "Hardware encoders: av1_amf (AMD), av1_nvenc (NVIDIA), av1_qsv (Intel)." ), ) parser.add_argument( "--use-hardware-worker", action="store_true", help="Use 1 hardware encoding worker + rest CPU workers (requires --encoder with HW encoder)", ) parser.add_argument( "--plex-url", default=None, help="Plex server URL (e.g., http://localhost:32400)", ) parser.add_argument( "--plex-token", default=None, help="Plex auth token", ) parser.add_argument( "--log-dir", default="/opt/Optmiser/logs", help="Log directory (default: /opt/Optmiser/logs)", ) args = parser.parse_args() if args.workers < 1: print("Error: --workers must be >= 1") sys.exit(2) check_dependencies() root = Path(args.directory) if not root.exists(): print(f"Directory not found: {root}") sys.exit(1) hwaccel = normalize_hwaccel(args.hwaccel) print(f"Platform: {platform_label()}") print(f"Scanning library: {root}") print(f"VMAF targets: {TARGETS}") print(f"Minimum savings: {MIN_SAVINGS_PERCENT}%") print(f"Estimate target: {TARGET_SAVINGS_FOR_ESTIMATE}%") print(f"Encoder Preset: {args.preset}") print(f"Workers: {args.workers}") if hwaccel: print(f"HWAccel: {hwaccel}") if args.use_hardware_worker: print(f"Hardware worker: 1 HW + {args.workers - 1} CPU") if args.plex_url: print(f"Plex refresh: Enabled") print("-" * 60) # Determine log name based on directory root_parts = str(root).lower().split("/") if "content" in root_parts: log_name = "content" exclude_dirs = [] else: log_name = "tv_movies" exclude_dirs = ["content"] print(f"Log file: {log_name}.jsonl") files = scan_library(root, exclude_dirs) if not files: print("No media files found.") return print(f"Found {len(files)} files to process") print("-" * 60) processed_count = 0 success_count = 0 fail_count = 0 # Hardware worker configuration # HW worker uses specified encoder; CPU workers use svt-av1 hw_encoder = args.encoder if args.encoder in HW_ENCODERS else None use_hw_primary = args.use_hardware_worker and hw_encoder is not None if args.workers == 1: # Single thread - just process files for file_path in files: if _shutdown_requested: break processed_count += 1 result = process_file( file_path, args.log_dir, log_name, args.preset, args.encoder, use_hw_primary, hwaccel, ) if result: success_count += 1 else: fail_count += 1 else: # Multi-threaded processing with ThreadPoolExecutor(max_workers=args.workers) as executor: futures = [] for file_path in files: if _shutdown_requested: break # All workers try to claim HW; only the first thread succeeds # and will use HW for ALL its tasks future = executor.submit( process_file, file_path, args.log_dir, log_name, args.preset, args.encoder, use_hw_primary, hwaccel, ) futures.append(future) for future in as_completed(futures): if _shutdown_requested: break processed_count += 1 try: result = future.result() if result: success_count += 1 else: fail_count += 1 except Exception as e: print(f" !! Unexpected error: {e}") import traceback traceback.print_exc() fail_count += 1 print("\n" + "=" * 60) print(f"SUMMARY: {root}") print(f" Processed: {processed_count} files") print(f" Success/Skip: {success_count}") print(f" Failed: {fail_count}") print("=" * 60) # Refresh Plex on completion if success_count > 0: refresh_plex(args.plex_url, args.plex_token) if __name__ == "__main__": main()