Jump to content

Convert your M3u to STRM easy config


Recommended Posts

whubbartt
Posted

I got sick and tired of trying to make M#U converters work with my M3u provider so this script will take
VOD from your M3u stream and process it as strm data in a designated folder. Enjoy


#!/usr/bin/env python3
"""
M3U to STRM python Converter
Converts IPTV M3U playlist VOD content to STRM files for Emby/Jellyfin
Be sure to create the 3 directories and specify below. set to run on a schedule
This python script will process compare current VOD streams to what is in the directory
and delete file from directory if it's no longer in the VOD feed from your provider.
This script only works with M3U and not Xtreme Code user pass
"""

import os
import re
import urllib.request
import urllib.error

# ============================================================
# CONFIGURATION - Edit these settings
# ============================================================

# Put Your Provider URL Here and delete mine
M3U_URL = "https://tv123.me/iptv/2/yf/VOD"

# Specify the directories as to where you can save your STRMS after processing here
MOVIES_DIR = "/mnt/user/Plexcontent/IPMovies"
TV_DIR = "/mnt/user/Plexcontent/IPTv"
XXX_DIR = "/mnt/user/Plexcontent/IPOther"

MOVIE_GROUPS = ["Movie VOD"]
TV_GROUPS = ["TV VOD"]
XXX_GROUPS = ["XXX VOD"]

# If you don't want XXX groups specify what not to process here
SKIP_GROUPS = ["TRY", "Tested", "ZZZ"]

CLEAN_SYNC = True  # Fixed Typo: Set True to remove STRM files no longer in M3U

# ============================================================

def sanitize_filename(name):
   """Remove invalid filename characters"""
   name = re.sub(r'[<>:"/\\|?*]', '', name)
   name = re.sub(r'\s+', ' ', name).strip()
   return name

def parse_episode(name):
   """Extract show name, season, episode from title"""
   # Match S01E01 pattern
   match = re.search(r'^(.*?)\s*[Ss](\d{1,2})[Ee](\d{1,2})', name)
   if match:
       show = sanitize_filename(match.group(1).strip())
       season = int(match.group(2))
       episode = int(match.group(3))
       return show, season, episode
   return None, None, None

