import os import sys import subprocess import argparse import json import shutil import platform from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path DEFAULT_VMAF = 95.0 DEFAULT_PRESET = 6 DEFAULT_WORKERS = 1 DEFAULT_SAMPLES = 4 EXTENSIONS = {".mkv", ".mp4", ".mov", ".avi", ".ts"} _AB_AV1_HELP_CACHE = {} def check_dependencies(): missing = [] for tool in ["ffmpeg", "ffprobe", "ab-av1"]: if not shutil.which(tool): missing.append(tool) if missing: print(f"Error: Missing required tools: {', '.join(missing)}") print( "Please install FFmpeg and 'ab-av1' (via cargo install ab-av1) before running." ) sys.exit(1) def is_wsl(): if os.environ.get("WSL_DISTRO_NAME"): return True try: with open("/proc/sys/kernel/osrelease", "r", encoding="utf-8") as f: return "microsoft" in f.read().lower() except FileNotFoundError: return False def platform_label(): system = platform.system() if system == "Linux" and is_wsl(): return "Linux (WSL)" return system def _ab_av1_help(subcommand): cached = _AB_AV1_HELP_CACHE.get(subcommand) if cached is not None: return cached try: result = subprocess.run( ["ab-av1", subcommand, "--help"], capture_output=True, text=True, check=False, ) help_text = (result.stdout or "") + "\n" + (result.stderr or "") except Exception: help_text = "" _AB_AV1_HELP_CACHE[subcommand] = help_text return help_text def ab_av1_supports(subcommand, flag): return flag in _ab_av1_help(subcommand) def normalize_hwaccel(value): if value is None: return None v = value.strip() if not v: return None v_lower = v.lower() if v_lower in {"none", "off", "false", "0"}: return None if v_lower != "auto": return v system = platform.system() if system == "Windows": return "d3d11va" if system == "Darwin": return "videotoolbox" return "vaapi" def get_video_info(filepath): try: cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-select_streams", "v:0", filepath, ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) data = json.loads(result.stdout) streams = data.get("streams") or [] if not streams: return None stream = streams[0] codec = stream.get("codec_name", "unknown") color_transfer = stream.get("color_transfer", "unknown") is_hdr = color_transfer in ["smpte2084", "arib-std-b67"] return {"codec": codec, "is_hdr": is_hdr} except Exception as e: print(f"Error probing {filepath}: {e}") return None def build_ab_av1_command(input_path, output_path, args): cmd = [ "ab-av1", "auto-encode", "-i", str(input_path), "-o", str(output_path), "--min-vmaf", str(args.vmaf), "--preset", str(args.preset), ] if args.encoder: if ab_av1_supports("auto-encode", "--encoder"): cmd.extend(["--encoder", args.encoder]) elif ab_av1_supports("auto-encode", "-e"): cmd.extend(["-e", args.encoder]) else: print("Warning: This ab-av1 version does not support --encoder; ignoring.") if args.samples is not None: if ab_av1_supports("auto-encode", "--samples"): cmd.extend(["--samples", str(args.samples)]) elif ab_av1_supports("auto-encode", "--sample-count"): cmd.extend(["--sample-count", str(args.samples)]) else: print("Warning: This ab-av1 version does not support --samples; ignoring.") if args.thorough: if ab_av1_supports("auto-encode", "--thorough"): cmd.append("--thorough") else: print("Warning: This ab-av1 version does not support --thorough; ignoring.") hwaccel = normalize_hwaccel(args.hwaccel) if hwaccel is not None: if ab_av1_supports("auto-encode", "--enc-input"): cmd.extend(["--enc-input", f"hwaccel={hwaccel}"]) hwaccel_output_format = args.hwaccel_output_format if hwaccel_output_format is None and hwaccel == "vaapi": hwaccel_output_format = "vaapi" if hwaccel_output_format is not None: cmd.extend( ["--enc-input", f"hwaccel_output_format={hwaccel_output_format}"] ) else: print( "Warning: This ab-av1 version does not support --enc-input; ignoring --hwaccel." ) if ab_av1_supports("auto-encode", "--acodec"): cmd.extend(["--acodec", "copy"]) elif ab_av1_supports("auto-encode", "--ac"): cmd.extend(["--ac", "copy"]) else: print( "Warning: This ab-av1 version does not support --acodec/--ac; leaving audio defaults." ) return cmd def process_file(filepath, args): input_path = Path(filepath) output_path = input_path.with_stem(input_path.stem + "_av1") if output_path.exists(): print(f"Skipping (Output exists): {input_path.name}") return info = get_video_info(str(input_path)) if not info: return if info["codec"] == "av1": print(f"Skipping (Already AV1): {input_path.name}") return print(f"\nProcessing: {input_path.name}") print(f" Source Codec: {info['codec']}") print(f" HDR: {info['is_hdr']}") cmd = build_ab_av1_command(input_path, output_path, args) try: subprocess.run(cmd, check=True) print(f"Success! Encoded: {output_path.name}") except subprocess.CalledProcessError: print(f"Failed to encode: {input_path.name}") if output_path.exists(): os.remove(output_path) def scan_library(root): files = [] for dirpath, _, filenames in os.walk(root): for filename in filenames: if Path(filename).suffix.lower() not in EXTENSIONS: continue full_path = Path(dirpath) / filename if "_av1" in full_path.stem: continue files.append(full_path) return files def main(): parser = argparse.ArgumentParser( description="Optimize video library to AV1 using VMAF targeting." ) parser.add_argument("directory", help="Root directory to scan") parser.add_argument( "--vmaf", type=float, default=DEFAULT_VMAF, help=f"Target VMAF score (default: {DEFAULT_VMAF})", ) parser.add_argument( "--preset", type=int, default=DEFAULT_PRESET, help=f"SVT-AV1 Preset (default: {DEFAULT_PRESET})", ) parser.add_argument( "--workers", type=int, default=DEFAULT_WORKERS, help=f"Concurrent files to process (default: {DEFAULT_WORKERS})", ) parser.add_argument( "--samples", type=int, default=DEFAULT_SAMPLES, help=f"Samples to use for CRF search if supported (default: {DEFAULT_SAMPLES})", ) parser.add_argument( "--thorough", action="store_true", help="Use ab-av1 thorough mode if supported (slower, more accurate)", ) parser.add_argument( "--encoder", default="svt-av1", help="ab-av1 encoder (default: svt-av1). For AMD AV1 on Windows try: av1_amf", ) parser.add_argument( "--hwaccel", default=None, help=( "Hardware acceleration for decode (passed via ab-av1 --enc-input if supported). " "Examples: auto, vaapi, d3d11va, videotoolbox. Use 'none' to disable." ), ) parser.add_argument( "--hwaccel-output-format", default=None, help="Optional hwaccel_output_format override (e.g., vaapi)", ) args = parser.parse_args() if args.workers < 1: print("Error: --workers must be >= 1") sys.exit(2) check_dependencies() root = Path(args.directory) if not root.exists(): print(f"Directory not found: {root}") sys.exit(1) print(f"Platform: {platform_label()}") print(f"Scanning library: {root}") print(f"Target VMAF: {args.vmaf}") print(f"Encoder Preset: {args.preset}") print(f"Workers: {args.workers}") print(f"Samples: {args.samples}") print(f"Encoder: {args.encoder}") if args.hwaccel is not None: print(f"HWAccel: {args.hwaccel}") print("-" * 50) files = scan_library(root) if not files: print("No media files found.") return if args.workers == 1: for file_path in files: process_file(file_path, args) return with ThreadPoolExecutor(max_workers=args.workers) as executor: futures = [ executor.submit(process_file, file_path, args) for file_path in files ] for future in as_completed(futures): try: future.result() except Exception as e: print(f"Unexpected error: {e}") if __name__ == "__main__": main()