From d1f6731a25e68b2d992853732d66d689a2908bb6 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Fri, 5 Jun 2026 23:06:48 -0400 Subject: [PATCH] [youtubearr] Bump version to 1.20.0 --- plugins/youtubearr/plugin.json | 2 +- plugins/youtubearr/plugin.py | 840 +++++++++++++++++++++++++++------ 2 files changed, 696 insertions(+), 146 deletions(-) diff --git a/plugins/youtubearr/plugin.json b/plugins/youtubearr/plugin.json index b961f63..db49bce 100644 --- a/plugins/youtubearr/plugin.json +++ b/plugins/youtubearr/plugin.json @@ -1,6 +1,6 @@ { "name": "YouTubearr", - "version": "1.19.0", + "version": "1.20.0", "description": "Zero-dependency YouTube livestream plugin with automatic monitoring and configurable numbering", "author": "jeff-gooch", "license": "Unlicense", diff --git a/plugins/youtubearr/plugin.py b/plugins/youtubearr/plugin.py index fd9d9c6..f7e5cce 100644 --- a/plugins/youtubearr/plugin.py +++ b/plugins/youtubearr/plugin.py @@ -19,12 +19,12 @@ from apps.channels.models import Channel, ChannelGroup, ChannelStream, Stream, Logo, ChannelProfile, ChannelProfileMembership from apps.epg.models import EPGData, EPGSource, ProgramData from core.models import StreamProfile -from core.scheduling import create_or_update_periodic_task, delete_periodic_task +from core.scheduling import delete_periodic_task class Plugin: name = "YouTubearr" - version = "1.19.0" + version = "1.20.0" description = "Zero-dependency YouTube livestream plugin with automatic monitoring and configurable numbering" author = "Jeff Gooch" help_url = "https://github.com/jeff-gooch/youtubearr" @@ -154,39 +154,102 @@ class Plugin: }, { "id": "info_webhook", - "label": "Webhook Integration", + "label": "Webhooks", "type": "info", - "description": "Trigger external services (like Jellyfin LiveTV refresh) when channels are added or removed.", + "description": "Trigger external services when channels are added or removed, and send notifications for new streams.", + }, + { + "id": "info_generic_webhooks", + "label": "Generic Webhooks", + "type": "info", + "description": "Configure generic webhook endpoints for media refresh and stream notifications.", + }, + { + "id": "media_refresh_webhook_url", + "label": "Media Refresh Webhook URL", + "type": "string", + "default": "", + "help_text": "URL to POST when channels are added or removed (e.g., Jellyfin, Emby, Plex guide refresh). Sends a structured JSON event body. Leave empty to disable.", + }, + { + "id": "media_refresh_webhook_delay_seconds", + "label": "Media Refresh Webhook Delay (seconds)", + "type": "number", + "default": 5, + "min": 0, + "max": 60, + "help_text": "Delay before sending the media refresh webhook to allow Dispatcharr to finish processing (default: 5 seconds).", + }, + { + "id": "media_refresh_webhook_headers", + "label": "Media Refresh Webhook Headers", + "type": "string", + "default": "", + "help_text": "Optional JSON object of extra request headers (e.g., {\"Authorization\": \"Bearer TOKEN\"}). Leave empty for no extra headers.", + }, + { + "id": "media_refresh_webhook_body_template", + "label": "Media Refresh Webhook Body Template", + "type": "text", + "default": "", + "help_text": "Optional static JSON or text body to send. Leave empty to use the default event payload.", + }, + { + "id": "notification_webhook_url", + "label": "Notification Webhook URL", + "type": "string", + "default": "", + "help_text": "URL to POST when a new stream is added. Sends a generic JSON payload suitable for Telegram bots, Discord, Home Assistant, n8n, or any webhook bridge. Leave empty to disable.", + }, + { + "id": "notification_base_url", + "label": "Notification Base URL", + "type": "string", + "default": "", + "help_text": "Base URL for Dispatcharr stream links in the notification payload (e.g., https://tv.example.com). Used to build stream URLs like {base_url}/proxy/ts/stream/{uuid}.", + }, + { + "id": "notification_webhook_headers", + "label": "Notification Webhook Headers", + "type": "string", + "default": "", + "help_text": "Optional JSON object of extra request headers. Leave empty for no extra headers.", + }, + { + "id": "info_legacy_webhooks", + "label": "Legacy Webhook Aliases", + "type": "info", + "description": "These fields are kept for backward compatibility. Prefer the generic webhook fields above for new setups.", }, { "id": "webhook_url", - "label": "Webhook URL (Jellyfin)", + "label": "Webhook URL (Legacy — media refresh alias, bodyless POST)", "type": "string", "default": "", - "help_text": "URL to POST when channels change (e.g., Jellyfin refresh). Leave empty to disable.", + "help_text": "Legacy media refresh alias. Sends a bodyless POST (original Jellyfin-style). Use 'Media Refresh Webhook URL' above for new setups.", }, { "id": "webhook_delay_seconds", - "label": "Webhook Delay (seconds)", + "label": "Webhook Delay (Legacy — media refresh delay alias)", "type": "number", "default": 5, "min": 0, "max": 60, - "help_text": "Delay before triggering webhook to allow Dispatcharr to finish processing (default: 5 seconds).", + "help_text": "Legacy media refresh delay alias. Use 'Media Refresh Webhook Delay' above for new setups.", }, { "id": "telegram_webhook_url", - "label": "Telegram Notification URL", + "label": "Telegram Notification URL (Legacy — notification alias)", "type": "string", "default": "", - "help_text": "URL to POST for Telegram notifications when new channels are added (e.g., https://example.com/webhook/notify). Leave empty to disable.", + "help_text": "Legacy notification alias. Preserves the original Telegram payload shape for compatibility. Use 'Notification Webhook URL' above for new setups.", }, { "id": "dispatcharr_base_url", - "label": "Dispatcharr Base URL", + "label": "Dispatcharr Base URL (Legacy — notification base URL alias)", "type": "string", "default": "", - "help_text": "Base URL for Dispatcharr stream links in notifications (e.g., https://tv.example.com). Used to build stream URLs like {base_url}/proxy/ts/stream/{uuid}.", + "help_text": "Legacy notification base URL alias. Use 'Notification Base URL' above for new setups.", }, { "id": "info_epg", @@ -274,6 +337,13 @@ class Plugin: "button_label": "Reset All", "button_color": "red", }, + { + "id": "diagnostics", + "label": "Diagnostics", + "description": "Run a non-destructive YouTubearr health check", + "button_label": "Diagnostics", + "button_color": "blue", + }, ] def __init__(self) -> None: @@ -300,6 +370,8 @@ def __init__(self) -> None: # Track video IDs that recently failed metadata extraction to avoid retrying every poll self._extraction_failures: Dict[str, float] = {} # video_id -> unix timestamp of failure + self._legacy_task_cleanup_done = False + # Field defaults self._field_defaults = {field["id"]: field.get("default") for field in self.fields} @@ -333,6 +405,8 @@ def run(self, action: str, params: Dict[str, Any], context: Dict[str, Any]) -> D response = self._handle_cleanup(context) elif action == "reset_all": response = self._handle_reset_all(context) + elif action == "diagnostics": + response = self._handle_diagnostics(context) else: response = {"status": "error", "message": f"Unknown action '{action}'"} @@ -354,8 +428,7 @@ def stop(self, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: def _handle_status(self, context: Dict[str, Any]) -> Dict[str, Any]: """Return current status""" - # IMPORTANT: Always read fresh settings from DB for auto-restart check - # Context settings may be stale (e.g., from Celery beat health check) + # Always read fresh settings from DB for auto-restart check try: cfg = PluginConfig.objects.get(key=self._plugin_key) settings = dict(cfg.settings or {}) @@ -365,6 +438,10 @@ def _handle_status(self, context: Dict[str, Any]) -> Dict[str, Any]: tracked_streams = settings.get("tracked_streams", {}) monitoring_active = settings.get("monitoring_active", False) + # Clean up the bogus Celery beat task left by older plugin versions (once per instance) + # Must run before any early return, so it fires even when yt-dlp is missing. + self._cleanup_legacy_celery_task() + # Check yt-dlp availability if not self._ytdlp_path: return { @@ -372,47 +449,8 @@ def _handle_status(self, context: Dict[str, Any]) -> Dict[str, Any]: "message": "yt-dlp not found (bundled version may not be working). Check logs.", } - # Auto-restart monitoring if DB says active but no thread is actually running - # This handles container/service restarts AND crashed/hung threads - # IMPORTANT: We can't rely on self._monitor_thread because each Celery worker - # creates a new Plugin instance with _monitor_thread=None. Instead, we check - # the monitoring_heartbeat timestamp to see if a thread is actively polling. - thread_dead = not self._monitor_thread or not self._monitor_thread.is_alive() - - # Check if another thread is actually running by looking at heartbeat - heartbeat_str = settings.get("monitoring_heartbeat") - heartbeat_recent = False - if heartbeat_str: - try: - heartbeat = datetime.fromisoformat(heartbeat_str.replace("Z", "+00:00")) - if isinstance(heartbeat.tzinfo, type(None)): - heartbeat = heartbeat.replace(tzinfo=dt_timezone.utc) - age_seconds = (datetime.now(dt_timezone.utc) - heartbeat).total_seconds() - # Heartbeat threshold must be longer than poll_interval + poll_duration - # Poll cycles can take 5-10 minutes with many channels, so use poll_interval + 10 min buffer - poll_interval_minutes = settings.get("poll_interval_minutes", 15) - heartbeat_threshold = (poll_interval_minutes + 10) * 60 # e.g., 25 minutes for 15-min poll - heartbeat_recent = age_seconds < heartbeat_threshold - if heartbeat_recent: - self._log(f"Monitoring heartbeat is recent ({int(age_seconds)}s ago, threshold={heartbeat_threshold}s), skipping auto-restart") - except (ValueError, TypeError): - pass - - if monitoring_active and thread_dead and not heartbeat_recent: - channels = settings.get("monitored_channels", "").strip() - if channels and self._ytdlp_path: - self._log("Auto-restarting monitoring after service restart") - self._monitoring_active = True - self._monitor_stop_event.clear() - self._monitor_thread = threading.Thread( - target=self._monitoring_loop, - args=(self._plugin_key,), - daemon=True, - name="YouTubearr-Monitor" - ) - self._monitor_thread.start() - # Ensure Celery beat health check is registered (idempotent) - self._register_celery_health_check() + # Self-heal: restart the monitor thread if DB says active but no live thread exists + self._ensure_monitoring_thread(settings) message_parts = [] if monitoring_active: @@ -659,9 +697,7 @@ def _handle_start_monitoring(self, context: Dict[str, Any]) -> Dict[str, Any]: self._monitor_thread.start() self._log("Monitoring started") - - # Register Celery beat health check for auto-recovery - self._register_celery_health_check() + self._cleanup_legacy_celery_task() return { "status": "running", @@ -694,9 +730,7 @@ def _handle_stop_monitoring(self, context: Dict[str, Any]) -> Dict[str, Any]: self._monitor_thread.join(timeout=5.0) self._log("Monitoring stopped") - - # Unregister Celery beat health check - self._unregister_celery_health_check() + self._cleanup_legacy_celery_task() return { "status": "stopped", @@ -869,6 +903,258 @@ def _handle_reset_all(self, context: Dict[str, Any]) -> Dict[str, Any]: self._log_error(f"Reset All failed: {exc}") return {"status": "error", "message": f"Reset failed: {str(exc)}"} + # --- Diagnostics --- + + def _handle_diagnostics(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Run a non-destructive health check and return diagnostics details.""" + settings = context.get("settings", {}) + issues: List[str] = [] # "error:" or "warning:" + details: Dict[str, Any] = {} + + # Plugin identity + details["plugin_version"] = self.version + details["plugin_key"] = self._plugin_key + + # Monitoring state + monitoring_active_db = settings.get("monitoring_active", False) + details["monitoring_active"] = monitoring_active_db + thread_alive = bool(self._monitor_thread and self._monitor_thread.is_alive()) + details["monitor_thread_alive"] = thread_alive + details["last_poll_time"] = settings.get("last_poll_time") or "unknown" + details["monitoring_heartbeat"] = settings.get("monitoring_heartbeat") or "unknown" + + if monitoring_active_db and not thread_alive: + heartbeat_str = settings.get("monitoring_heartbeat") + if heartbeat_str: + try: + hb = datetime.fromisoformat(heartbeat_str.replace("Z", "+00:00")) + if hb.tzinfo is None: + hb = hb.replace(tzinfo=dt_timezone.utc) + if (datetime.now(tz=dt_timezone.utc) - hb).total_seconds() > 600: + issues.append("warning:monitoring active but heartbeat is stale (>10 min)") + except Exception: + issues.append("warning:monitoring active but heartbeat is unparseable") + else: + issues.append("warning:monitoring active but no heartbeat found") + + # Monitored channels / tracked streams + monitored_raw = settings.get("monitored_channels", "").strip() + details["monitored_channel_count"] = ( + len([l for l in monitored_raw.splitlines() if l.strip()]) if monitored_raw else 0 + ) + tracked_streams = settings.get("tracked_streams", {}) + details["tracked_stream_count"] = len(tracked_streams) + + # Extraction failures + failure_count = len(self._extraction_failures) + details["extraction_failure_count"] = failure_count + if failure_count > 0: + try: + ts_list = [float(t) for t in self._extraction_failures.values()] + details["extraction_failure_oldest"] = datetime.fromtimestamp( + min(ts_list), tz=dt_timezone.utc + ).isoformat() + details["extraction_failure_newest"] = datetime.fromtimestamp( + max(ts_list), tz=dt_timezone.utc + ).isoformat() + except Exception: + details["extraction_failure_oldest"] = "unavailable" + details["extraction_failure_newest"] = "unavailable" + + # yt-dlp binary + details["ytdlp_path"] = self._ytdlp_path or "not found" + details["ytdlp_version"] = self._get_ytdlp_version() + if not self._ytdlp_path: + issues.append("error:yt-dlp binary not found") + + # QuickJS binary + details["qjs_path"] = self._qjs_path or "not found" + details["qjs_version"] = self._get_qjs_version() + + # Cookies (configured/present, never expose content) + cookies_raw = settings.get("cookies_content", "") + details["cookies_configured"] = bool(cookies_raw and cookies_raw.strip()) + details["cookies_file_present"] = (self._base_dir / "cookies.txt").exists() + + # Webhooks + media_cfg = self._get_media_refresh_webhook_config(settings) + details["media_refresh_webhook_configured"] = bool(media_cfg.get("url")) + details["media_refresh_webhook_is_legacy"] = media_cfg.get("is_legacy", False) + if settings.get("media_refresh_webhook_headers", "").strip() and not media_cfg.get("headers"): + issues.append("warning:media refresh webhook headers are invalid JSON") + + notif_cfg = self._get_notification_webhook_config(settings) + details["notification_webhook_configured"] = bool(notif_cfg.get("url")) + details["notification_webhook_is_legacy"] = notif_cfg.get("is_legacy", False) + if settings.get("notification_webhook_headers", "").strip() and not notif_cfg.get("headers"): + issues.append("warning:notification webhook headers are invalid JSON") + + # DB counts (best effort — defensive against mocked/unavailable Django) + details["owned_streams"] = self._count_owned_streams() + details["owned_channels"] = self._count_owned_channels() + details["owned_programs"] = self._count_owned_programs() + for key in ("owned_streams", "owned_channels", "owned_programs"): + if isinstance(details.get(key), str) and "unavailable" in details[key]: + issues.append(f"warning:{key} count unavailable") + details["epg_counts"] = self._get_epg_counts(settings) + + # Legacy Celery beat task presence + try: + from django_celery_beat.models import PeriodicTask as _PT + _task_name = f"youtubearr_{self._plugin_key}_health_check" + present = _PT.objects.filter(name=_task_name).exists() + details["legacy_celery_health_check_present"] = present + if present: + issues.append("warning:legacy Celery beat task present — causes unregistered-task spam; will auto-remove on next status/start action") + except Exception as _exc: + details["legacy_celery_health_check_present"] = f"unavailable: {_exc}" + + # Stale stream URLs (live streams whose URL hasn't been refreshed recently) + _url_refresh_interval = settings.get("url_refresh_interval_seconds", 3600) + _stale_threshold = 2 * _url_refresh_interval + _now_utc = datetime.now(dt_timezone.utc) + stale_count = 0 + oldest_stale_age = 0.0 + for _vid, _sd in tracked_streams.items(): + if not _sd.get("is_live"): + continue + _last_str = _sd.get("last_url_refresh") + if not _last_str: + stale_count += 1 + continue + try: + _lr = datetime.fromisoformat(_last_str.replace("Z", "+00:00")) + if _lr.tzinfo is None: + _lr = _lr.replace(tzinfo=dt_timezone.utc) + _age = (_now_utc - _lr).total_seconds() + if _age > _stale_threshold: + stale_count += 1 + oldest_stale_age = max(oldest_stale_age, _age) + except (ValueError, TypeError): + stale_count += 1 + details["stale_tracked_stream_url_count"] = stale_count + if oldest_stale_age: + details["oldest_url_refresh_age_seconds"] = int(oldest_stale_age) + if stale_count > 0: + issues.append(f"warning:{stale_count} live stream URL(s) are stale (monitor may not be refreshing URLs)") + + # Recent log summary + details["log_summary"] = self._get_recent_log_summary() + + # Status + errors = [i for i in issues if i.startswith("error:")] + warnings = [i for i in issues if i.startswith("warning:")] + if errors: + status, message = "error", "YouTubearr diagnostics found errors" + elif warnings: + status, message = "warning", "YouTubearr diagnostics completed with warnings" + else: + status, message = "success", "YouTubearr diagnostics completed: healthy" + + return {"status": status, "message": message, "details": details} + + def _get_ytdlp_version(self) -> str: + """Return yt-dlp version string, or a safe error description.""" + if not self._ytdlp_path: + return "unavailable: yt-dlp not found" + try: + result = subprocess.run( + [self._ytdlp_path, "--version"], + capture_output=True, text=True, timeout=5, + ) + ver = result.stdout.strip() or result.stderr.strip() + return ver if ver else "unknown" + except FileNotFoundError: + return "unavailable: binary not found" + except subprocess.TimeoutExpired: + return "unavailable: timeout" + except Exception as exc: + return f"unavailable: {exc}" + + def _get_qjs_version(self) -> str: + """Return QuickJS version string from --version output (may exit nonzero).""" + if not self._qjs_path: + return "not configured" + try: + result = subprocess.run( + [self._qjs_path, "--version"], + capture_output=True, text=True, timeout=5, + ) + combined = (result.stdout + result.stderr).strip() + m = re.search(r"QuickJS(?:-ng)?\s+version\s+\S+", combined, re.IGNORECASE) + if m: + return m.group(0) + return combined[:80] if combined else "unknown" + except FileNotFoundError: + return "unavailable: binary not found" + except subprocess.TimeoutExpired: + return "unavailable: timeout" + except Exception as exc: + return f"unavailable: {exc}" + + def _count_owned_streams(self) -> Any: + try: + return Stream.objects.filter(custom_properties__owner="youtubearr").count() + except Exception as exc: + return f"unavailable: {type(exc).__name__}" + + def _count_owned_channels(self) -> Any: + try: + return Channel.objects.filter( + streams__custom_properties__owner="youtubearr" + ).distinct().count() + except Exception as exc: + return f"unavailable: {type(exc).__name__}" + + def _count_owned_programs(self) -> Any: + try: + return ProgramData.objects.filter(custom_properties__owner="youtubearr").count() + except Exception as exc: + return f"unavailable: {type(exc).__name__}" + + def _get_epg_counts(self, settings: Dict[str, Any]) -> Dict[str, Any]: + epg_source_name = settings.get("epg_source_name", "YouTube Live").strip() + result: Dict[str, Any] = {} + try: + source = EPGSource.objects.filter(name=epg_source_name).first() + if source: + result["epg_source"] = epg_source_name + result["epg_data_count"] = EPGData.objects.filter(epg_source=source).count() + result["program_count"] = ProgramData.objects.filter(epg__epg_source=source).count() + else: + result["epg_source"] = f"{epg_source_name} (not found)" + result["epg_data_count"] = 0 + result["program_count"] = 0 + except Exception as exc: + result["epg_source"] = f"unavailable: {type(exc).__name__}" + result["epg_data_count"] = f"unavailable: {type(exc).__name__}" + result["program_count"] = f"unavailable: {type(exc).__name__}" + return result + + def _get_recent_log_summary(self) -> Dict[str, Any]: + result: Dict[str, Any] = {"error_count": 0, "recent_errors": [], "status": "ok"} + if not self._log_path.exists(): + result["status"] = "log file not found" + return result + try: + max_bytes = 32 * 1024 + size = self._log_path.stat().st_size + with open(self._log_path, "rb") as f: + if size > max_bytes: + f.seek(size - max_bytes) + raw = f.read(max_bytes) + text = raw.decode("utf-8", errors="replace") + lines = text.splitlines() + if size > max_bytes and lines: + lines = lines[1:] # drop possibly-truncated first line + error_lines = [l for l in lines if "ERROR:" in l] + result["error_count"] = len(error_lines) + result["recent_errors"] = [e[-120:] for e in error_lines[-5:]] + result["lines_scanned"] = len(lines) + except Exception as exc: + result["status"] = f"read failed: {type(exc).__name__}" + return result + # --- YouTube URL Parsing --- def _extract_video_id(self, url: str) -> Optional[str]: @@ -1096,6 +1382,7 @@ def _create_stream_and_channel( cfg = PluginConfig.objects.select_for_update().get(key=self._plugin_key) video_title = metadata.get("title", "YouTube Live") + video_id = metadata.get("video_id", "") stream_url = metadata.get("stream_url", "") thumbnail = metadata.get("thumbnail", "") channel_thumbnail = metadata.get("channel_thumbnail", "") @@ -1111,6 +1398,29 @@ def _create_stream_and_channel( stream_profile_id=self._get_stream_profile_id(settings), ) + # Apply YouTubearr ownership tags to stream custom_properties + try: + _existing = getattr(stream, 'custom_properties', None) + stream.custom_properties = self._merge_youtubearr_custom_properties( + _existing if isinstance(_existing, dict) else {}, + youtube_video_id=video_id, + youtube_channel_id=youtube_channel_id, + stream_url_refreshed_at=timezone.now().isoformat(), + ) + stream.save(update_fields=['custom_properties']) + except Exception: + pass # custom_properties field may not exist on this Dispatcharr version + + # Associate stream with custom M3U account for correct playback routing + try: + m3u_account = self._get_custom_m3u_account() + if m3u_account is not None: + stream.is_custom = True + stream.m3u_account = m3u_account + stream.save(update_fields=['is_custom', 'm3u_account']) + except Exception: + pass # Fields may not exist on this Dispatcharr version + # Get or create channel group group_name = settings.get("channel_group_name", self._channel_group_name) group, _ = ChannelGroup.objects.get_or_create(name=group_name) @@ -1124,14 +1434,9 @@ def _create_stream_and_channel( # Format channel name as: {youtube_channel_name} #{stream_number} numbering_mode = settings.get("channel_numbering_mode", "decimal") if numbering_mode == "decimal": - # Extract stream number from sub-channel (e.g., 93.2 → #2) - decimal_part = channel_number % 1 - if decimal_part > 0: - stream_number = int(round(decimal_part * 10)) - if stream_number == 0: - stream_number = int(round(decimal_part * 100)) # Handle .01, .02, etc. - else: - stream_number = 1 + # Extract stream number using string-safe parsing (e.g., 93.2 → 2, 93.11 → 11). + # Float math (decimal_part * 10) gives wrong results for values >= .10. + stream_number = self._get_subchannel_index(channel_number) else: # Sequential mode: count ACTIVE streams from this YouTube channel + 1. # Only count tracked_streams entries whose channel_id still exists in the DB. @@ -1222,7 +1527,7 @@ def _create_stream_and_channel( # Ensure a single program exists so the guide shows the stream title. now = timezone.now() - ProgramData.objects.update_or_create( + program_obj, _ = ProgramData.objects.update_or_create( epg=epg_data, tvg_id=channel_tvg_id, defaults={ @@ -1232,6 +1537,16 @@ def _create_stream_and_channel( "end_time": now + timedelta(hours=12), }, ) + try: + _existing_pp = getattr(program_obj, 'custom_properties', None) + program_obj.custom_properties = self._merge_youtubearr_custom_properties( + _existing_pp if isinstance(_existing_pp, dict) else {}, + youtube_video_id=video_id, + youtube_channel_id=youtube_channel_id, + ) + program_obj.save(update_fields=['custom_properties']) + except Exception: + pass # custom_properties may not exist on this Dispatcharr version except Exception as epg_exc: self._log(f"Could not assign EPG: {epg_exc}") @@ -2449,6 +2764,14 @@ def _monitoring_loop(self, plugin_key: str) -> None: # This prevents other Celery workers from starting duplicate threads self._persist_settings({"monitoring_heartbeat": timezone.now().isoformat()}) + # Prune stale extraction failures to keep the dict bounded + try: + _pruned = self._prune_extraction_failures() + if _pruned: + self._log(f"Pruned {_pruned} stale extraction failure(s)") + except Exception: + pass + # Poll channels try: added, ended = self._poll_monitored_channels(settings) @@ -2519,6 +2842,100 @@ def _persist_settings(self, updates: Dict[str, Any]) -> None: except PluginConfig.DoesNotExist: self._log_error("Plugin config not found") + # --- Operational Helpers --- + + def _prune_extraction_failures(self, ttl_days: int = 7, now: Optional[float] = None) -> int: + """Remove extraction failure entries older than ttl_days from the in-memory dict. + + Returns count of pruned entries. Malformed timestamps are also pruned. + Future timestamps (members-only entries stored at now+6days) are preserved. + """ + if now is None: + now = time.time() + cutoff = now - ttl_days * 86400 + to_prune = [] + for vid, fail_time in list(self._extraction_failures.items()): + try: + if float(fail_time) < cutoff: + to_prune.append(vid) + except (TypeError, ValueError): + to_prune.append(vid) # Malformed timestamp — prune it + for vid in to_prune: + del self._extraction_failures[vid] + return len(to_prune) + + def _get_subchannel_index(self, channel_number, base_channel=None) -> int: + """Extract subchannel index from channel_number using string parsing, not float math. + + Examples: "90.1" -> 1, "90.11" -> 11, "90.21" -> 21 + Float input is accepted; Python's str() gives the shortest decimal form. + """ + s = str(channel_number) + if '.' in s: + try: + return int(s.split('.', 1)[1]) + except (ValueError, IndexError): + return 1 + return 1 + + def _cache_bust_image_url(self, url, enabled: bool = True, timestamp=None): + """Append or replace a ytarr_ts query parameter on an image URL. + + Returns the original value unchanged when url is blank, a data URI, + a local path, or cache-busting is disabled. + """ + if not url: + return url + if not enabled: + return url + if isinstance(url, str) and url.startswith(('data:', '/', 'file:')): + return url + ts = str(int(timestamp)) if timestamp is not None else str(int(time.time())) + if '?' in url: + base, query = url.split('?', 1) + params = [p for p in query.split('&') if p and not p.startswith('ytarr_ts=')] + params.append(f'ytarr_ts={ts}') + return base + '?' + '&'.join(params) + return f'{url}?ytarr_ts={ts}' + + def _merge_youtubearr_custom_properties(self, existing, **metadata) -> dict: + """Return a new dict with YouTubearr ownership fields merged into existing props. + + Stamps owner='youtubearr' and any additional metadata kwargs. + Does not mutate the input dict. Safe against None or non-dict input. + Only call on Stream/ProgramData — Channel and EPGSource lack custom_properties. + """ + result: dict = {} + if existing: + try: + result = dict(existing) + except (TypeError, ValueError): + pass + result['owner'] = 'youtubearr' + result.update(metadata) + return result + + def _get_custom_m3u_account(self): + """Return the custom/built-in M3UAccount for stream association. + + Uses a lazy import so local unit tests without Dispatcharr installed + never fail at import time. Returns None on any failure. + """ + try: + from apps.m3u.models import M3UAccount # noqa: PLC0415 + try: + return M3UAccount.get_custom_account() + except AttributeError: + account, _ = M3UAccount.objects.get_or_create( + name='custom', + defaults={'is_active': True, 'locked': True, 'max_streams': 0}, + ) + return account + except ImportError: + return None + except Exception: + return None + # --- XMLTV Cache Generation --- def _generate_xmltv_cache(self, settings: Dict[str, Any]) -> None: @@ -2598,58 +3015,145 @@ def escape_xml(s: str) -> str: except Exception as e: self._log_error(f"Failed to generate XMLTV cache: {e}") - # --- Logging --- + # --- Webhook / Notification --- - def _trigger_webhook(self, settings: Dict[str, Any]) -> None: - """Trigger webhook URL when channels change (with configurable delay)""" - # Generate XMLTV cache before triggering webhook so Jellyfin has fresh data - self._generate_xmltv_cache(settings) - webhook_url = settings.get("webhook_url", "").strip() - - if not webhook_url: - return # Webhook disabled - - # Get delay setting (default 5 seconds to let Dispatcharr finish processing) - delay_seconds = settings.get("webhook_delay_seconds", 5) + def _parse_webhook_headers(self, raw: str) -> Dict[str, str]: + """Safely parse a JSON object string into a header dict. Invalid input is logged and ignored.""" + if not raw or not isinstance(raw, str): + return {} + raw = raw.strip() + if not raw: + return {} + try: + parsed = json.loads(raw) + if not isinstance(parsed, dict): + self._log_error("Webhook headers must be a JSON object string; ignoring") + return {} + return {str(k): str(v) for k, v in parsed.items()} + except json.JSONDecodeError as exc: + self._log_error(f"Invalid webhook headers JSON (ignored): {exc}") + return {} + + def _get_media_refresh_webhook_config(self, settings: Dict[str, Any]) -> Dict[str, Any]: + """Resolve media refresh webhook settings. New keys take precedence over legacy ones.""" + new_url = settings.get("media_refresh_webhook_url", "").strip() + legacy_url = settings.get("webhook_url", "").strip() + url = new_url or legacy_url + is_legacy = bool(legacy_url and not new_url) + + raw_delay = settings.get("media_refresh_webhook_delay_seconds", + settings.get("webhook_delay_seconds", 5)) try: - delay_seconds = int(delay_seconds) - if delay_seconds < 0: - delay_seconds = 0 - elif delay_seconds > 60: - delay_seconds = 60 + delay = max(0, min(60, int(raw_delay))) except (TypeError, ValueError): - delay_seconds = 5 + delay = 5 - try: - if delay_seconds > 0: - self._log(f"Waiting {delay_seconds}s before triggering webhook...") - time.sleep(delay_seconds) + return { + "url": url, + "method": settings.get("media_refresh_webhook_method", "POST"), + "delay": delay, + "headers": self._parse_webhook_headers(settings.get("media_refresh_webhook_headers", "")), + "body_template": settings.get("media_refresh_webhook_body_template", ""), + "is_legacy": is_legacy, + } - self._log(f"Triggering webhook: {webhook_url}") - req = urllib.request.Request(webhook_url, method='POST') - req.add_header('Content-Type', 'application/json') + def _get_notification_webhook_config(self, settings: Dict[str, Any]) -> Dict[str, Any]: + """Resolve notification webhook settings. New keys take precedence over legacy ones.""" + new_url = settings.get("notification_webhook_url", "").strip() + legacy_url = settings.get("telegram_webhook_url", "").strip() + url = new_url or legacy_url + is_legacy = bool(legacy_url and not new_url) - with urllib.request.urlopen(req, timeout=10) as response: - status = response.status - if status in [200, 204]: - self._log(f"Webhook triggered successfully (HTTP {status})") + new_base = settings.get("notification_base_url", "").strip().rstrip("/") + legacy_base = settings.get("dispatcharr_base_url", "").strip().rstrip("/") + base_url = new_base or legacy_base + + return { + "url": url, + "method": settings.get("notification_webhook_method", "POST"), + "headers": self._parse_webhook_headers(settings.get("notification_webhook_headers", "")), + "base_url": base_url, + "is_legacy": is_legacy, + } + + def _send_webhook_request(self, url: str, method: str = 'POST', + headers: Optional[Dict[str, str]] = None, + body: Optional[str] = None, timeout: int = 10) -> tuple: + """Send a single webhook HTTP request. Returns (status, response_text). May raise.""" + data = body.encode('utf-8') if isinstance(body, str) else None + req = urllib.request.Request(url, data=data, method=method) + req.add_header('Content-Type', 'application/json') + if headers: + for k, v in headers.items(): + req.add_header(k, str(v)) + with urllib.request.urlopen(req, timeout=timeout) as response: + return response.status, response.read().decode('utf-8', errors='replace') + + def _send_webhook_async(self, kind: str, url: str, method: str, + headers: Optional[Dict[str, str]], body: Optional[str], + delay_seconds: int = 0) -> None: + """Fire a webhook in a short-lived daemon thread. Never blocks the caller.""" + def _worker(): + try: + if delay_seconds > 0: + self._log(f"[{kind}] Waiting {delay_seconds}s before sending webhook...") + time.sleep(delay_seconds) + self._log(f"[{kind}] Sending webhook: {url}") + status, _ = self._send_webhook_request(url, method=method, + headers=headers, body=body, timeout=10) + if status in [200, 201, 204]: + self._log(f"[{kind}] Webhook sent successfully (HTTP {status})") else: - self._log(f"Webhook returned HTTP {status}") - except Exception as exc: - self._log_error(f"Failed to trigger webhook: {exc}") + self._log(f"[{kind}] Webhook returned HTTP {status}") + except Exception as exc: + self._log_error(f"[{kind}] Webhook failed: {exc}") - def _send_telegram_notification(self, settings: Dict[str, Any], video_id: str, metadata: Dict[str, Any], channel_number: int, channel_uuid: str) -> None: - """Send Telegram notification when a new channel is added""" - telegram_url = settings.get("telegram_webhook_url", "").strip() + t = threading.Thread(target=_worker, daemon=True) + t.start() - if not telegram_url: - return # Telegram notifications disabled + def _trigger_webhook(self, settings: Dict[str, Any]) -> None: + """Trigger media refresh webhook when channels change. Returns immediately (non-blocking).""" + # Generate XMLTV cache before triggering webhook so Jellyfin has fresh data + self._generate_xmltv_cache(settings) - try: - # Build the payload for Claudia (use channel UUID for Dispatcharr stream URL) - base_url = settings.get("dispatcharr_base_url", "").strip().rstrip("/") + config = self._get_media_refresh_webhook_config(settings) + url = config["url"] + if not url: + return + + if config["body_template"]: + body = config["body_template"] + elif config["is_legacy"]: + # Legacy Jellyfin-style: bodyless POST + body = None + else: + body = json.dumps({ + "event": "media_refresh_requested", + "plugin": "youtubearr", + "reason": "streams_changed", + "timestamp": datetime.now(dt_timezone.utc).isoformat(), + }) + + self._send_webhook_async( + "media_refresh", url, config["method"], config["headers"], body, + delay_seconds=config["delay"], + ) + + def _send_telegram_notification(self, settings: Dict[str, Any], video_id: str, + metadata: Dict[str, Any], channel_number: int, + channel_uuid: str) -> None: + """Send notification webhook when a new channel is added.""" + config = self._get_notification_webhook_config(settings) + url = config["url"] + if not url: + return + + base_url = config["base_url"] + + if config["is_legacy"]: + # Legacy Telegram payload — preserve exact key shape if not base_url: - self._log("Skipping Telegram notification: dispatcharr_base_url not configured") + self._log("Skipping notification: base URL not configured") return dispatcharr_url = f"{base_url}/proxy/ts/stream/{channel_uuid}" payload = { @@ -2657,50 +3161,96 @@ def _send_telegram_notification(self, settings: Dict[str, Any], video_id: str, m "channel": metadata.get("youtube_channel_name", "YouTube"), "url": dispatcharr_url, "description": f"Added as Dispatcharr Channel #{channel_number}", - "timestamp": datetime.now(dt_timezone.utc).isoformat() + "timestamp": datetime.now(dt_timezone.utc).isoformat(), + } + else: + # Generic notification payload + dispatcharr_url = f"{base_url}/proxy/ts/stream/{channel_uuid}" if base_url else "" + payload = { + "event": "stream_added", + "plugin": "youtubearr", + "video_id": video_id, + "title": metadata.get("title", "YouTube Live Stream"), + "channel_name": metadata.get("youtube_channel_name", "YouTube"), + "channel_number": str(channel_number), + "dispatcharr_channel_uuid": channel_uuid, + "url": dispatcharr_url, + "thumbnail": metadata.get("thumbnail", ""), + "timestamp": datetime.now(dt_timezone.utc).isoformat(), } - self._log(f"Sending Telegram notification for: {metadata.get('title', 'stream')[:60]}...") - - data = json.dumps(payload).encode('utf-8') - req = urllib.request.Request(telegram_url, data=data, method='POST') - req.add_header('Content-Type', 'application/json') - - with urllib.request.urlopen(req, timeout=10) as response: - status = response.status - if status in [200, 201, 204]: - self._log(f"Telegram notification sent successfully (HTTP {status})") - else: - self._log_error(f"Telegram notification failed: HTTP {status} from {telegram_url}") - except Exception as exc: - self._log_error(f"Failed to send Telegram notification: {exc}") + self._log(f"Sending notification for: {metadata.get('title', 'stream')[:60]}...") + self._send_webhook_async( + "notification", url, config["method"], config["headers"], + json.dumps(payload), delay_seconds=0, + ) - # --- Celery Beat Scheduling --- + # --- Monitoring Self-Healing and Legacy Cleanup --- - def _register_celery_health_check(self) -> None: - """Register periodic health check with Celery beat (runs every 5 minutes)""" - try: - task_name = f"youtubearr_{self._plugin_key}_health_check" - create_or_update_periodic_task( - task_name=task_name, - celery_task_path="core.tasks.check_plugin_health", - kwargs={"plugin_key": self._plugin_key}, - cron_expression="*/5 * * * *", # Every 5 minutes - enabled=True, - ) - self._log(f"Registered Celery beat health check: {task_name}") - except Exception as exc: - self._log_error(f"Failed to register Celery health check: {exc}") + def _cleanup_legacy_celery_task(self) -> None: + """Delete the bogus Celery beat task registered by older plugin versions. - def _unregister_celery_health_check(self) -> None: - """Unregister periodic health check from Celery beat""" + Older versions registered a periodic task pointing at a Dispatcharr core task + that does not exist, causing Celery to spam 'Received unregistered task' errors + every 5 minutes. This method is idempotent (runs once per Plugin instance) and + safe to call from any action handler. + """ + if self._legacy_task_cleanup_done: + return + self._legacy_task_cleanup_done = True try: task_name = f"youtubearr_{self._plugin_key}_health_check" deleted = delete_periodic_task(task_name) if deleted: - self._log(f"Unregistered Celery beat health check: {task_name}") + self._log(f"Removed legacy Celery beat task: {task_name}") except Exception as exc: - self._log_error(f"Failed to unregister Celery health check: {exc}") + self._log_error(f"Legacy Celery task cleanup failed: {exc}") + + def _ensure_monitoring_thread(self, settings: Dict[str, Any]) -> bool: + """Restart the monitor thread if DB says active but no live thread is running. + + This is the in-plugin self-healing path: it handles container restarts and + crashed/hung threads without relying on any Celery task. Returns True if a + new thread was started. + """ + if not settings.get("monitoring_active"): + return False + + thread_dead = not self._monitor_thread or not self._monitor_thread.is_alive() + if not thread_dead: + return False + + # Another Celery worker's thread may still be running — check the shared heartbeat + heartbeat_str = settings.get("monitoring_heartbeat") + if heartbeat_str: + try: + heartbeat = datetime.fromisoformat(heartbeat_str.replace("Z", "+00:00")) + if isinstance(heartbeat.tzinfo, type(None)): + heartbeat = heartbeat.replace(tzinfo=dt_timezone.utc) + age_seconds = (datetime.now(dt_timezone.utc) - heartbeat).total_seconds() + poll_interval_minutes = settings.get("poll_interval_minutes", 15) + heartbeat_threshold = (poll_interval_minutes + 10) * 60 + if age_seconds < heartbeat_threshold: + self._log(f"Monitoring heartbeat is recent ({int(age_seconds)}s ago, threshold={heartbeat_threshold}s), skipping auto-restart") + return False + except (ValueError, TypeError): + pass + + channels = settings.get("monitored_channels", "").strip() + if not channels or not self._ytdlp_path: + return False + + self._log("Auto-restarting monitoring after service restart") + self._monitoring_active = True + self._monitor_stop_event.clear() + self._monitor_thread = threading.Thread( + target=self._monitoring_loop, + args=(self._plugin_key,), + daemon=True, + name="YouTubearr-Monitor" + ) + self._monitor_thread.start() + return True def _log(self, message: str) -> None: """Write log message"""