Jump to content

Recommended Posts

whubbartt
Posted

I got sick and tired of trying to make M#U converters work with my M3u provider so this script will take
VOD from your M3u stream and process it as strm data in a designated folder. Enjoy


#!/usr/bin/env python3
"""
M3U to STRM python Converter
Converts IPTV M3U playlist VOD content to STRM files for Emby/Jellyfin
Be sure to create the 3 directories and specify below. set to run on a schedule
This python script will process compare current VOD streams to what is in the directory
and delete file from directory if it's no longer in the VOD feed from your provider.
This script only works with M3U and not Xtreme Code user pass
"""

import os
import re
import urllib.request
import urllib.error

# ============================================================
# CONFIGURATION - Edit these settings
# ============================================================

# Put Your Provider URL Here and delete mine
M3U_URL = "https://tv123.me/iptv/2/yf/VOD"

# Specify the directories as to where you can save your STRMS after processing here
MOVIES_DIR = "/mnt/user/Plexcontent/IPMovies"
TV_DIR = "/mnt/user/Plexcontent/IPTv"
XXX_DIR = "/mnt/user/Plexcontent/IPOther"

MOVIE_GROUPS = ["Movie VOD"]
TV_GROUPS = ["TV VOD"]
XXX_GROUPS = ["XXX VOD"]

# If you don't want XXX groups specify what not to process here
SKIP_GROUPS = ["TRY", "Tested", "ZZZ"]

CLEAN_SYNC = True  # Fixed Typo: Set True to remove STRM files no longer in M3U

# ============================================================

def sanitize_filename(name):
   """Remove invalid filename characters"""
   name = re.sub(r'[<>:"/\\|?*]', '', name)
   name = re.sub(r'\s+', ' ', name).strip()
   return name

def parse_episode(name):
   """Extract show name, season, episode from title"""
   # Match S01E01 pattern
   match = re.search(r'^(.*?)\s*[Ss](\d{1,2})[Ee](\d{1,2})', name)
   if match:
       show = sanitize_filename(match.group(1).strip())
       season = int(match.group(2))
       episode = int(match.group(3))
       return show, season, episode
   return None, None, None

