Frontera 0.6 文档¶
Frontera 是一个爬虫工具箱,它可以让你构建任何规模和任意目的的爬虫。
Frontera 提供 crawl frontier 框架,这个框架可以帮助解决*何时抓取下一个URL*、*下个抓取的URL是什么*和检查*抓取结果*等问题。
Frontera 还为所有的爬虫组件提供了复制、分片、隔离的特性,这可以方便的扩展爬虫规模和将爬虫做成分布式。
Fronteta 包含完全支持 Scrapy 的组件,可以使用Scrapy的所有功能创建爬虫。尽管它最初是为Scrapy设计的,但是它也可以完美契合其他任何的框架或系统,因为它可以作为一个框架提供通用的工具箱。
介绍¶
这一章的目的是介绍 Frontera 的概念,通过阅读本章,你可以知道 Frontera 的设计理念和确定它能不能满足你的需求。
Frontera 概览¶
Frontera 是 crawl frontier 的实现,用于在从网络下载之前累积URL /链接的网络爬虫组件。 Frontera的主要特征:
- 面向在线处理,
- 分布式爬虫和后端架构,
- 可定制抓取策略,
- Scrapy易于集成,
- 集成 SQLAlchemy 支持关系型数据库(Mysql, PostgreSQL, sqlite 等等), 集成 HBase 非常好得支持键值对数据库,
- 使用 ZeroMQ and Kafka 为分布式爬虫实现消息总线,
- 使用 Graph Manager 创建伪站点地图和模拟抓取,进行精确抓取逻辑调优。
- 透明的传输层概念(message bus)和通信协议,
- 纯 Python 实现 。
- 支持 Python 3 。
使用案例¶
下面是一些 crawl frontier 适用的案例:
- 与爬虫的 URL 排序/排队隔离(例如,需要远端服务器管理排序/排队的分布式爬虫集群),
- 需要存储 URL 的元信息(在一些地方验证它的内容),
- 需要高级的 URL 排序逻辑,但在爬虫或者抓取器中很难维护。
一次抓取,少量网站¶
这种情况下使用单进程可能是最好的选择。 Frontier 提供以下现成的优先级模型:
- FIFO,
- LIFO,
- 广度优先 (BFS),
- 深度优先 (DFS),
- 基于提供的得分,从 0.0 映射到 1.0。
如果网站很大,抓取所有网页太浪费, Frontera 可以控制爬虫抓取最重要的网页。
分布式抓取, 少量网站¶
如果考虑提高抓取速度可以使用分布式爬虫模式。在这种模式下,Frontera 为爬虫进程分发任务,并且只有一个后端实例。请求任务通过你选择的 message bus 进行分发,通过自定义分区调整任务分发策略。默认情况下请求任务是随机分发给爬虫的,抓取速度可以在爬虫中设置。
也考虑一下代理服务,比如 Crawlera。
重新抓取¶
有一组网站,并且需要以及时(或其他)方式重新抓取它们。Frontera 提供了简单的重新抓取后端,根据设置的时间间隔定期抓取已经抓取的网页。这个后端使用关系系数据库持久化数据,并可以应用在单进程模式或者分布式爬虫模式中。
看门狗案例 - 当需要通知文档变化时,也可以使用这样的后端和少量的自定义。
广度抓取¶
这种使用案例要求完全的分布式:爬虫和后端都是分布式。除了运行 spiders,还应该运行 strategy worker (s) 和 db worker (s),这取决于选择的分区策略。
Frontera可用于与大规模网络抓取相关的一系列广泛任务:
广泛的网页抓取,任意数量的网站和页面(我们在45M文档卷和100K网站上做过测试),
以主机为中心的抓取:当您有超过100个网站时,
聚焦抓取:
- 主题:您搜索关于某个预定义主题的页面,
- PageRank,HITS或其他链接图算法指导。
下面是一些真实世界的问题:
- 抓取网络中的内容检索构建搜索引擎。
- 网络图的各种研究工作:收集链接,统计,图结构,跟踪域名计数等。
- 更普遍的集中抓取任务:比如,您搜索的是大中心的网页,并且频繁更改时间。
运行模式¶
下图展示了运行模式的架构图:

