Coverage for src/iptvtools/utils.py: 23%
82 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-31 13:48 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-31 13:48 +0000
1#!/usr/bin/env python
2"""Relevant Utilities.
4File: utils.py
5Author: huxuan
6Email: i(at)huxuan.org
7"""
9import json
10import logging
11import socket
12import struct
13from subprocess import (
14 PIPE,
15 Popen,
16 TimeoutExpired,
17)
18from typing import Any
19from urllib.parse import urlparse
21import requests
23from iptvtools.config import Config
25PROBE_COMMAND = (
26 "ffprobe -hide_banner -show_streams -select_streams v -of json=c=1 -v quiet"
27)
29UDP_SCHEME = (
30 "udp",
31 "rtp",
32)
35def convert_url_with_udpxy(orig_url: str, udpxy: str) -> str:
36 """Convert url with udpxy."""
37 parsed_url = urlparse(orig_url)
38 if parsed_url.scheme in UDP_SCHEME:
39 return f"{udpxy}/{parsed_url.scheme}/{parsed_url.netloc}"
40 return orig_url
43def unify_title_and_id(item: dict[str, Any]) -> dict[str, Any]:
44 """Unify title and id."""
45 for title_unifier in sorted(Config.title_unifiers):
46 if title_unifier in item["title"]:
47 item["title"] = item["title"].replace(
48 title_unifier, Config.title_unifiers[title_unifier]
49 )
51 if "tvg-name" in item.get("params", {}):
52 item["id"] = item["params"]["tvg-name"]
53 else:
54 item["id"] = item["title"]
56 for id_unifier in sorted(Config.id_unifiers):
57 if id_unifier in item["id"]:
58 item["id"] = item["id"].replace(id_unifier, Config.id_unifiers[id_unifier])
60 return item
63def probe(url: str, timeout: int | None = None) -> Any:
64 """Invoke probe to get stream information."""
65 outs = None
66 with Popen( # noqa: S603
67 f"{PROBE_COMMAND} {url}".split(), stdout=PIPE, stderr=PIPE
68 ) as proc:
69 try:
70 outs, _ = proc.communicate(timeout=timeout)
71 except TimeoutExpired:
72 proc.kill()
73 if outs:
74 try:
75 return json.loads(outs.decode("utf-8"))
76 except json.JSONDecodeError as exc:
77 logging.error(exc)
78 return None
81def check_stream(url: str, timeout: int | None = None) -> int:
82 """Check stream information and return height."""
83 stream_info = probe(url, timeout)
84 if stream_info and stream_info.get("streams"):
85 return max([int(stream.get("height", 0)) for stream in stream_info["streams"]])
86 return 0
89def check_connectivity(url: str, timeout: int | None = None) -> bool:
90 """Check connectivity."""
91 parsed_url = urlparse(url)
92 if parsed_url.scheme in UDP_SCHEME:
93 return check_udp_connectivity(parsed_url.netloc, timeout)
94 return check_http_connectivity(url, timeout)
97def check_udp_connectivity(url: str, timeout: int | None = None) -> bool:
98 """Check UDP connectivity."""
99 ipaddr, port = url.rsplit(":", 1)
100 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
101 sock.settimeout(timeout)
102 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
103 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
104 sock.bind(("", int(port)))
105 mreq = struct.pack("4sl", socket.inet_aton(ipaddr), socket.INADDR_ANY)
106 sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
107 try:
108 if sock.recv(10240):
109 return True
110 except TimeoutError:
111 pass
112 return False
115def check_http_connectivity(url: str, timeout: int | None = None) -> bool:
116 """Check HTTP connectivity."""
117 try:
118 return requests.get(url, timeout=timeout, stream=True).ok
119 except requests.RequestException:
120 return False
123def height_to_resolution(height: int) -> str:
124 """Convert height to resolution."""
125 if not height:
126 return ""
127 if height >= 4320:
128 return "8K"
129 if height >= 2160:
130 return "4K"
131 if height >= 1080:
132 return "1080p"
133 if height >= 720:
134 return "720p"
135 return f"{height}p"