Coverage for src/iptvtools/models.py: 11%
155 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-31 13:48 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-31 13:48 +0000
1#!/usr/bin/env python
2"""Playlist which contains all the channels' information.
4File: models.py
5Author: huxuan
6Email: i(at)huxuan.org
7"""
9import logging
10import os.path
11import random
12import re
13import sys
14import time
15from typing import Any
17from tqdm import tqdm
19from iptvtools import parsers, utils
20from iptvtools.constants import defaults, tags
23class Playlist:
24 """Playlist model."""
26 def __init__(
27 self,
28 channel_exclude: str,
29 channel_include: str,
30 group_exclude: str,
31 group_include: str,
32 max_height: int,
33 min_height: int,
34 inputs: list[str],
35 interval: int,
36 skip_connectivity_check: bool,
37 output: str,
38 replace_group_by_source: bool,
39 resolution_on_title: bool,
40 sort_keys: list[str],
41 templates: list[str],
42 timeout: int,
43 udpxy: str,
44 ) -> None:
45 """Init for Playlist."""
46 self.channel_exclude = channel_exclude
47 self.channel_include = channel_include
48 self.group_exclude = group_exclude
49 self.group_include = group_include
50 self.max_height = max_height
51 self.min_height = min_height
52 self.inputs = inputs
53 self.interval = interval
54 self.skip_connectivity_check = skip_connectivity_check
55 self.output = output
56 self.replace_group_by_source = replace_group_by_source
57 self.resolution_on_title = resolution_on_title
58 self.sort_keys = sort_keys
59 self.templates = templates
60 self.timeout = timeout
61 self.udpxy = udpxy
62 self.data: dict[str, Any] = {}
63 self.id_url: dict[str, Any] = {}
64 self.inaccessible_urls: set[str] = set()
65 self.low_res_urls: set[str] = set()
66 self.high_res_urls: set[str] = set()
67 self.tvg_url = None
69 def export(self) -> None:
70 """Export playlist information."""
71 res = []
72 res.append(tags.M3U)
73 if self.tvg_url is not None:
74 res[0] += f' x-tvg-url="{self.tvg_url}"'
75 for url in sorted(self.data, key=self.__custom_sort):
76 if (
77 url in self.inaccessible_urls
78 or url in self.low_res_urls
79 or url in self.high_res_urls
80 ):
81 continue
83 entry = self.data[url]
84 params_dict = entry.get("params", {})
85 if self.replace_group_by_source:
86 params_dict["group-title"] = self.data[url]["source"]
87 params = " ".join(
88 [f'{key}="{value}"' for key, value in params_dict.items()]
89 )
90 duration = entry["duration"]
91 title = entry["title"]
92 if self.resolution_on_title:
93 height = self.data[url].get("height")
94 title += f" [{utils.height_to_resolution(height)}]"
96 res.append(f"{tags.INF}:{duration} {params},{title}\n{url}")
98 with open(self.output, "w", encoding="utf-8") as f:
99 f.write("\n".join(res))
101 def parse(self) -> None:
102 """Parse contents."""
103 self._parse(self.inputs)
104 logging.debug(self.data)
105 self._parse(self.templates, is_template=True)
106 logging.debug(self.data)
108 def _parse(self, sources: list[str], is_template: bool = False) -> None:
109 """Parse playlist sources."""
110 template_order = 0
111 for source in sources:
112 source_name = os.path.splitext(os.path.basename(source))[0]
113 current_item = {}
114 skip = False
115 is_first_line = True
116 for line in parsers.parse_content_to_lines(source):
117 if not line:
118 continue
119 if is_first_line:
120 is_first_line = False
121 if line.startswith(tags.M3U):
122 res = parsers.parse_tag_m3u(line)
123 if res.get("tvg-url"):
124 self.tvg_url = res.get("tvg-url")
125 continue
126 if skip:
127 skip = False
128 continue
129 if line.startswith(tags.INF):
130 current_item = parsers.parse_tag_inf(line)
131 current_item = utils.unify_title_and_id(current_item)
132 current_id = current_item["id"]
134 params = current_item.get("params", {})
135 group = params.get("group-title", "")
136 if not skip and self.group_include:
137 if re.search(self.group_include, group):
138 logging.debug(f"Group to include: `{group}`.")
139 else:
140 skip = True
141 if (
142 not skip
143 and self.group_exclude
144 and re.search(self.group_exclude, group)
145 ):
146 skip = True
147 logging.debug(f"Group to exclude: `{group}`.")
149 title = current_item.get("title", "")
150 if not skip and self.channel_include:
151 if re.search(self.channel_include, title):
152 logging.debug(f"Channel to include: `{title}`.")
153 else:
154 skip = True
155 if (
156 not skip
157 and self.channel_exclude
158 and re.search(self.channel_exclude, title)
159 ):
160 skip = True
161 logging.debug(f"Channel to exclude: `{title}`.")
163 else:
164 if is_template:
165 template_order = template_order + 1
166 for url in self.id_url.get(current_id, []):
167 current_params = current_item["params"]
168 current_params["template-order"] = template_order
169 self.data[url]["params"].update(current_params)
170 self.data[url]["title"] = current_item["title"]
171 else:
172 if self.udpxy:
173 line = utils.convert_url_with_udpxy(line, self.udpxy)
174 current_item["source"] = source_name
175 self.data[line] = current_item
177 if current_id not in self.id_url:
178 self.id_url[current_id] = []
179 self.id_url[current_id].append(line)
181 def filter(self) -> None:
182 """Filter process."""
183 urls = list(self.data.keys())
184 random.shuffle(urls)
185 pbar = tqdm(urls, ascii=True)
186 for url in pbar:
187 status = "OK"
188 time.sleep(self.interval)
189 if self.skip_connectivity_check:
190 status = "Skipped"
191 elif self.max_height or self.min_height or self.resolution_on_title:
192 height = utils.check_stream(url, self.timeout)
193 if height == 0:
194 self.inaccessible_urls.add(url)
195 status = "Inaccessible (0 height)"
196 elif height < self.min_height:
197 self.low_res_urls.add(url)
198 status = "Low Resolution"
199 elif (
200 self.max_height != defaults.MAX_HEIGHT and height > self.max_height
201 ):
202 self.high_res_urls.add(url)
203 status = "High Resolution"
204 self.data[url]["height"] = height
205 elif not utils.check_connectivity(url, self.timeout):
206 self.inaccessible_urls.add(url)
207 status = "Inaccessible (No connectivity)"
208 pbar.write(f"{url}, {status}!")
210 def __custom_sort(self, url: str) -> list[Any]:
211 """Sort by tvg-id, resolution, template-order and title."""
212 res = []
213 for key in self.sort_keys:
214 entry = self.data[url]
215 if key == "height":
216 res.append(-entry.get(key, 0))
217 elif key == "title":
218 res.append(entry.get(key, ""))
219 elif key == "tvg-id":
220 res.append(
221 int(re.sub(r"\D", "", entry["params"].get(key, "")) or sys.maxsize)
222 )
223 elif key == "template-order":
224 res.append(int(entry["params"].get(key) or sys.maxsize))
225 elif key == "group-title":
226 res.append(entry["params"].get(key) or "")
227 return res