单进程¶
Frontera 与 fetcher 在相同的过程中实例化(例如在 Scrapy 中)。要实现这个,需要设置 BACKEND
为 Backend
的子类。这种模式适合那种少量文档并且时间要求不紧的应用。
分布式爬虫¶
爬虫是分布式的,但后端不是。后端运行在 db worker 中,并通过 message bus 与爬虫通信。
- 将爬虫进程中的
BACKEND
设置为MessageBusBackend
- 在 DB worker 中
BACKEND
应该指向Backend
的子类。 - 每个爬虫进程应该有它自己的
SPIDER_PARTITION_ID
,值为从0到SPIDER_FEED_PARTITIONS
。 - 爬虫和 DB worker 都应该将
MESSAGE_BUS
设置为你选择的消息总线类或者其他你自定义的实现。
此模式适用于需要快速获取文档,同时文档的数量相对较小的应用。
分布式爬虫和后端¶
爬虫和后端都是分布式的。后端分成了两部分: strategy worker 和 db worker。strategy worker 实例被分配给他们自己的 spider log 部分。
- 将爬虫进程中的
BACKEND
设置为MessageBusBackend
- DB workers 和 SW workers 的
BACKEND
应该指向DistributedBackend
的子类。同时还需要配置您选择的后端。 - 每个爬虫进程应该有它自己的
SPIDER_PARTITION_ID
,值为从0到SPIDER_FEED_PARTITIONS
。最后一个必须可以被所有 DB worker 实例访问。 - 每个 SW worker 应该有自己的
SCORING_PARTITION_ID
,值为从0到SPIDER_LOG_PARTITIONS
。最后一个必须可以被所有 SW worker 实例访问。 - 爬虫和所有的 worker 都应该将
MESSAGE_BUS
设置为你选择的消息总线类或者其他你自定义的实现。
在这种模式下,只有 Kafka 消息总线、SqlAlchemy 和 Habse 后端是默认支持的。
此模式适用于广度优先抓取和网页数量巨大的情况。
单进程模式快速入门¶
1. 创建您的爬虫¶
按照通常的方式创建 Scrapy 项目。输入您要存储代码的目录,然后运行:
scrapy startproject tutorial
这会创建一个 tutorial 目录,包含以下内容:
tutorial/
scrapy.cfg
tutorial/
__init__.py
items.py
pipelines.py
settings.py
spiders/
__init__.py
...
这些是最基本的:
- scrapy.cfg: 项目的配置文件
- tutorial/: 项目的 python 模块,后续您将从这里引用您的代码。
- tutorial/items.py: 项目的 items 文件。
- tutorial/pipelines.py: 项目的 pipelines 文件。
- tutorial/settings.py: 项目的 settings 文件。
- tutorial/spiders/: 放爬虫的目录。
还有什么?¶
您已经看到了一个使用 Frontera 集成 Scrapy 的例子,但是这个仅仅是最基本的功能。Frontera 还提供了许多让 frontier 管理更加简单、有效率的强大功能,比如:
- 内置 database storage 支持存储抓取数据。
- 通过 API 可以方便的 与 Scrapy 集成 或者与其他爬虫集成。
- 通过使用 ZeroMq 或 Kafka 和分布式的后端,实现 两种分布式抓取模式 。
- 通过 自定义您的后端 创建不同抓取策略或者逻辑。
- 使用 middlewares 插入您自己的 request/response 修改策略。
- 使用 Graph Manager 创建假的网站地图,并可以不用爬虫而可以重现抓取过程。
- 记录您的 Scrapy 抓取结果 ,后续可以用它测试 frontier。
- 您可以用 hook 的方式使用日志工具,捕捉错误和调试您的 frontiers。
分布式模式快速入门¶
这篇文档教您在本地快速搭建单机、多进程的 Frontera 系统。我们将使用 SQLite 和 ZeroMQ 构建可能最简单的 Frontera 系统。如果要搭建生产环境下的 Frontera 系统,请参考 集群安装指南。
前提¶
Here is what services needs to be installed and configured before running Frontera: 以下是运行 Frontera 之前需要安装和配置的:
- Python 2.7+ 或 3.4+
- Scrapy
得到一个爬虫例子代码¶
首先从 Github 上下载 Frontera:
$ git clone https://github.com/scrapinghub/frontera.git
在 examples/general-spider
中有一个普通的爬虫例子。
这是一个很普通的爬虫,它仅仅从下载的内容中抽取链接。它同样包含了一些配置文件,请参考 settings reference 获取更多的信息。
启动集群¶
首先,让我们启动 ZeroMQ 代理。
$ python -m frontera.contrib.messagebus.zeromq.broker
你应该看到代理打印出 spider 与 DB worker 之间传递信息的统计信息。
后续所有的命令都可以在``general-spider``的根目录下执行。
第二步,启动 DB worker。:
$ python -m frontera.worker.db --config frontier.workersettings
你应该注意点 DB worker 正在输出信息。此时没有向 ZeroMQ 发送信息是正常的,因为现在系统中缺乏种子URL。
在目录下有一批西班牙的URL,让我们把它们当做种子来启动爬虫。启动爬虫:
$ scrapy crawl general -L INFO -s FRONTERA_SETTINGS=frontier.spider_settings -s SEEDS_SOURCE=seeds_es_smp.txt -s SPIDER_PARTITION_ID=0
$ scrapy crawl general -L INFO -s FRONTERA_SETTINGS=frontier.spider_settings -s SPIDER_PARTITION_ID=1
最后应该有两个爬虫进程在运行。每个爬虫应该读取自己的Frontera配置,并且第一个应该使用 SEEDS_SOURCE
选项来读取种子和启动 Frontera 集群。
一段时间以后,种子会被准备好,以供爬虫抓取。此时爬虫已经被启动了。现在你可以周期性的检查 DB worker 的输出或者 metadata
表信息来确认爬虫确实在运行。
集群安装指南¶
这个指南的目标是教你如何初始化爬虫集群,在实践过程中一些步骤可能需要微调。这篇指南假设你使用 Kafka作为消息总线(官方推荐),当然用 Zero MQ 也是可以的,但是可靠性会差点。
需要决定的事情
- DB worker 和 Strategy worker 的数量。
启动之前需要安装的¶
- Kafka,
- HBase (推荐 1.0.x 或更高的版本),
- DNS Service (推荐但并不是必须的).
启动之前需要实现的¶
- Crawling strategy
- 爬虫代码
配置 Kafka¶
为 Kafka 消息总线创建所有需要的 topic¶
- spider log (frontier-done (see
SPIDER_LOG_TOPIC
)), 设置 topic 分区数与 Strategy worker 实例数相同, - spider feed (frontier-todo (see
SPIDER_FEED_TOPIC
)), 设置 topic 分区数与爬虫数相同, - scoring log (frontier-score (see
SCORING_LOG_TOPIC
))
配置 HBase¶
- 创建一个 namespace
crawler
(请参照HBASE_NAMESPACE
), - 确保原生支持 Snappy 压缩。
配置 Frontera¶
每个 Frontera 组件需要自己的配置模块,但是一些配置项是共享的,所以我们推荐创建一个公有的配置模块,并在自有的配置中引入这个公有模块。
创建一个公有模块并添加如下信息:
from __future__ import absolute_import from frontera.settings.default_settings import MIDDLEWARES MAX_NEXT_REQUESTS = 512 SPIDER_FEED_PARTITIONS = 2 # number of spider processes SPIDER_LOG_PARTITIONS = 2 # worker instances MIDDLEWARES.extend([ 'frontera.contrib.middlewares.domain.DomainMiddleware', 'frontera.contrib.middlewares.fingerprint.DomainFingerprintMiddleware' ]) QUEUE_HOSTNAME_PARTITIONING = True KAFKA_LOCATION = 'localhost:9092' # your Kafka broker host:port SCORING_TOPIC = 'frontier-scoring' URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint'
创建 workers 的公有模块:
from __future__ import absolute_import from .common import * BACKEND = 'frontera.contrib.backends.hbase.HBaseBackend' MAX_NEXT_REQUESTS = 2048 NEW_BATCH_DELAY = 3.0 HBASE_THRIFT_HOST = 'localhost' # HBase Thrift server host and port HBASE_THRIFT_PORT = 9090
创建 DB worker 配置模块:
from __future__ import absolute_import from .worker import * LOGGING_CONFIG='logging-db.conf' # if needed
创建 Strategy worker 配置模块:
from __future__ import absolute_import from .worker import * CRAWLING_STRATEGY = '' # path to the crawling strategy class LOGGING_CONFIG='logging-sw.conf' # if needed
logging 配置可参考 https://docs.python.org/2/library/logging.config.html 请看 list of loggers.
设置爬虫配置模块:
from __future__ import absolute_import from .common import * BACKEND = 'frontera.contrib.backends.remote.messagebus.MessageBusBackend' KAFKA_GET_TIMEOUT = 0.5
配置 Scrapy settings 模块. 这个模块在 Scrapy 项目文件夹中,并被 scrapy.cfg 引用 。 添加如下:
FRONTERA_SETTINGS = '' # module path to your Frontera spider config module SCHEDULER = 'frontera.contrib.scrapy.schedulers.frontier.FronteraScheduler' SPIDER_MIDDLEWARES = { 'frontera.contrib.scrapy.middlewares.schedulers.SchedulerSpiderMiddleware': 999, 'frontera.contrib.scrapy.middlewares.seeds.file.FileSeedLoader': 1, } DOWNLOADER_MIDDLEWARES = { 'frontera.contrib.scrapy.middlewares.schedulers.SchedulerDownloaderMiddleware': 999, }
启动集群¶
首先,启动 DB worker:
# start DB worker only for batch generation
$ python -m frontera.worker.db --config [db worker config module] --no-incoming
...
# Then start next one dedicated to spider log processing
$ python -m frontera.worker.db --no-batches --config [db worker config module]
之后,启动strategy workers,每个 spider log topic 的分区需要对应一个 strategy workers 的实例:
$ python -m frontera.worker.strategy --config [strategy worker config] --partition-id 0
$ python -m frontera.worker.strategy --config [strategy worker config] --partition-id 1
...
$ python -m frontera.worker.strategy --config [strategy worker config] --partition-id N
你应该注意到所有的进程会向 log 中写信息。如果没有数据传递相关的 log 信息也是正常的,因为现在系统中还没有种子 URLS。
让我们在文件中每行放一个 URL 作为种子,来启动爬虫。每个爬虫进程对应一个 spider feed topic 的分区:
$ scrapy crawl [spider] -L INFO -s SEEDS_SOURCE = 'seeds.txt' -s SPIDER_PARTITION_ID=0
...
$ scrapy crawl [spider] -L INFO -s SPIDER_PARTITION_ID=1
$ scrapy crawl [spider] -L INFO -s SPIDER_PARTITION_ID=2
...
$ scrapy crawl [spider] -L INFO -s SPIDER_PARTITION_ID=N
最后你应该启动 N 个爬虫进程。通常一个爬虫实例从 SEEDS_SOURCE
中读取种子发送给 Frontera 集群就足够了。只有爬虫的任务队列为空时才会读取种子。也可以从配置文件中读取 SPIDER_PARTITION_ID
。
一段时间以后,种子会被准备好,以供爬虫抓取。爬虫真正启动了。
- Frontera 概览
- 明白什么是 Frontera ?它能为你做什么?
- 运行模式
- Frontera的高层体系结构和运行模式。
- 单进程模式快速入门
- 使用 Scrapy 作为容器来运行 Frontera。
- 分布式模式快速入门
- 引入 SQLite 和 ZeroMQ。
- 集群安装指南
- Setting up clustered version of Frontera on multiple machines with HBase and Kafka. 使用 HBase 和 Kafka 在多台机器上部署 Frontera 集群。
使用 Frontera¶
安装指南¶
下面的步骤假定您已经安装了以下必需的软件:
- Python 2.7+ 或 3.4+
- pip 和 setuptools Python 包。
你可以使用 pip 安装 Frontera.
使用 pip 安装:
pip install frontera[option1,option2,...optionN]
选项¶
Each option installs dependencies needed for particular functionality. 每个选项安装所需的特定功能的依赖。
- sql - 关系型数据库,
- graphs - Graph Manager,
- logging - 彩色日志,
- tldextract - 可以使用
TLDEXTRACT_DOMAIN_INFO
- hbase - HBase 分布式后端,
- zeromq - ZeroMQ 消息总线,
- kafka - Kafka 消息总线,
- distributed - workers 依赖.
Frontier 对象¶
Frontier 使用两种对象类型: Request
and Response
. 他们各自代表 HTTP 请求和 HTTP 返回.
这两个类会被大多数的 Frontera API 方法调用,根据方法不同可能作为参数也可能作为返回值。
Frontera 同样也会使用这两种对象在内部组件之间传递数据(比如 middlewares 和 backend)。
Request 对象¶
-
class
frontera.core.models.
Request
(url, method='GET', headers=None, cookies=None, meta=None, body='')¶ A
Request
object represents an HTTP request, which is generated for seeds, extracted page links and next pages to crawl. Each one should be associated to aResponse
object when crawled.参数: - url (string) – URL to send.
- method (string) – HTTP method to use.
- headers (dict) – dictionary of headers to send.
- cookies (dict) – dictionary of cookies to attach to this request.
- meta (dict) – dictionary that contains arbitrary metadata for this request, the keys must be bytes and the values must be either bytes or serializable objects such as lists, tuples, dictionaries with byte type items.
-
body
¶ A string representing the request body.
Dictionary of cookies to attach to this request.
-
headers
¶ A dictionary which contains the request headers.
-
meta
¶ A dict that contains arbitrary metadata for this request. This dict is empty for new Requests, and is usually populated by different Frontera components (middlewares, etc). So the data contained in this dict depends on the components you have enabled. The keys are bytes and the values are either bytes or serializable objects such as lists, tuples, dictionaries with byte type items.
-
method
¶ A string representing the HTTP method in the request. This is guaranteed to be uppercase. Example:
GET
,POST
,PUT
, etc
-
url
¶ A string containing the URL of this request.
Response 对象¶
-
class
frontera.core.models.
Response
(url, status_code=200, headers=None, body='', request=None)¶ A
Response
object represents an HTTP response, which is usually downloaded (by the crawler) and sent back to the frontier for processing.参数: - url (string) – URL of this response.
- status_code (int) – the HTTP status of the response. Defaults to 200.
- headers (dict) – dictionary of headers to send.
- body (str) – the response body.
- request (Request) – The Request object that generated this response.
-
body
¶ A str containing the body of this Response.
-
headers
¶ A dictionary object which contains the response headers.
-
meta
¶ A shortcut to the
Request.meta
attribute of theResponse.request
object (ie. self.request.meta).
-
status_code
¶ An integer representing the HTTP status of the response. Example:
200
,404
,500
.
-
url
¶ A string containing the URL of the response.
domain
和 fingerprint
字段被 内置 middlewares 添加。
对象唯一识别标志¶
因为 Frontera 对象会在爬虫和服务器之间传递,所以需要一些机制来唯一标示一个对象。这个识别机制会基于 Frontera 逻辑不同而有所变化(大多数情况是根据后端的逻辑)。
默认 Frontera 会激活 fingerprint middleware ,根据 Request.url
和 Response.url
分别生成一个唯一标示,并分别赋值给 Request.meta
and
Response.meta
。你可以使用这个中间件或者自己定义。
一个为 Request
生成指纹的例子:
>>> request.url
'http://thehackernews.com'
>>> request.meta['fingerprint']
'198d99a8b2284701d6c147174cd69a37a7dea90f'
为对象添加其他值¶
大多数情况下 Frontera 存储了系统运行所需要的参数。
同样的,其他信息也可以存入 Request.meta
和 Response.meta
例如,激活 domain middleware 会为每个 Request.meta
和
Response.meta
添加 domain
字段:
>>> request.url
'http://www.scrapinghub.com'
>>> request.meta['domain']
{
"name": "scrapinghub.com",
"netloc": "www.scrapinghub.com",
"scheme": "http",
"sld": "scrapinghub",
"subdomain": "www",
"tld": "com"
}
Middlewares(中间件)¶
Frontier Middleware
位于
FrontierManager
和
Backend
objects 之间, 根据 frontier data flow 的流程,处理 Request
和 Response
。
Middlewares 是一个轻量级、低层次的系统,可以用来过滤和更改 Frontier 的 requests 和 responses。
激活一个 middleware¶
要激活 Middleware
component, 需要添加它到
MIDDLEWARES
setting(这是一个列表,包含类的路径或者一个 Middleware
对象)。
这是一个例子:
MIDDLEWARES = [
'frontera.contrib.middlewares.domain.DomainMiddleware',
]
Middlewares按照它们在列表中定义的相同顺序进行调用,根据你自己的需要安排顺序。 该顺序很重要,因为每个中间件执行不同的操作,并且您的中间件可能依赖于一些先前(或后续的)执行的中间件。
最后,记住一些 middlewares 需要通过特殊的 setting。详细请参考 each middleware documentation 。
写你自己的 middleware¶
写自己的 Frontera middleware 是很简单的。每个 Middleware
是一个继承 Component
的 Python 类。
FrontierManager
会通过下面的方法和所有激活的 middlewares 通信。
-
class
frontera.core.components.
Middleware
¶ Methods
-
Middleware.
frontier_start
()¶ Called when the frontier starts, see starting/stopping the frontier.
-
Middleware.
frontier_stop
()¶ Called when the frontier stops, see starting/stopping the frontier.
-
Middleware.
add_seeds
(seeds)¶ This method is called when new seeds are added to the frontier.
参数: seeds (list) – A list of Request
objects.返回: Request
object list orNone
应该返回
None
或者Request
的列表。如果返回
None
,FrontierManager
将不会处理任何中间件,并且种子也不会到达Backend
。如果返回
Request
列表,该列表将会传给下个中间件。这个过程会在每个激活的中间件重复,直到它到达Backend
。如果要过滤任何种子,请不要将其包含在返回的对象列表中。
-
Middleware.
page_crawled
(response)¶ This method is called every time a page has been crawled.
参数: response (object) – The Response
object for the crawled page.返回: Response
orNone
应该返回
None
或者一个Response
对象。如果返回
None
,FrontierManager
将不会处理任何中间件,并且Backend
不会被通知。如果返回
Response
,它将会被传给下个中间件。这个过程会在每个激活的中间件重复,直到它到达Backend
。如果要过滤页面,只需返回 None。
-
Middleware.
request_error
(page, error)¶ This method is called each time an error occurs when crawling a page.
参数: - request (object) – The crawled with error
Request
object. - error (string) – A string identifier for the error.
返回: Request
orNone
应该返回
None
或者一个Request
对象。如果返回
None
,FrontierManager
将不会和其他任何中间件通信,并且Backend
不会被通知。如果返回一个
Response
对象,它将会被传给下个中间件。这个过程会在每个激活的中间件重复,直到它到达Backend
。如果要过滤页面错误,只需返回 None。
- request (object) – The crawled with error
Class Methods
-
Middleware.
from_manager
(manager)¶ Class method called from
FrontierManager
passing the manager itself.Example of usage:
def from_manager(cls, manager): return cls(settings=manager.settings)
-
内置 middleware 参考¶
这篇文章描述了 Frontera 所有的 Middleware
组件。如何使用和写自己的 middleware,请参考 middleware usage guide.。
有关默认启用的组件列表(及其顺序),请参阅 MIDDLEWARES 设置。
DomainMiddleware¶
-
class
frontera.contrib.middlewares.domain.
DomainMiddleware
¶ This
Middleware
will add adomain
info field for everyRequest.meta
andResponse.meta
if is activated.domain
object will contain the following fields, with both keys and values as bytes:- netloc: URL netloc according to RFC 1808 syntax specifications
- name: Domain name
- scheme: URL scheme
- tld: Top level domain
- sld: Second level domain
- subdomain: URL subdomain(s)
An example for a
Request
object:>>> request.url 'http://www.scrapinghub.com:8080/this/is/an/url' >>> request.meta['domain'] { "name": "scrapinghub.com", "netloc": "www.scrapinghub.com", "scheme": "http", "sld": "scrapinghub", "subdomain": "www", "tld": "com" }
If
TEST_MODE
is active, It will accept testing URLs, parsing letter domains:>>> request.url 'A1' >>> request.meta['domain'] { "name": "A", "netloc": "A", "scheme": "-", "sld": "-", "subdomain": "-", "tld": "-" }
UrlFingerprintMiddleware¶
-
class
frontera.contrib.middlewares.fingerprint.
UrlFingerprintMiddleware
¶ This
Middleware
will add afingerprint
field for everyRequest.meta
andResponse.meta
if is activated.Fingerprint will be calculated from object
URL
, using the function defined inURL_FINGERPRINT_FUNCTION
setting. You can write your own fingerprint calculation function and use by changing this setting. The fingerprint must be bytes.An example for a
Request
object:>>> request.url 'http//www.scrapinghub.com:8080' >>> request.meta['fingerprint'] '60d846bc2969e9706829d5f1690f11dafb70ed18'
-
frontera.utils.fingerprint.
hostname_local_fingerprint
(key)¶ This function is used for URL fingerprinting, which serves to uniquely identify the document in storage.
hostname_local_fingerprint
is constructing fingerprint getting first 4 bytes as Crc32 from host, and rest is MD5 from rest of the URL. Default option is set to make use of HBase block cache. It is expected to fit all the documents of average website within one cache block, which can be efficiently read from disk once.参数: key – str URL 返回: str 20 bytes hex string
DomainFingerprintMiddleware¶
-
class
frontera.contrib.middlewares.fingerprint.
DomainFingerprintMiddleware
¶ This
Middleware
will add afingerprint
field for everyRequest.meta
andResponse.meta
domain
fields if is activated.Fingerprint will be calculated from object
URL
, using the function defined inDOMAIN_FINGERPRINT_FUNCTION
setting. You can write your own fingerprint calculation function and use by changing this setting. The fingerprint must be bytesAn example for a
Request
object:>>> request.url 'http//www.scrapinghub.com:8080' >>> request.meta['domain'] { "fingerprint": "5bab61eb53176449e25c2c82f172b82cb13ffb9d", "name": "scrapinghub.com", "netloc": "www.scrapinghub.com", "scheme": "http", "sld": "scrapinghub", "subdomain": "www", "tld": "com" }
规范 URL 解算器 是一种特殊的 middleware 对象,用来识别网页的规范 URL,并根据这个修改 request 或者 response 的元数据。通常规范 URL 解算器是在调用后端方法之前最后一个执行的 middleware。
此组件的主要目的是防止元数据记录重复和混淆与其相关联的抓取器行为。原因是:
- 不同的重定向链将指向相同的文档。
- 同一份文件可以通过多个不同的URL访问。
精心设计的系统具有自己的稳定算法,为每个文档选择正确的 URL。另见 Canonical link element 。
规范 URL 解算器在Frontera Manager初始化期间使用 CANONICAL_SOLVER
设置中的类来实例化。
内置规范 URL 解算器参考¶
基本的¶
用作默认值。
-
class
frontera.contrib.canonicalsolvers.basic.
BasicCanonicalSolver
¶ Implements a simple CanonicalSolver taking always first URL from redirect chain, if there were redirects. It allows easily to avoid leaking of requests in Frontera (e.g. when request issued by
get_next_requests()
never matched inpage_crawled()
) at the price of duplicating records in Frontera for pages having more than one URL or complex redirects chains.
后端¶
Frontier Backend
是抓取逻辑/策略所在的地方,本质上是你的爬虫的大脑。 Queue
, Metadata
和 States
是为了放置低级代码的类,相反,后端类运行更高级的代码。Frontera 内置了内存或数据库方式实现的 Queue, Metadata 和 States,它们可以在你自定义的后端类中使用或者实例化 FrontierManager
和后端独立使用。
后端方法在 Middleware
之后被 FrontierManager 调用,根据 frontier data flow 使用 hooks 处理 Request
和 Response
。
与中间件可以激活许多不同的实例不同,每个 Frontera 只能使用一种后端。
激活一个后端¶
要激活 Frontera 后端组件,请通过 BACKEND
设置进行设置。
这是一个例子
BACKEND = 'frontera.contrib.backends.memory.FIFO'
请记住,某些后端可能需要额外配置其他 settings。 更多信息,请参阅 backends documentation 。
写你自己的后端¶
每个后端组件是一个 Python 类,它继承自 Backend
或
DistributedBackend
,且使用 Queue
, Metadata
和 States
中的一个或多个。
FrontierManager
将通过下列方法与激活的后端通信。
-
class
frontera.core.components.
Backend
¶ Methods
-
Backend.
frontier_start
()¶ Called when the frontier starts, see starting/stopping the frontier.
返回: None.
-
Backend.
frontier_stop
()¶ Called when the frontier stops, see starting/stopping the frontier.
返回: None.
-
Backend.
finished
()¶ Quick check if crawling is finished. Called pretty often, please make sure calls are lightweight.
返回: boolean
-
Backend.
add_seeds
(seeds)¶ This method is called when new seeds are added to the frontier.
参数: seeds (list) – A list of Request
objects.返回: None.
-
Backend.
page_crawled
(response)¶ This method is called every time a page has been crawled.
参数: response (object) – The Response
object for the crawled page.返回: None.
-
Backend.
request_error
(page, error)¶ This method is called each time an error occurs when crawling a page.
参数: - request (object) – The crawled with error
Request
object. - error (string) – A string identifier for the error.
返回: None.
- request (object) – The crawled with error
-
Backend.
get_next_requests
(max_n_requests, **kwargs)¶ Returns a list of next requests to be crawled.
参数: - max_next_requests (int) – Maximum number of requests to be returned by this method.
- kwargs (dict) – A parameters from downloader component.
返回: list of
Request
objects.
Class Methods
-
Backend.
from_manager
(manager)¶ Class method called from
FrontierManager
passing the manager itself.Example of usage:
def from_manager(cls, manager): return cls(settings=manager.settings)
Properties
-
queue
¶
-
states
¶
-
metadata
¶
-
-
class
frontera.core.components.
DistributedBackend
¶
继承 Backend 的所有方法,并且还有两个类方法,它们在 strategy worker 和 db worker 实例化期间被调用。
Backend 应通过这些类与低级存储进行通信:
Metadata¶
-
class
frontera.core.components.
Metadata
¶ Methods
-
Metadata.
add_seeds
(seeds)¶ This method is called when new seeds are added to the frontier.
参数: seeds (list) – A list of Request
objects.
-
已知的实现是: MemoryMetadata
和 sqlalchemy.components.Metadata
。
Queue¶
-
class
frontera.core.components.
Queue
¶ Methods
-
Queue.
get_next_requests
(max_n_requests, partition_id, **kwargs)¶ Returns a list of next requests to be crawled, and excludes them from internal storage.
参数: - max_next_requests (int) – Maximum number of requests to be returned by this method.
- kwargs (dict) – A parameters from downloader component.
返回: list of
Request
objects.
-
Queue.
schedule
(batch)¶ Schedules a new documents for download from batch, and updates score in metadata.
参数: batch – list of tuples(fingerprint, score, request, schedule), if schedule
is True, then document needs to be scheduled for download, False - only update score in metadata.
-
Queue.
count
()¶ Returns count of documents in the queue.
返回: int
-
已知的实现是: MemoryQueue
和 sqlalchemy.components.Queue
。
States¶
-
class
frontera.core.components.
States
¶ Methods
-
States.
update_cache
(objs)¶ Reads states from meta[‘state’] field of request in objs and stores states in internal cache.
参数: objs – list or tuple of Request
objects.
-
States.
set_states
(objs)¶ Sets meta[‘state’] field from cache for every request in objs.
参数: objs – list or tuple of Request
objects.
-
States.
flush
(force_clear)¶ Flushes internal cache to storage.
参数: force_clear – boolean, True - signals to clear cache after flush
-
States.
fetch
(fingerprints)¶ Get states from the persistent storage to internal cache.
参数: fingerprints – list document fingerprints, which state to read
-
已知的实现是: MemoryStates
和 sqlalchemy.components.States
。
内置后端引用¶
本文介绍了与 Frontera 捆绑在一起的所有后端组件。
要知道默认激活的 Backend
请看 BACKEND
设置。
基本算法¶
一些内置的 Backend
对象实现基本算法,如 FIFO/LIFO or DFS/BFS,用于页面访问排序。
它们之间的差异将在使用的存储引擎上。例如,memory.FIFO
和
sqlalchemy.FIFO
将使用相同的逻辑,但使用不同的存储引擎。
所有这些后端变体都使用相同的 CommonBackend
类实现具有优先级队列的一次访问爬网策略。
-
class
frontera.contrib.backends.
CommonBackend
¶
内存后端¶
这组 Backend
对象将使用 heapq 模块作为队列和本机字典作为 basic algorithms 的存储。
SQLAlchemy 后端¶
这组 Backend
对象将使用 SQLAlchemy 作为 basic algorithms 的存储。
默认情况下,它使用内存模式的 SQLite 数据库作为存储引擎,但可以使用 any databases supported by SQLAlchemy 。
如果你想使用你自己的 declarative sqlalchemy models ,你可以使用 SQLALCHEMYBACKEND_MODELS
设置。
这个 setting 使用一个字典,其中 key
代表要定义的模型的名称,value
代表了这个模型。
有关用于 SQLAlchemy 后端的所有 settings,请查看 settings 。
-
class
frontera.contrib.backends.sqlalchemy.
FIFO
¶
-
class
frontera.contrib.backends.sqlalchemy.
LIFO
¶
定时重爬后端¶
基于自定义 SQLAlchemy 后端和队列。从种子开始抓取。种子被抓取后,每一个新的
文件将被安排立即抓取。每个文档被抓取之后,将会在由 SQLALCHEMYBACKEND_REVISIT_INTERVAL
设置的时间间隔后再次抓取。
定时重爬后端当前没有实现优先级。 在长时间运行时,爬虫可能会闲置,因为 没有可用的抓取任务,但有任务等待他们的预定的访问时间。
HBase 后端¶
-
class
frontera.contrib.backends.hbase.
HBaseBackend
¶
更适合大规模抓取。设置请参考 HBase 后端 。请考虑调整块缓存以适应平均网页块的大小。要实现这一点,建议使用 hostname_local_fingerprint
,可以让相同域名的网页放在一起。这个函数可以通过 URL_FINGERPRINT_FUNCTION
设置。
消息总线¶
消息总线是传输层抽象机制。Frontera 提供了接口和几个实现。同一时间只能使用一种类型的消息总线,并通过 MESSAGE_BUS
设置。
爬虫进程可以使用
-
class
frontera.contrib.backends.remote.messagebus.
MessageBusBackend
(manager)¶
和消息总线进行通信。
内置消息总线参考¶
ZeroMQ¶
这是默认选项,使用轻量级的 ZeroMQ 库实现
可以使用 ZeroMQ 消息总线设置 配置。
ZeroMQ 需要按照 ZeroMQ 库,并且启动broker进程,请参考 启动集群 。
总的来说,使用 ZeroMQ 消息总线是为了用最少的部署实现 PoC (Patch Output Converter 成批输出转换程序)。因为它很容易 在组件的数据流未正确调整或启动过程中发生消息丢失,所以请参照下面的顺序启动组件:
- db worker
- strategy worker
- spiders
不幸的是,停止执行未完成抓取的爬虫时,无法避免消息丢失。如果你的爬虫程序对少量的信息丢失敏感的话,我建议你使用 Kafka。
警告!ZeroMQ消息总线不支持多个 SW worker 和 DB worker, 每种 woker 只能有一个实例。
协议¶
根据数据流,Frontera 使用几种消息类型来编码它的消息。每种消息是用 msgpack 或 JSON 序列化的 python 对象。可以使用 MESSAGE_BUS_CODEC
选择编解码器模块,并且需要导出编码器和解码器类。
以下是子类实现自己的编解码器所需的类:
-
class
frontera.core.codec.
BaseEncoder
¶ -
encode_add_seeds
(seeds)¶ Encodes add_seeds message
参数: seeds (list) – A list of frontier Request objects 返回: bytes encoded message
-
encode_page_crawled
(response)¶ Encodes a page_crawled message
参数: response (object) – A frontier Response object 返回: bytes encoded message
-
encode_request_error
(request, error)¶ Encodes a request_error message
参数: - request (object) – A frontier Request object
- error (string) – Error description
返回: bytes encoded message
-
encode_request
(request)¶ Encodes requests for spider feed stream.
参数: request (object) – Frontera Request object 返回: bytes encoded message
-
encode_update_score
(request, score, schedule)¶ Encodes update_score messages for scoring log stream.
参数: - request (object) – Frontera Request object
- score (float) – score
- schedule (bool) – True if document needs to be scheduled for download
返回: bytes encoded message
-
encode_new_job_id
(job_id)¶ Encodes changing of job_id parameter.
参数: job_id (int) – 返回: bytes encoded message
-
encode_offset
(partition_id, offset)¶ Encodes current spider offset in spider feed.
参数: - partition_id (int) –
- offset (int) –
返回: bytes encoded message
-
抓取策略¶
使用 cluster
例子和 frontera.worker.strategies.bfs
模型进行参考。
Use cluster
example and frontera.worker.strategies.bfs
module for reference. 一般来说,你需要写一个
抓取策略子类,参照:
-
class
frontera.worker.strategies.
BaseCrawlingStrategy
(manager, mb_stream, states_context)¶ Interface definition for a crawling strategy.
Before calling these methods strategy worker is adding ‘state’ key to meta field in every
Request
with state of the URL. Pleases refer for the states to HBaseBackend implementation.After exiting from all of these methods states from meta field are passed back and stored in the backend.
Methods
-
classmethod
from_worker
(manager, mb_stream, states_context)¶ Called on instantiation in strategy worker.
参数: - manager –
class: Backend <frontera.core.manager.FrontierManager> instance - mb_stream –
class: UpdateScoreStream <frontera.worker.strategy.UpdateScoreStream> instance
返回: new instance
- manager –
-
add_seeds
(seeds)¶ Called when add_seeds event is received from spider log.
参数: seeds (list) – A list of Request
objects.
-
page_crawled
(response)¶ Called every time document was successfully crawled, and receiving page_crawled event from spider log.
参数: response (object) – The Response
object for the crawled page.
-
page_error
(request, error)¶ Called every time there was error during page downloading.
参数: - request (object) – The fetched with error
Request
object. - error (str) – A string identifier for the error.
- request (object) – The fetched with error
-
finished
()¶ Called by Strategy worker, after finishing processing each cycle of spider log. If this method returns true, then Strategy worker reports that crawling goal is achieved, stops and exits.
返回: bool
-
close
()¶ Called when strategy worker is about to close crawling strategy.
-
classmethod
该类可以放在任何模块中,并在启动时使用命令行选项或 CRAWLING_STRATEGY
设置传递给 strategy worker。
这个策略类在 strategy worker 中实例化,可以使用自己的存储或任何其他类型的资源。所有来着 spider log 的都会传给这些方法。返回的分数不一定与方法参数相同。finished()
方法会被周期性的调用来检测抓取目标是否达到了。
使用 Frontier 和 Scrapy¶
Scrapy 中使用 Frontera 非常简单,它包含一组 Scrapy middlewares 和 Scrapy 调度程序,封装了 Frontera 的功能 ,可以使用 Scrapy settings 轻松配置。
激活 frontier¶
Frontera 使用两种不同的中间件:SchedulerSpiderMiddleware
and SchedulerDownloaderMiddleware
和自己的调度程序 FronteraScheduler
。
要在你的 Scrapy 项目中激活 Frontera,只要把它们加入到 SPIDER_MIDDLEWARES, DOWNLOADER_MIDDLEWARES 和 SCHEDULER settings:
SPIDER_MIDDLEWARES.update({
'frontera.contrib.scrapy.middlewares.schedulers.SchedulerSpiderMiddleware': 1000,
})
DOWNLOADER_MIDDLEWARES.update({
'frontera.contrib.scrapy.middlewares.schedulers.SchedulerDownloaderMiddleware': 1000,
})
SCHEDULER = 'frontera.contrib.scrapy.schedulers.frontier.FronteraScheduler'
创建一个 Frontera settings.py
并把它加入到你的 Scrapy 设置中:
FRONTERA_SETTINGS = 'tutorial.frontera.settings'
另一种选择是将这些设置放在 Scrapy 设置模块中。
组织文件¶
当使用 Scrapy 和 frontier 时,我们有以下目录结构:
my_scrapy_project/
my_scrapy_project/
frontera/
__init__.py
settings.py
middlewares.py
backends.py
spiders/
...
__init__.py
settings.py
scrapy.cfg
这些都是基本的:
my_scrapy_project/frontera/settings.py
: Frontera settings 文件。my_scrapy_project/frontera/middlewares.py
: Frontera 的中间件。my_scrapy_project/frontera/backends.py
: Frontera 使用的后端。my_scrapy_project/spiders
: Scrapy spiders 文件夹my_scrapy_project/settings.py
: Scrapy settings 文件scrapy.cfg
: Scrapy 配置文件
Frontier Scrapy settings¶
你可以使用两种方式配置 frontier :
- 使用
FRONTERA_SETTINGS
,可以在 Scrapy 配置文件中指明 Frontera 配置文件的路径,默认是None
- 将 frontier 设置定义到 Scrapy 设置文件中。
通过 Scrapy 配置 frontier¶
Frontier settings 也可以通过 Scrapy 配置。在这种情况下,配置优先级顺序如下:
FRONTERA_SETTINGS
指向的文件中的配置(更高的优先级)- Scrapy 配置文件中的配置
- 默认 frontier 配置
写 Scrapy 爬虫¶
爬虫逻辑¶
创建基本的 Scrapy 爬虫在 Quick start single process 中做了描述。
这也是一个防止爬虫因为队列中请求不足而关闭的好方法:
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = cls(*args, **kwargs)
spider._set_crawler(crawler)
spider.crawler.signals.connect(spider.spider_idle, signal=signals.spider_idle)
return spider
def spider_idle(self):
self.log("Spider idle signal caught.")
raise DontCloseSpider
配置准则¶
您可以进行多种调整,以实现高效的广泛抓取。
添加一个种子加载器,用于启动爬虫进程:
SPIDER_MIDDLEWARES.update({
'frontera.contrib.scrapy.middlewares.seeds.file.FileSeedLoader': 1,
})
适合广泛抓取的各种设置:
HTTPCACHE_ENABLED = False # 关闭磁盘缓存,它在大量抓取中具有较低的命中率
REDIRECT_ENABLED = True
COOKIES_ENABLED = False
DOWNLOAD_TIMEOUT = 120
RETRY_ENABLED = False # 重试可以由 Frontera 本身处理,具体取决于爬网策略
DOWNLOAD_MAXSIZE = 10 * 1024 * 1024 # 最大文档大小,如果未设置,会导致OOM
LOGSTATS_INTERVAL = 10 # 每10秒钟向控制台打印统计
自动限流和并发设置,以方便有礼貌和负责任的抓取:
# auto throttling
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_DEBUG = False
AUTOTHROTTLE_MAX_DELAY = 3.0
AUTOTHROTTLE_START_DELAY = 0.25 # 任何足够小的值,它将在平均运行期间通过瓶颈响应延迟进行调整。
RANDOMIZE_DOWNLOAD_DELAY = False
# concurrency
CONCURRENT_REQUESTS = 256 # 取决于许多因素,应通过实验确定
CONCURRENT_REQUESTS_PER_DOMAIN = 10
DOWNLOAD_DELAY = 0.0
具体参照 Scrapy broad crawling.
Scrapy 种子加载器¶
Frontera 有一些内置的 Scrapy 中间件用于种子装载。
种子装载使用 process_start_requests
方法从源中生成请求,这些请求后续会被加入 FrontierManager
。
激活一个种子加载器¶
只需将种子加载器中间件加入 SPIDER_MIDDLEWARES
中:
SPIDER_MIDDLEWARES.update({
'frontera.contrib.scrapy.middlewares.seeds.FileSeedLoader': 650
})
FileSeedLoader¶
从文件中导入种子。该文件必须是每行一个 URL 的格式:
http://www.asite.com
http://www.anothersite.com
...
你可以使用 #
注释掉某一行:
...
#http://www.acommentedsite.com
...
Settings:
SEEDS_SOURCE
: 种子文件路径
S3SeedLoader¶
从存储在 Amazon S3 中的文件导入种子 Load seeds from a file stored in an Amazon S3 bucket
文件格式应该和 FileSeedLoader 中的一样。
Settings:
SEEDS_SOURCE
: S3 bucket 文件路径。 例如:s3://some-project/seed-urls/
SEEDS_AWS_ACCESS_KEY
: S3 credentials Access KeySEEDS_AWS_SECRET_ACCESS_KEY
: S3 credentials Secret Access Key
Settings¶
Frontera settings 允许你定制所有的组件,包括
FrontierManager
,
Middleware
and
Backend
themselves.
settings 提供了键值映射的全局命名空间可以用来获取配置值。可以通过不同的机制填充设置,如下所述。
内置的配置请查看: Built-in settings reference。
设计 settings¶
当你使用 Frontera 的时候,你需要告诉它你在用什么配置。因为 FrontierManager
是 Frontier 使用的入口,你可以使用 Loading from settings 来进行配置。
当使用一个字符串路径指向配置文件的时候,我们建议下面这种文件结构:
my_project/
frontier/
__init__.py
settings.py
middlewares.py
backends.py
...
这些都是基本的:
frontier/settings.py
: frontier settings 文件。frontier/middlewares.py
: frontier 使用的中间件。frontier/backends.py
: frontier 使用的后端。
如何访问 settings¶
Settings
可以通过
FrontierManager.settings
属性访问, 它传给了
Middleware.from_manager
和
Backend.from_manager
类方法:
class MyMiddleware(Component):
@classmethod
def from_manager(cls, manager):
manager = crawler.settings
if settings.TEST_MODE:
print "test mode is enabled!"
换句话说, settings 可以通过 Settings
访问.
内置 frontier settings¶
下面是所以可用的 Frontera settings,以字母顺序排序,以及它们的默认值和适用范围。
CANONICAL_SOLVER¶
默认: frontera.contrib.canonicalsolvers.Basic
CanonicalSolver
被用来解析规划网址。详情参考 Canonical URL Solver.
SPIDER_LOG_CONSUMER_BATCH_SIZE¶
默认: 512
这是 strategy worker 和 db worker 消费 spider log 流时的批量大小。增加它将使 worker 在每个任务上花更多的时间,但是每个任务处理更多的 item,因此在一段时间内给其它任务留下了更少的时间。减少它将导致在同一时间段内运行多个任务,但总体效率较低。 当你的消费者太快或者太慢的时候使用这个选项。
SCORING_LOG_CONSUMER_BATCH_SIZE¶
默认: 512
这是 db worker 消费 scoring log 流时的批量大小。当你需要调整 scoring log 消费速度的时候使用这个选项。
DELAY_ON_EMPTY¶
默认: 5.0
当队列大小小于 CONCURRENT_REQUESTS
时,向调度器发送请求时的延迟时间。当后端没有请求的时候,这个延迟帮助消耗掉剩余的缓存而无需每次请求都去请求后端。如果对后端的调用花费的时间过长,则增加它,如果你需要更快从种子启动爬虫,则减少它。
LOGGING_CONFIG¶
默认: logging.conf
logging 模块的路径。参考 https://docs.python.org/2/library/logging.config.html#logging-config-fileformat 如果文件不存在,日志将使用 logging.basicConfig()
实例化,将在 CONSOLE 输出日志。这个选项只在 db worker 和 strategy worker 中使用。
The path to a file with logging module configuration.
MAX_NEXT_REQUESTS¶
默认: 64
get_next_requests
API 返回的最大请求数量。在分布式模式中,它应该是 db worker 为每个爬虫生成的请求数或者是每次从消息总线中获取的请求数。在单进程模式下,它应该是每次调用 get_next_requests
获取的请求数量。
MESSAGE_BUS_CODEC¶
默认: frontera.contrib.backends.remote.codecs.msgpack
指向 message bus 编码实现。参考 codec interface description。 默认是 MsgPack。
MIDDLEWARES¶
包含在 frontier 中启用的中间件的列表。详情参考 Activating a middleware
默认:
[
'frontera.contrib.middlewares.fingerprint.UrlFingerprintMiddleware',
]
OVERUSED_SLOT_FACTOR¶
默认: 5.0
(某个 slot 中进行中+队列中的请求)/ (每个slot允许的最大并发送)称作 overused。它只影响 Scrapy scheduler。
SPIDER_LOG_PARTITIONS¶
默认: 1
spider log 分区的数量。这个参数影响 strategy worker (s) 的数量,每个 strategy worker 被分配到自己的分区。
STORE_CONTENT¶
默认: False
Determines if content should be sent over the message bus and stored in the backend: a serious performance killer.
内置指纹中间件设置¶
设置被 UrlFingerprintMiddleware 和 DomainFingerprintMiddleware 使用。
TLDEXTRACT_DOMAIN_INFO¶
默认: False
如果设置为 True
,将使用 tldextract 附加额外的域信息(二级,顶级和子域名)到 meta 字段(参考 为对象添加其他值 )。
内置后端配置¶
SQLAlchemy¶
SQLALCHEMYBACKEND_CACHE_SIZE¶
默认: 10000
SQLAlchemy 元数据的 LRU 缓存大小。它用来缓存对象,这些对象从数据库获得,还缓存抓取的文档。它主要节约了数据库的吞吐量,如果你面临从数据库获得太多数据的问题增加它,如果你想节约内存就减少它。
SQLALCHEMYBACKEND_DROP_ALL_TABLES¶
默认: True
如果你想禁止每次后端初始化的时候删除数据库表则将之设为 False
(例如每次启动 Scrapy 爬虫的时候)。
SQLALCHEMYBACKEND_MODELS¶
默认:
{
'MetadataModel': 'frontera.contrib.backends.sqlalchemy.models.MetadataModel',
'StateModel': 'frontera.contrib.backends.sqlalchemy.models.StateModel',
'QueueModel': 'frontera.contrib.backends.sqlalchemy.models.QueueModel'
}
用来设置后端使用的 SQLAlchemy 模型。主要用来定制化。
重新抓取后端¶
ZeroMQ 消息总线设置¶
消息总线类是 distributed_frontera.messagebus.zeromq.MessageBus
ZMQ_ADDRESS¶
默认: 127.0.0.1
定义 ZeroMQ 套接字应该绑定或连接的位置。可以是主机名或IP地址。现在,ZMQ 只有通过 IPv4进行了测试。IPv6在不久的将来会增加支持。
Kafka 消息总线配置¶
这个消息总线的类是 frontera.contrib.messagebus.kafkabus.MessageBus
KAFKA_LOCATION¶
kafka 代理的主机名和端口号,以 :分割。可以是主机名:端口的字符串对,用逗号(,)分隔。
SCORING_LOG_TOPIC¶
scoring log 数据流的 topic 名字。
Default settings¶
如果没有指定设置,frontier 将使用内置的默认设置。有关默认值的完整列表,请参见 Built-in settings reference 。所有默认设置都可以被覆盖。
- 安装指南
- 安装方法和依赖的选项。
- Frontier 对象
- 理解用来代表网络请求和网络响应的类。
- Middlewares(中间件)
- 过滤或者更改链接和网页的信息。
- 内置规范 URL 解算器参考
- 确认和使用网页的规范url。
- 后端
- 自定义抓取规则和存储方式。
- 消息总线
- 内置消息总线参考。
- 抓取策略
- 为分布式后端实现自己的抓取策略。
- 使用 Frontier 和 Scrapy
- 学习如何使用 Frontera + Scrapy 。
- Settings
- 设置参考。
高级用法¶
什么是 Crawl Frontier?¶
Frontera 一个实现 crawl frontier 的框架。crawler frontier 是爬虫系统的一部分,它决定了爬虫抓取网页时候的逻辑和策略(哪些页面应该被抓取,优先级和排序,页面被重新抓取的频率等)。
通常的 crawl frontier 方案是:

