fixed locks

This commit is contained in:
bnair
2026-01-03 15:12:18 +01:00
parent ef81365fce
commit b1d7630a10
4 changed files with 94 additions and 22 deletions

View File

@@ -144,7 +144,7 @@ def process_file(filepath, log_dir, log_category, lock_dir):
filename = filepath.name filename = filepath.name
# Check lock (using shared logic now) # Check lock (using shared logic now)
lock_file = common.acquire_lock(lock_dir, filepath) lock_file = common.acquire_lock(lock_dir, filepath, None) # Legacy encoder, no category root
if not lock_file: if not lock_file:
print(f" 🔒 Skipping (locked): {filename}") print(f" 🔒 Skipping (locked): {filename}")
return True return True

View File

@@ -151,10 +151,11 @@ def run_vmaf_check(reference, distorted, status_callback=None):
return -1.0 return -1.0
# --- Core Logic --- # --- Core Logic ---
def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id=0, status_cb=None): def process_file(filepath, log_category, lock_dir, log_dir, encoders, category_root=None, worker_id=0, status_cb=None):
""" """
Process a single file. Process a single file.
status_cb: function(worker_id, filename, status_text, color) status_cb: function(worker_id, filename, status_text, color)
category_root: Root directory (tv_dir or content_dir) for relative path calculation
""" """
av1_enc, hevc_enc, hw_type = encoders av1_enc, hevc_enc, hw_type = encoders
filepath = Path(filepath) filepath = Path(filepath)
@@ -172,7 +173,7 @@ def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id=
return return
# 1. Lock Check (Shared Storage) # 1. Lock Check (Shared Storage)
lock_file = common.acquire_lock(lock_dir, filepath) lock_file = common.acquire_lock(lock_dir, filepath, category_root)
if not lock_file: if not lock_file:
return # Locked or skipped return # Locked or skipped
@@ -354,6 +355,9 @@ def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id=
# Mark as processed to prevent re-encoding in future runs # Mark as processed to prevent re-encoding in future runs
common.mark_processed(log_dir, filepath, chosen_codec, vmaf_score, final_savings) common.mark_processed(log_dir, filepath, chosen_codec, vmaf_score, final_savings)
# Mark lock as completed (keep it for future runs)
common.mark_lock_completed(lock_file)
update("Done", "green") update("Done", "green")
if status_cb: status_cb(worker_id, filename, f"STATS:SAVED:{saved_bytes}", "green") if status_cb: status_cb(worker_id, filename, f"STATS:SAVED:{saved_bytes}", "green")
else: else:
@@ -368,8 +372,10 @@ def process_file(filepath, log_category, lock_dir, log_dir, encoders, worker_id=
except Exception as e: except Exception as e:
update(f"Error: {str(e)[:30]}", "red") update(f"Error: {str(e)[:30]}", "red")
# On error, delete lock so file can be retried
if lock_file and lock_file.exists():
lock_file.unlink()
finally: finally:
if lock_file.exists(): lock_file.unlink()
update("Idle", "dim") update("Idle", "dim")
def main(): def main():
@@ -463,9 +469,15 @@ def main():
# 4. Execute # 4. Execute
print(f"\n🚀 Processing {len(tasks)} files...") print(f"\n🚀 Processing {len(tasks)} files...")
# Build category root map
category_roots = {
"tv_shows": Path(args.tv_dir),
"content": Path(args.content_dir)
}
with ThreadPoolExecutor(max_workers=args.jobs) as executor: with ThreadPoolExecutor(max_workers=args.jobs) as executor:
futures = { futures = {
executor.submit(process_file, f, cat, lock_dir, log_dir, (av1, hevc, hw)): f executor.submit(process_file, f, cat, lock_dir, log_dir, (av1, hevc, hw), category_roots.get(cat)): f
for f, cat in tasks for f, cat in tasks
} }

View File

@@ -220,7 +220,7 @@ class Dashboard:
return layout return layout
# --- Worker Bridge --- # --- Worker Bridge ---
def worker_wrapper(worker_id, file_path, category, lock_dir, log_dir, encoders, dashboard): def worker_wrapper(worker_id, file_path, category, category_root, lock_dir, log_dir, encoders, dashboard):
def status_callback(w_id, fname, msg, color): def status_callback(w_id, fname, msg, color):
# Handle Stats Signal # Handle Stats Signal
if msg.startswith("STATS:"): if msg.startswith("STATS:"):
@@ -252,7 +252,7 @@ def worker_wrapper(worker_id, file_path, category, lock_dir, log_dir, encoders,
dashboard.update_stats("failed") dashboard.update_stats("failed")
try: try:
encoder.process_file(file_path, category, lock_dir, log_dir, encoders, worker_id, status_callback) encoder.process_file(file_path, category, lock_dir, log_dir, encoders, category_root, worker_id, status_callback)
except Exception as e: except Exception as e:
dashboard.add_log(f"Error in worker {worker_id}: {str(e)[:30]}") dashboard.add_log(f"Error in worker {worker_id}: {str(e)[:30]}")
dashboard.update_stats("failed") dashboard.update_stats("failed")
@@ -371,11 +371,18 @@ def main():
threads = [] threads = []
def worker_loop(w_id): def worker_loop(w_id):
# Map category to root path
category_roots = {
"tv_shows": Path(args.tv_dir),
"content": Path(args.content_dir)
}
while not encoder.shutdown_requested: while not encoder.shutdown_requested:
try: try:
item = work_queue.get(timeout=1) item = work_queue.get(timeout=1)
file_path, category = item file_path, category = item
worker_wrapper(w_id, file_path, category, lock_dir, log_dir, encoders, dashboard) category_root = category_roots.get(category)
worker_wrapper(w_id, file_path, category, category_root, lock_dir, log_dir, encoders, dashboard)
work_queue.task_done() work_queue.task_done()
except queue.Empty: except queue.Empty:
# If batch mode and scan is done and queue is empty, exit # If batch mode and scan is done and queue is empty, exit

View File

@@ -20,6 +20,7 @@ DEFAULT_CONFIG = {
"av1_crf": 34, "av1_crf": 34,
"hevc_crf": 28, "hevc_crf": 28,
"temp_dir_windows": r"C:\Users\bnair\Videos\encodes", "temp_dir_windows": r"C:\Users\bnair\Videos\encodes",
"temp_dir_macos": "/Users/bnair/Documents/encodes",
"temp_dir_linux": "~/Videos/encodes" "temp_dir_linux": "~/Videos/encodes"
} }
@@ -91,7 +92,9 @@ def get_temp_dir(args=None):
path = Path(args.temp_dir) path = Path(args.temp_dir)
elif platform.system() == "Windows": elif platform.system() == "Windows":
path = Path(DEFAULT_CONFIG["temp_dir_windows"]) path = Path(DEFAULT_CONFIG["temp_dir_windows"])
else: elif platform.system() == "Darwin": # macOS
path = Path(DEFAULT_CONFIG["temp_dir_macos"])
else: # Linux
path = Path(DEFAULT_CONFIG["temp_dir_linux"]).expanduser() path = Path(DEFAULT_CONFIG["temp_dir_linux"]).expanduser()
path.mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
@@ -226,33 +229,70 @@ def get_video_info(filepath):
return None return None
# --- Locks --- # --- Locks ---
def acquire_lock(lock_dir, filepath): def acquire_lock(lock_dir, filepath, category_root=None):
""" """
Simple file-based lock. Simple file-based lock with completion tracking.
Returns lock_path if acquired, None if failed. Uses relative path from category root for cross-platform compatibility.
Lock lifecycle:
- Created with status="processing" when file starts
- Updated to status="completed" when file finishes successfully
- Deleted only if process is cancelled/interrupted
This way lock files act as "already processed" markers across machines.
Args:
lock_dir: Directory to store lock files
filepath: Full path to the video file
category_root: Root directory (tv_dir or content_dir) to calculate relative path
Returns lock_path if acquired, None if failed/already completed.
""" """
# Use hash of full path (not just filename) for uniqueness # Calculate relative path for consistent hashing across platforms
fhash = hashlib.md5(str(filepath).encode()).hexdigest() if category_root:
try:
rel_path = Path(filepath).relative_to(category_root)
# Use forward slashes for consistency across platforms
hash_key = str(rel_path).replace(os.sep, '/')
except ValueError:
# File not under category_root, use filename
hash_key = Path(filepath).name
else:
hash_key = Path(filepath).name
fhash = hashlib.md5(hash_key.encode()).hexdigest()
lock_file = lock_dir / f"{fhash}.lock" lock_file = lock_dir / f"{fhash}.lock"
if lock_file.exists(): if lock_file.exists():
# Check staleness (24h)
try: try:
if time.time() - lock_file.stat().st_mtime > 86400: # Read lock file to check status
lock_file.unlink() lock_data = json.loads(lock_file.read_text())
else:
# Lock exists and is fresh - file is being processed elsewhere if lock_data.get("status") == "completed":
return None # File already processed successfully - skip it
except:
return None return None
# Check if it's a stale "processing" lock (older than 24h)
if lock_data.get("status") == "processing":
if time.time() - lock_data.get("timestamp", 0) > 86400:
# Stale lock - remove and retry
lock_file.unlink()
else:
# Fresh processing lock - file is being worked on
return None
except:
# Corrupted lock file - remove and retry
lock_file.unlink()
try: try:
# Write hostname and timestamp for debugging # Create new processing lock
import socket import socket
lock_info = { lock_info = {
"file": str(filepath), "file": str(filepath),
"relative_path": hash_key,
"host": socket.gethostname(), "host": socket.gethostname(),
"timestamp": time.time() "timestamp": time.time(),
"status": "processing"
} }
lock_file.write_text(json.dumps(lock_info)) lock_file.write_text(json.dumps(lock_info))
return lock_file return lock_file
@@ -260,6 +300,19 @@ def acquire_lock(lock_dir, filepath):
print(f"[Warning] Failed to acquire lock for {filepath.name}: {e}") print(f"[Warning] Failed to acquire lock for {filepath.name}: {e}")
return None return None
def mark_lock_completed(lock_file):
"""Mark a lock file as completed (keep it for future runs)"""
if not lock_file or not lock_file.exists():
return
try:
lock_data = json.loads(lock_file.read_text())
lock_data["status"] = "completed"
lock_data["completed_at"] = time.time()
lock_file.write_text(json.dumps(lock_data))
except Exception as e:
print(f"[Warning] Failed to mark lock as completed: {e}")
# --- Hardware Detection --- # --- Hardware Detection ---
def detect_hardware_encoder(args=None): def detect_hardware_encoder(args=None):
""" """