def clean_name(name):
   """Remove provider prefixes like 'TV VOD,' or 'Movie VOD,'"""
   for prefix in MOVIE_GROUPS + TV_GROUPS + XXX_GROUPS:
       if name.startswith(prefix + ","https://cloudstatic.net/pic/smilies/wink.gif:
           name = name[len(prefix) + 1:].strip()
       if name.startswith(prefix + " "https://cloudstatic.net/pic/smilies/wink.gif:
           name = name[len(prefix):].strip()
   return sanitize_filename(name)

def download_m3u(url):
   """Download M3U playlist"""
   print(f"Downloading M3U from {url}..."https://cloudstatic.net/pic/smilies/wink.gif
   try:
       req = urllib.request.Request(url, headers={'User-Agent': 'VLC/3.0.0'})
       with urllib.request.urlopen(req, timeout=30) as response:
           content = response.read().decode('utf-8', errors='ignore')
       print(f"Downloaded {len(content)} bytes"https://cloudstatic.net/pic/smilies/wink.gif
       return content
   except Exception as e:
       print(f"ERROR downloading M3U: {e}"https://cloudstatic.net/pic/smilies/wink.gif
       return None

def parse_m3u(content):
   """Parse M3U content into list of entries"""
   entries = []
   lines = content.splitlines()
   current = {}
   
   for line in lines:
       line = line.strip()
       if line.startswith('#EXTINF:'):
           current = {}
           # Extract group-title
           group_match = re.search(r'group-title="([^"]*)"', line)
           current['group'] = group_match.group(1) if group_match else ''
           # Extract tvg-name
           name_match = re.search(r'tvg-name="([^"]*)"', line)
           # Get display name (after the last comma)
           display_name = line.split(',')[-1].strip() if ',' in line else ''
           current['name'] = name_match.group(1) if name_match else display_name
           if not current['name']:
               current['name'] = display_name
       elif line.startswith('http') and current:
           current['url'] = line
           entries.append(current)
           current = {}
   
   print(f"Parsed {len(entries)} total entries"https://cloudstatic.net/pic/smilies/wink.gif
   return entries

def write_strm(filepath, url):
   """Write a STRM file"""
   os.makedirs(os.path.dirname(filepath), exist_ok=True)
   if not os.path.exists(filepath):
       with open(filepath, 'w', encoding='utf-8') as f:
           f.write(url)
       return True
   return False

def process_movies(entries):
   """Process movie entries into STRM files"""
   count = 0
   skipped = 0
   existing_files = set()
   
   for entry in entries:
       group = entry.get('group', '')
       
       # Skip unwanted groups
       if any(skip in group for skip in SKIP_GROUPS):
           continue
           
       # Only process movie groups
       if not any(mg in group for mg in MOVIE_GROUPS):
           continue
       
       name = clean_name(entry.get('name', 'Unknown'))
       if not name:
           continue
           
       url = entry.get('url', '')
       if not url:
           continue
       
       movie_folder = os.path.join(MOVIES_DIR, name)
       strm_file = os.path.join(movie_folder, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif
       existing_files.add(strm_file)
       
       if write_strm(strm_file, url):
           count += 1
       else:
           skipped += 1
   
   print(f"Movies: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif
   return existing_files

def process_tv(entries):
   """Process TV series entries into STRM files"""
   count = 0
   skipped = 0
   existing_files = set()
   
   for entry in entries:
       group = entry.get('group', '')
       
       # Skip unwanted groups
       if any(skip in group for skip in SKIP_GROUPS):
           continue
           
       # Only process TV groups
       if not any(tg in group for tg in TV_GROUPS):
           continue
       
       name = clean_name(entry.get('name', 'Unknown'))
       if not name:
           continue
           
       url = entry.get('url', '')
       if not url:
           continue
       
       # Try to parse as episode
       show, season, episode = parse_episode(name)
       
       if show and season and episode:
           # Proper TV episode structure
           season_folder = f"Season {season:02d}"
           episode_name = f"{show} S{season:02d}E{episode:02d}"
           strm_file = os.path.join(TV_DIR, show, season_folder, f"{episode_name}.strm"https://cloudstatic.net/pic/smilies/wink.gif
       else:
           # Can't parse episode info — put in show folder
           strm_file = os.path.join(TV_DIR, name, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif
       
       existing_files.add(strm_file)
       
       if write_strm(strm_file, url):
           count += 1
       else:
           skipped += 1
   
   print(f"TV Shows: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif
   return existing_files

def process_xxx(entries):
   """Process adult content entries into STRM files"""
   count = 0
   skipped = 0
   existing_files = set()
   
   for entry in entries:
       group = entry.get('group', '')
       
       if any(skip in group for skip in SKIP_GROUPS):
           continue
           
       if not any(xg in group for xg in XXX_GROUPS):
           continue
       
       name = clean_name(entry.get('name', 'Unknown'))
       if not name:
           continue
           
       url = entry.get('url', '')
       if not url:
           continue
       
       xxx_folder = os.path.join(XXX_DIR, name)
       strm_file = os.path.join(xxx_folder, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif
       existing_files.add(strm_file)
       
       if write_strm(strm_file, url):
           count += 1
       else:
           skipped += 1
           
   print(f"XXX VOD: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif
   return existing_files

def clean_old_files(directory, current_files):
   """Remove STRM files no longer in M3U"""
   removed = 0
   for root, dirs, files in os.walk(directory):
       for file in files:
           if file.endswith('.strm'):
               filepath = os.path.join(root, file)
               if filepath not in current_files:
                   try:
                       os.remove(filepath)
                       removed += 1
                   except OSError:
                       pass
       # Remove empty directories safely
       for d in dirs:
           dirpath = os.path.join(root, d)
           try:
               if not os.listdir(dirpath):
                   os.rmdir(dirpath)
           except OSError:
               pass
   print(f"Removed {removed} old STRM files from {directory}"https://cloudstatic.net/pic/smilies/wink.gif

def main():
   print("=" * 50)
   print("M3U to STRM Converter"https://cloudstatic.net/pic/smilies/wink.gif
   print("=" * 50)
   
   # Create output directories
   for d in [MOVIES_DIR, TV_DIR, XXX_DIR]:
       os.makedirs(d, exist_ok=True)
   
   # Download M3U
   content = download_m3u(M3U_URL)
   if not content:
       print("Failed to download M3U — aborting"https://cloudstatic.net/pic/smilies/wink.gif
       return
   
   # Parse M3U
   entries = parse_m3u(content)
   if not entries:
       print("No entries found — aborting"https://cloudstatic.net/pic/smilies/wink.gif
       return
   
   # Process movies
   print("\nProcessing movies..."https://cloudstatic.net/pic/smilies/wink.gif
   movie_files = process_movies(entries)
   
   # Process TV shows
   print("\nProcessing TV shows..."https://cloudstatic.net/pic/smilies/wink.gif
   tv_files = process_tv(entries)
   
   # Process XXX VOD
   print("\nProcessing XXX VOD..."https://cloudstatic.net/pic/smilies/wink.gif
   xxx_files = process_xxx(entries)
   
   # Clean old files if enabled
   if CLEAN_SYNC:
       print("\nCleaning old files..."https://cloudstatic.net/pic/smilies/wink.gif
       clean_old_files(MOVIES_DIR, movie_files)
       clean_old_files(TV_DIR, tv_files)
       clean_old_files(XXX_DIR, xxx_files)
   
   print("\n" + "=" * 50)
   print("Done!"https://cloudstatic.net/pic/smilies/wink.gif
   print("=" * 50)

if __name__ == "__main__":
   main()

Posted

Thanks for sharing.

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×
×
  • Create New...