frontier 以 URL 列表初始化,我们称之为种子。 一旦边界初始化,爬虫程序会询问下一步应该访问哪些页面。 当爬虫开始访问页面并获取结果时,它将通知 frontier 每个页面响应以及页面中包含的超链接。 这些链接被 frontier 当做新的请求加入,安装抓取策略进行抓取。
这个过程(请求新的任务/通知结果)会一直重复直到达到爬虫的结束条件。一些爬虫可能不会停止,我们称之为永久爬虫。
Frontier 抓取策略几乎可以基于任何的逻辑。常见情况是基于得分/优先级,它们通过一个或多个页面的属性(新鲜度,更新时间,某些条款的内容相关性等)计算得来。也可以基于很简单的逻辑,比如 FIFO/LIFO 或 DFS/BFS 。
根据 frontier 的逻辑,可能需要持久存储系统来存储,更新或查询页面信息。 其他系统可能是100%不稳定的,并且不会在不同爬虫之间共享任何信息。
更多 crawl frontier 理论请参照 Christopher D. Manning, Prabhakar Raghavan & Hinrich Schütze 写的 URL frontier 文章。
Graph Manager¶
Graph Manager 是一种将网站地图表示为图形的工具。
它可以很容易地用于测试 frontier。我们可以向 graph manager 查询页面来伪造爬虫的请求/响应,并且不使用爬虫就可以知道每个页面的提取链接。你可以使用你的伪造测试或者 Frontier Tester tool 。
你可以用它来定义你自己的网站,用来测试或者使用 Scrapy Recorder 来记录抓取过程并可以在之后重现。
定义一个网站图¶
网站的页面及其链接可以轻松定义为有向图,其中每个节点表示页面边缘之间的链接。
我们使用一个非常简单的站点表示,一个起始页面 A 里面有一个链接到 B, C, D 里面的链接。 我们可以用这个图表来表示网站:

