Files
VMAFOptimiser/src/smart_gpu_encoder.py
2026-01-03 11:57:26 +01:00

505 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Smart GPU Encoder (Windows/AMD/NVIDIA Optimized)
------------------------------------------------
Refactored to use vmaf_common.py
"""
import os
import sys
import subprocess
import json
import shutil
import time
import argparse
import signal
import platform
import re
import threading
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
# --- Import Common Module ---
import vmaf_common as common
# --- Configuration ---
TARGET_VMAF_MIN = common.DEFAULT_CONFIG["target_vmaf"]
MIN_SAVINGS_PERCENT = common.DEFAULT_CONFIG["min_savings"]
MAX_JOBS = common.DEFAULT_CONFIG["gpu_jobs"]
DEFAULT_AV1_QP = 32
DEFAULT_HEVC_QP = 28
SAMPLE_DURATION = 60
SAMPLE_START_TIME = 300
TEMP_DIR = None # Will be set in main
# Tools (Local override if needed, else used from common checks)
FFMPEG_BIN = "ffmpeg"
AB_AV1_BIN = "ab-av1"
# Global state
shutdown_requested = False
active_processes = set()
upload_lock = threading.Lock()
proc_lock = threading.Lock()
debug_mode = False
def handle_sigint(signum, frame):
global shutdown_requested
print("\n\n[!] CRITICAL: Shutdown requested (Ctrl+C).")
print(" Killing active encoder processes...")
shutdown_requested = True
with proc_lock:
for proc in list(active_processes):
try:
proc.terminate()
time.sleep(0.1)
if proc.poll() is None:
proc.kill()
except:
pass
print(" Cleanup complete. Exiting.")
sys.exit(1)
signal.signal(signal.SIGINT, handle_sigint)
# --- Hardware Detection ---
def detect_hardware_encoder():
"""Detects available hardware encoders via ffmpeg"""
try:
res = subprocess.run([FFMPEG_BIN, "-hide_banner", "-encoders"], capture_output=True, text=True)
out = res.stdout
av1_enc = None
hevc_enc = None
# Check AMD
if "av1_amf" in out: av1_enc = "av1_amf"
if "hevc_amf" in out: hevc_enc = "hevc_amf"
if av1_enc or hevc_enc: return av1_enc, hevc_enc, "amf"
# Check NVIDIA
if "av1_nvenc" in out: av1_enc = "av1_nvenc"
if "hevc_nvenc" in out: hevc_enc = "hevc_nvenc"
if av1_enc or hevc_enc: return av1_enc, hevc_enc, "nvenc"
# Check Intel
if "av1_qsv" in out: av1_enc = "av1_qsv"
if "hevc_qsv" in out: hevc_enc = "hevc_qsv"
if av1_enc or hevc_enc: return av1_enc, hevc_enc, "qsv"
# Check Apple
if "av1_videotoolbox" in out: av1_enc = "av1_videotoolbox"
if "hevc_videotoolbox" in out: hevc_enc = "hevc_videotoolbox"
if av1_enc or hevc_enc: return av1_enc, hevc_enc, "videotoolbox"
return None, None, "cpu"
except Exception as e:
print(f"[Warning] HW Detection failed: {e}")
return None, None, "cpu"
def get_encoder_args(codec, encoder, qp):
"""Returns correct ffmpeg args for specific HW vendor"""
if not encoder: return []
# AMD AMF
if "amf" in encoder:
common_args = ["-rc", "cqp", "-qp_i", str(qp), "-qp_p", str(qp), "-qp_b", str(qp), "-quality", "quality"]
return ["-c:v", encoder, "-usage", "transcoding"] + common_args
# NVIDIA NVENC
if "nvenc" in encoder:
return ["-c:v", encoder, "-rc", "constqp", "-qp", str(qp), "-preset", "p6", "-spatial-aq", "1"]
# Intel QSV
if "qsv" in encoder:
return ["-c:v", encoder, "-global_quality", str(qp), "-look_ahead", "1"]
# Apple VideoToolbox
if "videotoolbox" in encoder:
q = int(100 - (qp * 2))
return ["-c:v", encoder, "-q:v", str(q)]
# Software Fallback
if encoder == "libsvtav1":
# CRF 20-35 range usually good
return ["-c:v", "libsvtav1", "-crf", str(qp), "-preset", "6", "-g", "240"]
if encoder == "libx265":
return ["-c:v", "libx265", "-crf", str(qp), "-preset", "medium"]
return []
# --- Helpers ---
def run_process(cmd, description="", status_callback=None):
"""Run a process with real-time output and clean shutdown tracking"""
if shutdown_requested: return False
if status_callback: status_callback(description)
try:
# Windows: Hide console window
cflags = 0x08000000 if platform.system() == 'Windows' else 0
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
creationflags=cflags
)
with proc_lock:
active_processes.add(proc)
if proc.stdout:
for line in proc.stdout:
if shutdown_requested:
proc.terminate()
break
line = line.strip()
if line:
if debug_mode: print(f" [Debug] {line}")
if status_callback and ("frame=" in line or "size=" in line or "time=" in line):
status_callback(line)
proc.wait()
with proc_lock:
if proc in active_processes:
active_processes.remove(proc)
return proc.returncode == 0
except Exception as e:
if status_callback: status_callback(f"Error: {e}")
return False
def run_vmaf_check(reference, distorted, status_callback=None):
"""Run ab-av1 vmaf to get score"""
# Use common dependency check to find binary if needed, but here just assume it's in path or bin
ab_exe = "ab-av1"
# Check if bundled exists
bundled = Path(__file__).parent / "bin" / "ab-av1.exe"
if bundled.exists():
ab_exe = str(bundled)
cmd = [ab_exe, "vmaf", "--reference", str(reference), "--distorted", str(distorted)]
if status_callback: status_callback("Calculating VMAF...")
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
for line in result.stdout.splitlines():
line = line.strip()
match = re.search(r"VMAF\s+([0-9.]+)", line)
if match: return float(match.group(1))
try:
val = float(line)
if 0 <= val <= 100: return val
except: pass
return 0.0
except Exception:
return -1.0
# --- Core Logic ---
def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id=0, status_cb=None):
"""
Process a single file.
status_cb: function(worker_id, filename, status_text, color)
"""
av1_enc, hevc_enc, hw_type = encoders
filepath = Path(filepath)
filename = filepath.name
def update(msg, color="white"):
if status_cb: status_cb(worker_id, filename, msg, color)
else: print(f"[{worker_id}] {msg}")
if shutdown_requested: return
# 1. Lock Check (Shared Storage)
lock_file = common.acquire_lock(lock_dir, filepath)
if not lock_file:
return # Locked or skipped
try:
update("Analyzing...", "blue")
# 2. Analyze Source
info = common.get_video_info(filepath)
if not info:
update("Metadata Error", "red")
return
if info["codec"] == "av1":
update("Already AV1 (Skipping)", "green")
time.sleep(1)
return
# 3. Create Samples
sample_ref = TEMP_DIR / f"{filepath.stem}_{worker_id}_ref.mkv"
sample_enc = TEMP_DIR / f"{filepath.stem}_{worker_id}_enc.mkv"
sample_start = SAMPLE_START_TIME
if info["duration"] < (SAMPLE_START_TIME + SAMPLE_DURATION):
sample_start = max(0, (info["duration"] / 2) - (SAMPLE_DURATION / 2))
update("Extracting Ref", "magenta")
cmd_ref = [
FFMPEG_BIN, "-y", "-hide_banner", "-loglevel", "error",
"-ss", str(sample_start), "-t", str(SAMPLE_DURATION),
"-i", str(filepath), "-c", "copy", "-map", "0:v:0",
str(sample_ref)
]
if not run_process(cmd_ref):
update("Extract Ref Failed", "red")
return
# TEST 1: AV1
vmaf_score = 0
savings = 0
if av1_enc:
update(f"Testing AV1 QP{DEFAULT_AV1_QP}", "yellow")
enc_args = get_encoder_args("av1", av1_enc, DEFAULT_AV1_QP)
cmd_enc = [
FFMPEG_BIN, "-y", "-hide_banner", "-loglevel", "error",
"-i", str(sample_ref), *enc_args, "-an", str(sample_enc)
]
if run_process(cmd_enc):
update("Calculating VMAF", "cyan")
vmaf_score = run_vmaf_check(sample_ref, sample_enc)
ref_size = sample_ref.stat().st_size
enc_size = sample_enc.stat().st_size
savings = (1 - (enc_size / ref_size)) * 100 if ref_size > 0 else 0
else:
update("AV1 Test Failed", "red")
chosen_codec = None
chosen_qp = 0
# Decision Logic
if vmaf_score >= TARGET_VMAF_MIN and savings >= MIN_SAVINGS_PERCENT:
chosen_codec = "av1"
chosen_qp = DEFAULT_AV1_QP
update(f"AV1 Good (VMAF {vmaf_score:.1f})", "green")
# Smart Optimization
if vmaf_score > 97.0:
update(f"Optimizing (High Quality {vmaf_score:.1f})", "yellow")
new_qp = DEFAULT_AV1_QP + 4
args_opt = get_encoder_args("av1", av1_enc, new_qp)
cmd_opt = [FFMPEG_BIN, "-y", "-hide_banner", "-loglevel", "error", "-i", str(sample_ref), *args_opt, "-an", str(sample_enc)]
if run_process(cmd_opt):
vmaf_opt = run_vmaf_check(sample_ref, sample_enc)
size_opt = sample_enc.stat().st_size
sav_opt = (1 - (size_opt / sample_ref.stat().st_size)) * 100
if vmaf_opt >= TARGET_VMAF_MIN and sav_opt > savings:
update(f"Opt Accepted (+{sav_opt - savings:.1f}%)", "green")
chosen_qp = new_qp
vmaf_score = vmaf_opt
savings = sav_opt
else:
update("Testing HEVC Fallback", "magenta")
if info["codec"] != "hevc" and hevc_enc:
hevc_args = get_encoder_args("hevc", hevc_enc, DEFAULT_HEVC_QP)
cmd_hevc = [FFMPEG_BIN, "-y", "-hide_banner", "-loglevel", "error", "-i", str(sample_ref), *hevc_args, "-an", str(sample_enc)]
run_process(cmd_hevc)
vmaf_score = run_vmaf_check(sample_ref, sample_enc)
enc_size = sample_enc.stat().st_size
savings = (1 - (enc_size / sample_ref.stat().st_size)) * 100
if vmaf_score >= TARGET_VMAF_MIN and savings >= MIN_SAVINGS_PERCENT:
update(f"HEVC Accepted (VMAF {vmaf_score:.1f})", "green")
chosen_codec = "hevc"
chosen_qp = DEFAULT_HEVC_QP
else:
update("HEVC Rejected", "red")
common.log_event(log_dir, "rejected.jsonl", {"file": str(filepath), "status": "rejected", "vmaf": vmaf_score})
else:
update("Skipping HEVC", "yellow")
# Cleanup Samples
if sample_ref.exists(): sample_ref.unlink()
if sample_enc.exists(): sample_enc.unlink()
# 4. Full Encode
if chosen_codec:
update(f"Encoding {chosen_codec.upper()} (QP {chosen_qp})", "green")
output_file = TEMP_DIR / f"{filepath.stem}.{chosen_codec}.mkv"
final_args = get_encoder_args(chosen_codec, av1_enc if chosen_codec=="av1" else hevc_enc, chosen_qp)
cmd_full = [
FFMPEG_BIN, "-y", "-hide_banner", "-loglevel", "info", "-stats",
"-i", str(filepath),
*final_args,
"-c:a", "copy", "-c:s", "copy", "-map", "0",
str(output_file)
]
def prog_cb(msg):
if "frame=" in msg:
try:
if "time=" in msg:
t_str = msg.split("time=")[1].split(" ")[0]
h, m, s = map(float, t_str.split(':'))
cur_sec = h*3600 + m*60 + s
percent = (cur_sec / info["duration"]) * 100
else:
percent = 0.0
speed = "1x"
if "speed=" in msg:
speed = msg.split("speed=")[1].split("x")[0] + "x"
update(f"Encoding {chosen_codec} | {percent:.1f}% | {speed}", "green")
except:
pass
if run_process(cmd_full, f"Full Encode ({chosen_codec.upper()} QP {chosen_qp})", status_callback=prog_cb):
final_info = common.get_video_info(output_file)
if not final_info: final_info = {"size": output_file.stat().st_size}
final_size = final_info["size"]
final_savings = (1 - (final_size / info["size"])) * 100
saved_bytes = info["size"] - final_size
update(f"Uploading (Saved {final_savings:.1f}%)", "blue")
# UPLOAD (Serialized)
with upload_lock:
update(f"Uploading...", "blue")
backup_path = filepath.with_suffix(f"{filepath.suffix}.original")
try:
shutil.move(str(filepath), str(backup_path))
shutil.copy2(str(output_file), str(filepath))
# Verify integrity
if Path(filepath).stat().st_size == final_size:
output_file.unlink()
backup_path.unlink()
# Refresh metadata for accuracy
final_info_verified = common.get_video_info(filepath)
common.log_event(log_dir, f"{log_category}.jsonl", {
"file": str(filepath),
"status": "success",
"codec": chosen_codec,
"vmaf": vmaf_score,
"savings": final_savings,
"original_metadata": info,
"encoded_metadata": final_info_verified or final_info
})
update("Done", "green")
if status_cb: status_cb(worker_id, filename, f"STATS:SAVED:{saved_bytes}", "green")
else:
update("Upload Failed (Size Mismatch)", "red")
shutil.move(str(backup_path), str(filepath))
except Exception as e:
update(f"Move Error: {str(e)[:20]}", "red")
if backup_path.exists(): shutil.move(str(backup_path), str(filepath))
else:
update("Encode Failed", "red")
if output_file.exists(): output_file.unlink()
except Exception as e:
update(f"Error: {str(e)[:30]}", "red")
finally:
if lock_file.exists(): lock_file.unlink()
update("Idle", "dim")
def main():
global debug_mode
parser = argparse.ArgumentParser()
parser.add_argument("--tv-dir", default=common.DEFAULT_CONFIG["tv_dir"])
parser.add_argument("--content-dir", default=common.DEFAULT_CONFIG["content_dir"])
parser.add_argument("--jobs", type=int, default=MAX_JOBS)
parser.add_argument("--debug", action="store_true")
parser.add_argument("--skip-until", help="Skip all files alphabetically until this filename substring is found")
parser.add_argument("--cpu-only", action="store_true", help="Force software encoding (CPU only)")
parser.add_argument("--temp-dir", help="Override local temp directory")
args = parser.parse_args()
if args.debug:
debug_mode = True
print("[Debug Mode Enabled]")
# 0. Check Dependencies
common.check_dependencies()
# 1. Setup Directories
lock_dir, log_dir = common.get_base_paths(args)
global TEMP_DIR
TEMP_DIR = common.get_temp_dir(args)
# 2. Detect Hardware
av1, hevc, hw = common.detect_hardware_encoder(args)
print("="*60)
print(f" SMART ENCODER | Hardware: {hw.upper()} | Jobs: {args.jobs}")
if hw == "cpu":
print(f" [!] Fallback to CPU Software Encoding (Slow)")
if av1: print(f" AV1: {av1} (libsvtav1)")
if hevc: print(f" HEVC: {hevc} (libx265)")
else:
print(f" AV1: {av1} | HEVC: {hevc}")
print(f" Locks: {lock_dir}")
print("="*60)
# 3. Scan & Queue
tasks = []
tv_path = Path(args.tv_dir)
if tv_path.exists():
print(f"Scanning TV: {tv_path}")
files = list(tv_path.rglob("*.mkv")) + list(tv_path.rglob("*.mp4"))
files.sort(key=lambda x: x.stat().st_size, reverse=True)
for f in files: tasks.append((f, "tv_shows"))
content_path = Path(args.content_dir)
if content_path.exists():
print(f"Scanning Content: {content_path}")
files = list(content_path.rglob("*.mkv")) + list(content_path.rglob("*.mp4"))
files.sort(key=lambda x: x.stat().st_size, reverse=True)
for f in files: tasks.append((f, "content"))
if not tasks:
print("No files found.")
return
# 4. Execute
print(f"\n🚀 Processing {len(tasks)} files...")
with ThreadPoolExecutor(max_workers=args.jobs) as executor:
futures = {
executor.submit(process_file, f, cat, lock_dir, log_dir, (av1, hevc, hw)): f
for f, cat in tasks
}
for future in as_completed(futures):
if shutdown_requested:
executor.shutdown(wait=False, cancel_futures=True)
break
try:
future.result()
except Exception as e:
print(f"Worker Error: {e}")
if __name__ == "__main__":
main()