whubbartt 1 Posted 2 hours ago Posted 2 hours ago I got sick and tired of trying to make M#U converters work with my M3u provider so this script will take VOD from your M3u stream and process it as strm data in a designated folder. Enjoy #!/usr/bin/env python3 """ M3U to STRM python Converter Converts IPTV M3U playlist VOD content to STRM files for Emby/Jellyfin Be sure to create the 3 directories and specify below. set to run on a schedule This python script will process compare current VOD streams to what is in the directory and delete file from directory if it's no longer in the VOD feed from your provider. This script only works with M3U and not Xtreme Code user pass """ import os import re import urllib.request import urllib.error # ============================================================ # CONFIGURATION - Edit these settings # ============================================================ # Put Your Provider URL Here and delete mine M3U_URL = "https://tv123.me/iptv/2/yf/VOD" # Specify the directories as to where you can save your STRMS after processing here MOVIES_DIR = "/mnt/user/Plexcontent/IPMovies" TV_DIR = "/mnt/user/Plexcontent/IPTv" XXX_DIR = "/mnt/user/Plexcontent/IPOther" MOVIE_GROUPS = ["Movie VOD"] TV_GROUPS = ["TV VOD"] XXX_GROUPS = ["XXX VOD"] # If you don't want XXX groups specify what not to process here SKIP_GROUPS = ["TRY", "Tested", "ZZZ"] CLEAN_SYNC = True # Fixed Typo: Set True to remove STRM files no longer in M3U # ============================================================ def sanitize_filename(name): """Remove invalid filename characters""" name = re.sub(r'[<>:"/\\|?*]', '', name) name = re.sub(r'\s+', ' ', name).strip() return name def parse_episode(name): """Extract show name, season, episode from title""" # Match S01E01 pattern match = re.search(r'^(.*?)\s*[Ss](\d{1,2})[Ee](\d{1,2})', name) if match: show = sanitize_filename(match.group(1).strip()) season = int(match.group(2)) episode = int(match.group(3)) return show, season, episode return None, None, None def clean_name(name): """Remove provider prefixes like 'TV VOD,' or 'Movie VOD,'""" for prefix in MOVIE_GROUPS + TV_GROUPS + XXX_GROUPS: if name.startswith(prefix + ","https://cloudstatic.net/pic/smilies/wink.gif: name = name[len(prefix) + 1:].strip() if name.startswith(prefix + " "https://cloudstatic.net/pic/smilies/wink.gif: name = name[len(prefix):].strip() return sanitize_filename(name) def download_m3u(url): """Download M3U playlist""" print(f"Downloading M3U from {url}..."https://cloudstatic.net/pic/smilies/wink.gif try: req = urllib.request.Request(url, headers={'User-Agent': 'VLC/3.0.0'}) with urllib.request.urlopen(req, timeout=30) as response: content = response.read().decode('utf-8', errors='ignore') print(f"Downloaded {len(content)} bytes"https://cloudstatic.net/pic/smilies/wink.gif return content except Exception as e: print(f"ERROR downloading M3U: {e}"https://cloudstatic.net/pic/smilies/wink.gif return None def parse_m3u(content): """Parse M3U content into list of entries""" entries = [] lines = content.splitlines() current = {} for line in lines: line = line.strip() if line.startswith('#EXTINF:'): current = {} # Extract group-title group_match = re.search(r'group-title="([^"]*)"', line) current['group'] = group_match.group(1) if group_match else '' # Extract tvg-name name_match = re.search(r'tvg-name="([^"]*)"', line) # Get display name (after the last comma) display_name = line.split(',')[-1].strip() if ',' in line else '' current['name'] = name_match.group(1) if name_match else display_name if not current['name']: current['name'] = display_name elif line.startswith('http') and current: current['url'] = line entries.append(current) current = {} print(f"Parsed {len(entries)} total entries"https://cloudstatic.net/pic/smilies/wink.gif return entries def write_strm(filepath, url): """Write a STRM file""" os.makedirs(os.path.dirname(filepath), exist_ok=True) if not os.path.exists(filepath): with open(filepath, 'w', encoding='utf-8') as f: f.write(url) return True return False def process_movies(entries): """Process movie entries into STRM files""" count = 0 skipped = 0 existing_files = set() for entry in entries: group = entry.get('group', '') # Skip unwanted groups if any(skip in group for skip in SKIP_GROUPS): continue # Only process movie groups if not any(mg in group for mg in MOVIE_GROUPS): continue name = clean_name(entry.get('name', 'Unknown')) if not name: continue url = entry.get('url', '') if not url: continue movie_folder = os.path.join(MOVIES_DIR, name) strm_file = os.path.join(movie_folder, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif existing_files.add(strm_file) if write_strm(strm_file, url): count += 1 else: skipped += 1 print(f"Movies: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif return existing_files def process_tv(entries): """Process TV series entries into STRM files""" count = 0 skipped = 0 existing_files = set() for entry in entries: group = entry.get('group', '') # Skip unwanted groups if any(skip in group for skip in SKIP_GROUPS): continue # Only process TV groups if not any(tg in group for tg in TV_GROUPS): continue name = clean_name(entry.get('name', 'Unknown')) if not name: continue url = entry.get('url', '') if not url: continue # Try to parse as episode show, season, episode = parse_episode(name) if show and season and episode: # Proper TV episode structure season_folder = f"Season {season:02d}" episode_name = f"{show} S{season:02d}E{episode:02d}" strm_file = os.path.join(TV_DIR, show, season_folder, f"{episode_name}.strm"https://cloudstatic.net/pic/smilies/wink.gif else: # Can't parse episode info — put in show folder strm_file = os.path.join(TV_DIR, name, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif existing_files.add(strm_file) if write_strm(strm_file, url): count += 1 else: skipped += 1 print(f"TV Shows: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif return existing_files def process_xxx(entries): """Process adult content entries into STRM files""" count = 0 skipped = 0 existing_files = set() for entry in entries: group = entry.get('group', '') if any(skip in group for skip in SKIP_GROUPS): continue if not any(xg in group for xg in XXX_GROUPS): continue name = clean_name(entry.get('name', 'Unknown')) if not name: continue url = entry.get('url', '') if not url: continue xxx_folder = os.path.join(XXX_DIR, name) strm_file = os.path.join(xxx_folder, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif existing_files.add(strm_file) if write_strm(strm_file, url): count += 1 else: skipped += 1 print(f"XXX VOD: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif return existing_files def clean_old_files(directory, current_files): """Remove STRM files no longer in M3U""" removed = 0 for root, dirs, files in os.walk(directory): for file in files: if file.endswith('.strm'): filepath = os.path.join(root, file) if filepath not in current_files: try: os.remove(filepath) removed += 1 except OSError: pass # Remove empty directories safely for d in dirs: dirpath = os.path.join(root, d) try: if not os.listdir(dirpath): os.rmdir(dirpath) except OSError: pass print(f"Removed {removed} old STRM files from {directory}"https://cloudstatic.net/pic/smilies/wink.gif def main(): print("=" * 50) print("M3U to STRM Converter"https://cloudstatic.net/pic/smilies/wink.gif print("=" * 50) # Create output directories for d in [MOVIES_DIR, TV_DIR, XXX_DIR]: os.makedirs(d, exist_ok=True) # Download M3U content = download_m3u(M3U_URL) if not content: print("Failed to download M3U — aborting"https://cloudstatic.net/pic/smilies/wink.gif return # Parse M3U entries = parse_m3u(content) if not entries: print("No entries found — aborting"https://cloudstatic.net/pic/smilies/wink.gif return # Process movies print("\nProcessing movies..."https://cloudstatic.net/pic/smilies/wink.gif movie_files = process_movies(entries) # Process TV shows print("\nProcessing TV shows..."https://cloudstatic.net/pic/smilies/wink.gif tv_files = process_tv(entries) # Process XXX VOD print("\nProcessing XXX VOD..."https://cloudstatic.net/pic/smilies/wink.gif xxx_files = process_xxx(entries) # Clean old files if enabled if CLEAN_SYNC: print("\nCleaning old files..."https://cloudstatic.net/pic/smilies/wink.gif clean_old_files(MOVIES_DIR, movie_files) clean_old_files(TV_DIR, tv_files) clean_old_files(XXX_DIR, xxx_files) print("\n" + "=" * 50) print("Done!"https://cloudstatic.net/pic/smilies/wink.gif print("=" * 50) if __name__ == "__main__": main()
Recommended Posts
Create an account or sign in to comment
You need to be a member in order to leave a comment
Create an account
Sign up for a new account in our community. It's easy!
Register a new accountSign in
Already have an account? Sign in here.
Sign In Now