# -*- coding: utf-8 -*- import os import sys import json import urllib.request import zipfile import subprocess import shutil # Variables globales para importación dinámica requests = None yt_dlp = None def install_and_import(): global requests, yt_dlp # 1. Intentar importar o instalar 'requests' try: import requests as req requests = req except ImportError: print("Instalando dependencia 'requests' en tu equipo...") try: subprocess.check_call([sys.executable, "-m", "pip", "install", "requests"]) import requests as req requests = req print(" [OK] Dependencia 'requests' instalada con éxito.") except Exception as e: print(f"\n[!] Error crítico: No se pudo instalar 'requests' automáticamente.") print(f" Detalle del error: {e}") print("\n Intenta abrir una consola de comandos (cmd) y escribe: pip install requests") input("\nPresiona Enter para salir...") sys.exit(1) # 2. Intentar importar o instalar 'yt-dlp' try: import yt_dlp as ytdl yt_dlp = ytdl except ImportError: print("Instalando dependencia 'yt-dlp' en tu equipo...") try: subprocess.check_call([sys.executable, "-m", "pip", "install", "yt-dlp"]) import yt_dlp as ytdl yt_dlp = ytdl print(" [OK] Dependencia 'yt-dlp' instalada con éxito.") except Exception as e: print(f"\n[!] Error crítico: No se pudo instalar 'yt-dlp' automáticamente.") print(f" Detalle del error: {e}") print("\n Intenta abrir una consola de comandos (cmd) y escribe: pip install yt-dlp") input("\nPresiona Enter para salir...") sys.exit(1) CONFIG_FILE = "downloader_config.json" FFMPEG_URL = "https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-essentials.zip" FFMPEG_EXE = "ffmpeg.exe" def clear_console(): os.system('cls' if os.name == 'nt' else 'clear') def check_ffmpeg(): """Verifica si ffmpeg está disponible en el sistema o en la carpeta actual.""" if os.path.exists(FFMPEG_EXE): return True if shutil.which("ffmpeg"): return True print("\n--- FFmpeg no encontrado ---") print("FFmpeg es necesario para convertir los audios de YouTube a formato MP3.") choice = input("¿Deseas descargar automáticamente una versión estable de FFmpeg para Windows? (s/n): ").strip().lower() if choice == 's': try: print("Descargando FFmpeg (aprox. 85MB)... Por favor espera.") zip_path = "ffmpeg.zip" # Usar requests para descargar de forma robusta con barra de progreso simple o mensaje headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} r = requests.get(FFMPEG_URL, headers=headers, stream=True) r.raise_for_status() with open(zip_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) print("Extrayendo archivos...") with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall("temp_ffmpeg") # Buscar ffmpeg.exe y ffprobe.exe de manera recursiva for root, dirs, files in os.walk("temp_ffmpeg"): for file in files: if file.lower() in ["ffmpeg.exe", "ffprobe.exe"]: shutil.move(os.path.join(root, file), ".") # Limpiar carpetas temporales try: if os.path.exists("temp_ffmpeg"): shutil.rmtree("temp_ffmpeg") if os.path.exists(zip_path): os.remove(zip_path) except: pass print("¡FFmpeg instalado con éxito en el directorio actual!") return True except Exception as e: print(f"Error al descargar o extraer FFmpeg: {e}") print("Por favor, descarga ffmpeg.exe manualmente y colócalo en esta misma carpeta.") input("\nPresiona Enter para salir...") sys.exit(1) else: print("No se puede continuar sin FFmpeg.") input("\nPresiona Enter para salir...") sys.exit(1) def load_config(): if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) except: pass return {"server_url": "", "api_key": ""} def save_config(config): with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(config, f, indent=4, ensure_ascii=False) def test_connection(url, key): url = url.rstrip('/') try: res = requests.get(f"{url}/api/songs", headers={"X-API-Key": key}, timeout=10) if res.status_code == 200: return True elif res.status_code == 401: print("[-] Error: Contraseña de administrador (API Key) incorrecta.") else: print(f"[-] Error: El servidor respondió con código {res.status_code}.") except Exception as e: print(f"[-] Error al conectar con {url}: {e}") return False def setup_config(): config = load_config() print("=== Configuración de RadioZone Downloader ===") while True: server_url = input(f"URL de tu Radio (ej. https://radiozone.xyz) [{config.get('server_url')}]: ").strip() if not server_url and config.get("server_url"): server_url = config.get("server_url") api_key = input(f"Contraseña de Administrador (API Key) [Oculta/Dejar en blanco para mantener]: ").strip() if not api_key and config.get("api_key"): api_key = config.get("api_key") if not server_url or not api_key: print("Ambos campos son requeridos. Inténtalo de nuevo.\n") continue print("\nVerificando conexión con el servidor...") if test_connection(server_url, api_key): config["server_url"] = server_url config["api_key"] = api_key save_config(config) print("[+] ¡Conexión exitosa y configuración guardada!\n") input("Presiona Enter para continuar...") break else: retry = input("La verificación falló. ¿Deseas reconfigurar? (s/n): ").strip().lower() if retry != 's': break return config def upload_file(file_path, config): server_url = config["server_url"].rstrip('/') api_key = config["api_key"] filename = os.path.basename(file_path) print(f" [+] Subiendo {filename} a la radio...") try: with open(file_path, 'rb') as f: files = {'song': (filename, f, 'audio/mpeg')} headers = {'X-API-Key': api_key} res = requests.post(f"{server_url}/api/upload", files=files, headers=headers) if res.status_code == 201: data = res.json() print(f" [OK] ¡Subido con éxito! Guardado como: {data.get('title')} - {data.get('artist')}") return True else: try: err_msg = res.json().get('error', 'Error desconocido') except: err_msg = res.text print(f" [ERROR] No se pudo subir el archivo: {err_msg}") except Exception as e: print(f" [ERROR] Error en la petición de subida: {e}") return False def download_and_upload(ydl_opts, config, max_to_upload=100): # Carpeta de destino final (Downloads/mp3sun) dest_dir = os.path.abspath(os.path.expanduser(r"~\Downloads\mp3sun")) if not os.path.exists(dest_dir): try: os.makedirs(dest_dir) except: dest_dir = "mp3sun" if not os.path.exists(dest_dir): os.makedirs(dest_dir) ydl_opts['outtmpl'] = os.path.join(dest_dir, '%(title)s.%(ext)s') print(f"\n[+] Las canciones se guardarán localmente en: {dest_dir}") print("[+] Analizando consulta y obteniendo enlaces de YouTube...") try: # 1. Obtener metadatos sin descargar para procesar uno a uno ydl_opts_meta = dict(ydl_opts) ydl_opts_meta['extract_flat'] = True # Extrae rápido sin descargar with yt_dlp.YoutubeDL(ydl_opts_meta) as ydl: playlist_info = ydl.extract_info(ydl_opts['url_or_search'], download=False) # Determinar si es una lista de videos o uno solo if playlist_info is None: print("No se encontraron resultados para la búsqueda o el enlace.") input("\nPresiona Enter para volver...") return videos = [] if 'entries' in playlist_info: videos = list(playlist_info['entries']) else: videos = [playlist_info] total_videos = len(videos) if total_videos == 0: print("No se encontraron videos disponibles para descargar.") input("\nPresiona Enter para volver...") return print(f"[+] Se encontraron {total_videos} videos en total.") print(f"[+] Se descargarán un máximo de {max_to_upload} canciones individuales (menores a 5 min).\n") success_count = 0 processed_count = 0 for idx, video in enumerate(videos): if max_to_upload is not None and success_count >= max_to_upload: print(f"\n[+] Límite alcanzado: Se subieron con éxito {success_count} canciones.") break if not video: continue title = video.get('title', f"Video_{idx+1}") duration = video.get('duration') # Omitir videos/mixes de más de 5 minutos (300 segundos) desde metadatos if duration and duration > 300: print(f"[{idx+1}/{total_videos}] [-] Saltando video largo ({int(duration)//60} min): {title}") continue processed_count += 1 print(f"-------------------------------------------------------------") print(f"[{idx+1}/{total_videos}] Descargando: {title}") print(f"-------------------------------------------------------------") # Listar archivos antes de descargar files_before = set(os.listdir(dest_dir)) # Descargar canción individual ydl_opts_single = dict(ydl_opts) if 'logger' in ydl_opts_single: del ydl_opts_single['logger'] # Filtro nativo de duración para yt-dlp como doble seguridad def filter_duration(info_dict, *, incomplete): duration_val = info_dict.get('duration') if duration_val and duration_val > 300: return 'El video dura más de 5 minutos (300s)' return None ydl_opts_single['match_filter'] = filter_duration # Descargar try: url = video.get('url') or video.get('webpage_url') if not url: video_id = video.get('id') if video_id: url = f"https://www.youtube.com/watch?v={video_id}" else: print(" [ERROR] No se pudo determinar la URL del video.") continue with yt_dlp.YoutubeDL(ydl_opts_single) as ydl_s: ydl_s.download([url]) except Exception as download_err: print(f" [ERROR/FILTRADO] No se descargó el video: {download_err}") print(" [+] Continuando con el siguiente...") continue # Listar archivos después de descargar para encontrar el nuevo archivo files_after = set(os.listdir(dest_dir)) new_files = list(files_after - files_before) # Buscar el archivo MP3 entre los archivos nuevos mp3_file = None for f in new_files: if f.lower().endswith(".mp3"): mp3_file = os.path.join(dest_dir, f) break if not mp3_file: # Fallback: si por alguna razón no detecta el cambio, buscar el mp3 modificado más recientemente all_mp3s = [os.path.join(dest_dir, x) for x in os.listdir(dest_dir) if x.lower().endswith(".mp3")] if all_mp3s: mp3_file = max(all_mp3s, key=os.path.getmtime) if not mp3_file: print(" [ERROR] No se encontró el archivo MP3 convertido.") continue # Subir archivo try: if upload_file(mp3_file, config): success_count += 1 else: print(" [INFO] Descargado localmente pero falló la subida al hosting. Continuando...") except Exception as upload_err: print(f" [ERROR] Error al intentar subir el archivo: {upload_err}") print(" [+] Continuando con el resto de descargas...") print(f"\n=============================================================") print(f" PROCESO COMPLETADO: {success_count} de {processed_count} canciones procesadas subidas con éxito") print(f" Las canciones descargadas están en: {dest_dir}") print(f"=============================================================") except Exception as e: print(f"Ocurrió un error inesperado en el proceso: {e}") input("\nPresiona Enter para volver al menú...") def main(): clear_console() print("=============================================") print(" RadioZone - YouTube to MP3 Downloader ") print("=============================================\n") check_ffmpeg() config = load_config() if not config.get("server_url") or not config.get("api_key"): print("[-] Falta configurar la conexión con tu radio.") config = setup_config() if not config.get("server_url") or not config.get("api_key"): print("Configuración incompleta. Saliendo...") sys.exit(1) while True: clear_console() print("=============================================") print(" RadioZone - YouTube to MP3 Downloader ") print("=============================================") print(f" Servidor activo: {config['server_url']}") print("=============================================") print(" 1. Descargar enlace de YouTube (Video o Playlist)") print(" 2. Buscar y descargar canciones (Ej. Rock en Español)") print(" 3. Cambiar Configuración (URL / Contraseña)") print(" 4. Salir") print("=============================================") opc = input("Selecciona una opción (1-4): ").strip() ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '320', }], 'prefer_ffmpeg': True, 'keepvideo': False, } if os.path.exists(FFMPEG_EXE): ydl_opts['ffmpeg_location'] = "." if opc == '1': url = input("\nIntroduce la URL del video o lista de reproducción: ").strip() if url: ydl_opts['url_or_search'] = url download_and_upload(ydl_opts, config, max_to_upload=100) elif opc == '2': query = input("\nIntroduce lo que quieres buscar (ej. Rock en español 80s): ").strip() if query: try: limit = int(input("¿Cuántas canciones deseas descargar? (1-100): ").strip()) if limit < 1: limit = 5 elif limit > 100: limit = 100 except: limit = 5 # Buscamos más resultados (hasta el doble, máx 150) para compensar # aquellos videos que tengan una duración mayor a 5 min y sean saltados search_limit = min(limit * 2, 150) ydl_opts['url_or_search'] = f"ytsearch{search_limit}:{query}" ydl_opts['noplaylist'] = True download_and_upload(ydl_opts, config, max_to_upload=limit) elif opc == '3': config = setup_config() elif opc == '4': print("\n¡Gracias por usar RadioZone Downloader! Saliendo...") break else: input("\nOpción no válida. Presiona Enter para intentar de nuevo...") if __name__ == "__main__": try: install_and_import() main() except KeyboardInterrupt: print("\n\nPrograma cancelado por el usuario. Saliendo...") except Exception as e: import traceback print("\n=======================================================") print(" [!] OCURRIÓ UN ERROR CRÍTICO EN LA EJECUCIÓN") print("=======================================================") print(traceback.format_exc()) print("=======================================================") input("\nPresiona Enter para salir y cerrar la ventana...")