Coverage for src/iptvtools/models.py: 11%

155 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-31 13:48 +0000

1#!/usr/bin/env python 

2"""Playlist which contains all the channels' information. 

3 

4File: models.py 

5Author: huxuan 

6Email: i(at)huxuan.org 

7""" 

8 

9import logging 

10import os.path 

11import random 

12import re 

13import sys 

14import time 

15from typing import Any 

16 

17from tqdm import tqdm 

18 

19from iptvtools import parsers, utils 

20from iptvtools.constants import defaults, tags 

21 

22 

23class Playlist: 

24 """Playlist model.""" 

25 

26 def __init__( 

27 self, 

28 channel_exclude: str, 

29 channel_include: str, 

30 group_exclude: str, 

31 group_include: str, 

32 max_height: int, 

33 min_height: int, 

34 inputs: list[str], 

35 interval: int, 

36 skip_connectivity_check: bool, 

37 output: str, 

38 replace_group_by_source: bool, 

39 resolution_on_title: bool, 

40 sort_keys: list[str], 

41 templates: list[str], 

42 timeout: int, 

43 udpxy: str, 

44 ) -> None: 

45 """Init for Playlist.""" 

46 self.channel_exclude = channel_exclude 

47 self.channel_include = channel_include 

48 self.group_exclude = group_exclude 

49 self.group_include = group_include 

50 self.max_height = max_height 

51 self.min_height = min_height 

52 self.inputs = inputs 

53 self.interval = interval 

54 self.skip_connectivity_check = skip_connectivity_check 

55 self.output = output 

56 self.replace_group_by_source = replace_group_by_source 

57 self.resolution_on_title = resolution_on_title 

58 self.sort_keys = sort_keys 

59 self.templates = templates 

60 self.timeout = timeout 

61 self.udpxy = udpxy 

62 self.data: dict[str, Any] = {} 

63 self.id_url: dict[str, Any] = {} 

64 self.inaccessible_urls: set[str] = set() 

65 self.low_res_urls: set[str] = set() 

66 self.high_res_urls: set[str] = set() 

67 self.tvg_url = None 

68 

69 def export(self) -> None: 

70 """Export playlist information.""" 

71 res = [] 

72 res.append(tags.M3U) 

73 if self.tvg_url is not None: 

74 res[0] += f' x-tvg-url="{self.tvg_url}"' 

75 for url in sorted(self.data, key=self.__custom_sort): 

76 if ( 

77 url in self.inaccessible_urls 

78 or url in self.low_res_urls 

79 or url in self.high_res_urls 

80 ): 

81 continue 

82 

83 entry = self.data[url] 

84 params_dict = entry.get("params", {}) 

85 if self.replace_group_by_source: 

86 params_dict["group-title"] = self.data[url]["source"] 

87 params = " ".join( 

88 [f'{key}="{value}"' for key, value in params_dict.items()] 

89 ) 

90 duration = entry["duration"] 

91 title = entry["title"] 

92 if self.resolution_on_title: 

93 height = self.data[url].get("height") 

94 title += f" [{utils.height_to_resolution(height)}]" 

95 

96 res.append(f"{tags.INF}:{duration} {params},{title}\n{url}") 

97 

98 with open(self.output, "w", encoding="utf-8") as f: 

99 f.write("\n".join(res)) 

100 

101 def parse(self) -> None: 

102 """Parse contents.""" 

103 self._parse(self.inputs) 

104 logging.debug(self.data) 

105 self._parse(self.templates, is_template=True) 

106 logging.debug(self.data) 

107 

108 def _parse(self, sources: list[str], is_template: bool = False) -> None: 

109 """Parse playlist sources.""" 

110 template_order = 0 

111 for source in sources: 

112 source_name = os.path.splitext(os.path.basename(source))[0] 

113 current_item = {} 

114 skip = False 

115 is_first_line = True 

116 for line in parsers.parse_content_to_lines(source): 

117 if not line: 

118 continue 

119 if is_first_line: 

120 is_first_line = False 

