线程和进程
线程池和进程池
协程
多线程 示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 from threading import Thread def func (): for i in range (1000 ): print ("func" , i) if __name__ == '__main__' : t = Thread(target=func) t.start() for i in range (1000 ): print ("main" , i)
继承Thread类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from threading import Thread class MyThread (Thread ): def run (self ): for i in range (1000 ): print ("子线程" , i) if __name__ == '__main__' : t = MyThread() t.start() for i in range (1000 ): print ("主线程" , i)
多进程 示例:
第一种写法:
1 2 3 4 5 6 7 8 9 10 11 12 from multiprocessing import Processdef func (): for i in range (1000 ): print ("子进程" , i) if __name__ == '__main__' : p = Process(target=func) p.start() for i in range (1000 ): print ("主进程" , i)
第二种写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Processclass MyProcess (Process ): def run (self ): for i in range (1000 ): print ("MyProcess" , i) if __name__ == "__main__" : t = MyProcess() t.start() for i in range (1000 ): print ("main" , i)
传参问题
1 2 3 4 5 6 7 8 9 10 11 12 13 from threading import Threaddef func (name ): for i in range (1000 ): print (name, i) if __name__ == '__main__' : t1 = Thread(target=func, args=("周杰伦" ,)) t1.start() t2 = Thread(target=func, args=("王力宏" ,)) t2.start()
Python中,线程池和进程池的写法几乎一样,只不过线程池是ThreadPoolExecutor
,进程池是ProcessPoolExecutor
线程池 存放固定数量的线程,由线程池分配任务
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from concurrent.futures import ThreadPoolExecutordef fn (name ): for i in range (1000 ): print (name, i) if __name__ == '__main__' : with ThreadPoolExecutor(50 ) as t: for i in range (100 ): t.submit(fn, name=f"线程{i} " ) print ("123" )
例2:使用的库不同,方法也不同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import timefrom multiprocessing.dummy import Poolstart_time = time.time() def get_page (str ): print ("正在下载 :" , str ) time.sleep(2 ) print ('下载成功:' , str ) name_list = ['xiaozi' , 'aa' , 'bb' , 'cc' ] pool = Pool(4 ) pool.map (get_page, name_list) pool.close() pool.join() end_time = time.time() print (end_time - start_time)
进程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 from concurrent.futures import ProcessPoolExecutordef fn (name ): for i in range (1000 ): print (name, i) if __name__ == '__main__' : with ProcessPoolExecutor(50 ) as t: for i in range (100 ): t.submit(fn, name=f"线程{i} " ) print ("123" )
协程 宏观上是多线程,微观上是单线程,对CPU的利用率高。
1 2 3 4 5 6 7 8 9 import asyncioasync def func (): print ("你好啊, 我叫赛利亚" ) if __name__ == '__main__' : g = func() asyncio.run(g)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import asyncioasync def request (url ): print ('正在请求的url是' , url) print ('请求成功,' , url) return url c = request('www.baidu.com' ) loop = asyncio.get_event_loop() loop.run_until_complete(c) def callback_func (task ): print (task.result()) loop = asyncio.get_event_loop() task = asyncio.ensure_future(c) task.add_done_callback(callback_func) loop.run_until_complete(task)
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import asyncioimport timeasync def func1 (): print ("你好啊, 我叫潘金莲" ) await asyncio.sleep(3 ) print ("你好啊, 我叫潘金莲" ) return 'func1' async def func2 (): print ("你好啊, 我叫王建国" ) await asyncio.sleep(2 ) print ("你好啊, 我叫王建国" ) return 'func2' async def func3 (): print ("你好啊, 我叫李雪琴" ) await asyncio.sleep(4 ) print ("你好啊, 我叫李雪琴" ) return 'func3' async def main (): tasks = [ asyncio.create_task(func1()), asyncio.create_task(func2()), asyncio.create_task(func3()) ] await asyncio.wait(tasks) for task in tasks: print (task.result()) return 'main' if __name__ == '__main__' : t1 = time.time() text = asyncio.run(main()) print (text) t2 = time.time() print (t2 - t1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioimport timeasync def request (url ): print ('正在下载' , url) await asyncio.sleep(2 ) print ('下载完毕' , url) start = time.time() urls = ['www.baidu.com' , 'www.sogou.com' , 'www.goubanjia.com' ] stasks = [] for url in urls: c = request(url) task = asyncio.ensure_future(c) stasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(stasks)) print (time.time() - start)
多任务协程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioimport timeasync def request (url ): print ('正在下载' , url) await asyncio.sleep(2 ) print ('下载完毕' , url) start = time.time() urls = ['www.baidu.com' , 'www.sogou.com' , 'www.goubanjia.com' ] stasks = [] for url in urls: c = request(url) task = asyncio.ensure_future(c) stasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(stasks)) print (time.time() - start)
aiohttp模块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asyncioimport aiohttpurls = [ "http://kr.shanghai-jiuxin.com/file/2020/1031/191468637cab2f0206f7d1d9b175ac81.jpg" , "http://kr.shanghai-jiuxin.com/file/2020/1031/563337d07af599a9ea64e620729f367e.jpg" , "http://kr.shanghai-jiuxin.com/file/2020/1031/774218be86d832f359637ab120eba52d.jpg" ] async def aiodownload (url ): name = url.rsplit("/" , 1 )[1 ] async with aiohttp.ClientSession() as session: async with session.get(url) as resp: with open (name, mode="wb" ) as f: f.write(await resp.content.read()) print (name, "搞定" ) async def main (): tasks = [] for url in urls: tasks.append(aiodownload(url)) await asyncio.wait(tasks) if __name__ == '__main__' : asyncio.run(main())
注意 python asyncio获取协程返回值和使用callback https://www.cnblogs.com/callyblog/p/11216961.html
aiohttp模块
学习链接 https://www.cnblogs.com/lymmurrain/p/13805690.html
aiohttp中拿文本 resp.text改成resp.text()
resp.content改成 resp.content.read()
代理,请求头 见下面的ssl 错误 博客链接https://blog.csdn.net/qq_43965708/article/details/109622238
sess.get(url=url, headers=headers, proxy=’http://xxx.xxx.xxx ‘, proxy_auth=auto.proxy_auth) as res:
防止协程被封:降低爬取速度,或者使用多个代理ip
1 2 3 timeout = aiohttp.ClientTimeout(total=600 ) connector = aiohttp.TCPConnector(limit=50 ) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
1 2 3 4 5 6 7 8 9 10 import aiohttpasync def fetch (): async with aiohttp.request('GET' , 'http://python.org/' ) as resp: assert resp.status == 200 print (await resp.text()) loop = asyncio.get_event_loop() loop.run_until_complete(fetch())
Session 封装了一个连接池 (连接器 实例),并且默认情况下支持keepalive。除非在应用程序的生存期内连接到大量未知的不同服务器,否则建议您在应用程序的生存期内使用单个会话以受益于连接池。
不要为每个请求创建Session 。每个应用程序很可能需要一个会话,以完全执行所有请求。
更复杂的情况可能需要在每个站点上进行一次会话,例如,一个会话用于Github,另一个会话用于Facebook API。无论如何,为每个请求建立会话是一个非常糟糕的 主意。
会话内部包含一个连接池。连接重用和保持活动状态(默认情况下均处于启用状态)可能会提高整体性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import aiohttpimport asyncioasync def fetch (client,url ): async with client.get(url) as resp: assert resp.status == 200 return await resp.text() async def main (): async with aiohttp.ClientSession() as client: html = await fetch(client,url) print (html) loop = asyncio.get_event_loop() loop.run_until_complete(main())
aiohttp 指定 tls 版本 (ssl 错误) https://blog.csdn.net/qq_31720329/article/details/82023393
1 aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host www.mdnkids.com:443 ssl:<ssl.SSLContext object at 0x1096634a8> [None]
解决:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import aiohttpimport asyncioimport threadingfrom tools import autoasyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) async def quest (url, headers ): con = aiohttp.TCPConnector(ssl=False ) async with aiohttp.ClientSession(connector=con, trust_env=True ) as sess: async with sess.get(url=url, headers=headers, proxy=auto.proxies, proxy_auth=auto.proxy_auth) as res: return await res.read() def forever (loop ): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == '__main__' : headers = {'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:79.0) Gecko/20100101 Firefox/79.0' } url = 'https://baidu.com' loop = asyncio.new_event_loop() t = threading.Thread(target=forever, args=(loop,)) t.setDaemon(True ) t.start() ret = asyncio.run_coroutine_threadsafe(quest(url, headers), loop) print (ret.result())
爬取视频案例 可能已经运行不了了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 """ 思路: 1. 拿到主页面的页面源代码, 找到iframe 2. 从iframe的页面源代码中拿到m3u8文件的地址 3. 下载第一层m3u8文件 -> 下载第二层m3u8文件(视频存放路径) 4. 下载视频 5. 下载秘钥, 进行解密操作 6. 合并所有ts文件为一个mp4文件 """ import requestsfrom bs4 import BeautifulSoupimport reimport asyncioimport aiohttpimport aiofilesfrom Crypto.Cipher import AES import osdef get_iframe_src (url ): resp = requests.get(url) main_page = BeautifulSoup(resp.text, "html.parser" ) src = main_page.find("iframe" ).get("src" ) return src def get_first_m3u8_url (url ): resp = requests.get(url) obj = re.compile (r'var main = "(?P<m3u8_url>.*?)"' , re.S) m3u8_url = obj.search(resp.text).group("m3u8_url" ) return m3u8_url def download_m3u8_file (url, name ): resp = requests.get(url) with open (name, mode="wb" ) as f: f.write(resp.content) async def download_ts (url, name, session ): async with session.get(url) as resp: async with aiofiles.open (f"video2/{name} " , mode="wb" ) as f: await f.write(await resp.content.read()) print (f"{name} 下载完毕" ) async def aio_download (up_url ): tasks = [] async with aiohttp.ClientSession() as session: async with aiofiles.open ("越狱第一季第一集_second_m3u8.txt" , mode="r" , encoding='utf-8' ) as f: async for line in f: if line.startswith("#" ): continue line = line.strip() ts_url = up_url + line task = asyncio.create_task(download_ts(ts_url, line, session)) tasks.append(task) await asyncio.wait(tasks) def get_key (url ): resp = requests.get(url) return resp.text async def dec_ts (name, key ): aes = AES.new(key=key, IV=b"0000000000000000" , mode=AES.MODE_CBC) async with aiofiles.open (f"video2/{name} " , mode="rb" ) as f1,\ aiofiles.open (f"video2/temp_{name} " , mode="wb" ) as f2: bs = await f1.read() await f2.write(aes.decrypt(bs)) print (f"{name} 处理完毕" ) async def aio_dec (key ): tasks = [] async with aiofiles.open ("越狱第一季第一集_second_m3u8.txt" , mode="r" , encoding="utf-8" ) as f: async for line in f: if line.startswith("#" ): continue line = line.strip() task = asyncio.create_task(dec_ts(line, key)) tasks.append(task) await asyncio.wait(tasks) def merge_ts (): lst = [] with open ("越狱第一季第一集_second_m3u8.txt" , mode="r" , encoding="utf-8" ) as f: for line in f: if line.startswith("#" ): continue line = line.strip() lst.append(f"video2/temp_{line} " ) s = " " .join(lst) os.system(f"cat {s} > movie.mp4" ) print ("搞定!" ) def main (url ): iframe_src = get_iframe_src(url) first_m3u8_url = get_first_m3u8_url(iframe_src) iframe_domain = iframe_src.split("/share" )[0 ] first_m3u8_url = iframe_domain+first_m3u8_url download_m3u8_file(first_m3u8_url, "越狱第一季第一集_first_m3u8.txt" ) with open ("越狱第一季第一集_first_m3u8.txt" , mode="r" , encoding="utf-8" ) as f: for line in f: if line.startswith("#" ): continue else : line = line.strip() second_m3u8_url = first_m3u8_url.split("index.m3u8" )[0 ] + line download_m3u8_file(second_m3u8_url, "越狱第一季第一集_second_m3u8.txt" ) print ("m3u8文件下载完毕" ) second_m3u8_url_up = second_m3u8_url.replace("index.m3u8" , "" ) asyncio.run(aio_download(second_m3u8_url_up)) key_url = second_m3u8_url_up + "key.key" key = get_key(key_url) asyncio.run(aio_dec(key)) merge_ts() if __name__ == '__main__' : url = "https://www.91kanju.com/vod-play/541-2-1.html" main(url)