杨记

碎片化学习令人焦虑,系统化学习使人进步

0%

高性能异步爬虫

线程和进程

线程池和进程池

协程

多线程

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
#  多线程
from threading import Thread # 线程类

def func():
for i in range(1000):
print("func", i)

if __name__ == '__main__':
t = Thread(target=func) # 创建线程并给线程安排任务
t.start() # 多线程状态为可以开始工作状态, 具体的执行时间由CPU决定

for i in range(1000):
print("main", i)

继承Thread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#  多线程
from threading import Thread # 线程类

class MyThread(Thread): # 继承线程类
def run(self): # 固定的 -> 当线程被执行的时候, 被执行的就是run()
for i in range(1000):
print("子线程", i)


if __name__ == '__main__':
t = MyThread()
# t.run() # 方法的调用了. -> 单线程????
t.start() # 开启线程

for i in range(1000):
print("主线程", i)

多进程

示例:

第一种写法:

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process

def func():
for i in range(1000):
print("子进程", i)


if __name__ == '__main__':
p = Process(target=func)
p.start()
for i in range(1000):
print("主进程", i)

第二种写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process

class MyProcess(Process):
def run(self):
for i in range(1000):
print("MyProcess", i)


if __name__ == "__main__":
t = MyProcess()
t.start()
for i in range(1000):
print("main", i)

传参问题

1
2
3
4
5
6
7
8
9
10
11
12
13
from threading import Thread

def func(name):
for i in range(1000):
print(name, i)


if __name__ == '__main__':
t1 = Thread(target=func, args=("周杰伦",)) # 传递参数必须是元组
t1.start()

t2 = Thread(target=func, args=("王力宏",))
t2.start()

Python中,线程池和进程池的写法几乎一样,只不过线程池是ThreadPoolExecutor,进程池是ProcessPoolExecutor

线程池

存放固定数量的线程,由线程池分配任务

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from concurrent.futures import ThreadPoolExecutor


def fn(name):
for i in range(1000):
print(name, i)


if __name__ == '__main__':
# 创建线程池
with ThreadPoolExecutor(50) as t:
for i in range(100):
t.submit(fn, name=f"线程{i}")
# 等待线程池中的任务全部执行完毕. 才继续执行(守护)
print("123")

例2:使用的库不同,方法也不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
# 导入线程池模块对应的类
from multiprocessing.dummy import Pool
# 使用线程池方式执行
start_time = time.time()


def get_page(str):
print("正在下载 :", str)
time.sleep(2)
print('下载成功:', str)


name_list = ['xiaozi', 'aa', 'bb', 'cc']

# 实例化一个线程池对象
pool = Pool(4)
# 将列表中每一个列表元素传递给get_page进行处理。
pool.map(get_page, name_list)
pool.close()
pool.join()
end_time = time.time()
print(end_time - start_time)

进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import ProcessPoolExecutor

def fn(name):
for i in range(1000):
print(name, i)


if __name__ == '__main__':
# 创建进程池
with ProcessPoolExecutor(50) as t:
for i in range(100):
t.submit(fn, name=f"线程{i}")
# 等待进程池中的任务全部执行完毕. 才继续执行(守护)
print("123")

协程

宏观上是多线程,微观上是单线程,对CPU的利用率高。

1
2
3
4
5
6
7
8
9
import asyncio

async def func():
print("你好啊, 我叫赛利亚")

if __name__ == '__main__':
g = func() # 此时的函数是异步协程函数. 此时函数执行得到的是一个协程对象
# print(g)
asyncio.run(g) # 协程程序运行需要asyncio模块的支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio

async def request(url):
print('正在请求的url是', url)
print('请求成功,', url)
return url

# async修饰的函数,调用之后返回的一个协程对象
c = request('www.baidu.com')

# 创建一个事件循环对象
loop = asyncio.get_event_loop()

# 将协程对象注册到loop中,然后启动loop
loop.run_until_complete(c)

# # task的使用
# loop = asyncio.get_event_loop()
# task = loop.create_task(c) # 基于loop创建了一个task对象
# print(task)
# loop.run_until_complete(task)
# print(task)

# # future的使用
# loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(c)
# print(task)
# loop.run_until_complete(task)
# print(task.result())