121 if line.startswith(tags.M3U): 

122 res = parsers.parse_tag_m3u(line) 

123 if res.get("tvg-url"): 

124 self.tvg_url = res.get("tvg-url") 

125 continue 

126 if skip: 

127 skip = False 

128 continue 

129 if line.startswith(tags.INF): 

130 current_item = parsers.parse_tag_inf(line) 

131 current_item = utils.unify_title_and_id(current_item) 

132 current_id = current_item["id"] 

133 

134 params = current_item.get("params", {}) 

135 group = params.get("group-title", "") 

136 if not skip and self.group_include: 

137 if re.search(self.group_include, group): 

138 logging.debug(f"Group to include: `{group}`.") 

139 else: 

140 skip = True 

141 if ( 

142 not skip 

143 and self.group_exclude 

144 and re.search(self.group_exclude, group) 

145 ): 

146 skip = True 

147 logging.debug(f"Group to exclude: `{group}`.") 

148 

149 title = current_item.get("title", "") 

150 if not skip and self.channel_include: 

151 if re.search(self.channel_include, title): 

152 logging.debug(f"Channel to include: `{title}`.") 

153 else: 

154 skip = True 

155 if ( 

156 not skip 

157 and self.channel_exclude 

158 and re.search(self.channel_exclude, title) 

159 ): 

160 skip = True 

161 logging.debug(f"Channel to exclude: `{title}`.") 

162 

163 else: 

164 if is_template: 

165 template_order = template_order + 1 

166 for url in self.id_url.get(current_id, []): 

167 current_params = current_item["params"] 

168 current_params["template-order"] = template_order 

169 self.data[url]["params"].update(current_params) 

170 self.data[url]["title"] = current_item["title"] 

171 else: 

172 if self.udpxy: 

173 line = utils.convert_url_with_udpxy(line, self.udpxy) 

174 current_item["source"] = source_name 

175 self.data[line] = current_item 

176 

177 if current_id not in self.id_url: 

178 self.id_url[current_id] = [] 

179 self.id_url[current_id].append(line) 

180 

181 def filter(self) -> None: 

182 """Filter process.""" 

183 urls = list(self.data.keys()) 

184 random.shuffle(urls) 

185 pbar = tqdm(urls, ascii=True) 

186 for url in pbar: 

187 status = "OK" 

188 time.sleep(self.interval) 

189 if self.skip_connectivity_check: 

190 status = "Skipped" 

191 elif self.max_height or self.min_height or self.resolution_on_title: 

192 height = utils.check_stream(url, self.timeout) 

193 if height == 0: 

194 self.inaccessible_urls.add(url) 

195 status = "Inaccessible (0 height)" 

196 elif height < self.min_height: 

197 self.low_res_urls.add(url) 

198 status = "Low Resolution" 

199 elif ( 

200 self.max_height != defaults.MAX_HEIGHT and height > self.max_height 

201 ): 

202 self.high_res_urls.add(url) 

203 status = "High Resolution" 

204 self.data[url]["height"] = height 

205 elif not utils.check_connectivity(url, self.timeout): 

206 self.inaccessible_urls.add(url) 

207 status = "Inaccessible (No connectivity)" 

208 pbar.write(f"{url}, {status}!") 

209 

210 def __custom_sort(self, url: str) -> list[Any]: 

211 """Sort by tvg-id, resolution, template-order and title.""" 

212 res = [] 

213 for key in self.sort_keys: 

214 entry = self.data[url] 

215 if key == "height": 

216 res.append(-entry.get(key, 0)) 

217 elif key == "title": 

218 res.append(entry.get(key, "")) 

219 elif key == "tvg-id": 

220 res.append( 

221 int(re.sub(r"\D", "", entry["params"].get(key, "")) or sys.maxsize) 

222 ) 

223 elif key == "template-order": 

224 res.append(int(entry["params"].get(key) or sys.maxsize)) 

225 elif key == "group-title": 

226 res.append(entry["params"].get(key) or "") 

227 return res