Coverage for src/iptvtools/utils.py: 23%

82 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-31 13:48 +0000

1#!/usr/bin/env python 

2"""Relevant Utilities. 

3 

4File: utils.py 

5Author: huxuan 

6Email: i(at)huxuan.org 

7""" 

8 

9import json 

10import logging 

11import socket 

12import struct 

13from subprocess import ( 

14 PIPE, 

15 Popen, 

16 TimeoutExpired, 

17) 

18from typing import Any 

19from urllib.parse import urlparse 

20 

21import requests 

22 

23from iptvtools.config import Config 

24 

25PROBE_COMMAND = ( 

26 "ffprobe -hide_banner -show_streams -select_streams v -of json=c=1 -v quiet" 

27) 

28 

29UDP_SCHEME = ( 

30 "udp", 

31 "rtp", 

32) 

33 

34 

35def convert_url_with_udpxy(orig_url: str, udpxy: str) -> str: 

36 """Convert url with udpxy.""" 

37 parsed_url = urlparse(orig_url) 

38 if parsed_url.scheme in UDP_SCHEME: 

39 return f"{udpxy}/{parsed_url.scheme}/{parsed_url.netloc}" 

40 return orig_url 

41 

42 

43def unify_title_and_id(item: dict[str, Any]) -> dict[str, Any]: 

44 """Unify title and id.""" 

45 for title_unifier in sorted(Config.title_unifiers): 

46 if title_unifier in item["title"]: 

47 item["title"] = item["title"].replace( 

48 title_unifier, Config.title_unifiers[title_unifier] 

49 ) 

50 

51 if "tvg-name" in item.get("params", {}): 

52 item["id"] = item["params"]["tvg-name"] 

53 else: 

54 item["id"] = item["title"] 

55 

56 for id_unifier in sorted(Config.id_unifiers): 

57 if id_unifier in item["id"]: 

58 item["id"] = item["id"].replace(id_unifier, Config.id_unifiers[id_unifier]) 

59 

60 return item 

61 

62 

63def probe(url: str, timeout: int | None = None) -> Any: 

64 """Invoke probe to get stream information.""" 

65 outs = None 

66 with Popen( # noqa: S603 

67 f"{PROBE_COMMAND} {url}".split(), stdout=PIPE, stderr=PIPE 

68 ) as proc: 

69 try: 

70 outs, _ = proc.communicate(timeout=timeout) 

71 except TimeoutExpired: 

72 proc.kill() 

73 if outs: 

74 try: 

75 return json.loads(outs.decode("utf-8")) 

76 except json.JSONDecodeError as exc: 

77 logging.error(exc) 

78 return None 

79 

80 

81def check_stream(url: str, timeout: int | None = None) -> int: 

82 """Check stream information and return height.""" 

83 stream_info = probe(url, timeout) 

84 if stream_info and stream_info.get("streams"): 

85 return max([int(stream.get("height", 0)) for stream in stream_info["streams"]]) 

86 return 0 

87 

88 

89def check_connectivity(url: str, timeout: int | None = None) -> bool: 

90 """Check connectivity.""" 

91 parsed_url = urlparse(url) 

92 if parsed_url.scheme in UDP_SCHEME: 

93 return check_udp_connectivity(parsed_url.netloc, timeout) 

94 return check_http_connectivity(url, timeout) 

95 

96 

97def check_udp_connectivity(url: str, timeout: int | None = None) -> bool: 

98 """Check UDP connectivity.""" 

99 ipaddr, port = url.rsplit(":", 1) 

100 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 

101 sock.settimeout(timeout) 

102 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

103 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

104 sock.bind(("", int(port))) 

105 mreq = struct.pack("4sl", socket.inet_aton(ipaddr), socket.INADDR_ANY) 

106 sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 

107 try: 

108 if sock.recv(10240): 

109 return True 

110 except TimeoutError: 

111 pass 

112 return False 

113 

114 

115def check_http_connectivity(url: str, timeout: int | None = None) -> bool: 

116 """Check HTTP connectivity.""" 

117 try: 

118 return requests.get(url, timeout=timeout, stream=True).ok 

119 except requests.RequestException: 

120 return False 

121 

122 

123def height_to_resolution(height: int) -> str: 

124 """Convert height to resolution.""" 

125 if not height: 

126 return "" 

127 if height >= 4320: 

128 return "8K" 

129 if height >= 2160: 

130 return "4K" 

131 if height >= 1080: 

132 return "1080p" 

133 if height >= 720: 

134 return "720p" 

135 return f"{height}p"