最近仿照书上写了个多线程爬虫框架,在实现多进程的时候遇到了困难,不过打算开始学scrapy了也就暂时不管多进程的问题了
首先是缓存部分,在每次下载一个html的时候,首先会查询mongodb数据库中是否已经有该页面的缓存,如果没有,下载页面,如果有,获得缓存的页面
在mongodb中设置一个特殊索引用于删除超时的缓存(缓存默认保存30天),由于该类实现了 __getitem__和__setitem__方法,所以可以直接像操作字典一样操作这个对象
import pickle import zlib from datetime import datetime,timedelta from pymongo import MongoClient from bson.binary import Binary class MongoCache: def __init__(self,client = None,expires = timedelta(days=30)): #如果没有传递MongoClient对象,创建一个默认对象 if client is None: self.client = MongoClient("localhost",12345) #创建一个连接想数据库中缓存数据 self.db = self.client.cache self.db.webpage.create_index("timestamp",expireAfterSeconds=expires.total_seconds()) def __getitem__(self, url): ''' 从数据库中获得该url的值 ''' record = self.db.webpage.find_one({"_id":url}) print(record) if record: return pickle.loads(zlib.decompress(record["result"])) #return record["result"] else: raise KeyError(url+"不存在") def __setitem__(self, url,result): ''' 将数据储存到数据库中 ''' record = {"result":Binary(zlib.compress(pickle.dumps(result))), "timestamp":datetime.utcnow()} #record = {"result":result,"timestamp":datetime.utcnow()} self.db.webpage.update({"_id":url},{"$set":record},upsert=True)
然后是实现下载的类,该类首先查看缓存,如果缓存中已有html并且响应码正常,则直接从缓存中获取html,否则下载页面
Throttle类实现了下载之间的延时功能
import urllib.request import time import datetime import re import socket import random from DiskCache import DiskCache DEFAULT_AGENT = "wswp" DEFAULT_DELAY = 5 DEFAULT_RETRIES = 1 DEFAULT_TIMEOUT = 60 class Downloader: ''' 用于下载html的类,可以传入的参数有 proxies;代理ip列表,会随机的在列表中抽取代理ip进行下载 delay:下载同一域名的等待时间,默认一秒 user_agent:主机名,默认python num_retries:下载失败重新下载次数 timeout;下载超时时间 cache:缓存方式 ''' def __init__(self,proxies=None,delay = DEFAULT_DELAY,user_agent = DEFAULT_AGENT,num_retries = DEFAULT_RETRIES,timeout = DEFAULT_TIMEOUT,opener = None,cache=None): #设置超时时间 socket.setdefaulttimeout(timeout) self.throttle = Throttle(delay) self.user_agent = user_agent self.proxies = proxies self.num_retries = num_retries self.opener = opener self.cache = cache def __call__(self,url): ''' 带有缓存功能的下载方法,通过类对象可以直接调用 ''' print(self.user_agent) print("开始下载"+url) result = None if self.cache: try: #从缓存中获取url对应的数据 result = self.cache[url] print("测试代码4") except KeyError: #如果获得KeyError异常,跳过 pass else: #如果是未成功下载的网页,重新下载 if result["code"]: if self.num_retries > 0 and 500<result["code"]<600: result = None # 如果页面不存在,下载该页面 if result is None: #延迟默时间 self.throttle.wait(url) if self.proxies: #如果有代理IP,从代理IP列表中随机抽取一个代理IP proxy = random.choice(self.proxies) else: proxy = None #构造请求头 headers = {"User-agent":self.user_agent} #下载页面 result = self.download(url,headers,proxy = proxy,num_retries = self.num_retries) ''' file = open("f:\\bilibili.html","wb") file.write(result["html"]) file.close() ''' if self.cache: #如果有缓存方式,缓存网页 self.cache[url] = result print(url,"页面下载完成") return result["html"] def download(self,url,headers,proxy,num_retries,data=None): ''' 用于下载一个页面,返回页面和与之对应的状态码 ''' #构建请求 request = urllib.request.Request(url,data,headers or {}) request.add_header("Cookie","finger=7360d3c2; UM_distinctid=15c59703db998-0f42b4b61afaa1-5393662-100200-15c59703dbcc1d; pgv_pvi=653650944; fts=1496149148; sid=bgsv74pg; buvid3=56812A21-4322-4C70-BF18-E6D646EA78694004infoc; CNZZDATA2724999=cnzz_eid=214248390-1496147515-https%3A%2F%2Fwww.baidu.com%2F&ntime=1496805293") request.add_header("Upgrade-Insecure-Requests","1") opener = self.opener or urllib.request.build_opener() if proxy: #如果有代理IP,使用代理IP opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxy)) try: #下载网页 response = opener.open(request) print("code是",response.code) html = response.read().decode() code = response.code except Exception as e: print("下载出现错误",str(e)) html = '' if hasattr(e,"code"): code =e.code if num_retries > 0 and 500<code<600: #如果错误不是未找到网页,则重新下载num_retries次 return self.download(url,headers,proxy,num_retries-1,data) else: code = None print(html) return {"html":html,"code":code} class Throttle: ''' 按照延时,请求,代理IP等下载网页,处理网页中的link的类 ''' def __init__(self, delay): self.delay = delay self.domains = {} def wait(self, url): ''' 每下载一个html之间暂停的时间 ''' # 获得域名 domain = urllib.parse.urlparse(url).netloc # 获得上次访问此域名的时间 las_accessed = self.domains.get(domain) if self.delay > 0 and las_accessed is not None: # 计算需要强制暂停的时间 = 要求的间隔时间 - (现在的时间 - 上次访问的时间) sleep_secs = self.delay - (datetime.datetime.now() - las_accessed).seconds if sleep_secs > 0: time.sleep(sleep_secs) # 存储此次访问域名的时间 self.domains[domain] = datetime.datetime.now()
然后是实现爬虫功能的类
import time import threading import re import urllib.parse import datetime from bs4 import BeautifulSoup from Downloader import Downloader from MongoCache import MongoCache SLEEP_TIME = 1 def get_links(html): ''' 获得一个页面上的所有链接 ''' bs = BeautifulSoup(html, "lxml") link_labels = bs.find_all("a") # for link in link_labels: return [link_label.get('href', "default") for link_label in link_labels] def same_domain(url1, url2): ''' 判断域名书否相同 ''' return urllib.parse.urlparse(url1).netloc == urllib.parse.urlparse(url2).netloc def normalize(seed_url, link): ''' 用于将绝对路径转换为相对路径 ''' link, no_need = urllib.parse.urldefrag(link) return urllib.parse.urljoin(seed_url, link) def threader_crawler(seed_url,resource_regiex=None,link_regiex = ".*",delay=5,cache=None,download_source_callback=None,user_agent="wswp",proxies=None, num_retries=1, max_threads=10, timeout=60,max_url=500): downloaded = [] crawl_queue = [seed_url] seen = set([seed_url]) D = Downloader(cache = cache,delay = delay,user_agent=user_agent,proxies=proxies,num_retries=num_retries,timeout=timeout) print(user_agent) def process_queue(): while True: links = [] try: url = crawl_queue.pop() except IndexError: break else: html = D(url) downloaded.append(url) if download_source_callback: if resource_regiex and re.match(resource_regiex,url): download_source_callback(url,html) links.extend([link for link in get_links(html) if re.match(link_regiex,link)]) for link in links: link = normalize(seed_url, link) if link not in seen: seen.add(link) if same_domain(seed_url,link): crawl_queue.append(link) print("已经发现的总网页数目为",len(seen)) print("已经下载过的网页数目为",len(downloaded)) print("还没有遍历过的网页数目为",len(crawl_queue)) threads=[] while threads or crawl_queue: if len(downloaded) == max_url: return for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads) < max_threads and crawl_queue: print("线程数量为", len(threads)) thread = threading.Thread(target=process_queue) thread.setDaemon(True) thread.start() print("线程数量为", len(threads)) threads.append(thread) def main(): starttime = datetime.datetime.now() threader_crawler("http://www.xicidaili.com/",max_threads=1,max_url=10,user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36") endtime = datetime.datetime.now() print("花费时间",(endtime-starttime).total_seconds()) if __name__ == "__main__": main()
经过测试,多线程爬虫速度要远远高于单个线程爬取,简单测试结果如下
开启30个线程爬取一百个网站用时31秒,平均一个用时0.31秒 开启10个线程爬取一百个网页用时69秒,平均一个用时0.69秒 开启1 个线程爬取一百个网站用时774秒,平均一个用时7.74秒
顺便实现了一个测试用的资源下载类,用于将电影天堂的所有资源页的电影保存到数据库
from lxml import etree from pymongo import MongoClient import urllib.request import re class download_source_callback: def __init__(self,client=None): if client: self.client = client else: self.client = MongoClient("localhost",12345) self.db = self.client.cache def __call__(self,url,html): title_regiex = "<title>(.*?)</title>" class_regiex = "类 别(.*?)<" director_regiex = ".*导 演(.*?)<" content_regiex = "简 介(.*?)<br /><br />◎" imdb_regiex = "IMDb评分 (.*?)<" douban_regiex = "豆瓣评分(.*?)<" html = html.decode("gbk","ignore") m = re.search(title_regiex,html) if m: title = m.group(1) else: title = None m = re.search(class_regiex,html) if m: class_name = m.group(1) else: class_name = None m = re.search(content_regiex,html) if m: text = m.group(1).replace("<br />","") content = text else: content = None m = re.search(douban_regiex,html) if m: douban = m.group(1) else: douban = None m = re.search(imdb_regiex,html) if m: imdb = m.group(1) else: imdb = None print(title,class_name,content,douban,imdb) move = { "name":title, "class":class_name, "introduce":content, "douban":douban, "imdb":imdb } self.db.moves.update({"_id":title},{"$set":move},upsert=True) print("成功储存一部电影"+title) if __name__ == "__main__": html= open("f:\资源.txt").read() a = download_source_callback() a("http://www.dytt8.net/html/gndy/jddy/20170529/54099.html",html)