whubbartt 1 Posted May 17 Posted May 17 I got sick and tired of trying to make M#U converters work with my M3u provider so this script will take VOD from your M3u stream and process it as strm data in a designated folder. Enjoy #!/usr/bin/env python3 """ M3U to STRM python Converter Converts IPTV M3U playlist VOD content to STRM files for Emby/Jellyfin Be sure to create the 3 directories and specify below. set to run on a schedule This python script will process compare current VOD streams to what is in the directory and delete file from directory if it's no longer in the VOD feed from your provider. This script only works with M3U and not Xtreme Code user pass """ import os import re import urllib.request import urllib.error # ============================================================ # CONFIGURATION - Edit these settings # ============================================================ # Put Your Provider URL Here and delete mine M3U_URL = "https://tv123.me/iptv/2/yf/VOD" # Specify the directories as to where you can save your STRMS after processing here MOVIES_DIR = "/mnt/user/Plexcontent/IPMovies" TV_DIR = "/mnt/user/Plexcontent/IPTv" XXX_DIR = "/mnt/user/Plexcontent/IPOther" MOVIE_GROUPS = ["Movie VOD"] TV_GROUPS = ["TV VOD"] XXX_GROUPS = ["XXX VOD"] # If you don't want XXX groups specify what not to process here SKIP_GROUPS = ["TRY", "Tested", "ZZZ"] CLEAN_SYNC = True # Fixed Typo: Set True to remove STRM files no longer in M3U # ============================================================ def sanitize_filename(name): """Remove invalid filename characters""" name = re.sub(r'[<>:"/\\|?*]', '', name) name = re.sub(r'\s+', ' ', name).strip() return name def parse_episode(name): """Extract show name, season, episode from title""" # Match S01E01 pattern match = re.search(r'^(.*?)\s*[Ss](\d{1,2})[Ee](\d{1,2})', name) if match: show = sanitize_filename(match.group(1).strip()) season = int(match.group(2)) episode = int(match.group(3)) return show, season, episode return None, None, None def clean_name(name): """Remove provider prefixes like 'TV VOD,' or 'Movie VOD,'""" for prefix in MOVIE_GROUPS + TV_GROUPS + XXX_GROUPS: if name.startswith(prefix + ","https://cloudstatic.net/pic/smilies/wink.gif: name = name[len(prefix) + 1:].strip() if name.startswith(prefix + " "https://cloudstatic.net/pic/smilies/wink.gif: name = name[len(prefix):].strip() return sanitize_filename(name) def download_m3u(url): """Download M3U playlist""" print(f"Downloading M3U from {url}..."https://cloudstatic.net/pic/smilies/wink.gif try: req = urllib.request.Request(url, headers={'User-Agent': 'VLC/3.0.0'}) with urllib.request.urlopen(req, timeout=30) as response: content = response.read().decode('utf-8', errors='ignore') print(f"Downloaded {len(content)} bytes"https://cloudstatic.net/pic/smilies/wink.gif return content except Exception as e: print(f"ERROR downloading M3U: {e}"https://cloudstatic.net/pic/smilies/wink.gif return None def parse_m3u(content): """Parse M3U content into list of entries""" entries = [] lines = content.splitlines() current = {} for line in lines: line = line.strip() if line.startswith('#EXTINF:'): current = {} # Extract group-title group_match = re.search(r'group-title="([^"]*)"', line) current['group'] = group_match.group(1) if group_match else '' # Extract tvg-name name_match = re.search(r'tvg-name="([^"]*)"', line) # Get display name (after the last comma) display_name = line.split(',')[-1].strip() if ',' in line else '' current['name'] = name_match.group(1) if name_match else display_name if not current['name']: current['name'] = display_name elif line.startswith('http') and current: current['url'] = line entries.append(current) current = {} print(f"Parsed {len(entries)} total entries"https://cloudstatic.net/pic/smilies/wink.gif return entries def write_strm(filepath, url): """Write a STRM file""" os.makedirs(os.path.dirname(filepath), exist_ok=True) if not os.path.exists(filepath): with open(filepath, 'w', encoding='utf-8') as f: f.write(url) return True return False def process_movies(entries): """Process movie entries into STRM files""" count = 0 skipped = 0 existing_files = set() for entry in entries: group = entry.get('group', '') # Skip unwanted groups if any(skip in group for skip in SKIP_GROUPS): continue # Only process movie groups if not any(mg in group for mg in MOVIE_GROUPS): continue name = clean_name(entry.get('name', 'Unknown')) if not name: continue url = entry.get('url', '') if not url: continue movie_folder = os.path.join(MOVIES_DIR, name) strm_file = os.path.join(movie_folder, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif existing_files.add(strm_file) if write_strm(strm_file, url): count += 1 else: skipped += 1 print(f"Movies: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif return existing_files def process_tv(entries): """Process TV series entries into STRM files""" count = 0 skipped = 0 existing_files = set() for entry in entries: group = entry.get('group', '') # Skip unwanted groups if any(skip in group for skip in SKIP_GROUPS): continue # Only process TV groups if not any(tg in group for tg in TV_GROUPS): continue name = clean_name(entry.get('name', 'Unknown')) if not name: continue url = entry.get('url', '') if not url: continue # Try to parse as episode show, season, episode = parse_episode(name) if show and season and episode: # Proper TV episode structure season_folder = f"Season {season:02d}" episode_name = f"{show} S{season:02d}E{episode:02d}" strm_file = os.path.join(TV_DIR, show, season_folder, f"{episode_name}.strm"https://cloudstatic.net/pic/smilies/wink.gif else: # Can't parse episode info — put in show folder strm_file = os.path.join(TV_DIR, name, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif existing_files.add(strm_file) if write_strm(strm_file, url): count += 1 else: skipped += 1 print(f"TV Shows: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif return existing_files def process_xxx(entries): """Process adult content entries into STRM files""" count = 0 skipped = 0 existing_files = set() for entry in entries: group = entry.get('group', '') if any(skip in group for skip in SKIP_GROUPS): continue if not any(xg in group for xg in XXX_GROUPS): continue name = clean_name(entry.get('name', 'Unknown')) if not name: continue url = entry.get('url', '') if not url: continue xxx_folder = os.path.join(XXX_DIR, name) strm_file = os.path.join(xxx_folder, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif existing_files.add(strm_file) if write_strm(strm_file, url): count += 1 else: skipped += 1 print(f"XXX VOD: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif return existing_files def clean_old_files(directory, current_files): """Remove STRM files no longer in M3U""" removed = 0 for root, dirs, files in os.walk(directory): for file in files: if file.endswith('.strm'): filepath = os.path.join(root, file) if filepath not in current_files: try: os.remove(filepath) removed += 1 except OSError: pass # Remove empty directories safely for d in dirs: dirpath = os.path.join(root, d) try: if not os.listdir(dirpath): os.rmdir(dirpath) except OSError: pass print(f"Removed {removed} old STRM files from {directory}"https://cloudstatic.net/pic/smilies/wink.gif def main(): print("=" * 50) print("M3U to STRM Converter"https://cloudstatic.net/pic/smilies/wink.gif print("=" * 50) # Create output directories for d in [MOVIES_DIR, TV_DIR, XXX_DIR]: os.makedirs(d, exist_ok=True) # Download M3U content = download_m3u(M3U_URL) if not content: print("Failed to download M3U — aborting"https://cloudstatic.net/pic/smilies/wink.gif return # Parse M3U entries = parse_m3u(content) if not entries: print("No entries found — aborting"https://cloudstatic.net/pic/smilies/wink.gif return # Process movies print("\nProcessing movies..."https://cloudstatic.net/pic/smilies/wink.gif movie_files = process_movies(entries) # Process TV shows print("\nProcessing TV shows..."https://cloudstatic.net/pic/smilies/wink.gif tv_files = process_tv(entries) # Process XXX VOD print("\nProcessing XXX VOD..."https://cloudstatic.net/pic/smilies/wink.gif xxx_files = process_xxx(entries) # Clean old files if enabled if CLEAN_SYNC: print("\nCleaning old files..."https://cloudstatic.net/pic/smilies/wink.gif clean_old_files(MOVIES_DIR, movie_files) clean_old_files(TV_DIR, tv_files) clean_old_files(XXX_DIR, xxx_files) print("\n" + "=" * 50) print("Done!"https://cloudstatic.net/pic/smilies/wink.gif print("=" * 50) if __name__ == "__main__": main()
whubbartt 1 Posted May 18 Author Posted May 18 So this is my first time writing a converter and I noticed some issues with REGEX processing of Series this is version 2 along with instructions. This now will ask you questions to establish the necessary variables and store them in a json file. It will email you sms the results each time it runs. This is only ver 2 so it may not be full proof yet but from my testing it works pretty darn good. Iv'e attached the script to this post also. This is a very smart script from version 1 and there is a lot of logic added to determine proper save structure. Enjoy! IPTV VOD converter for Jellyfin, Emby, Plex Linux Setup Guide This script automatically pulls a streaming playlist and turns every video into a tiny placeholder file (.strm) on your server. This allows media servers like Emby or Plex to list the videos in your library and stream them directly without taking up massive amounts of hard drive space. Hardware & Database Friendly Smart Scanning: The script only adds new videos. If a file already exists on your server, the script skips it entirely. It will never delete or overwrite your files. Saves SSD Life: Because it doesn’t delete and recreate files, it protects your Solid State Drives (NVMe/SSDs) from unnecessary wear and tear. Protects Emby/Plex: Your watch history, custom posters, and metadata are perfectly safe because the script never forces your media server to think files were deleted. How to Setup and Run the Script You do not need to edit the script code, copy text, or download any playlist files to make this work. Just follow these steps: Step 1: Put the Script on Your Server Copy the script file (e.g., IPTVconverter.py) onto your server into the folder where you want it to live. The script is already programmed to automatically download the latest streaming list from the web, so you don't need to put any other files in this directory. This is a Python 3 script. Run by simply typing python3 IPTVconverter.py After initial run put it in your scheduler to run once a day. I worked hard on this so do Enjoy! #!/usr/bin/env python3 """ ================================================================================ UNIVERSAL IPTV STRM SYNC ENGINE (FIXED VERIFICATION & 50-ITEM SPLIT SMS BURST) ================================================================================ """ import os import sys import json import re import urllib.request import ssl import smtplib from email.mime.text import MIMEText CONFIG_FILE = "iptv_config.json" CARRIER_MAP = { "verizon": "vtext.com", "att": "txt.att.net", "tmobile": "tmomail.net", "sprint": "messaging.sprintpcs.com", "cricket": "sms.cricketwireless.net", "boost": "myboostmobile.com" } def confirm_input(value_to_show): while True: print(f" You entered/selected: {value_to_show}") choice = input(" Is this correct? (y/n): ").strip().lower() if choice in ['y', 'yes']: return True if choice in ['n', 'no']: print(" Let's try that question again...\n") return False print(" Please type 'y' for yes or 'n' for no.") def scan_m3u_groups(url): print(f"\n Analyzing playlist architecture and isolating VOD assets...") try: req = urllib.request.Request(url, headers={'User-Agent': 'VLC/3.0.0'}) context = ssl._create_unverified_context() detected_groups = set() with urllib.request.urlopen(req, context=context, timeout=45) as response: current_group = None for line in response: line = line.decode('utf-8', errors='ignore').strip() if line.startswith('#EXTINF:'): group_match = re.search(r'group-title="([^"]*)"', line) current_group = group_match.group(1) if group_match else None elif line.startswith('http') and current_group: if not any(live_indicator in line.lower() for live_indicator in ['/xmltv', '.xml', '/epg']): detected_groups.add(current_group) current_group = None return list(detected_groups) except Exception as e: print(f" Could not pre-scan M3U groups: {e}") return [] def auto_classify_groups(detected_groups): movies, tv, xxx = [], [], [] tv_indicators = ['series', 'tv show', 'tv-show', 'season', 's0', 's1', 's2', 'episodes', 'shows', 'tv'] xxx_indicators = ['xxx', 'adult', '18+', 'nsfw', 'pink', 'erotic'] for group in detected_groups: g_lower = group.lower() if any(x in g_lower for x in xxx_indicators): xxx.append(group) elif any(t in g_lower for t in tv_indicators): tv.append(group) else: movies.append(group) return movies, tv, xxx def run_wizard(): if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: return json.load(f) config = {} print("\n==================================================") print(" IPTV SYNC SETUP (INTELLIGENT WIZARD) ") print("==================================================") while True: url = input(" Paste your IPTV M3U URL: ").strip() if not url.startswith(("http://", "https://")): print(" Invalid URL format.\n") continue if confirm_input(url): config['M3U_URL'] = url break all_groups = scan_m3u_groups(config['M3U_URL']) m_groups, t_groups, x_groups = auto_classify_groups(all_groups) config['MOVIE_GROUPS'] = m_groups if m_groups else ["Movie VOD"] config['TV_GROUPS'] = t_groups if t_groups else ["TV VOD"] config['XXX_GROUPS'] = x_groups print("\n --- MEDIA STORAGE LOCAL PATHS ---") while True: path = input(" Movies directory path: ").strip() if confirm_input(path): config['MOVIES'] = path break while True: path = input(" TV Shows directory path: ").strip() if not path: config['TV'] = "/tmp/tv_unused" config['TV_GROUPS'] = [] break if confirm_input(path): config['TV'] = path break if x_groups: while True: path = input(" Adult VOD directory path (Or ENTER to skip): ").strip() if not path: config['XXX'] = "/tmp/xxx_unused" config['XXX_GROUPS'] = [] break if confirm_input(path): config['XXX'] = path break else: config['XXX'] = "/tmp/xxx_unused" config['XXX_GROUPS'] = [] print("\n --- SMS NOTIFICATIONS (OPTIONAL) ---") want_sms = False while True: phone = input(" Enter your 10-digit phone number (Or ENTER to skip): ").strip() if not phone: config['PHONE'] = "" break clean_phone = re.sub(r'\D', '', phone) if len(clean_phone) != 10: print(" Invalid entry.\n") continue if confirm_input(clean_phone): config['PHONE'] = clean_phone want_sms = True break if want_sms: while True: carrier = input(" Enter your mobile carrier name: ").strip().lower() if carrier not in CARRIER_MAP: print(" Unsupported carrier profile.\n") continue if confirm_input(carrier.upper()): config['CARRIER_NAME'] = carrier config['CARRIER_DOMAIN'] = CARRIER_MAP[carrier] break print("\n --- OUTGOING SMTP MAIL ROUTER ---") while True: smtp_server = input(" Outgoing SMTP Server Host: ").strip() if confirm_input(smtp_server): config['SMTP_SERVER'] = smtp_server break while True: print("\n [SMTP SECURITY NOTICE]") smtp_port = input(" Enter secure SMTP Port [Suggested: 587]: ").strip() try: port_val = int(smtp_port) if confirm_input(port_val): config['SMTP_PORT'] = port_val break except ValueError: print(" Numerical integers only.") while True: email_user = input(" Sender Email Address Username: ").strip() if confirm_input(email_user): config['EMAIL_USER'] = email_user break while True: email_pass = input(" Email Account Password: ").strip() if confirm_input("********"): config['EMAIL_PASS'] = email_pass break config['SKIP_GROUPS'] = ["TRY", "Tested", "ZZZ"] config['CLEAN_SYNC'] = True print("\n🛠 Initializing storage paths...") for key in ['MOVIES', 'TV', 'XXX']: if config[key] not in ["/tmp/tv_unused", "/tmp/xxx_unused"]: os.makedirs(config[key], exist_ok=True) print(f" Target Ready: {config[key]}") with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=4) return config def clean_title_casing(title): title = re.sub(r'[\[\(\{][^\]\}\)]*[\]\}\)]', '', title) title = re.sub(r'[<>:"/\\|?*]', '', title) title = re.sub(r'\s+', ' ', title).strip() title = re.sub(r'\bL\s+a\b', 'LA', title, flags=re.I) words = title.split() minor_words = {'a', 'an', 'the', 'and', 'but', 'or', 'for', 'on', 'at', 'to', 'by', 'of', 'with', 'in', 'vs', 'from'} caps_words = {'au', 'us', 'uk', 'fbi', 'abc', 'dc', 'tv', 'la', 'swat', 'ncis', 'csi', 'sec', 'snl'} final_words = [] for i, word in enumerate(words): lw = word.lower() if lw == 'x': final_words.append('X') elif lw in caps_words: final_words.append(word.upper()) elif lw in minor_words and i > 0: final_words.append(lw) else: final_words.append(word.capitalize()) result = " ".join(final_words).strip() return re.sub(r'\s*-\s*$', '', result) def strip_group_prefixes(name, cfg): all_prefixes = cfg.get('MOVIE_GROUPS', []) + cfg.get('TV_GROUPS', []) + cfg.get('XXX_GROUPS', []) for prefix in all_prefixes: if name.startswith(prefix + ","): name = name[len(prefix) + 1:].strip() elif name.startswith(prefix + " "): name = name[len(prefix):].strip() return name def parse_tv_meta(raw_name, cfg): raw_name = re.sub(r'\b(repack|proper|internal|unrated|extended)\b', '', raw_name, flags=re.I) raw_name = strip_group_prefixes(raw_name, cfg) date_match = re.search(r'\b(19\d{2}|20\d{2})[\s._](0[1-9]|1[0-2])[\s._](0[1-9]|[12]\d|3[01])\b', raw_name) if date_match: year, month, day = date_match.group(1), date_match.group(2), date_match.group(3) parts = re.split(r'\b' + year + r'\b', raw_name, maxsplit=1) show_clean = clean_title_casing(parts[0]) return f"{show_clean} ({year})", year, f"{month}{day}" year_match = re.search(r'\b(19\d{2}|20\d{2})\b', raw_name) if year_match: year = year_match.group(1) if not re.search(r'[sS]' + year, raw_name): parts = re.split(r'\b' + year + r'\b', raw_name, maxsplit=1) show_clean = clean_title_casing(parts[0]) if not any(ep_tag in parts[1].lower() for ep_tag in ['s', 'e', 'season', 'episode']): return f"{show_clean} ({year})", "01", "01" multi_match = re.search(r'\bS(\d+)\s*[eE](\d+)(?:[-_]*[eE]?\d+)+\b', raw_name) if multi_match: s_num = multi_match.group(1).zfill(2) e_num = multi_match.group(2).zfill(2) scrubbed = re.split(r'\bS\d+\s*[eE]\d+', raw_name, flags=re.I)[0] return clean_title_casing(scrubbed), s_num, e_num sxx_match = re.search(r'\bS(\d+)\s*[eE](\d+)\b', raw_name) if sxx_match: s_num = sxx_match.group(1).zfill(2) e_num = sxx_match.group(2).zfill(2) scrubbed = re.split(r'\bS\d+\s*[eE]\d+', raw_name, flags=re.I)[0] return clean_title_casing(scrubbed), s_num, e_num season_tag_match = re.search(r'\bS(?:eason)?\s*(\d+)\b', raw_name, re.I) if season_tag_match: s_num = season_tag_match.group(1).zfill(2) scrubbed = re.split(r'\bS(?:eason)?\s*\d+', raw_name, flags=re.I)[0] ep_match = re.search(r'\bE(?:pisode)?\s*(\d+)\b', raw_name, re.I) e_num = ep_match.group(1).zfill(2) if ep_match else "01" return clean_title_casing(scrubbed), s_num, e_num ep_tag_match = re.search(r'(?:[-_\s]E(?:pisode)?)\s*[-_]?(\d+)\b', raw_name, re.I) if ep_tag_match: e_num = ep_tag_match.group(1).zfill(2) scrubbed = re.split(r'(?:[-_\s]E(?:pisode)?)\s*[-_]?\d+', raw_name, flags=re.I)[0] return clean_title_casing(scrubbed), "01", e_num num_match = re.search(r'[-_\s](\d{1,3})\b(?=\s*(?:720p|1080p|2160p|4k|hevc|x264|x265|web|hdtv|h264|h265|proper|repack|\(|\[|$))', raw_name, re.I) if num_match: e_num = num_match.group(1).zfill(2) scrubbed = raw_name.split(num_match.group(0))[0] return clean_title_casing(scrubbed), "01", e_num scrubbed = re.sub(r'\b(720p|1080p|2160p|4k|hevc|x264|x265|h264|h265|web|hdtv).*$', '', raw_name, flags=re.I) return clean_title_casing(scrubbed), "01", "01" def download_m3u(url): print(f"Downloading M3U from {url}...") try: req = urllib.request.Request(url, headers={'User-Agent': 'VLC/3.0.0'}) context = ssl._create_unverified_context() with urllib.request.urlopen(req, context=context, timeout=45) as response: content = response.read().decode('utf-8', errors='ignore') print(f"Successfully downloaded {len(content):,} bytes") return content except Exception as e: print(f"ERROR downloading M3U: {e}") return None def parse_m3u(content): entries = [] lines = content.splitlines() current = {} for line in lines: line = line.strip() if line.startswith('#EXTINF:'): current = {} group_match = re.search(r'group-title="([^"]*)"', line) current['group'] = group_match.group(1) if group_match else '' display_name = line.split(',')[-1].strip() if ',' in line else '' name_match = re.search(r'tvg-name="([^"]*)"', line) current['name'] = name_match.group(1) if name_match and name_match.group(1) else display_name if not current['name']: current['name'] = display_name elif line.startswith('http') and current: if not any(live_indicator in line.lower() for live_indicator in ['/xmltv', '.xml', '/epg']): current['url'] = line entries.append(current) current = {} print(f"Isolated {len(entries):,} total VOD media entries from playlist.") return entries def write_strm(filepath, url): os.makedirs(os.path.dirname(filepath), exist_ok=True) if not os.path.exists(filepath): with open(filepath, 'w', encoding='utf-8') as f: f.write(url) return True return False def process_movies(entries, cfg): count, skipped = 0, 0 active_files = set() newly_added_titles = [] skip_grps = cfg.get('SKIP_GROUPS', []) mov_grps = cfg.get('MOVIE_GROUPS', []) tv_indicators = ['series', 'tv show', 'tv-show', 'season', 's0', 's1', 's2', 'episodes', 'shows'] xxx_indicators = ['xxx', 'adult', '18+', 'nsfw', 'pink', 'erotic'] for entry in entries: group = entry.get('group', '') g_lower = group.lower() if any(skip.lower() in g_lower for skip in skip_grps): continue if mov_grps: if group not in mov_grps: continue else: if any(x in g_lower for x in xxx_indicators) or any(t in g_lower for t in tv_indicators): continue raw_name = entry.get('name', 'Unknown') name = clean_title_casing(strip_group_prefixes(raw_name, cfg)) url = entry.get('url', '') if not name or not url: continue movie_folder = os.path.join(cfg['MOVIES'], name) strm_file = os.path.join(movie_folder, f"{name}.strm") active_files.add(strm_file) if write_strm(strm_file, url): count += 1 newly_added_titles.append(name) else: skipped += 1 print(f"Movies: {count} deployed, {skipped} verified current.") return active_files, count, skipped, newly_added_titles def process_tv(entries, cfg): count, skipped = 0, 0 active_files = set() newly_added_titles = [] skip_grps = cfg.get('SKIP_GROUPS', []) tv_grps = cfg.get('TV_GROUPS', []) if cfg.get('TV') == "/tmp/tv_unused": return active_files, 0, 0, newly_added_titles tv_indicators = ['series', 'tv show', 'tv-show', 'season', 's0', 's1', 's2', 'episodes', 'shows'] for entry in entries: group = entry.get('group', '') g_lower = group.lower() if any(skip.lower() in g_lower for skip in skip_grps): continue if tv_grps: if group not in tv_grps: continue else: if not any(t in g_lower for t in tv_indicators): continue raw_name = entry.get('name', 'Unknown') url = entry.get('url', '') if not raw_name or not url: continue show_base, s_num, e_num = parse_tv_meta(raw_name, cfg) season_folder = f"Season {s_num}" episode_name = f"{show_base} - S{s_num}E{e_num}" strm_file = os.path.join(cfg['TV'], show_base, season_folder, f"{episode_name}.strm") active_files.add(strm_file) if write_strm(strm_file, url): count += 1 newly_added_titles.append(episode_name) else: skipped += 1 print(f"TV Shows: {count} deployed, {skipped} verified current.") return active_files, count, skipped, newly_added_titles def process_xxx(entries, cfg): count, skipped = 0, 0 active_files = set() newly_added_titles = [] skip_grps = cfg.get('SKIP_GROUPS', []) xxx_grps = cfg.get('XXX_GROUPS', []) if cfg.get('XXX') == "/tmp/xxx_unused": return active_files, 0, 0, newly_added_titles xxx_indicators = ['xxx', 'adult', '18+', 'nsfw', 'pink', 'erotic'] for entry in entries: group = entry.get('group', '') g_lower = group.lower() if any(skip.lower() in g_lower for skip in skip_grps): continue if xxx_grps: if group not in xxx_grps: continue else: if not any(x in g_lower for x in xxx_indicators): continue raw_name = entry.get('name', 'Unknown') name = clean_title_casing(strip_group_prefixes(raw_name, cfg)) url = entry.get('url', '') if not name or not url: continue xxx_folder = os.path.join(cfg['XXX'], name) strm_file = os.path.join(xxx_folder, f"{name}.strm") active_files.add(strm_file) if write_strm(strm_file, url): count += 1 newly_added_titles.append(name) else: skipped += 1 print(f"XXX VOD: {count} deployed, {skipped} verified current.") return active_files, count, skipped, newly_added_titles def clean_old_files(directory, current_files): removed = 0 if not os.path.exists(directory) or directory in ("/tmp/movies_unused", "/tmp/tv_unused", "/tmp/xxx_unused"): return removed for root, dirs, files in os.walk(directory, topdown=False): for file in files: if file.endswith('.strm'): filepath = os.path.join(root, file) if filepath not in current_files: try: os.remove(filepath) removed += 1 except Exception: pass for d in dirs: dirpath = os.path.join(root, d) try: if not os.listdir(dirpath): os.rmdir(dirpath) except Exception: pass return removed def send_sms_raw(cfg, subject, body_text): if not cfg.get('PHONE'): return recipient = f"{cfg['PHONE']}@{cfg['CARRIER_DOMAIN']}" try: msg = MIMEText(body_text, 'plain', 'utf-8') msg['From'] = f"Tower Sync <{cfg.get('EMAIL_USER', '')}>" msg['To'] = recipient msg['Subject'] = subject port = int(cfg.get('SMTP_PORT', 25)) server_host = cfg.get('SMTP_SERVER', '') if port == 465: context = ssl._create_unverified_context() server = smtplib.SMTP_SSL(server_host, port, context=context, timeout=20) else: server = smtplib.SMTP(server_host, port, timeout=20) if port == 587: context = ssl._create_unverified_context() server.starttls(context=context) if cfg.get('EMAIL_USER') and cfg.get('EMAIL_PASS'): server.login(cfg['EMAIL_USER'], cfg['EMAIL_PASS']) server.send_message(msg) server.quit() except Exception as e: print(f" SMS Send Failed [{subject}]: {e}") def main(): print("=" * 60) print(" UNIVERSAL IPTV AUTOMATED SYNCHRONIZATION ENGINE ") print("=" * 60) cfg = run_wizard() m3u_content = download_m3u(cfg.get('M3U_URL', '')) if not m3u_content: return entries = parse_m3u(m3u_content) if not entries: return print("\nProcessing Movies Library Section...") movie_files, m_new, m_skip, m_added_names = process_movies(entries, cfg) print("\nProcessing TV Library Section...") tv_files, t_new, t_skip, t_added_names = process_tv(entries, cfg) print("\nProcessing Adult Library Section...") xxx_files, x_new, x_skip, x_added_names = process_xxx(entries, cfg) m_rem, t_rem, x_rem = 0, 0, 0 if cfg.get('CLEAN_SYNC', True): print("\nExecuting Mirror Synchronization Cleanups...") m_rem = clean_old_files(cfg.get('MOVIES', ''), movie_files) t_rem = clean_old_files(cfg.get('TV', ''), tv_files) x_rem = clean_old_files(cfg.get('XXX', ''), xxx_files) print(f"Removed old STRM files: Movies: -{m_rem} | TV: -{t_rem} | XXX: -{x_rem}") # --- BURST 1: Send Core Metric Numbers Immediately --- core_report = ( f"Tower Sync Complete\n" f"Movies: +{m_new}/-{m_rem} (Total: {len(movie_files)})\n" f"TV: +{t_new}/-{t_rem} (Total: {len(tv_files)})\n" f"XXX: +{x_new}/-{x_rem} (Total: {len(xxx_files)})\n" f"Playlist entries: {len(entries):,}" ) print(f"\nDispatching main metrics to {cfg['PHONE']}...") send_sms_raw(cfg, "Tower Stats", core_report) # --- BURST 2: Send Title Additions Limited to Top 50 to Avoid Carrier Filters --- total_added = len(m_added_names) + len(t_added_names) + len(x_added_names) if total_added > 0: added_lines = ["New Additions Preview:"] if m_added_names: added_lines.append("\n MOVIES:") for name in m_added_names[:50]: added_lines.append(f"• {name}") if len(m_added_names) > 50: added_lines.append(f"...and {len(m_added_names)-50} more") if t_added_names: added_lines.append("\n TV:") for name in t_added_names[:50]: added_lines.append(f"• {name}") if len(t_added_names) > 50: added_lines.append(f"...and {len(t_added_names)-50} more") if x_added_names: added_lines.append("\n XXX:") for name in x_added_names[:50]: added_lines.append(f"• {name}") if len(x_added_names) > 50: added_lines.append(f"...and {len(x_added_names)-50} more") preview_report = "\n".join(added_lines) print("Dispatching true item addition preview text...") send_sms_raw(cfg, "Tower Details", preview_report) print("\n" + "=" * 60) print(" SYNC PIPELINE COMPLETE! ") print("=" * 60) if __name__ == "__main__": main() IPTVconverter.py
whubbartt 1 Posted May 18 Author Posted May 18 It would be best to just download the IPTVconverter.py from my previous post cause it looks like emojis were inserted into the code
Recommended Posts
Create an account or sign in to comment
You need to be a member in order to leave a comment
Create an account
Sign up for a new account in our community. It's easy!
Register a new accountSign in
Already have an account? Sign in here.
Sign In Now