我们使用列表来表示不同的网站页面和元组来定义页面及其链接,上面的例子
site = [
('A', ['B', 'C', 'D']),
]
请注意,我们不需要定义没有链接的页面,但是我们也可以将其用作有效的表示
site = [
('A', ['B', 'C', 'D']),
('B', []),
('C', []),
('D', []),
]
一个更复杂的网站:

可以表示为:
site = [
('A', ['B', 'C', 'D']),
('D', ['A', 'D', 'E', 'F']),
]
注意 D 链接到自己和它的父节点 A 。
同样的,一个页面可以有多个父节点:

site = [
('A', ['B', 'C', 'D']),
('B', ['C']),
('D', ['C']),
]
为了简化示例,我们不使用网址表示,但当然url是可以用于网站图:

site = [
('http://example.com', ['http://example.com/anotherpage', 'http://othersite.com']),
]
使用 Graph Manager¶
一旦我们将我们的站点定义为一个图表,我们就可以开始使用 Graph Manager 了。
我们必须先创建我们的图表管理器
>>> from frontera.utils import graphs
>>> g = graphs.Manager()
使用 add_site 方法添加网站
>>> site = [('A', ['B', 'C', 'D'])]
>>> g.add_site(site)
这个管理器现在被初始化并准备好使用。
我们可以得到图表中的所有页面
>>> g.pages
[<1:A*>, <2:B>, <3:C>, <4:D>]
星号表示页面是种子,如果我们想要获取站点图形的种子
>>> g.seeds
[<1:A*>]
我们可以用 get_page 获取一个单独页面, 如果页面不存在就返回 None
>>> g.get_page('A')
<1:A*>
>>> g.get_page('F')
None
CrawlPage 对象¶
页面使用 CrawlPage
对象表示:
例子:
>>> p = g.get_page('A')
>>> p.id
1
>>> p.url
u'A'
>>> p.status # defaults to 200
u'200'
>>> p.is_seed
True
>>> p.links
[<2:B>, <3:C>, <4:D>]
>>> p.referers # No referers for A
[]
>>> g.get_page('B').referers # referers for B
[<1:A*>]
添加页面和链接¶
网站图也可以定义为单独添加页面和链接,我们可以用这种方式定义相同的图形:
>>> g = graphs.Manager()
>>> a = g.add_page(url='A', is_seed=True)
>>> b = g.add_link(page=a, url='B')
>>> c = g.add_link(page=a, url='C')
>>> d = g.add_link(page=a, url='D')
add_page 和 add_link 可以随时和 add_site 配合使用:
>>> site = [('A', ['B', 'C', 'D'])]
>>> g = graphs.Manager()
>>> g.add_site(site)
>>> d = g.get_page('D')
>>> g.add_link(d, 'E')
添加多个网站¶
多个网站可以加入管理器:
>>> site1 = [('A1', ['B1', 'C1', 'D1'])]
>>> site2 = [('A2', ['B2', 'C2', 'D2'])]
>>> g = graphs.Manager()
>>> g.add_site(site1)
>>> g.add_site(site2)
>>> g.pages
[<1:A1*>, <2:B1>, <3:C1>, <4:D1>, <5:A2*>, <6:B2>, <7:C2>, <8:D2>]
>>> g.seeds
[<1:A1*>, <5:A2*>]
或者使用 add_site_list 方法
>>> site_list = [
[('A1', ['B1', 'C1', 'D1'])],
[('A2', ['B2', 'C2', 'D2'])],
]
>>> g = graphs.Manager()
>>> g.add_site_list(site_list)
Graphs 数据库¶
Graph Manager 使用 SQLAlchemy 来存储和重现图。
默认使用 SQLite 内存模式当成数据库引擎,但是 any databases supported by SQLAlchemy 中提到的都可以支持。
使用 SQLite 的例子
>>> g = graphs.Manager(engine='sqlite:///graph.db')
默认情况下,每个新添加都会进行更改,稍后可以加载图
>>> graph = graphs.Manager(engine='sqlite:///graph.db')
>>> graph.add_site(('A', []))
>>> another_graph = graphs.Manager(engine='sqlite:///graph.db')
>>> another_graph.pages
[<1:A1*>]
可以使用 clear_content 参数重置数据库
>>> g = graphs.Manager(engine='sqlite:///graph.db', clear_content=True)
使用状态码¶
为了使用图重新创建/模拟爬虫,可以为每个页面定义HTTP响应代码。
一个404错误的例子
>>> g = graphs.Manager()
>>> g.add_page(url='A', status=404)
可以使用元组列表以下列方式为站点定义状态代码:
>>> site_with_status_codes = [
((200, "A"), ["B", "C"]),
((404, "B"), ["D", "E"]),
((500, "C"), ["F", "G"]),
]
>>> g = graphs.Manager()
>>> g.add_site(site_with_status_codes)
新页面的默认状态码值为200。
一个简单的伪造爬虫例子¶
使用 Frontier Tester tool 能更好的完成 Frontier 的测试,但这里就是一个例子,说明如何伪造 frontier:
from frontera import FrontierManager, Request, Response
from frontera.utils import graphs
if __name__ == '__main__':
# Load graph from existing database
graph = graphs.Manager('sqlite:///graph.db')
# Create frontier from default settings
frontier = FrontierManager.from_settings()
# Create and add seeds
seeds = [Request(seed.url) for seed in graph.seeds]
frontier.add_seeds(seeds)
# Get next requests
next_requets = frontier.get_next_requests()
# Crawl pages
while (next_requests):
for request in next_requests:
# Fake page crawling
crawled_page = graph.get_page(request.url)
# Create response
response = Response(url=crawled_page.url, status_code=crawled_page.status)
# Update Page
page = frontier.page_crawled(response=response
links=[link.url for link in crawled_page.links])
# Get next requests
next_requets = frontier.get_next_requests()
如何使用¶
Graph Manager 可以结合 Frontier Tester 测试 frontiers,也可以结合 Scrapy Recordings 。
记录 Scrapy 抓取过程¶
Scrapy Recorder 是一套 Scrapy middlewares ,可以让您记录scrapy抓取过程并将其存储到 Graph Manager 中。
这可以用于执行 frontier 测试,而不必再次爬取整个站点,甚至不用使用 Scrapy。
激活 recorder¶
recorder 使用了两个中间件: CrawlRecorderSpiderMiddleware
和 CrawlRecorderDownloaderMiddleware
。
要在 Scrapy 项目中激活 recorder,只需要将他们加入 SPIDER_MIDDLEWARES 和 DOWNLOADER_MIDDLEWARES 配置中:
SPIDER_MIDDLEWARES.update({
'frontera.contrib.scrapy.middlewares.recording.CrawlRecorderSpiderMiddleware': 1000,
})
DOWNLOADER_MIDDLEWARES.update({
'frontera.contrib.scrapy.middlewares.recording.CrawlRecorderDownloaderMiddleware': 1000,
})
选择你的存储引擎¶
因为 recorder 内部使用 Graph Manager 存储抓取的网页,所以你可以选择存储引擎,参照 different storage engines 。
我们使用 RECORDER_STORAGE_ENGINE
配置存储引擎:
RECORDER_STORAGE_ENGINE = 'sqlite:///my_record.db'
您还可以选择重置数据库表或仅重置数据:
RECORDER_STORAGE_DROP_ALL_TABLES = True
RECORDER_STORAGE_CLEAR_CONTENT = True
运行爬虫¶
和之前一样从命令行运行爬虫:
scrapy crawl myspider
一旦完成,抓取过程会被记录。
如果你需要取消记录,可以设置 RECORDER_ENABLED
scrapy crawl myspider -s RECORDER_ENABLED=False
Recorder 设置¶
以下是所有可用Scrapy Recorder设置的列表,按字母顺序排列,以及默认值及其应用范围。
Frontera 集群优化¶
为什么爬行速度如此之低?¶
寻找瓶颈。
- 所有请求都针对少量的几个网站,
- DNS 解析(参考 DNS Service ),
- strategy worker 性能问题
- db worker 生成任务不足
- HBase 相应时间过长
- 集群内的网络过载。
优化 HBase¶
- 在 HBase 中增加块缓存。
- 在每个 HBase 区服务器上部署 Thrift 服务器,并将负载从 SW 传播到 Thrift。
- 启用 Snappy 压缩 (参照
HBASE_USE_SNAPPY
).
优化 Kafka¶
- 将日志大小降至最低,并优化系统以避免在 Kafka 存储大量数据。 一旦写入数据,它应尽可能快地消耗。
- 使用SSD或甚至RAM存储 Kafka logs,
- 启用 Snappy 压缩。
各种组件之间的流量控制¶
MAX_NEXT_REQUESTS
用于控制批量任务大小。 在爬虫配置中,它控制每个 get_next_requests
调用返回多少任务。同时在 DB worker 中配置它,它会设置每个分区生成的任务数。 设置这些参数时要牢记:
- DB worker 和爬虫值必须保持一致,以避免消息总线过载和消息丢失。 换句话说,DB worker 产生的任务要比爬虫消耗的要少一些,因为即使DB worker还没来得及产生新的任务,蜘蛛应该仍然可以获取新的页面。
- 爬虫消费率取决于许多因素:互联网连接延迟,蜘蛛解析/抓取工作量,延迟和自动限制设置,代理使用等。
- 保持爬虫任务队列总是满的,以防止蜘蛛空闲。
- 一般建议是设置 DB worker值比爬虫大2-4倍。
- 批量生成任务数量不应太大,这样才不会在后端产生太多的负载,并允许系统对队列更改做出快速反应。
- 注意有关丢失的消息的警告。
DNS 服务¶
除了提到的 前提 你可能还需要一个专用的 DNS 服务。特别是在你的爬虫会产生大量 DNS 请求的情况下。在广度优先抓取或者其他短时间内访问大量网站的情况下,使用专用的 DNS 服务都是正确的。
由于负载巨大,DNS 服务最终可能会被您的网络提供商阻止。
DNS策略有两种选择:
- 递归DNS解析,
- 利用上游服务器(大规模的DNS缓存像OpenDNS或Verizon)。
第二个仍然容易阻塞。
NLnet 实验室发布的 DNS 服务软件很好用 https://www.unbound.net/ 。它允许选择上述策略之一,并维护本地DNS缓存。
请查看 Scrapy 选项 REACTOR_THREADPOOL_MAXSIZE
和 DNS_TIMEOUT
。
- 什么是 Crawl Frontier?
- 学习 Crawl Frontier 理论。
- Graph Manager
- 定义假的抓取规则来测试你的 frontier 。
- 记录 Scrapy 抓取过程
- 创建 Scrapy 抓取记录,并在之后重现他们。
- Frontera 集群优化
- 机器部署和微调信息。
- DNS 服务
- DNS 服务搭建简介。
开发者文档¶
架构概述¶
本文档介绍了Frontera Manager管道,分布式组件以及它们的交互方式。
单进程¶
下图显示了Frontera管道的架构,其组件(由数字引用)和系统内发生的数据流的轮廓。 有关组件的简要说明,请参见以下有关它们的更多详细信息的链接。 数据流也在下面描述。