def clean_name(name):
   """Remove provider prefixes like 'TV VOD,' or 'Movie VOD,'"""
   for prefix in MOVIE_GROUPS + TV_GROUPS + XXX_GROUPS:
       if name.startswith(prefix + ","https://cloudstatic.net/pic/smilies/wink.gif:
           name = name[len(prefix) + 1:].strip()
       if name.startswith(prefix + " "https://cloudstatic.net/pic/smilies/wink.gif:
           name = name[len(prefix):].strip()
   return sanitize_filename(name)

def download_m3u(url):
   """Download M3U playlist"""
   print(f"Downloading M3U from {url}..."https://cloudstatic.net/pic/smilies/wink.gif
   try:
       req = urllib.request.Request(url, headers={'User-Agent': 'VLC/3.0.0'})
       with urllib.request.urlopen(req, timeout=30) as response:
           content = response.read().decode('utf-8', errors='ignore')
       print(f"Downloaded {len(content)} bytes"https://cloudstatic.net/pic/smilies/wink.gif
       return content
   except Exception as e:
       print(f"ERROR downloading M3U: {e}"https://cloudstatic.net/pic/smilies/wink.gif
       return None

def parse_m3u(content):
   """Parse M3U content into list of entries"""
   entries = []
   lines = content.splitlines()
   current = {}
   
   for line in lines:
       line = line.strip()
       if line.startswith('#EXTINF:'):
           current = {}
           # Extract group-title
           group_match = re.search(r'group-title="([^"]*)"', line)
           current['group'] = group_match.group(1) if group_match else ''
           # Extract tvg-name
           name_match = re.search(r'tvg-name="([^"]*)"', line)
           # Get display name (after the last comma)
           display_name = line.split(',')[-1].strip() if ',' in line else ''
           current['name'] = name_match.group(1) if name_match else display_name
           if not current['name']:
               current['name'] = display_name
       elif line.startswith('http') and current:
           current['url'] = line
           entries.append(current)
           current = {}
   
   print(f"Parsed {len(entries)} total entries"https://cloudstatic.net/pic/smilies/wink.gif
   return entries

def write_strm(filepath, url):
   """Write a STRM file"""
   os.makedirs(os.path.dirname(filepath), exist_ok=True)
   if not os.path.exists(filepath):
       with open(filepath, 'w', encoding='utf-8') as f:
           f.write(url)
       return True
   return False

def process_movies(entries):
   """Process movie entries into STRM files"""
   count = 0
   skipped = 0
   existing_files = set()
   
   for entry in entries:
       group = entry.get('group', '')
       
       # Skip unwanted groups
       if any(skip in group for skip in SKIP_GROUPS):
           continue
           
       # Only process movie groups
       if not any(mg in group for mg in MOVIE_GROUPS):
           continue
       
       name = clean_name(entry.get('name', 'Unknown'))
       if not name:
           continue
           
       url = entry.get('url', '')
       if not url:
           continue
       
       movie_folder = os.path.join(MOVIES_DIR, name)
       strm_file = os.path.join(movie_folder, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif
       existing_files.add(strm_file)
       
       if write_strm(strm_file, url):
           count += 1
       else:
           skipped += 1
   
   print(f"Movies: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif
   return existing_files

def process_tv(entries):
   """Process TV series entries into STRM files"""
   count = 0
   skipped = 0
   existing_files = set()
   
   for entry in entries:
       group = entry.get('group', '')
       
       # Skip unwanted groups
       if any(skip in group for skip in SKIP_GROUPS):
           continue
           
       # Only process TV groups
       if not any(tg in group for tg in TV_GROUPS):
           continue
       
       name = clean_name(entry.get('name', 'Unknown'))
       if not name:
           continue
           
       url = entry.get('url', '')
       if not url:
           continue
       
       # Try to parse as episode
       show, season, episode = parse_episode(name)
       
       if show and season and episode:
           # Proper TV episode structure
           season_folder = f"Season {season:02d}"
           episode_name = f"{show} S{season:02d}E{episode:02d}"
           strm_file = os.path.join(TV_DIR, show, season_folder, f"{episode_name}.strm"https://cloudstatic.net/pic/smilies/wink.gif
       else:
           # Can't parse episode info — put in show folder
           strm_file = os.path.join(TV_DIR, name, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif
       
       existing_files.add(strm_file)
       
       if write_strm(strm_file, url):
           count += 1
       else:
           skipped += 1
   
   print(f"TV Shows: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif
   return existing_files

def process_xxx(entries):
   """Process adult content entries into STRM files"""
   count = 0
   skipped = 0
   existing_files = set()
   
   for entry in entries:
       group = entry.get('group', '')
       
       if any(skip in group for skip in SKIP_GROUPS):
           continue
           
       if not any(xg in group for xg in XXX_GROUPS):
           continue
       
       name = clean_name(entry.get('name', 'Unknown'))
       if not name:
           continue
           
       url = entry.get('url', '')
       if not url:
           continue
       
       xxx_folder = os.path.join(XXX_DIR, name)
       strm_file = os.path.join(xxx_folder, f"{name}.strm"https://cloudstatic.net/pic/smilies/wink.gif
       existing_files.add(strm_file)
       
       if write_strm(strm_file, url):
           count += 1
       else:
           skipped += 1
           
   print(f"XXX VOD: {count} new, {skipped} already existed"https://cloudstatic.net/pic/smilies/wink.gif
   return existing_files

def clean_old_files(directory, current_files):
   """Remove STRM files no longer in M3U"""
   removed = 0
   for root, dirs, files in os.walk(directory):
       for file in files:
           if file.endswith('.strm'):
               filepath = os.path.join(root, file)
               if filepath not in current_files:
                   try:
                       os.remove(filepath)
                       removed += 1
                   except OSError:
                       pass
       # Remove empty directories safely
       for d in dirs:
           dirpath = os.path.join(root, d)
           try:
               if not os.listdir(dirpath):
                   os.rmdir(dirpath)
           except OSError:
               pass
   print(f"Removed {removed} old STRM files from {directory}"https://cloudstatic.net/pic/smilies/wink.gif

def main():
   print("=" * 50)
   print("M3U to STRM Converter"https://cloudstatic.net/pic/smilies/wink.gif
   print("=" * 50)
   
   # Create output directories
   for d in [MOVIES_DIR, TV_DIR, XXX_DIR]:
       os.makedirs(d, exist_ok=True)
   
   # Download M3U
   content = download_m3u(M3U_URL)
   if not content:
       print("Failed to download M3U — aborting"https://cloudstatic.net/pic/smilies/wink.gif
       return
   
   # Parse M3U
   entries = parse_m3u(content)
   if not entries:
       print("No entries found — aborting"https://cloudstatic.net/pic/smilies/wink.gif
       return
   
   # Process movies
   print("\nProcessing movies..."https://cloudstatic.net/pic/smilies/wink.gif
   movie_files = process_movies(entries)
   
   # Process TV shows
   print("\nProcessing TV shows..."https://cloudstatic.net/pic/smilies/wink.gif
   tv_files = process_tv(entries)
   
   # Process XXX VOD
   print("\nProcessing XXX VOD..."https://cloudstatic.net/pic/smilies/wink.gif
   xxx_files = process_xxx(entries)
   
   # Clean old files if enabled
   if CLEAN_SYNC:
       print("\nCleaning old files..."https://cloudstatic.net/pic/smilies/wink.gif
       clean_old_files(MOVIES_DIR, movie_files)
       clean_old_files(TV_DIR, tv_files)
       clean_old_files(XXX_DIR, xxx_files)
   
   print("\n" + "=" * 50)
   print("Done!"https://cloudstatic.net/pic/smilies/wink.gif
   print("=" * 50)

if __name__ == "__main__":
   main()

Posted

Thanks for sharing.

whubbartt
Posted

So this is my first time writing a converter and I noticed some issues with REGEX processing of Series this is version 2 along with instructions.

This now will ask you questions to establish the necessary variables and store them in a json file. It will email you sms the results each time it runs.

This is only ver 2 so it may not be full proof yet but from my testing it works pretty darn good. Iv'e attached the script to this post also. This is a very smart script from version 1 and there is a lot of logic added to determine proper save structure.

Enjoy!

IPTV VOD converter for Jellyfin, Emby, Plex Linux Setup Guide

This script automatically pulls a streaming playlist and turns every video into a tiny placeholder file (.strm) on your server. This allows media servers like Emby or Plex to list the videos in your library and stream them directly without taking up massive amounts of hard drive space.

🔒 Hardware & Database Friendly

  • Smart Scanning: The script only adds new videos. If a file already exists on your server, the script skips it entirely. It will never delete or overwrite your files.
  • Saves SSD Life: Because it doesn’t delete and recreate files, it protects your Solid State Drives (NVMe/SSDs) from unnecessary wear and tear.
  • Protects Emby/Plex: Your watch history, custom posters, and metadata are perfectly safe because the script never forces your media server to think files were deleted.

🛠️ How to Setup and Run the Script

You do not need to edit the script code, copy text, or download any playlist files to make this work. Just follow these steps:

Step 1: Put the Script on Your Server

  1. Copy the script file (e.g., IPTVconverter.py) onto your server into the folder where you want it to live.
  2. The script is already programmed to automatically download the latest streaming list from the web, so you don't need to put any other files in this directory.
  3. This is a Python 3 script. Run by simply typing python3 IPTVconverter.py
  4. After initial run put it in your scheduler to run once a day.
  5. I worked hard on this so do Enjoy!

 

 

 

#!/usr/bin/env python3

"""

================================================================================

UNIVERSAL IPTV STRM SYNC ENGINE (FIXED VERIFICATION & 50-ITEM SPLIT SMS BURST)

================================================================================

"""

 

import os

import sys

import json

import re

import urllib.request

import ssl

import smtplib

from email.mime.text import MIMEText

 

CONFIG_FILE = "iptv_config.json"

 

CARRIER_MAP = {

    "verizon": "vtext.com",

    "att": "txt.att.net",

    "tmobile": "tmomail.net",

    "sprint": "messaging.sprintpcs.com",

    "cricket": "sms.cricketwireless.net",

    "boost": "myboostmobile.com"

}

 

def confirm_input(value_to_show):

    while True:

        print(f"👉 You entered/selected: {value_to_show}")

        choice = input("🤔 Is this correct? (y/n): ").strip().lower()

        if choice in ['y', 'yes']:

            return True

        if choice in ['n', 'no']:

            print("🔄 Let's try that question again...\n")

            return False

        print("❌ Please type 'y' for yes or 'n' for no.")

 

def scan_m3u_groups(url):

    print(f"\n🔍 Analyzing playlist architecture and isolating VOD assets...")

    try:

        req = urllib.request.Request(url, headers={'User-Agent': 'VLC/3.0.0'})

        context = ssl._create_unverified_context()

        detected_groups = set()

       

        with urllib.request.urlopen(req, context=context, timeout=45) as response:

            current_group = None

            for line in response:

                line = line.decode('utf-8', errors='ignore').strip()

                if line.startswith('#EXTINF:'):

                    group_match = re.search(r'group-title="([^"]*)"', line)

                    current_group = group_match.group(1) if group_match else None

                elif line.startswith('http') and current_group:

                    if not any(live_indicator in line.lower() for live_indicator in ['/xmltv', '.xml', '/epg']):

                        detected_groups.add(current_group)

                    current_group = None

            return list(detected_groups)

    except Exception as e:

        print(f"⚠️  Could not pre-scan M3U groups: {e}")

        return []

 

def auto_classify_groups(detected_groups):

    movies, tv, xxx = [], [], []

    tv_indicators = ['series', 'tv show', 'tv-show', 'season', 's0', 's1', 's2', 'episodes', 'shows', 'tv']

    xxx_indicators = ['xxx', 'adult', '18+', 'nsfw', 'pink', 'erotic']

   

    for group in detected_groups:

        g_lower = group.lower()

        if any(x in g_lower for x in xxx_indicators):

            xxx.append(group)

        elif any(t in g_lower for t in tv_indicators):

            tv.append(group)

        else:

            movies.append(group)

    return movies, tv, xxx

 

def run_wizard():

    if os.path.exists(CONFIG_FILE):

        with open(CONFIG_FILE, 'r') as f:

            return json.load(f)

 

    config = {}

    print("\n==================================================")

    print("        IPTV SYNC SETUP (INTELLIGENT WIZARD)      ")

    print("==================================================")

   

    while True:

        url = input("👉 Paste your IPTV M3U URL: ").strip()

        if not url.startswith(("http://", "https://")):

            print("❌ Invalid URL format.\n")

            continue

        if confirm_input(url):

            config['M3U_URL'] = url

            break

 

    all_groups = scan_m3u_groups(config['M3U_URL'])

    m_groups, t_groups, x_groups = auto_classify_groups(all_groups)

   

    config['MOVIE_GROUPS'] = m_groups if m_groups else ["Movie VOD"]

    config['TV_GROUPS'] = t_groups if t_groups else ["TV VOD"]

    config['XXX_GROUPS'] = x_groups

 

    print("\n📁 --- MEDIA STORAGE LOCAL PATHS ---")

    while True:

        path = input("👉 Movies directory path: ").strip()

        if confirm_input(path):

            config['MOVIES'] = path

            break

 

    while True:

        path = input("👉 TV Shows directory path: ").strip()

        if not path:

            config['TV'] = "/tmp/tv_unused"

            config['TV_GROUPS'] = []

            break

        if confirm_input(path):

            config['TV'] = path

            break

 

    if x_groups:

        while True:

            path = input("👉 Adult VOD directory path (Or ENTER to skip): ").strip()

            if not path:

                config['XXX'] = "/tmp/xxx_unused"

                config['XXX_GROUPS'] = []

                break

            if confirm_input(path):

                config['XXX'] = path

                break

    else:

        config['XXX'] = "/tmp/xxx_unused"

        config['XXX_GROUPS'] = []

 

    print("\n📱 --- SMS NOTIFICATIONS (OPTIONAL) ---")

    want_sms = False

    while True:

        phone = input("👉 Enter your 10-digit phone number (Or ENTER to skip): ").strip()

        if not phone:

            config['PHONE'] = ""

            break

        clean_phone = re.sub(r'\D', '', phone)

        if len(clean_phone) != 10:

            print("❌ Invalid entry.\n")

            continue

        if confirm_input(clean_phone):

            config['PHONE'] = clean_phone

            want_sms = True

            break

 

    if want_sms:

        while True:

            carrier = input("👉 Enter your mobile carrier name: ").strip().lower()

            if carrier not in CARRIER_MAP:

                print("❌ Unsupported carrier profile.\n")

                continue

            if confirm_input(carrier.upper()):

                config['CARRIER_NAME'] = carrier

                config['CARRIER_DOMAIN'] = CARRIER_MAP[carrier]

                break

 

        print("\n📧 --- OUTGOING SMTP MAIL ROUTER ---")

        while True:

            smtp_server = input("👉 Outgoing SMTP Server Host: ").strip()

            if confirm_input(smtp_server):

                config['SMTP_SERVER'] = smtp_server

                break

 

        while True:

            print("\n🔒 [SMTP SECURITY NOTICE]")

            smtp_port = input("👉 Enter secure SMTP Port [Suggested: 587]: ").strip()

            try:

                port_val = int(smtp_port)

                if confirm_input(port_val):

                    config['SMTP_PORT'] = port_val

                    break

            except ValueError:

                print("❌ Numerical integers only.")

 

        while True:

            email_user = input("👉 Sender Email Address Username: ").strip()

            if confirm_input(email_user):

                config['EMAIL_USER'] = email_user

                break

               

        while True:

            email_pass = input("👉 Email Account Password: ").strip()

            if confirm_input("********"):

                config['EMAIL_PASS'] = email_pass

                break

 

    config['SKIP_GROUPS'] = ["TRY", "Tested", "ZZZ"]

    config['CLEAN_SYNC'] = True

 

    print("\n🛠  Initializing storage paths...")

    for key in ['MOVIES', 'TV', 'XXX']:

        if config[key] not in ["/tmp/tv_unused", "/tmp/xxx_unused"]:

            os.makedirs(config[key], exist_ok=True)

            print(f"    ✅ Target Ready: {config[key]}")

 

    with open(CONFIG_FILE, 'w') as f:

        json.dump(config, f, indent=4)

    return config

 

def clean_title_casing(title):

    title = re.sub(r'[\[\(\{][^\]\}\)]*[\]\}\)]', '', title)

    title = re.sub(r'[<>:"/\\|?*]', '', title)

    title = re.sub(r'\s+', ' ', title).strip()

    title = re.sub(r'\bL\s+a\b', 'LA', title, flags=re.I)

   

    words = title.split()

    minor_words = {'a', 'an', 'the', 'and', 'but', 'or', 'for', 'on', 'at', 'to', 'by', 'of', 'with', 'in', 'vs', 'from'}

    caps_words = {'au', 'us', 'uk', 'fbi', 'abc', 'dc', 'tv', 'la', 'swat', 'ncis', 'csi', 'sec', 'snl'}

   

    final_words = []

    for i, word in enumerate(words):

        lw = word.lower()

        if lw == 'x':

            final_words.append('X')

        elif lw in caps_words:

            final_words.append(word.upper())

        elif lw in minor_words and i > 0:

            final_words.append(lw)

        else:

            final_words.append(word.capitalize())

           

    result = " ".join(final_words).strip()

    return re.sub(r'\s*-\s*$', '', result)

 

def strip_group_prefixes(name, cfg):

    all_prefixes = cfg.get('MOVIE_GROUPS', []) + cfg.get('TV_GROUPS', []) + cfg.get('XXX_GROUPS', [])

    for prefix in all_prefixes:

        if name.startswith(prefix + ","):

            name = name[len(prefix) + 1:].strip()

        elif name.startswith(prefix + " "):

            name = name[len(prefix):].strip()

    return name

 

def parse_tv_meta(raw_name, cfg):

    raw_name = re.sub(r'\b(repack|proper|internal|unrated|extended)\b', '', raw_name, flags=re.I)

    raw_name = strip_group_prefixes(raw_name, cfg)

   

    date_match = re.search(r'\b(19\d{2}|20\d{2})[\s._](0[1-9]|1[0-2])[\s._](0[1-9]|[12]\d|3[01])\b', raw_name)

    if date_match:

        year, month, day = date_match.group(1), date_match.group(2), date_match.group(3)

        parts = re.split(r'\b' + year + r'\b', raw_name, maxsplit=1)

        show_clean = clean_title_casing(parts[0])

        return f"{show_clean} ({year})", year, f"{month}{day}"

 

    year_match = re.search(r'\b(19\d{2}|20\d{2})\b', raw_name)

    if year_match:

        year = year_match.group(1)

        if not re.search(r'[sS]' + year, raw_name):

            parts = re.split(r'\b' + year + r'\b', raw_name, maxsplit=1)

            show_clean = clean_title_casing(parts[0])

            if not any(ep_tag in parts[1].lower() for ep_tag in ['s', 'e', 'season', 'episode']):

                return f"{show_clean} ({year})", "01", "01"

 

    multi_match = re.search(r'\bS(\d+)\s*[eE](\d+)(?:[-_]*[eE]?\d+)+\b', raw_name)

    if multi_match:

        s_num = multi_match.group(1).zfill(2)

        e_num = multi_match.group(2).zfill(2)

        scrubbed = re.split(r'\bS\d+\s*[eE]\d+', raw_name, flags=re.I)[0]

        return clean_title_casing(scrubbed), s_num, e_num

 

    sxx_match = re.search(r'\bS(\d+)\s*[eE](\d+)\b', raw_name)

    if sxx_match:

        s_num = sxx_match.group(1).zfill(2)

        e_num = sxx_match.group(2).zfill(2)

        scrubbed = re.split(r'\bS\d+\s*[eE]\d+', raw_name, flags=re.I)[0]

        return clean_title_casing(scrubbed), s_num, e_num

 

    season_tag_match = re.search(r'\bS(?:eason)?\s*(\d+)\b', raw_name, re.I)

    if season_tag_match:

        s_num = season_tag_match.group(1).zfill(2)

        scrubbed = re.split(r'\bS(?:eason)?\s*\d+', raw_name, flags=re.I)[0]

        ep_match = re.search(r'\bE(?:pisode)?\s*(\d+)\b', raw_name, re.I)

        e_num = ep_match.group(1).zfill(2) if ep_match else "01"

        return clean_title_casing(scrubbed), s_num, e_num

 

    ep_tag_match = re.search(r'(?:[-_\s]E(?:pisode)?)\s*[-_]?(\d+)\b', raw_name, re.I)

    if ep_tag_match:

        e_num = ep_tag_match.group(1).zfill(2)

        scrubbed = re.split(r'(?:[-_\s]E(?:pisode)?)\s*[-_]?\d+', raw_name, flags=re.I)[0]

        return clean_title_casing(scrubbed), "01", e_num

 

    num_match = re.search(r'[-_\s](\d{1,3})\b(?=\s*(?:720p|1080p|2160p|4k|hevc|x264|x265|web|hdtv|h264|h265|proper|repack|\(|\[|$))', raw_name, re.I)

    if num_match:

        e_num = num_match.group(1).zfill(2)

        scrubbed = raw_name.split(num_match.group(0))[0]

        return clean_title_casing(scrubbed), "01", e_num

 

    scrubbed = re.sub(r'\b(720p|1080p|2160p|4k|hevc|x264|x265|h264|h265|web|hdtv).*$', '', raw_name, flags=re.I)

    return clean_title_casing(scrubbed), "01", "01"

 

def download_m3u(url):

    print(f"Downloading M3U from {url}...")

    try:

        req = urllib.request.Request(url, headers={'User-Agent': 'VLC/3.0.0'})

        context = ssl._create_unverified_context()

        with urllib.request.urlopen(req, context=context, timeout=45) as response:

            content = response.read().decode('utf-8', errors='ignore')

        print(f"Successfully downloaded {len(content):,} bytes")

        return content

    except Exception as e:

        print(f"ERROR downloading M3U: {e}")

        return None

 

def parse_m3u(content):

    entries = []

    lines = content.splitlines()

    current = {}

   

    for line in lines:

        line = line.strip()

        if line.startswith('#EXTINF:'):

            current = {}

            group_match = re.search(r'group-title="([^"]*)"', line)

            current['group'] = group_match.group(1) if group_match else ''

            display_name = line.split(',')[-1].strip() if ',' in line else ''

            name_match = re.search(r'tvg-name="([^"]*)"', line)

            current['name'] = name_match.group(1) if name_match and name_match.group(1) else display_name

            if not current['name']:

                current['name'] = display_name

        elif line.startswith('http') and current:

            if not any(live_indicator in line.lower() for live_indicator in ['/xmltv', '.xml', '/epg']):

                current['url'] = line

                entries.append(current)

            current = {}

    print(f"Isolated {len(entries):,} total VOD media entries from playlist.")

    return entries

 

def write_strm(filepath, url):

    os.makedirs(os.path.dirname(filepath), exist_ok=True)

    if not os.path.exists(filepath):

        with open(filepath, 'w', encoding='utf-8') as f:

            f.write(url)

        return True

    return False

 

def process_movies(entries, cfg):

    count, skipped = 0, 0

    active_files = set()

    newly_added_titles = []

    skip_grps = cfg.get('SKIP_GROUPS', [])

    mov_grps = cfg.get('MOVIE_GROUPS', [])

   

    tv_indicators = ['series', 'tv show', 'tv-show', 'season', 's0', 's1', 's2', 'episodes', 'shows']

    xxx_indicators = ['xxx', 'adult', '18+', 'nsfw', 'pink', 'erotic']

 

    for entry in entries:

        group = entry.get('group', '')

        g_lower = group.lower()

        if any(skip.lower() in g_lower for skip in skip_grps):

            continue

           

        if mov_grps:

            if group not in mov_grps:

                continue

        else:

            if any(x in g_lower for x in xxx_indicators) or any(t in g_lower for t in tv_indicators):

                continue

           

        raw_name = entry.get('name', 'Unknown')

        name = clean_title_casing(strip_group_prefixes(raw_name, cfg))

        url = entry.get('url', '')

        if not name or not url:

            continue

           

        movie_folder = os.path.join(cfg['MOVIES'], name)

        strm_file = os.path.join(movie_folder, f"{name}.strm")

        active_files.add(strm_file)

       

        if write_strm(strm_file, url):

            count += 1

            newly_added_titles.append(name)

        else:

            skipped += 1

    print(f"Movies: {count} deployed, {skipped} verified current.")

    return active_files, count, skipped, newly_added_titles

 

def process_tv(entries, cfg):

    count, skipped = 0, 0

    active_files = set()

    newly_added_titles = []

    skip_grps = cfg.get('SKIP_GROUPS', [])

    tv_grps = cfg.get('TV_GROUPS', [])

   

    if cfg.get('TV') == "/tmp/tv_unused":

        return active_files, 0, 0, newly_added_titles

       

    tv_indicators = ['series', 'tv show', 'tv-show', 'season', 's0', 's1', 's2', 'episodes', 'shows']

 

    for entry in entries:

        group = entry.get('group', '')

        g_lower = group.lower()

        if any(skip.lower() in g_lower for skip in skip_grps):

            continue

           

        if tv_grps:

            if group not in tv_grps:

                continue

        else:

            if not any(t in g_lower for t in tv_indicators):

                continue

           

        raw_name = entry.get('name', 'Unknown')

        url = entry.get('url', '')

        if not raw_name or not url:

            continue

           

        show_base, s_num, e_num = parse_tv_meta(raw_name, cfg)

        season_folder = f"Season {s_num}"

        episode_name = f"{show_base} - S{s_num}E{e_num}"

       

        strm_file = os.path.join(cfg['TV'], show_base, season_folder, f"{episode_name}.strm")

        active_files.add(strm_file)

       

        if write_strm(strm_file, url):

            count += 1

            newly_added_titles.append(episode_name)

        else:

            skipped += 1

    print(f"TV Shows: {count} deployed, {skipped} verified current.")

    return active_files, count, skipped, newly_added_titles

 

def process_xxx(entries, cfg):

    count, skipped = 0, 0

    active_files = set()

    newly_added_titles = []

    skip_grps = cfg.get('SKIP_GROUPS', [])

    xxx_grps = cfg.get('XXX_GROUPS', [])

   

    if cfg.get('XXX') == "/tmp/xxx_unused":

        return active_files, 0, 0, newly_added_titles

       

    xxx_indicators = ['xxx', 'adult', '18+', 'nsfw', 'pink', 'erotic']

 

    for entry in entries:

        group = entry.get('group', '')

        g_lower = group.lower()

        if any(skip.lower() in g_lower for skip in skip_grps):

            continue

           

        if xxx_grps:

            if group not in xxx_grps:

                continue

        else:

            if not any(x in g_lower for x in xxx_indicators):

                continue

           

        raw_name = entry.get('name', 'Unknown')

        name = clean_title_casing(strip_group_prefixes(raw_name, cfg))

        url = entry.get('url', '')

        if not name or not url:

            continue

           

        xxx_folder = os.path.join(cfg['XXX'], name)

        strm_file = os.path.join(xxx_folder, f"{name}.strm")

        active_files.add(strm_file)

       

        if write_strm(strm_file, url):

            count += 1

            newly_added_titles.append(name)

        else:

            skipped += 1

    print(f"XXX VOD: {count} deployed, {skipped} verified current.")

    return active_files, count, skipped, newly_added_titles

 

def clean_old_files(directory, current_files):

    removed = 0

    if not os.path.exists(directory) or directory in ("/tmp/movies_unused", "/tmp/tv_unused", "/tmp/xxx_unused"):

        return removed

       

    for root, dirs, files in os.walk(directory, topdown=False):

        for file in files:

            if file.endswith('.strm'):

                filepath = os.path.join(root, file)

                if filepath not in current_files:

                    try:

                        os.remove(filepath)

                        removed += 1

                    except Exception:

                        pass

        for d in dirs:

            dirpath = os.path.join(root, d)

            try:

                if not os.listdir(dirpath):

                    os.rmdir(dirpath)

            except Exception:

                pass

    return removed

 

def send_sms_raw(cfg, subject, body_text):

    if not cfg.get('PHONE'):

        return

       

    recipient = f"{cfg['PHONE']}@{cfg['CARRIER_DOMAIN']}"

    try:

        msg = MIMEText(body_text, 'plain', 'utf-8')

        msg['From'] = f"Tower Sync <{cfg.get('EMAIL_USER', '')}>" 

        msg['To'] = recipient

        msg['Subject'] = subject

       

        port = int(cfg.get('SMTP_PORT', 25))

        server_host = cfg.get('SMTP_SERVER', '')

       

        if port == 465:

            context = ssl._create_unverified_context()

            server = smtplib.SMTP_SSL(server_host, port, context=context, timeout=20)

        else:

            server = smtplib.SMTP(server_host, port, timeout=20)

            if port == 587:

                context = ssl._create_unverified_context()

                server.starttls(context=context)

               

        if cfg.get('EMAIL_USER') and cfg.get('EMAIL_PASS'):

            server.login(cfg['EMAIL_USER'], cfg['EMAIL_PASS'])

           

        server.send_message(msg)

        server.quit()

    except Exception as e:

        print(f"⚠️  SMS Send Failed [{subject}]: {e}")

 

def main():

    print("=" * 60)

    print("      UNIVERSAL IPTV AUTOMATED SYNCHRONIZATION ENGINE       ")

    print("=" * 60)

   

    cfg = run_wizard()

    m3u_content = download_m3u(cfg.get('M3U_URL', ''))

    if not m3u_content:

        return

       

    entries = parse_m3u(m3u_content)

    if not entries:

        return

       

    print("\nProcessing Movies Library Section...")

    movie_files, m_new, m_skip, m_added_names = process_movies(entries, cfg)

   

    print("\nProcessing TV Library Section...")

    tv_files, t_new, t_skip, t_added_names = process_tv(entries, cfg)

   

    print("\nProcessing Adult Library Section...")

    xxx_files, x_new, x_skip, x_added_names = process_xxx(entries, cfg)

   

    m_rem, t_rem, x_rem = 0, 0, 0

    if cfg.get('CLEAN_SYNC', True):

        print("\nExecuting Mirror Synchronization Cleanups...")

        m_rem = clean_old_files(cfg.get('MOVIES', ''), movie_files)

        t_rem = clean_old_files(cfg.get('TV', ''), tv_files)

        x_rem = clean_old_files(cfg.get('XXX', ''), xxx_files)

        print(f"Removed old STRM files: Movies: -{m_rem} | TV: -{t_rem} | XXX: -{x_rem}")

       

    # --- BURST 1: Send Core Metric Numbers Immediately ---

    core_report = (

        f"Tower Sync Complete\n"

        f"Movies: +{m_new}/-{m_rem} (Total: {len(movie_files)})\n"

        f"TV: +{t_new}/-{t_rem} (Total: {len(tv_files)})\n"

        f"XXX: +{x_new}/-{x_rem} (Total: {len(xxx_files)})\n"

        f"Playlist entries: {len(entries):,}"

    )

    print(f"\nDispatching main metrics to {cfg['PHONE']}...")

    send_sms_raw(cfg, "Tower Stats", core_report)

   

    # --- BURST 2: Send Title Additions Limited to Top 50 to Avoid Carrier Filters ---

    total_added = len(m_added_names) + len(t_added_names) + len(x_added_names)

    if total_added > 0:

        added_lines = ["New Additions Preview:"]

       

        if m_added_names:

            added_lines.append("\n🎬 MOVIES:")

            for name in m_added_names[:50]:

                added_lines.append(f"• {name}")

            if len(m_added_names) > 50:

                added_lines.append(f"...and {len(m_added_names)-50} more")

               

        if t_added_names:

            added_lines.append("\n📺 TV:")

            for name in t_added_names[:50]:

                added_lines.append(f"• {name}")

            if len(t_added_names) > 50:

                added_lines.append(f"...and {len(t_added_names)-50} more")

               

        if x_added_names:

            added_lines.append("\n🌶️ XXX:")

            for name in x_added_names[:50]:

                added_lines.append(f"• {name}")

            if len(x_added_names) > 50:

                added_lines.append(f"...and {len(x_added_names)-50} more")

               

        preview_report = "\n".join(added_lines)

        print("Dispatching true item addition preview text...")

        send_sms_raw(cfg, "Tower Details", preview_report)

   

    print("\n" + "=" * 60)

    print("                  SYNC PIPELINE COMPLETE!                    ")

    print("=" * 60)

 

if __name__ == "__main__":

    main()

 

 

 

 

 

 

IPTVconverter.py

whubbartt
Posted

It would be best to just download the IPTVconverter.py from my previous post cause it looks like emojis were inserted into the code

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×
×
  • Create New...