杨记

碎片化学习令人焦虑,系统化学习使人进步

0%

m3u8视频爬取

M3U8文件IV值及KEY获取 - 走看看 (zoukankan.com)

使用openssl m3u8解密 EXT-X-KEY:METHOD=AES-128,URI=xxx_gdut17的博客-CSDN博客_ext-x-key

环境篇-Windows下安装OpenSSL_百里杨的博客-CSDN博客_openssl windows

ts

ts是日本高清摄像机拍摄下进行的封装格式,全称为MPEG2-TS。ts即”Transport Stream”的缩写。MPEG2-TS格式的特点就是要求从视频流的任一片段开始都是可以独立解码的。大多数在线播放的视频使用ts格式作流媒体传输。

m3u8

M3U8 是 Unicode 版本的 M3U,用 UTF-8 编码。“M3U” 和 “M3U8” 文件都是苹果公司使用的 HTTP Live Streaming(HLS) 协议格式的基础,这种协议格式可以在 iPhone 和 Macbook 等设备播放。

通常m3u8包含了若干个ts文件的名称,按播放顺序有序排列,还包括版本、是否加密等信息。

image-20221102121449280

电脑浏览器输入m3u8的url不能直接播放,而会作为文本文件下载。如:https://cdn.zoubuting.com/20221001/v15iQV88/1000kb/hls/index.m3u8

可以在浏览器上安装插件,如Native HLS Playback,即可通过m3u8网址直接播放视频

可能的情况

1)有两层m3u8文件

image-20221102153517747

2)视频没有加密,下完只要合并ts就行

image-20221102153624467

3)有加密和Key,但是没有IV,则IV的长度和Key对齐,全部填0

image-20221102153828472

爬取步骤

  1. 下载m3u8文件,下载m3u8文件中的所有ts视频
  2. 获取密钥key和偏移量IV,对所有切片ts解密
  3. 合并所有的ts为一个视频

对上图中的案例:

解密步骤

1)获取key文件的16进制值:

image-20221102145822913

如果有linux服务器的话,也可以使用hexdump -v -e '16/1' "%02x" 文件名来查看

image-20221102150654958

2)获取IV值

文件中已经有了IV的值,我们只需要去掉前面0x,然后取前16位的字符作为iv,如果m3u8文件中没有IV值,则是16个0

3)用openssl工具对ts视频文件进行解密

1
2
openssl.exe aes-128-cbc -d -in 加密的ts视频文件路径 -out 输出的ts视频文件目录 -K 第一步取到的16进制 -iv 第二步取到的值
openssl.exe aes-128-cbc -d -in 0.ts -out 1.ts -K 5d18d1c81249c55f52a9669e9086f3de -iv fae721f7c61ccb9f8d986454e8e923b9

合并ts视频

windows下可以使用终端命令:

1
2
copy /b E:\\video\\*.ts E:\\合并的视频.ts	# ts视频一定要是有序的
copy /b E:\\video\\10001.ts+E:\\video\\10002.ts E:\\合并的视频.ts # 合并10001.ts和10002.ts

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# 使用说明:输入 m3u8的url 和 最终合并的ts视频名称
import shutil
import requests
import re
import asyncio
import aiohttp
import aiofiles
import os
from Crypto.Cipher import AES


old_dir = 'E:\\ts'
dec_dir = 'E:\\dects'
m3u8_path = 'E:\\Code\\Python\\sources\\index.m3u8'
save_path = 'E:\\video'
loop = asyncio.get_event_loop() # 异步准备


# m3u8文件处理类
class m3u8:
def __init__(self, url=None, m3u8path=None, dirPath=None, videoPath=None, headers=None):
self.m3u8_url = url
self.m3u8_path = m3u8path
self.ts_dir = dirPath
self.video_path = videoPath
self.headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'} if headers is None else headers

# 下载m3u8文件,url为m3u8文件地址,m3u8_path为保存的路径
def download_m3u8(self):
if self.m3u8_url is None:
return None
resp = session.get(url=self.m3u8_url, headers=self.headers)
resp.close()
with open(self.m3u8_path, mode='w', encoding='utf-8') as f:
f.write(resp.text)
print("index.m3u8下载完成")
return self.m3u8_path

# 统计m3u8时间,m3u8_path为文件路径,返回值类型为float
def countTime(self):
timeLen = 0.0
with open(self.m3u8_path, 'r', encoding='utf-8') as f:
for line in f:
if line.startswith('#EXTINF:'):
timeLen = Decimal(str(timeLen)) + Decimal(line.strip('#EXTINF:,\n'))
return timeLen

# 下载ts入口
def start_download_TS(self, loop):
self.start = 10000001
self.end = loop.run_until_complete(self.__aio_download__(loop)) # end = 10000943
return [self.start, self.end]

# 读取m3u8文件 创建异步任务 每个任务都是下载单个ts
async def __aio_download__(self, loop):
print("切片ts下载中...")
tasks = []
tasksPoolDic = {}
ts_name = self.start
connector = aiohttp.TCPConnector(limit=16) # 将并发数量降低
timeout = aiohttp.ClientTimeout(total=1000) # 将超时时间秒
async with aiohttp.ClientSession(connector=connector, timeout=timeout, headers=self.headers) as aSession:
async with aiofiles.open(self.m3u8_path, mode='r', encoding='utf-8') as f:
async for line in f:
if line.startswith('#'):
continue
ts_url = line.strip()
tasksPoolDic[ts_name] = ts_url # 将每个要下载的url和对应编号放入字典
task = loop.create_task(self.__download_ts__(ts_url, ts_name, aSession))
tasks.append(task)
ts_name += 1
await asyncio.wait(tasks)
print('所有切片ts下载完成')
return ts_name

# 下载ts文件
async def __download_ts__(self, url, name, aSession):
async with aSession.get(url=url) as resp:
async with aiofiles.open(f'{self.ts_dir}\\{name}.ts', mode='wb') as f:
await f.write(await resp.content.read())

# 解密ts入口
def start_dec_ts(self):
key_IV = self.__get_key__()
if key_IV is None:
print('无加密')
return None
print("key = %s \t IV = %s" % (key_IV[0], key_IV[1]))
self.__dec_ts__(key_IV[0], key_IV[1])
print("切片ts解密完成")
return True

# 获取密钥,返回列表[key, IV],无加密返回None
def __get_key__(self):
# 获取密钥
key_url = ''
IV = "0000000000000000"
with open(self.m3u8_path, mode='r', encoding='utf-8') as f:
key_ret = re.search(r'URI="(.*?)"', f.read())
if key_ret is None:
return None
key_url = key_ret.group(1)
IV_ret = re.search(r'IV=(.*?)\n', f.read())
if IV_ret is not None:
IV = IV_ret.group(1)
resp = session.get(key_url, headers=self.headers)
resp.close()
key = resp.content
if IV.startswith('0x'):
IV = IV[2:]
return [key, IV]

# 解密视频
def __dec_ts__(self, key, IV):
aes = AES.new(key=key[:16], IV=bytes(IV, encoding='utf-8')[:16], mode=AES.MODE_CBC)
for tsName in range(self.start, self.end):
with open(f'{self.ts_dir}\\{tsName}.ts', 'rb') as fr:
fcontent = fr.read()
with open(f'{self.ts_dir}\\{tsName}.ts', 'wb') as fw:
fw.write(aes.decrypt(fcontent))

def merge_ts(self):
os.system(f'copy/b {self.ts_dir}\\*.ts "{self.video_path}" > NUL')
print(f'{self.video_path}文件合并完成')


if __name__ == '__main__':


欢迎关注我的其它发布渠道