def callback_func(task):
# result返回的就是任务对象中封装的协程对象对应函数的返回值
print(task.result())


# 绑定回调
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
# 将回调函数绑定到任务对象中
task.add_done_callback(callback_func)
loop.run_until_complete(task)

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
import time

async def func1():
print("你好啊, 我叫潘金莲")
# time.sleep(3) # 当程序出现了同步操作的时候. 异步就中断了
await asyncio.sleep(3) # 异步操作的代码
print("你好啊, 我叫潘金莲")
return 'func1'

async def func2():
print("你好啊, 我叫王建国")
await asyncio.sleep(2)
print("你好啊, 我叫王建国")
return 'func2'

async def func3():
print("你好啊, 我叫李雪琴")
await asyncio.sleep(4)
print("你好啊, 我叫李雪琴")
return 'func3'

# 写法一:
# if __name__ == '__main__':
# f1 = func1()
# f2 = func2()
# f3 = func3()
# tasks = [
# f1, f2, f3
# ]
# t1 = time.time()
# # 一次性启动多个任务(协程)
# asyncio.run(asyncio.wait(tasks))
# t2 = time.time()
# print(t2 - t1)

# 写法二:
async def main():
# 第一种写法
# f1 = func1()
# await f1 # 一般await挂起操作放在协程对象前面
# 第二种写法(推荐)
tasks = [
asyncio.create_task(func1()), # py3.8以后加上asyncio.create_task()
asyncio.create_task(func2()),
asyncio.create_task(func3())
]
await asyncio.wait(tasks)
# 协程输出返回值
for task in tasks:
print(task.result())
return 'main'


if __name__ == '__main__':
t1 = time.time()
# 一次性启动多个任务(协程)
text = asyncio.run(main())
print(text)
t2 = time.time()
print(t2 - t1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import time


async def request(url):
print('正在下载', url)
# 在异步协程中如果出现了同步模块相关的代码,那么就无法实现异步。
# time.sleep(2)
# 当在asyncio中遇到阻塞操作必须进行手动挂起
await asyncio.sleep(2)
print('下载完毕', url)


start = time.time()
urls = ['www.baidu.com', 'www.sogou.com', 'www.goubanjia.com']

# 任务列表:存放多个任务对象
stasks = []
for url in urls:
c = request(url)
task = asyncio.ensure_future(c)
stasks.append(task)

loop = asyncio.get_event_loop()
# 需要将任务列表封装到wait中
loop.run_until_complete(asyncio.wait(stasks))

print(time.time() - start)

多任务协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import time


async def request(url):
print('正在下载', url)
# 在异步协程中如果出现了同步模块相关的代码,那么就无法实现异步。
# time.sleep(2)
# 当在asyncio中遇到阻塞操作必须进行手动挂起
await asyncio.sleep(2)
print('下载完毕', url)


start = time.time()
urls = ['www.baidu.com', 'www.sogou.com', 'www.goubanjia.com']

# 任务列表:存放多个任务对象
stasks = []
for url in urls:
c = request(url)
task = asyncio.ensure_future(c)
stasks.append(task)

loop = asyncio.get_event_loop()
# 需要将任务列表封装到wait中
loop.run_until_complete(asyncio.wait(stasks))

print(time.time() - start)

aiohttp模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import aiohttp

urls = [
"http://kr.shanghai-jiuxin.com/file/2020/1031/191468637cab2f0206f7d1d9b175ac81.jpg",
"http://kr.shanghai-jiuxin.com/file/2020/1031/563337d07af599a9ea64e620729f367e.jpg",
"http://kr.shanghai-jiuxin.com/file/2020/1031/774218be86d832f359637ab120eba52d.jpg"
]


async def aiodownload(url):
# 发送请求.
# 得到图片内容
# 保存到文件
name = url.rsplit("/", 1)[1] # 从右边切, 切一次. 得到[1]位置的内容
async with aiohttp.ClientSession() as session: # requests
async with session.get(url) as resp: # resp = requests.get()
# 请求回来了. 写入文件
# 可以自己去学习一个模块, aiofiles
with open(name, mode="wb") as f: # 创建文件
f.write(await resp.content.read()) # 读取内容是异步的. 需要await挂起, resp.text()
print(name, "搞定")


async def main():
tasks = []
for url in urls:
tasks.append(aiodownload(url))
await asyncio.wait(tasks)


if __name__ == '__main__':
asyncio.run(main())

注意

python asyncio获取协程返回值和使用callback https://www.cnblogs.com/callyblog/p/11216961.html

aiohttp模块

学习链接 https://www.cnblogs.com/lymmurrain/p/13805690.html

aiohttp中拿文本 resp.text改成resp.text()

resp.content改成 resp.content.read()

代理,请求头 见下面的ssl 错误 博客链接https://blog.csdn.net/qq_43965708/article/details/109622238

sess.get(url=url, headers=headers, proxy=’http://xxx.xxx.xxx‘, proxy_auth=auto.proxy_auth) as res:

防止协程被封:降低爬取速度,或者使用多个代理ip

1
2
3
timeout = aiohttp.ClientTimeout(total=600)  # 将超时时间设置为600秒
connector = aiohttp.TCPConnector(limit=50) # 将并发数量降低
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
1
2
3
4
5
6
7
8
9
10
import aiohttp

async def fetch():
async with aiohttp.request('GET',
'http://python.org/') as resp:
assert resp.status == 200
print(await resp.text())
#将协程放入时间循环
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch())

Session 封装了一个连接池连接器实例),并且默认情况下支持keepalive。除非在应用程序的生存期内连接到大量未知的不同服务器,否则建议您在应用程序的生存期内使用单个会话以受益于连接池。

