1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| import shutil import requests import re import asyncio import aiohttp import aiofiles import os from Crypto.Cipher import AES
old_dir = 'E:\\ts' dec_dir = 'E:\\dects' m3u8_path = 'E:\\Code\\Python\\sources\\index.m3u8' save_path = 'E:\\video' loop = asyncio.get_event_loop()
class m3u8: def __init__(self, url=None, m3u8path=None, dirPath=None, videoPath=None, headers=None): self.m3u8_url = url self.m3u8_path = m3u8path self.ts_dir = dirPath self.video_path = videoPath self.headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'} if headers is None else headers
def download_m3u8(self): if self.m3u8_url is None: return None resp = session.get(url=self.m3u8_url, headers=self.headers) resp.close() with open(self.m3u8_path, mode='w', encoding='utf-8') as f: f.write(resp.text) print("index.m3u8下载完成") return self.m3u8_path
def countTime(self): timeLen = 0.0 with open(self.m3u8_path, 'r', encoding='utf-8') as f: for line in f: if line.startswith('#EXTINF:'): timeLen = Decimal(str(timeLen)) + Decimal(line.strip('#EXTINF:,\n')) return timeLen
def start_download_TS(self, loop): self.start = 10000001 self.end = loop.run_until_complete(self.__aio_download__(loop)) return [self.start, self.end]
async def __aio_download__(self, loop): print("切片ts下载中...") tasks = [] tasksPoolDic = {} ts_name = self.start connector = aiohttp.TCPConnector(limit=16) timeout = aiohttp.ClientTimeout(total=1000) async with aiohttp.ClientSession(connector=connector, timeout=timeout, headers=self.headers) as aSession: async with aiofiles.open(self.m3u8_path, mode='r', encoding='utf-8') as f: async for line in f: if line.startswith('#'): continue ts_url = line.strip() tasksPoolDic[ts_name] = ts_url task = loop.create_task(self.__download_ts__(ts_url, ts_name, aSession)) tasks.append(task) ts_name += 1 await asyncio.wait(tasks) print('所有切片ts下载完成') return ts_name
async def __download_ts__(self, url, name, aSession): async with aSession.get(url=url) as resp: async with aiofiles.open(f'{self.ts_dir}\\{name}.ts', mode='wb') as f: await f.write(await resp.content.read())
def start_dec_ts(self): key_IV = self.__get_key__() if key_IV is None: print('无加密') return None print("key = %s \t IV = %s" % (key_IV[0], key_IV[1])) self.__dec_ts__(key_IV[0], key_IV[1]) print("切片ts解密完成") return True
def __get_key__(self): key_url = '' IV = "0000000000000000" with open(self.m3u8_path, mode='r', encoding='utf-8') as f: key_ret = re.search(r'URI="(.*?)"', f.read()) if key_ret is None: return None key_url = key_ret.group(1) IV_ret = re.search(r'IV=(.*?)\n', f.read()) if IV_ret is not None: IV = IV_ret.group(1) resp = session.get(key_url, headers=self.headers) resp.close() key = resp.content if IV.startswith('0x'): IV = IV[2:] return [key, IV]
def __dec_ts__(self, key, IV): aes = AES.new(key=key[:16], IV=bytes(IV, encoding='utf-8')[:16], mode=AES.MODE_CBC) for tsName in range(self.start, self.end): with open(f'{self.ts_dir}\\{tsName}.ts', 'rb') as fr: fcontent = fr.read() with open(f'{self.ts_dir}\\{tsName}.ts', 'wb') as fw: fw.write(aes.decrypt(fcontent))
def merge_ts(self): os.system(f'copy/b {self.ts_dir}\\*.ts "{self.video_path}" > NUL') print(f'{self.video_path}文件合并完成')
if __name__ == '__main__':
|