组件¶
Fetcher¶
Fetcher(2)负责从网站(1)中获取网页,并将其提供给管理接下来要抓取哪些页面的 frontier。
Fetcher 可以使用 Scrapy 或任何其他爬虫框架/系统来实现,因为框架提供了通用的 frontier 功能。
在分布式运行模式下,Fetcher由Frontera Manager侧的消息总线生产者和Fetcher侧的消费者替代。
Frontera API / Manager¶
Frontera API(3)的主要入口点是 FrontierManager
对象。Frontier 用户(在我们的案例中是Fetcher(2))将通过它与 Frontier 进行通信。
更多请参考 Frontera API 。
Middlewares¶
Frontier middlewares (4) 是位于Manager(3)和Backend(5)之间的特定钩子。 这些中间件在传入和传出 Frontier 和 Backend 时处理 Request
和 Response
对象。 它们通过插入自定义代码提供了一种方便的扩展功能的机制。 规范URL解算器是一种特殊的中间件,负责替代非规范文档URL。
更改请参考 Middlewares(中间件) 和 内置规范 URL 解算器参考 。
数据流¶
Frontera 的数据流由 Frontier Manager 控制,所有数据都通过 manager-middlewares-backend 流程,如下所示:
- frontier初始化为种子请求列表(种子URL)作为爬虫的入口点。
- fetcher请求一批任务去抓取。
- 每个url都被提取,并且 frontier 被通知回传抓取结果以及页面包含的提取数据。 如果在爬行中出现问题,frontier 也会被通知。
一旦所有 url 被抓取,重复步骤2-3,直到达到 frontier 结束条件。每个循环(步骤2-3)重复被称为 frontier 迭代 。
分布式¶
在分布式模式下运行时,所有 Frontera 进程都使用相同的 Frontera Manager。
整体系统形成一个封闭的圆圈,所有的组件都在无限循环中作为守护进程工作。 有一个 message bus 负责在组件,持久存储和 fetcher(当和提取结合时,fetcher又叫做spider)之间传输消息。 有一个传输和存储层抽象,所以可以插上它自己的实现。 分布式后端运行模式具有三种类型的实例:
- Spiders 或者 fetchers,使用Scrapy(分片)实现。
- 负责解决DNS查询,从互联网获取内容并从内容中进行链接(或其他数据)提取。
- Strategy workers (分片)。
- 运行爬网策略代码:为链接链接,决定链接是否需要被抓取,以及何时停止抓取。
- DB workers (分片)。
- 存储所有元数据,包括分数和内容,并生成新的批量任务以供爬虫下载。
*分片*意味着组件仅消耗分配的分区的消息,例如处理数据流的某些共享。*复制*是组件消耗数据流,而不管分区。
这样的设计允许在线操作。可以更改抓取策略,而无需停止抓取。 爬虫策略 也可以作为单独的模块实现; 包含用于检查爬网停止条件,URL排序和评分模型的逻辑。
Frontera 的设计是对Web友好的,每个主机由不超过一个的爬虫进程下载。这是通过数据流分区实现的。