不要为每个请求创建Session 。每个应用程序很可能需要一个会话,以完全执行所有请求。

更复杂的情况可能需要在每个站点上进行一次会话,例如,一个会话用于Github,另一个会话用于Facebook API。无论如何,为每个请求建立会话是一个非常糟糕的主意。

会话内部包含一个连接池。连接重用和保持活动状态(默认情况下均处于启用状态)可能会提高整体性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import aiohttp
import asyncio

#传入client使用
async def fetch(client,url):
async with client.get(url) as resp:
assert resp.status == 200
return await resp.text()

async def main():
async with aiohttp.ClientSession() as client:
html = await fetch(client,url)
print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

aiohttp 指定 tls 版本 (ssl 错误) https://blog.csdn.net/qq_31720329/article/details/82023393

1
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host www.mdnkids.com:443 ssl:<ssl.SSLContext object at 0x1096634a8> [None]

解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import aiohttp
import asyncio
import threading
from tools import auto
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 加上这一行
# proxy=auto.proxies, proxy_auth=auto.proxy_auth 这里的代理需要换成自己的
async def quest(url, headers):
con = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=con, trust_env=True) as sess: # 加上trust_env=True
async with sess.get(url=url, headers=headers, proxy=auto.proxies, proxy_auth=auto.proxy_auth) as res:
return await res.read()

def forever(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

if __name__ == '__main__':
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:79.0) Gecko/20100101 Firefox/79.0'}
url = 'https://baidu.com'

loop = asyncio.new_event_loop()
t = threading.Thread(target=forever, args=(loop,))
t.setDaemon(True)
t.start()
ret = asyncio.run_coroutine_threadsafe(quest(url, headers), loop)
print(ret.result())

爬取视频案例

可能已经运行不了了

