之前的内容已经大致实现了如何获取网页、分析网页、获取目标内容。接下来的目标是如何让网页抓取进行得更效率些。在进行抓取的时候,时间的消耗主要是在请求等待的时间上,所以一个最容易想到的优化方式就是使用多线程。
多线程
多线程的实现还是比较简单的,下面是一个简单的线程实现方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#!python # encoding: utf-8 from threading import Thread def func(): print("this is multi thread") def start(): t = Thread(target=func) t.start() if __name__ == '__main__': start() |
就是这么简单。
线程池
在抓取网页的时候,一个简单的思路就是为每个网页启动一个线程。在要抓取的网页比较少的时候——比如百十来个——这样子还是可行的。但是网页比较多的时候,这样做就不太合理了。因为线程的创建启动和运行都会消耗很多的资源,线程启动太多会耗尽资源导致机器卡死。而且,创建线程后只执行一次也是一种浪费。为了减少线程的创建、实现线程的重复利用,我们需要引入线程池。
可以使用python的ThreadPoolExecutor来创建线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!python # encoding: utf-8 from concurrent.futures import ThreadPoolExecutor def func(): print("this is multi thread") def start_pool(): pool = ThreadPoolExecutor(64) for i in range(10): pool.submit(func) if __name__ == '__main__': start_pool() |
也很简单,甚至不比单线程多一行代码。在代码里创建了一个总数为64的线程池,然后在一个循环中每次取出一条线程来执行func函数,没有空闲线程时则会进入等待。
按照这样的思路,我们也可以使用Queue来自己创建线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
class CustomThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.__queue = queue def run(self): while True: cmd = self.__queue.get() cmd() self.__queue.task_done() def custom_pool(): queue = Queue(5) for i in range(queue.maxsize): t = CustomThread(queue) t.setDaemon(True) t.start() for i in range(20): queue.put(func) queue.join() |
在上面的代码里创建了一个长度为5的队列,然后参照队列的长度创建了几个线程,并立刻启动。每个线程随时待命,一旦队列里面有了要执行的任务就会拿过来立即执行,并在执行完成后发送通知给队列。queue.join()方法则会在队列中的所有任务执行完成前阻塞住线程,待所有任务执行完成后再继续执行后面的代码。
并行
前面所述的方案是并发处理的方案。并发处理的方案可以充分利用CPU。不过现在的CPU通常都是多核的,为了利用多核CPU的特点,可以考虑使用并行处理的方案。下面是一个进程的演示:
1 2 3 4 5 6 7 8 |
#!python # encoding: utf-8 from multiprocessing import Process def process(): p = Process(target=func) p.start() |
当然也有对应的进程池了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!python # encoding: utf-8 from multiprocessing.pool import Pool from multiprocessing import Process def func(): print("this is multi thread") def process_pool(): pool = Pool(processes=3) for i in range(6): pool.apply_async(func) pool.close() pool.join() |
在我使用爬虫的经历中极少会用到多进程或进程池。不过,既然说起来了就得说个全套,所以才耐着性子把这块儿写完。
很多人可能就是因为要学习爬虫才开始看Python的。不过看了python却只知道爬虫未免有些可惜,我还是希望能够多接触到一些较深入的东西的。多线程和多进程是学习Python的经历中无论如何也不应该绕过的部分,如果不想浅尝而止的话,还是建议多看看。
完整的抓取程序在下面。程序写于半年前,有很多不成熟的地方,也没心思修改了,凑合看吧。因为代码太长所以折叠了起来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
#!python # encoding: utf8 # zhyea.com from urllib.request import Request from urllib import parse import urllib.request import threading from bs4 import BeautifulSoup from queue import Queue class Client: def __init__(self): self.__TIMEOUT = 300 self.__CHARSET = "utf8" self.__HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:43.0) Gecko/20100101 Firefox/43.0"} def get(self, url): request = Request(url, headers=self.__HEADERS) response = urllib.request.urlopen(request, timeout=self.__TIMEOUT) content = response.read().decode(self.__CHARSET) response.close() return content def post(self, url, **paras): data = parse.urlencode(paras).encode(self.__CHARSET) request = Request(url, data, headers=self.__HEADERS) response = urllib.request.urlopen(request, timeout=self.__TIMEOUT) content = response.read().decode(self.__CHARSET) response.close() return content class GrabThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.__queue = queue def run(self): while True: cmd, arg = self.__queue.get() cmd(arg) self.__queue.task_done() class Grabber: def __init__(self): self.__URL_SEARCH = "http://www.xxxxxxx.net/search/" self.__SAVE_PATH = "D://xxxx.log" self.__OUTPUT = open(self.__SAVE_PATH, 'a', encoding='utf-8') self.__queue = Queue() def __write(self, title, magnent): self.__OUTPUT.write( "".join(["\n", title, "\n", magnent, "\n", "---------------------------------------------"])) def __grab_anchor(self, url): content = Client().get(url); magnet_anchors = BeautifulSoup(content, "html.parser").select('a[href^="magnet:?xt"]') for i in range(len(magnet_anchors)): self.__write(magnet_anchors[i].attrs['title'], magnet_anchors[i].attrs['href']) def __grab(self, key): content = Client().post(self.__URL_SEARCH, q=parse.quote(key)); page_anchors = BeautifulSoup(content, "html.parser").select(".pagination > a") if len(page_anchors) == 0: self.__queue.put((self.__grabAnchor, "".join([self.__URL_SEARCH, "/", key]))) else: page_total = int(page_anchors[-2].get_text()) for i in range(page_total): self.__queue.put((self.__grabAnchor, "".join([self.__URL_SEARCH, "/", key, "/", "%d" % i]))) self.__queue.join() def start(self): for i in range(32): t = GrabThread(self.__queue) t.daemon = True t.start() keys = ["key1", "key2"] for i in range(len(keys)): self.__grab(keys[i]) self.__OUTPUT.close() if __name__ == "__main__": Grabber().start() |
参考文档
- python cookbook:http://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p07_creating_thread_pool.html
- Python 并发编程之使用多线程和多处理器:http://developer.51cto.com/art/201405/438178.htm
####################
简单易懂,非常好