上一篇《神器!五分钟完成大型爬行动物项目!”,我们介绍了一个类似于Scrapy-Feader的开源爬虫框架,并重点介绍了这个框架的一个应用——air spider,这是一个轻量级的爬虫。
接下来,我们来介绍另一个爬虫应用——Spider,它是一个基于redis的分布式爬虫。适用于海量数据采集,支持断点续爬、爬虫报警、数据自动入库等功能。
固定
和AirSpider一样,我们也是通过命令行安装的。
由于Spider是分布式爬虫,可能涉及多个爬虫,所以最好将其创建为一个项目。
创建项目
让我们首先创建项目:
feapder create -p spider-project
创建的项目目录如下所示:
项目创建后,我们需要在开发时将项目设置为工作范围,否则,当引入非对等目录的文件时,编译器会报错。
设置工作区(以pycharm为例):项目->:右键->;将目录标记为->源根目录.
创建爬虫
创建crawler的命令行语句是:
FEA pder create-s <spider _ name & gt& ltspider _ type & gt
AirSpider 对应的 spider_type 值为 1Spider 对应的 spider_type 值为 2BatchSpider 对应的 spider_type 值为 3
spider_type的默认值为1。
所以创建Spider的语句是:
feapder create -s spider_test 2
运行该语句后,我们可以看到spider_test.py文件是在spiders目录中生成的。
对应的文件内容是:
import feapderclass SpiderTest(feapder.Spider): # 自定义数据库,若项目中有setting.py文件,此自定义可删除 __custom_setting__ = dict( REDISDB_IP_PORTS="localhost:6379", REDISDB_USER_PASS="", REDISDB_DB=0 ) def start_requests(self): yield feapder.Request("https://***.baidu.com") def parse(self, request, response): print(response)if __name__ == "__***in__": SpiderTest(redis_key="xxx:xxx").start()
因为Spider是基于redis分布的,模板代码默认给redis配置。关于Redis的配置信息:
REDISDB_IP_PORTS:连接地址,若为集群或哨兵模式,多个连接地址用逗号分开,若为哨兵模式,需要加个REDISDB_SERVICE_NAME参数。REDISDB_USER_PASS:连接密码。REDISDB_DB:数据库。
在主函数中,我们可以看到redis_key有一个参数,是redis中存储任务等信息的关键字前缀,比如redis _ key = & # 8221FEA pder:spider _ test ”,redis中将生成以下内容:
特点
我们在AirSpider里面讲的方法都是蜘蛛支持的。让我们来看看蜘蛛和飞艇的区别。
自动数据仓库
写过爬虫的人都知道,如果要把数据持久化到MySQL数据库中,如果遇到很多字段,会很烦。你需要在解析和拼凑SQL语句后,手工编写大量字段。
Spider帮我们想到了这个问题,可以用框架帮我们自动入库。
剑表
第一步,我们需要在数据库中创建一个数据表。这个大家都知道,这里就不说了。
配置设置
将setting.py中的数据库配置更改为您自己的配置:
# # MYSQLMYSQL_IP = ""MYSQL_PORT = MYSQL_DB = ""MYSQL_USER_NAME = ""MYSQL_USER_PASS = ""
这些是配置。
生成实体类项目
接下来,我们将命令行切换到项目的items目录,并运行命令:
FEA pder create-I <item _ name & gt
我在这里使用数据库中的报告表,所以命令是:
feapder创建-i报告
然后,我们可以在items目录中看到生成的report_item.py实体类。我在这里生成的实体类的内容是:
from feapder import Itemclass ReportItem(Item): """ This class was generated by feapder. com***nd: feapder create -i report. """ __table_name__ = "report" def __init__(self, *args, **kwargs): self.count = None self.emRatingName = None # 评级名称 self.emRatingValue = None # 评级代码 self.encodeUrl = None # 链接 # self.id = None self.indvInduCode = None # 行业代码 self.indvInduName = None # 行业名称 self.lastEmRatingName = None # 上次评级名称 self.lastEmRatingValue = None # 上次评级代码 self.orgCode = None # 机构代码 self.orgName = None # 机构名称 self.orgSName = None # 机构简称 self.predictNextTwoYearEps = None self.predictNextTwoYearPe = None self.predictNextYearEps = None self.predictNextYearPe = None self.predictThisYearEps = None self.predictThisYearPe = None self.publishDate = None # 发表时间 self.ratingChange = None # 评级变动 self.researcher = None # 研究员 self.stockCode = None # 股票代码 self.stockName = None # 股票简称 self.title = None # 报告名称
如果一个字段有默认值或自增量,默认情况下它会被注释掉,可以根据需要打开。您可以看到我的表的id字段在这里做了注释。
如果项目字段太多,并且您不想逐个赋值,您可以按如下方式创建它们:
feapder创建-i报告1
此时生成的实体类是这样的:
class ReportItem(Item): """ This class was generated by feapder. com***nd: feapder create -i report 1. """ __table_name__ = "report 1" def __init__(self, *args, **kwargs): self.count = kwargs.get('count') self.emRatingName = kwargs.get('emRatingName') # 评级名称 self.emRatingValue = kwargs.get('emRatingValue') # 评级代码 self.encodeUrl = kwargs.get('encodeUrl') # 链接 # self.id = kwargs.get('id') self.indvInduCode = kwargs.get('indvInduCode') # 行业代码 self.indvInduName = kwargs.get('indvInduName') # 行业名称 self.lastEmRatingName = kwargs.get('lastEmRatingName') # 上次评级名称 self.lastEmRatingValue = kwargs.get('lastEmRatingValue') # 上次评级代码 self.orgCode = kwargs.get('orgCode') # 机构代码 self.orgName = kwargs.get('orgName') # 机构名称 self.orgSName = kwargs.get('orgSName') # 机构简称 self.predictNextTwoYearEps = kwargs.get('predictNextTwoYearEps') self.predictNextTwoYearPe = kwargs.get('predictNextTwoYearPe') self.predictNextYearEps = kwargs.get('predictNextYearEps') self.predictNextYearPe = kwargs.get('predictNextYearPe') self.predictThisYearEps = kwargs.get('predictThisYearEps') self.predictThisYearPe = kwargs.get('predictThisYearPe') self.publishDate = kwargs.get('publishDate') # 发表时间 self.ratingChange = kwargs.get('ratingChange') # 评级变动 self.researcher = kwargs.get('researcher') # 研究员 self.stockCode = kwargs.get('stockCode') # 股票代码 self.stockName = kwargs.get('stockName') # 股票简称 self.title = kwargs.get('title') # 报告名称
这样,当我们请求返回json数据时,我们可以直接赋值,比如:
response_data = {"title":" 测试"} # 模拟请求回来的数据item = SpiderDataItem(**response_data)
自动存储数据也相对简单。解析数据后,将数据分配给Item,然后产生:
def parse(self, request, response): html = response.content.decode("utf-8") if len(html): content = html.replace('datatable1351846(', '')[:-1] content_json = json.loads(content) print(content_json) for obj in content_json['data']: result = ReportItem() result['orgName'] = obj['orgName'] #机构名称 result['orgSName'] = obj['orgSName'] #机构简称 result['publishDate'] = obj['publishDate'] #发布日期 result['predictNextTwoYearEps'] = obj['predictNextTwoYearEps'] #后年每股盈利 result['title'] = obj['title'] #报告名称 result['stockName'] = obj['stockName'] #股票名称 result['stockCode'] = obj['stockCode'] #股票code result['orgCode'] = obj['stockCode'] #机构code result['predictNextTwoYearPe'] = obj['predictNextTwoYearPe'] #后年市盈率 result['predictNextYearEps'] = obj['predictNextYearEps'] # 明年每股盈利 result['predictNextYearPe'] = obj['predictNextYearPe'] # 明年市盈率 result['predictThisYearEps'] = obj['predictThisYearEps'] #今年每股盈利 result['predictThisYearPe'] = obj['predictThisYearPe'] #今年市盈率 result['indvInduCode'] = obj['indvInduCode'] # 行业代码 result['indvInduName'] = obj['indvInduName'] # 行业名称 result['lastEmRatingName'] = obj['lastEmRatingName'] # 上次评级名称 result['lastEmRatingValue'] = obj['lastEmRatingValue'] # 上次评级代码 result['emRatingValue'] = obj['emRatingValue'] # 评级代码 result['emRatingName'] = obj['emRatingName'] # 评级名称 result['ratingChange'] = obj['ratingChange'] # 评级变动 result['researcher'] = obj['researcher'] # 研究员 result['encodeUrl'] = obj['encodeUrl'] # 链接 result['count'] = int(obj['count']) # 近一月个股研报数 yield result
返回项目后,项目将流入框架的项目缓冲区。每隔0.05秒或者物品数量累计到5000件时,这些物品就会被分批入库。表名是类名,Item的小写去掉了。例如,ReportItem数据将落入报告表中。
接受初步测试
在开发过程中,我们可能需要调试某个请求,常规的方式是修改下发任务的代码。然而,这并不好。改了可能会把之前写的逻辑搞混,或者忘记改回来直接释放,或者调试好的数据入库,污染了数据库中已有的数据,造成了很多本不该发生的问题。
这个框架支持调试爬虫,可以针对某个任务进行调试。它是这样写的:
if __name__ == "__***in__": spider = SpiderTest.to_DebugSpider( redis_key="feapder:spider_test", request=feapder.Request("http://***.baidu.com") ) spider.start()
比较以前的启动模式:
spider = SpiderTest(redis_key="feapder:spider_test")spider.start()
可以看到代码中的to_DebugSpider方法可以直接将原爬虫转换为调试爬虫,然后通过传递request参数抓取指定的任务。
通常情况下,调试是结合断点进行的。在调试模式下,默认情况下,运行生成的数据不会放入存储中。
除了指定request参数,还可以指定request_dict参数和request_dict接收字典的类型,比如request _ dict = { & # 8220网址”:”http://*** . Baidu . com ”},用于交付请求协议。可以传递request_dict或者request _ dict。
运行多个爬网程序
通常,一个项目下可能会有多个爬虫。为了规范起见,建议将启动门户放入项目下的***in.py中,然后通过命令行运行指定的文件。
例如,以下项目:
该项目包含两个spider,***in.py编写如下:
from spiders import *from feapder import Requestfrom feapder import ArgumentParserdef test_spider(): spider = test_spider.TestSpider(redis_key="spider:report") spider.start()def test_spider2(): spider = test_spider.TestSpider2(redis_key="spider:report") spider.start()if __name__ == "__***in__": parser = ArgumentParser(description="Spider测试") parser.add_argument( "--test_spider", action="store_true", help="测试Spider", function=test_spider ) parser.add_argument( "--test_spider2", action="store_true", help="测试Spider2", function=test_spider2 ) parser.start()
ArgumentParser模块在这里用于支持命令行参数,比如运行test_spider:
python3 ***in.py & # 8211测试蜘蛛
分布式的
说白了,分布就是启动多个流程,处理同一批任务。Spider支持多次启动,不重复发送任务。我们可以在多台服务器上部署启动,或者在同一台机器上多次启动。
摘要
至此,我们已经完成了蜘蛛分布式爬虫。还有一些细节是你在使用的时候需要琢磨的。总的来说,这个框架好用,好用,处理一些不是很复杂的场景绰绰有余。你可以尝试重构你的爬虫,尝试这个框架。
本文来自热恋少女投稿,不代表舒华文档立场,如若转载,请注明出处:https://www.chinashuhua.cn/24/496613.html