image-20220119112527357

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
思路:
1. 拿到主页面的页面源代码, 找到iframe
2. 从iframe的页面源代码中拿到m3u8文件的地址
3. 下载第一层m3u8文件 -> 下载第二层m3u8文件(视频存放路径)
4. 下载视频
5. 下载秘钥, 进行解密操作
6. 合并所有ts文件为一个mp4文件
"""
import requests
from bs4 import BeautifulSoup
import re
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES # pycryptodome
import os
def get_iframe_src(url):
resp = requests.get(url)
main_page = BeautifulSoup(resp.text, "html.parser")
src = main_page.find("iframe").get("src")
return src
# return "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp" # 为了测试
def get_first_m3u8_url(url):
resp = requests.get(url)
# print(resp.text)
obj = re.compile(r'var main = "(?P<m3u8_url>.*?)"', re.S)
m3u8_url = obj.search(resp.text).group("m3u8_url")
# print(m3u8_url)
return m3u8_url
def download_m3u8_file(url, name):
resp = requests.get(url)
with open(name, mode="wb") as f:
f.write(resp.content)
async def download_ts(url, name, session):
async with session.get(url) as resp:
async with aiofiles.open(f"video2/{name}", mode="wb") as f:
await f.write(await resp.content.read()) # 把下载到的内容写入到文件中
print(f"{name}下载完毕")
async def aio_download(up_url): # https://boba.52kuyun.com/20170906/Moh2l9zV/hls/
tasks = []
async with aiohttp.ClientSession() as session: # 提前准备好session
async with aiofiles.open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding='utf-8') as f:
async for line in f:
if line.startswith("#"):
continue
# line就是xxxxx.ts
line = line.strip() # 去掉没用的空格和换行
# 拼接真正的ts路径
ts_url = up_url + line
task = asyncio.create_task(download_ts(ts_url, line, session)) # 创建任务
tasks.append(task)
await asyncio.wait(tasks) # 等待任务结束
def get_key(url):
resp = requests.get(url)
return resp.text
async def dec_ts(name, key):
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f"video2/{name}", mode="rb") as f1,\
aiofiles.open(f"video2/temp_{name}", mode="wb") as f2:
bs = await f1.read() # 从源文件读取内容
await f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件
print(f"{name}处理完毕")
async def aio_dec(key):
# 解密
tasks = []
async with aiofiles.open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f:
async for line in f:
if line.startswith("#"):
continue
line = line.strip()
# 开始创建异步任务
task = asyncio.create_task(dec_ts(line, key))
tasks.append(task)
await asyncio.wait(tasks)
def merge_ts():
# mac: cat 1.ts 2.ts 3.ts > xxx.mp4
# windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
lst = []
with open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
lst.append(f"video2/temp_{line}")
s = " ".join(lst) # 1.ts 2.ts 3.ts
os.system(f"cat {s} > movie.mp4")
print("搞定!")
def main(url):
# 1. 拿到主页面的页面源代码, 找到iframe对应的url
iframe_src = get_iframe_src(url)
# 2. 拿到第一层的m3u8文件的下载地址
first_m3u8_url = get_first_m3u8_url(iframe_src)
# 拿到iframe的域名
# "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp"
iframe_domain = iframe_src.split("/share")[0]
# 拼接出真正的m3u8的下载路径
first_m3u8_url = iframe_domain+first_m3u8_url
# https://boba.52kuyun.com/20170906/Moh2l9zV/index.m3u8?sign=548ae366a075f0f9e7c76af215aa18e1
# print(first_m3u8_url)
# 3.1 下载第一层m3u8文件
download_m3u8_file(first_m3u8_url, "越狱第一季第一集_first_m3u8.txt")
# 3.2 下载第二层m3u8文件
with open("越狱第一季第一集_first_m3u8.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
else:
line = line.strip() # 去掉空白或者换行符 hls/index.m3u8
# 准备拼接第二层m3u8的下载路径
# https://boba.52kuyun.com/20170906/Moh2l9zV/ + hls/index.m3u8
# https://boba.52kuyun.com/20170906/Moh2l9zV/hls/index.m3u8
# https://boba.52kuyun.com/20170906/Moh2l9zV/hls/cFN8o3436000.ts
second_m3u8_url = first_m3u8_url.split("index.m3u8")[0] + line
download_m3u8_file(second_m3u8_url, "越狱第一季第一集_second_m3u8.txt")
print("m3u8文件下载完毕")
# 4. 下载视频
second_m3u8_url_up = second_m3u8_url.replace("index.m3u8", "")
# 异步协程
asyncio.run(aio_download(second_m3u8_url_up)) # 测试的使用可以注释掉
# 5.1 拿到秘钥
key_url = second_m3u8_url_up + "key.key" # 偷懒写法, 正常应该去m3u8文件里去找
key = get_key(key_url)
# 5.2 解密
asyncio.run(aio_dec(key))
# 6. 合并ts文件为mp4文件
merge_ts()
if __name__ == '__main__':
url = "https://www.91kanju.com/vod-play/541-2-1.html"
main(url)
# 简单的问题复杂化, 复杂的问题简单化
# 秒杀()

欢迎关注我的其它发布渠道