数据流¶
我们从爬虫开始吧。爬虫内的用户定义的种子URL通过 spider log 流发送给 strategy workers 和 DB workers。strategy workers 使用状态缓存决定抓取哪些页面,为每个页面分配一个分数,并将结果发送到 scoring log 流。
DB Worker存储各种元数据,包括内容和分数。另外 DB Worker 检查爬虫消费者的偏移量,并在需要时生成新的任务,并将其发送到 spider feed 流。爬虫消耗这些任务,下载每个页面并从中提取链接。然后将链接发送到 ‘Spider Log’ 流,并将其存储和记分。 这样,流量将无限期地重复。
Frontera API¶
本节介绍了 Frontera 核心API,适用于中间件和后端的开发人员。
Frontera API / Manager¶
Frontera API的主要入口点是 FrontierManager
对象,通过from_manager类方法传递给中间件和后端。该对象提供对所有Frontera核心组件的访问,并且是中间件和后端访问它们并将其功能挂接到Frontera中的唯一方法。
FrontierManager
负责加载安装的中间件和后端,以及用于管理整个 frontier 的数据流。
从settings加载¶
尽管 FrontierManager
可以通过参数初始化,但最常用的初始化方法还是使用 Frontera Settings 。
这个可以通过 from_settings
类方法实现,使用字符串路径:
>>> from frontera import FrontierManager
>>> frontier = FrontierManager.from_settings('my_project.frontier.settings')
或者一个 BaseSettings
对象:
>>> from frontera import FrontierManager, Settings
>>> settings = Settings()
>>> settings.MAX_PAGES = 0
>>> frontier = FrontierManager.from_settings(settings)
也可以无参数初始化,这种情况下 frontier 会使用 默认配置
>>> from frontera import FrontierManager, Settings
>>> frontier = FrontierManager.from_settings()
Frontier Manager¶
-
class
frontera.core.manager.
FrontierManager
(request_model, response_model, backend, middlewares=None, test_mode=False, max_requests=0, max_next_requests=0, auto_start=True, settings=None, canonicalsolver=None, db_worker=False, strategy_worker=False)¶ The
FrontierManager
object encapsulates the whole frontier, providing an API to interact with. It’s also responsible of loading and communicating all different frontier components.参数: - request_model (object/string) – The
Request
object to be used by the frontier. - response_model (object/string) – The
Response
object to be used by the frontier. - backend (object/string) – The
Backend
object to be used by the frontier. - middlewares (list) – A list of
Middleware
objects to be used by the frontier. - test_mode (bool) – Activate/deactivate frontier test mode.
- max_requests (int) – Number of pages after which the frontier would stop (See Finish conditions).
- max_next_requests (int) – Maximum number of requests returned by
get_next_requests
method. - auto_start (bool) – Activate/deactivate automatic frontier start (See starting/stopping the frontier).
- settings (object/string) – The
Settings
object used by the frontier. - canonicalsolver (object/string) – The
CanonicalSolver
object to be used by frontier. - db_worker (bool) – True if class is instantiated in DB worker environment
- strategy_worker (bool) – True if class is instantiated in strategy worker environment
Attributes
-
request_model
¶ The
Request
object to be used by the frontier. Can be defined withREQUEST_MODEL
setting.
-
response_model
¶ The
Response
object to be used by the frontier. Can be defined withRESPONSE_MODEL
setting.
-
middlewares
¶ A list of
Middleware
objects to be used by the frontier. Can be defined withMIDDLEWARES
setting.
-
test_mode
¶ Boolean value indicating if the frontier is using frontier test mode. Can be defined with
TEST_MODE
setting.
-
max_requests
¶ Number of pages after which the frontier would stop (See Finish conditions). Can be defined with
MAX_REQUESTS
setting.
-
max_next_requests
¶ Maximum number of requests returned by
get_next_requests
method. Can be defined withMAX_NEXT_REQUESTS
setting.
-
auto_start
¶ Boolean value indicating if automatic frontier start is activated. See starting/stopping the frontier. Can be defined with
AUTO_START
setting.
-
iteration
¶ Current frontier iteration.
-
n_requests
¶ Number of accumulated requests returned by the frontier.
-
finished
¶ Boolean value indicating if the frontier has finished. See Finish conditions.
API Methods
-
start
()¶ Notifies all the components of the frontier start. Typically used for initializations (See starting/stopping the frontier).
返回: None.
-
stop
()¶ Notifies all the components of the frontier stop. Typically used for finalizations (See starting/stopping the frontier).
返回: None.
-
add_seeds
(seeds)¶ Adds a list of seed requests (seed URLs) as entry point for the crawl.
参数: seeds (list) – A list of Request
objects.返回: None.
-
get_next_requests
(max_next_requests=0, **kwargs)¶ Returns a list of next requests to be crawled. Optionally a maximum number of pages can be passed. If no value is passed,
FrontierManager.max_next_requests
will be used instead. (MAX_NEXT_REQUESTS
setting).参数: - max_next_requests (int) – Maximum number of requests to be returned by this method.
- kwargs (dict) – Arbitrary arguments that will be passed to backend.
返回: list of
Request
objects.
-
page_crawled
(response)¶ Informs the frontier about the crawl result.
参数: response (object) – The Response
object for the crawled page.返回: None.
-
request_error
(request, error)¶ Informs the frontier about a page crawl error. An error identifier must be provided.
参数: - request (object) – The crawled with error
Request
object. - error (string) – A string identifier for the error.
返回: None.
- request (object) – The crawled with error
Class Methods
-
classmethod
from_settings
(settings=None, db_worker=False, strategy_worker=False)¶ Returns a
FrontierManager
instance initialized with the passed settings argument. If no settings is given, frontier default settings are used.
- request_model (object/string) – The
启动/停止 frontier¶
有时,frontier 组件需要执行初始化和最终化操作。frontier 通过 start()
和 stop()
方法去通知不同组件启动或者停止。
默认 auto_start
值是激活的,这意味着在创建 FrontierManager
对象后,组件将被通知。如果您需要对初始化不同组件时进行更精细的控制,请停用 auto_start
并手动调用frontier API start()
和 stop()
方法。
当 auto_start
处于激活状态时,Frontier stop()
方法不会自动调用(因为frontier 不知道抓取状态)。如果您需要通知 frontier 组件,您应该手动调用该方法。
Frontier 迭代¶
一旦 frontier 运行,通常的过程就是 data flow 部分所描述的过程。
爬虫调用 get_next_requests()
方法请求接下来要抓取的页面。每次 frontier 返回一个非空列表(可用数据),就是我们所说的前沿迭代。当前 frontier 迭代可以通过 iteration
属性访问。
结束 frontier¶
抓取过程可以被爬虫程序或者 Frontera 停止。当返回最大页数时,Frontera 将结束。此限制由 max_requests
属性控制( MAX_REQUESTS
设置)。
如果 max_requests
设置为0,那么 frontier 会无限抓取下去。
一旦 frontier 完成,get_next_requests
方法将不再返回任何页面,并且 finished
属性将为True。
组件对象¶
-
class
frontera.core.components.
Component
¶ Interface definition for a frontier component The
Component
object is the base class for frontierMiddleware
andBackend
objects.FrontierManager
communicates with the active components using the hook methods listed below.Implementations are different for
Middleware
andBackend
objects, therefore methods are not fully described here but in their corresponding section.Attributes
-
name
¶ The component name
Abstract methods
-
frontier_start
()¶ Called when the frontier starts, see starting/stopping the frontier.
-
frontier_stop
()¶ Called when the frontier stops, see starting/stopping the frontier.
-
add_seeds
(seeds)¶ This method is called when new seeds are added to the frontier.
参数: seeds (list) – A list of Request
objects.
-
page_crawled
(response)¶ This method is called every time a page has been crawled.
参数: response (object) – The Response
object for the crawled page.
-
request_error
(page, error)¶ This method is called each time an error occurs when crawling a page.
参数: - request (object) – The crawled with error
Request
object. - error (string) – A string identifier for the error.
- request (object) – The crawled with error
Class Methods
-
classmethod
from_manager
(manager)¶ Class method called from
FrontierManager
passing the manager itself.Example of usage:
def from_manager(cls, manager): return cls(settings=manager.settings)
-
测试模式¶
在某些情况下,在测试中,frontier 组件需要采用与通常不同的方式(例如,在测试模式下解析域URL时,domain middleware 会接受诸如 'A1'
或者 'B1'
之类的非有效URL)。
组件可以通过 test_mode
属性知道 frontier 是否处于测试模式。
使用 frontier 的其他方法¶
与 frontier 通信也可以通过HTTP API或队列系统等其他机制完成。这些功能暂时不可用,但希望包含在将来的版本中。
Frontier + Requests¶
为了结合 frontier 和 Requests ,提供了 RequestsFrontierManager
类。
这个类是一个简单的 FrontierManager
封装,它使用 Requests 对象 (Request
/Response
),将他们和 frontier 相互转换。
和 FrontierManager
一样使用,使用你的 settings 初始化它。 get_next_requests
将返回 Requests Request
对象。
一个例子:
import re
import requests
from urlparse import urljoin
from frontera.contrib.requests.manager import RequestsFrontierManager
from frontera import Settings
SETTINGS = Settings()
SETTINGS.BACKEND = 'frontera.contrib.backends.memory.FIFO'
SETTINGS.LOGGING_MANAGER_ENABLED = True
SETTINGS.LOGGING_BACKEND_ENABLED = True
SETTINGS.MAX_REQUESTS = 100
SETTINGS.MAX_NEXT_REQUESTS = 10
SEEDS = [
'http://www.imdb.com',
]
LINK_RE = re.compile(r'href="(.*?)"')
def extract_page_links(response):
return [urljoin(response.url, link) for link in LINK_RE.findall(response.text)]
if __name__ == '__main__':
frontier = RequestsFrontierManager(SETTINGS)
frontier.add_seeds([requests.Request(url=url) for url in SEEDS])
while True:
next_requests = frontier.get_next_requests()
if not next_requests:
break
for request in next_requests:
try:
response = requests.get(request.url)
links = [requests.Request(url=url) for url in extract_page_links(response)]
frontier.page_crawled(response=response)
frontier.links_extracted(request=request, links=links)
except requests.RequestException, e:
error_code = type(e).__name__
frontier.request_error(request, error_code)
例子¶
这个项目包含了 examples
文件夹,其中有使用 Frontera 的代码
examples/
requests/
general-spider/
scrapy_recording/
scripts/
- requests: Example script with Requests library.
- general-spider: Scrapy 整合示例项目。
- scrapy_recording: Scrapy 记录示例项目。
- scripts: 一些简单的脚本。
注解
这个例子可能需要安装额外的库才能工作.
你可以使用 pip 来安装它们:
pip install -r requirements/examples.txt
cluster¶
是一个大型可扩展爬虫程序,用于抓取每个域有限制的大量网站。它在HBase中保留每个域的状态,并在安排新的下载请求时使用它。设计用于使用 HBase 在分布式后端运行模式下运行。
scripts¶
一些关于如何使用不同 frontier 组件的示例脚本。
Tests¶
Frontera 测试使用 pytest 工具实现。
您可以使用pip安装 pytest 和测试中使用的附加必需库:
pip install -r requirements/tests.txt
写 tests¶
所有功能(包括新功能和错误修复)都必须包含一个测试用例,以检查它是否按预期工作,所以如果你希望他们能够早点上线,请尽快写好相关的测试。
后端 testing¶
有一个继承 pytest 的类用来测试 Backend
:
BackendTest
比方说,你想测试你的后端 MyBackend
并为每个测试方法调用创建一个新的 frontier 实例,你可以定义一个这样的测试类:
class TestMyBackend(backends.BackendTest):
backend_class = 'frontera.contrib.backend.abackend.MyBackend'
def test_one(self):
frontier = self.get_frontier()
...
def test_two(self):
frontier = self.get_frontier()
...
...
如果它使用一个数据库文件,你需要在每次测试之前和之后进行清理:
class TestMyBackend(backends.BackendTest):
backend_class = 'frontera.contrib.backend.abackend.MyBackend'
def setup_backend(self, method):
self._delete_test_db()
def teardown_backend(self, method):
self._delete_test_db()
def _delete_test_db(self):
try:
os.remove('mytestdb.db')
except OSError:
pass
def test_one(self):
frontier = self.get_frontier()
...
def test_two(self):
frontier = self.get_frontier()
...
...
测试后端执行顺序¶
为了测试 Backend
抓取顺序你可以使用 BackendSequenceTest
类。
BackendSequenceTest
将依据传过来的网站图进行一遍完整的抓取,并返回后端访问网页的顺序。
比如你想测试一个按照字母顺序抓取网页的后端。你可以这样写测试:
class TestAlphabeticSortBackend(backends.BackendSequenceTest):
backend_class = 'frontera.contrib.backend.abackend.AlphabeticSortBackend'
SITE_LIST = [
[
('C', []),
('B', []),
('A', []),
],
]
def test_one(self):
# Check sequence is the expected one
self.assert_sequence(site_list=self.SITE_LIST,
expected_sequence=['A', 'B', 'C'],
max_next_requests=0)
def test_two(self):
# Get sequence and work with it
sequence = self.get_sequence(site_list=SITE_LIST,
max_next_requests=0)
assert len(sequence) > 2
...
测试基本算法¶
如果你的后端使用 基本算法逻辑 中的一个,你可以继承对应的测试类,之后顺序会被自动测试:
from tests import backends
class TestMyBackendFIFO(backends.FIFOBackendTest):
backend_class = 'frontera.contrib.backends.abackend.MyBackendFIFO'
class TestMyBackendLIFO(backends.LIFOBackendTest):
backend_class = 'frontera.contrib.backends.abackend.MyBackendLIFO'
class TestMyBackendDFS(backends.DFSBackendTest):
backend_class = 'frontera.contrib.backends.abackend.MyBackendDFS'
class TestMyBackendBFS(backends.BFSBackendTest):
backend_class = 'frontera.contrib.backends.abackend.MyBackendBFS'
class TestMyBackendRANDOM(backends.RANDOMBackendTest):
backend_class = 'frontera.contrib.backends.abackend.MyBackendRANDOM'
Logging¶
Frontera 使用 Python 原生日志系统。这允许用户通过配置文件(参考 LOGGING_CONFIG
)或者在运行时配置 logger。
Logger 配置语法在这里 https://docs.python.org/2/library/logging.config.html
使用的Loggers¶
- kafka
- hbase.backend
- hbase.states
- hbase.queue
- sqlalchemy.revisiting.queue
- sqlalchemy.metadata
- sqlalchemy.states
- sqlalchemy.queue
- offset-fetcher
- messagebus-backend
- cf-server
- db-worker
- strategy-worker
- messagebus.kafka
- memory.queue
- memory.dequequeue
- memory.states
- manager.components
- manager
- frontera.contrib.scrapy.schedulers.FronteraScheduler
测试一个 Frontier¶
Frontier Tester是一个帮助类,用于方便 frontier 测试。
基本上它基于 Frontier 运行一个模拟的爬取过程,爬虫信息是使用Graph Manager实例伪造的。
创建一个 Frontier Tester¶
FrontierTester 需要一个 Graph Manager 和一个
FrontierManager
实例:
>>> from frontera import FrontierManager, FrontierTester
>>> from frontera.utils import graphs
>>> graph = graphs.Manager('sqlite:///graph.db') # Crawl fake data loading
>>> frontier = FrontierManager.from_settings() # Create frontier from default settings
>>> tester = FrontierTester(frontier, graph)
运行一个 Test¶
tester 已经被实例化,现在只需调用 run 函数运行:
>>> tester.run()
当 run 方法被调用 tester 将:
- 从图中添加所有的种子。
- 向 frontier 询问新的任务。
- 模拟页面响应,并通知 frontier 关于页面抓取及其链接。
重复步骤1和2,直到抓取或 frontier 结束。
测试完成后,抓取页面 sequence
可作为 frontier Request
对象列表使用。
测试参数¶
在某些测试用例中,您可能需要将所有页面添加为种子,这可以通过参数 add_all_pages
来完成:
>>> tester.run(add_all_pages=True)
每个 get_next_requests
调用的最大返回页数可以使用 frontier settings进行设置,也可创建 FrontierTester 时使用 max_next_pages
参数进行修改:
>>> tester = FrontierTester(frontier, graph, max_next_pages=10)
使用的例子¶
一个使用 graph 测试数据和 basic backends 的例子
from frontera import FrontierManager, Settings, FrontierTester, graphs
def test_backend(backend):
# Graph
graph = graphs.Manager()
graph.add_site_list(graphs.data.SITE_LIST_02)
# Frontier
settings = Settings()
settings.BACKEND = backend
settings.TEST_MODE = True
frontier = FrontierManager.from_settings(settings)
# Tester
tester = FrontierTester(frontier, graph)
tester.run(add_all_pages=True)
# Show crawling sequence
print '-'*40
print frontier.backend.name
print '-'*40
for page in tester.sequence:
print page.url
if __name__ == '__main__':
test_backend('frontera.contrib.backends.memory.heapq.FIFO')
test_backend('frontera.contrib.backends.memory.heapq.LIFO')
test_backend('frontera.contrib.backends.memory.heapq.BFS')
test_backend('frontera.contrib.backends.memory.heapq.DFS')
F.A.Q.¶
如何并行有效下载?¶
通常,URL排序的设计意味着从同一个域获取许多URL。 如果抓取过程需要有礼貌,则必须保留一些延迟和请求率。 另一方面,下载器同时可以同时下载许多网址(例如100)。 因此,来自同一域的URL的并发导致下载器连接池资源的低效浪费。
这是一个简短的例子。 想像一下,我们有来自许多不同域的10K网址的队列。 我们的任务是尽快取得它。 在下载期间,我们希望对每个主机有礼貌和并限制RPS。 同时,我们有一个优先级,它倾向于对来自同一个域的URL进行分组。 当抓取工具正在请求批量的URL以获取时,将从同一主机获取数百个URL。 由于RPS限制和延迟,下载器将无法快速获取它们。 因此,从队列中挑选统一域的URL可以让我们浪费时间,因为下载器的连接池浪费了大部分时间。
解决方案是在下载程序中为 Frontera 后端提供主机名/ ip(通常但不是必需的)使用。 我们在方法 get_next_requests
中有一个关键字参数,用于传递这些统计信息到 Fronter a后端。 任何类型的信息都可以通过。 这个参数通常设置在 Frontera 之外,然后通过 FrontierManagerWrapper
子类传递给CF到后端。
贡献指引¶
- 所有的问题和讨论请使用 Frontera google group 。
- 提交补丁请使用 Github repo 的 pull request 。
- Github issues 中请提交 Frontera 将来能受益的问题。请将自己运行 Frontera 的问题提交到 google group 。
我们总是乐意接受有文档和测试的解决方案。
术语表¶
- spider log
- 来自爬虫的编码消息流。每个消息都是从文档中提取,通常是链接,分数,分类结果。
- scoring log
- 从 strategy worker 到 db worker,包含更新事件和调度标志(如果链接需要调度下载)的分数。
- spider feed
- 从 db worker 到爬虫,包含新的一批需要抓取的文档。
- strategy worker
- 一种特殊的 worker,运行抓取策略代码:为链接评分,决定链接是否需要被调度(查询 state cache )和合适停止抓取。这种 worker 是分片的。
- db worker
- 负责和存储数据库通信,主要存储元数据和内容,同时拉取新的任务去下载。
- state cache
- 内存数据结构,包含文档是否被抓取的状态信息。 定期与持久存储同步。
- message bus
- 传输层抽象机制。提供传输层抽象和实现的接口。
- spider
- 从 Web 检索和提取内容,使用 spider feed 作为输入队列,将结果存储到 spider log 。在这个文档中,提取器被用作同义词。
- 架构概述
- 了解 Frontera 如何工作和它的不同组件。
- Frontera API
- 学习如何使用 frontier 。
- Frontier + Requests
- 学习如何使用 Frontera + Requests 。
- 例子
- 一些使用 Frontera 的示例工程和示例脚本。
- Tests
- 如果运行和写 Frontera 的测试用例。
- Logging
- 使用 python 原生日志系统创建的一些 loggers 。
- 测试一个 Frontier
- 使用一个简单的方法测试你的 frontier。
- F.A.Q.
- 常见问题。
- 贡献指引
- 如何贡献。
- 术语表
- 术语表。