Frontera 0.6 文档

Frontera 是一个爬虫工具箱,它可以让你构建任何规模和任意目的的爬虫。

Frontera 提供 crawl frontier 框架,这个框架可以帮助解决*何时抓取下一个URL*、*下个抓取的URL是什么*和检查*抓取结果*等问题。

Frontera 还为所有的爬虫组件提供了复制、分片、隔离的特性,这可以方便的扩展爬虫规模和将爬虫做成分布式。

Fronteta 包含完全支持 Scrapy 的组件,可以使用Scrapy的所有功能创建爬虫。尽管它最初是为Scrapy设计的,但是它也可以完美契合其他任何的框架或系统,因为它可以作为一个框架提供通用的工具箱。

介绍

这一章的目的是介绍 Frontera 的概念,通过阅读本章,你可以知道 Frontera 的设计理念和确定它能不能满足你的需求。

Frontera 概览

Frontera 是 crawl frontier 的实现,用于在从网络下载之前累积URL /链接的网络爬虫组件。 Frontera的主要特征:

  • 面向在线处理,
  • 分布式爬虫和后端架构,
  • 可定制抓取策略,
  • Scrapy易于集成,
  • 集成 SQLAlchemy 支持关系型数据库(Mysql, PostgreSQL, sqlite 等等), 集成 HBase 非常好得支持键值对数据库,
  • 使用 ZeroMQ and Kafka 为分布式爬虫实现消息总线,
  • 使用 Graph Manager 创建伪站点地图和模拟抓取,进行精确抓取逻辑调优。
  • 透明的传输层概念(message bus)和通信协议,
  • 纯 Python 实现 。
  • 支持 Python 3 。

使用案例

下面是一些 crawl frontier 适用的案例:

  • 与爬虫的 URL 排序/排队隔离(例如,需要远端服务器管理排序/排队的分布式爬虫集群),
  • 需要存储 URL 的元信息(在一些地方验证它的内容),
  • 需要高级的 URL 排序逻辑,但在爬虫或者抓取器中很难维护。
一次抓取,少量网站

这种情况下使用单进程可能是最好的选择。 Frontier 提供以下现成的优先级模型:

  • FIFO,
  • LIFO,
  • 广度优先 (BFS),
  • 深度优先 (DFS),
  • 基于提供的得分,从 0.0 映射到 1.0。

如果网站很大,抓取所有网页太浪费, Frontera 可以控制爬虫抓取最重要的网页。

分布式抓取, 少量网站

如果考虑提高抓取速度可以使用分布式爬虫模式。在这种模式下,Frontera 为爬虫进程分发任务,并且只有一个后端实例。请求任务通过你选择的 message bus 进行分发,通过自定义分区调整任务分发策略。默认情况下请求任务是随机分发给爬虫的,抓取速度可以在爬虫中设置。

也考虑一下代理服务,比如 Crawlera

重新抓取

有一组网站,并且需要以及时(或其他)方式重新抓取它们。Frontera 提供了简单的重新抓取后端,根据设置的时间间隔定期抓取已经抓取的网页。这个后端使用关系系数据库持久化数据,并可以应用在单进程模式或者分布式爬虫模式中。

看门狗案例 - 当需要通知文档变化时,也可以使用这样的后端和少量的自定义。

广度抓取

这种使用案例要求完全的分布式:爬虫和后端都是分布式。除了运行 spiders,还应该运行 strategy worker (s) 和 db worker (s),这取决于选择的分区策略。

Frontera可用于与大规模网络抓取相关的一系列广泛任务:

  • 广泛的网页抓取,任意数量的网站和页面(我们在45M文档卷和100K网站上做过测试),

  • 以主机为中心的抓取:当您有超过100个网站时,

  • 聚焦抓取:

    • 主题:您搜索关于某个预定义主题的页面,
    • PageRank,HITS或其他链接图算法指导。

下面是一些真实世界的问题:

  • 抓取网络中的内容检索构建搜索引擎。
  • 网络图的各种研究工作:收集链接,统计,图结构,跟踪域名计数等。
  • 更普遍的集中抓取任务:比如,您搜索的是大中心的网页,并且频繁更改时间。

运行模式

下图展示了运行模式的架构图:

_images/high-level-arc.png

单进程

Frontera 与 fetcher 在相同的过程中实例化(例如在 Scrapy 中)。要实现这个,需要设置 BACKENDBackend 的子类。这种模式适合那种少量文档并且时间要求不紧的应用。

分布式爬虫

爬虫是分布式的,但后端不是。后端运行在 db worker 中,并通过 message bus 与爬虫通信。

  1. 将爬虫进程中的 BACKEND 设置为 MessageBusBackend
  2. 在 DB worker 中 BACKEND 应该指向 Backend 的子类。
  3. 每个爬虫进程应该有它自己的 SPIDER_PARTITION_ID,值为从0到 SPIDER_FEED_PARTITIONS
  4. 爬虫和 DB worker 都应该将 MESSAGE_BUS 设置为你选择的消息总线类或者其他你自定义的实现。

此模式适用于需要快速获取文档,同时文档的数量相对较小的应用。

分布式爬虫和后端

爬虫和后端都是分布式的。后端分成了两部分: strategy workerdb worker。strategy worker 实例被分配给他们自己的 spider log 部分。

  1. 将爬虫进程中的 BACKEND 设置为 MessageBusBackend
  2. DB workers 和 SW workers 的 BACKEND 应该指向 DistributedBackend 的子类。同时还需要配置您选择的后端。
  3. 每个爬虫进程应该有它自己的 SPIDER_PARTITION_ID,值为从0到 SPIDER_FEED_PARTITIONS。最后一个必须可以被所有 DB worker 实例访问。
  4. 每个 SW worker 应该有自己的 SCORING_PARTITION_ID,值为从0到 SPIDER_LOG_PARTITIONS。最后一个必须可以被所有 SW worker 实例访问。
  5. 爬虫和所有的 worker 都应该将 MESSAGE_BUS 设置为你选择的消息总线类或者其他你自定义的实现。

在这种模式下,只有 Kafka 消息总线、SqlAlchemy 和 Habse 后端是默认支持的。

此模式适用于广度优先抓取和网页数量巨大的情况。

单进程模式快速入门

1. 创建您的爬虫

按照通常的方式创建 Scrapy 项目。输入您要存储代码的目录,然后运行:

scrapy startproject tutorial

这会创建一个 tutorial 目录,包含以下内容:

tutorial/
    scrapy.cfg
    tutorial/
        __init__.py
        items.py
        pipelines.py
        settings.py
        spiders/
            __init__.py
            ...

这些是最基本的:

  • scrapy.cfg: 项目的配置文件
  • tutorial/: 项目的 python 模块,后续您将从这里引用您的代码。
  • tutorial/items.py: 项目的 items 文件。
  • tutorial/pipelines.py: 项目的 pipelines 文件。
  • tutorial/settings.py: 项目的 settings 文件。
  • tutorial/spiders/: 放爬虫的目录。

2. 安装 Frontera

请看 安装指南.

3. 集成您的爬虫和 Frontera

这篇文章 集成 Scrapy 详细介绍了这一步。

4. 选择您的后端

为 Frontera 设置内置的后端,比如内存中的BFS后端(广度优先):

BACKEND = 'frontera.contrib.backends.memory.BFS'

5. 运行爬虫

按照通常的方式从命令行启动 Scrapy 爬虫:

scrapy crawl myspider

就是这样! 您成功将您的爬虫与 Frontera 集成了。

还有什么?

您已经看到了一个使用 Frontera 集成 Scrapy 的例子,但是这个仅仅是最基本的功能。Frontera 还提供了许多让 frontier 管理更加简单、有效率的强大功能,比如:

  • 内置 database storage 支持存储抓取数据。
  • 通过 API 可以方便的 与 Scrapy 集成 或者与其他爬虫集成。
  • 通过使用 ZeroMq 或 Kafka 和分布式的后端,实现 两种分布式抓取模式
  • 通过 自定义您的后端 创建不同抓取策略或者逻辑。
  • 使用 middlewares 插入您自己的 request/response 修改策略。
  • 使用 Graph Manager 创建假的网站地图,并可以不用爬虫而可以重现抓取过程。
  • 记录您的 Scrapy 抓取结果 ,后续可以用它测试 frontier。
  • 您可以用 hook 的方式使用日志工具,捕捉错误和调试您的 frontiers。

分布式模式快速入门

这篇文档教您在本地快速搭建单机、多进程的 Frontera 系统。我们将使用 SQLite 和 ZeroMQ 构建可能最简单的 Frontera 系统。如果要搭建生产环境下的 Frontera 系统,请参考 集群安装指南

前提

Here is what services needs to be installed and configured before running Frontera: 以下是运行 Frontera 之前需要安装和配置的:

  • Python 2.7+ 或 3.4+
  • Scrapy
安装 Frontera

Ubuntu 系统, 在命令行中输入:

$ pip install frontera[distributed,zeromq,sql]

得到一个爬虫例子代码

首先从 Github 上下载 Frontera:

$ git clone https://github.com/scrapinghub/frontera.git

examples/general-spider 中有一个普通的爬虫例子。

这是一个很普通的爬虫,它仅仅从下载的内容中抽取链接。它同样包含了一些配置文件,请参考 settings reference 获取更多的信息。

启动集群

首先,让我们启动 ZeroMQ 代理。

$ python -m frontera.contrib.messagebus.zeromq.broker

你应该看到代理打印出 spider 与 DB worker 之间传递信息的统计信息。

后续所有的命令都可以在``general-spider``的根目录下执行。

第二步,启动 DB worker。:

$ python -m frontera.worker.db --config frontier.workersettings

你应该注意点 DB worker 正在输出信息。此时没有向 ZeroMQ 发送信息是正常的,因为现在系统中缺乏种子URL。

在目录下有一批西班牙的URL,让我们把它们当做种子来启动爬虫。启动爬虫:

$ scrapy crawl general -L INFO -s FRONTERA_SETTINGS=frontier.spider_settings -s SEEDS_SOURCE=seeds_es_smp.txt -s SPIDER_PARTITION_ID=0
$ scrapy crawl general -L INFO -s FRONTERA_SETTINGS=frontier.spider_settings -s SPIDER_PARTITION_ID=1

最后应该有两个爬虫进程在运行。每个爬虫应该读取自己的Frontera配置,并且第一个应该使用 SEEDS_SOURCE 选项来读取种子和启动 Frontera 集群。

一段时间以后,种子会被准备好,以供爬虫抓取。此时爬虫已经被启动了。现在你可以周期性的检查 DB worker 的输出或者 metadata 表信息来确认爬虫确实在运行。

集群安装指南

这个指南的目标是教你如何初始化爬虫集群,在实践过程中一些步骤可能需要微调。这篇指南假设你使用 Kafka作为消息总线(官方推荐),当然用 Zero MQ 也是可以的,但是可靠性会差点。

需要决定的事情

  • DB worker 和 Strategy worker 的数量。

启动之前需要安装的

  • Kafka,
  • HBase (推荐 1.0.x 或更高的版本),
  • DNS Service (推荐但并不是必须的).

启动之前需要实现的

配置 Kafka

为 Kafka 消息总线创建所有需要的 topic

配置 HBase

  • 创建一个 namespace crawler (请参照 HBASE_NAMESPACE),
  • 确保原生支持 Snappy 压缩。

配置 Frontera

每个 Frontera 组件需要自己的配置模块,但是一些配置项是共享的,所以我们推荐创建一个公有的配置模块,并在自有的配置中引入这个公有模块。

  1. 创建一个公有模块并添加如下信息:

    from __future__ import absolute_import
    from frontera.settings.default_settings import MIDDLEWARES
    MAX_NEXT_REQUESTS = 512
    SPIDER_FEED_PARTITIONS = 2 # number of spider processes
    SPIDER_LOG_PARTITIONS = 2 # worker instances
    MIDDLEWARES.extend([
        'frontera.contrib.middlewares.domain.DomainMiddleware',
        'frontera.contrib.middlewares.fingerprint.DomainFingerprintMiddleware'
    ])
    
    QUEUE_HOSTNAME_PARTITIONING = True
    KAFKA_LOCATION = 'localhost:9092' # your Kafka broker host:port
    SCORING_TOPIC = 'frontier-scoring'
    URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint'
    
  2. 创建 workers 的公有模块:

    from __future__ import absolute_import
    from .common import *
    
    BACKEND = 'frontera.contrib.backends.hbase.HBaseBackend'
    
    MAX_NEXT_REQUESTS = 2048
    NEW_BATCH_DELAY = 3.0
    
    HBASE_THRIFT_HOST = 'localhost' # HBase Thrift server host and port
    HBASE_THRIFT_PORT = 9090
    
  3. 创建 DB worker 配置模块:

    from __future__ import absolute_import
    from .worker import *
    
    LOGGING_CONFIG='logging-db.conf' # if needed
    
  4. 创建 Strategy worker 配置模块:

    from __future__ import absolute_import
    from .worker import *
    
    CRAWLING_STRATEGY = '' # path to the crawling strategy class
    LOGGING_CONFIG='logging-sw.conf' # if needed
    

logging 配置可参考 https://docs.python.org/2/library/logging.config.html 请看 list of loggers.

  1. 设置爬虫配置模块:

    from __future__ import absolute_import
    from .common import *
    
    BACKEND = 'frontera.contrib.backends.remote.messagebus.MessageBusBackend'
    KAFKA_GET_TIMEOUT = 0.5
    
  2. 配置 Scrapy settings 模块. 这个模块在 Scrapy 项目文件夹中,并被 scrapy.cfg 引用 。 添加如下:

    FRONTERA_SETTINGS = ''  # module path to your Frontera spider config module
    
    SCHEDULER = 'frontera.contrib.scrapy.schedulers.frontier.FronteraScheduler'
    
    SPIDER_MIDDLEWARES = {
        'frontera.contrib.scrapy.middlewares.schedulers.SchedulerSpiderMiddleware': 999,
        'frontera.contrib.scrapy.middlewares.seeds.file.FileSeedLoader': 1,
    }
    DOWNLOADER_MIDDLEWARES = {
        'frontera.contrib.scrapy.middlewares.schedulers.SchedulerDownloaderMiddleware': 999,
    }
    

启动集群

首先,启动 DB worker:

# start DB worker only for batch generation
$ python -m frontera.worker.db --config [db worker config module] --no-incoming
...
# Then start next one dedicated to spider log processing
$ python -m frontera.worker.db --no-batches --config [db worker config module]

之后,启动strategy workers,每个 spider log topic 的分区需要对应一个 strategy workers 的实例:

$ python -m frontera.worker.strategy --config [strategy worker config] --partition-id 0
$ python -m frontera.worker.strategy --config [strategy worker config] --partition-id 1
...
$ python -m frontera.worker.strategy --config [strategy worker config] --partition-id N

你应该注意到所有的进程会向 log 中写信息。如果没有数据传递相关的 log 信息也是正常的,因为现在系统中还没有种子 URLS。

让我们在文件中每行放一个 URL 作为种子,来启动爬虫。每个爬虫进程对应一个 spider feed topic 的分区:

$ scrapy crawl [spider] -L INFO -s SEEDS_SOURCE = 'seeds.txt' -s SPIDER_PARTITION_ID=0
...
$ scrapy crawl [spider] -L INFO -s SPIDER_PARTITION_ID=1
$ scrapy crawl [spider] -L INFO -s SPIDER_PARTITION_ID=2
...
$ scrapy crawl [spider] -L INFO -s SPIDER_PARTITION_ID=N

最后你应该启动 N 个爬虫进程。通常一个爬虫实例从 SEEDS_SOURCE 中读取种子发送给 Frontera 集群就足够了。只有爬虫的任务队列为空时才会读取种子。也可以从配置文件中读取 SPIDER_PARTITION_ID

一段时间以后,种子会被准备好,以供爬虫抓取。爬虫真正启动了。

Frontera 概览
明白什么是 Frontera ?它能为你做什么?
运行模式
Frontera的高层体系结构和运行模式。
单进程模式快速入门
使用 Scrapy 作为容器来运行 Frontera。
分布式模式快速入门
引入 SQLite 和 ZeroMQ。
集群安装指南
Setting up clustered version of Frontera on multiple machines with HBase and Kafka. 使用 HBase 和 Kafka 在多台机器上部署 Frontera 集群。

使用 Frontera

安装指南

下面的步骤假定您已经安装了以下必需的软件:

你可以使用 pip 安装 Frontera.

使用 pip 安装:

pip install frontera[option1,option2,...optionN]

选项

Each option installs dependencies needed for particular functionality. 每个选项安装所需的特定功能的依赖。

  • sql - 关系型数据库,
  • graphs - Graph Manager,
  • logging - 彩色日志,
  • tldextract - 可以使用 TLDEXTRACT_DOMAIN_INFO
  • hbase - HBase 分布式后端,
  • zeromq - ZeroMQ 消息总线,
  • kafka - Kafka 消息总线,
  • distributed - workers 依赖.

Frontier 对象

Frontier 使用两种对象类型: Request and Response. 他们各自代表 HTTP 请求和 HTTP 返回.

这两个类会被大多数的 Frontera API 方法调用,根据方法不同可能作为参数也可能作为返回值。

Frontera 同样也会使用这两种对象在内部组件之间传递数据(比如 middlewares 和 backend)。

Request 对象

class frontera.core.models.Request(url, method='GET', headers=None, cookies=None, meta=None, body='')

A Request object represents an HTTP request, which is generated for seeds, extracted page links and next pages to crawl. Each one should be associated to a Response object when crawled.

参数:
  • url (string) – URL to send.
  • method (string) – HTTP method to use.
  • headers (dict) – dictionary of headers to send.
  • cookies (dict) – dictionary of cookies to attach to this request.
  • meta (dict) – dictionary that contains arbitrary metadata for this request, the keys must be bytes and the values must be either bytes or serializable objects such as lists, tuples, dictionaries with byte type items.
body

A string representing the request body.

cookies

Dictionary of cookies to attach to this request.

headers

A dictionary which contains the request headers.

meta

A dict that contains arbitrary metadata for this request. This dict is empty for new Requests, and is usually populated by different Frontera components (middlewares, etc). So the data contained in this dict depends on the components you have enabled. The keys are bytes and the values are either bytes or serializable objects such as lists, tuples, dictionaries with byte type items.

method

A string representing the HTTP method in the request. This is guaranteed to be uppercase. Example: GET, POST, PUT, etc

url

A string containing the URL of this request.

Response 对象

class frontera.core.models.Response(url, status_code=200, headers=None, body='', request=None)

A Response object represents an HTTP response, which is usually downloaded (by the crawler) and sent back to the frontier for processing.

参数:
  • url (string) – URL of this response.
  • status_code (int) – the HTTP status of the response. Defaults to 200.
  • headers (dict) – dictionary of headers to send.
  • body (str) – the response body.
  • request (Request) – The Request object that generated this response.
body

A str containing the body of this Response.

headers

A dictionary object which contains the response headers.

meta

A shortcut to the Request.meta attribute of the Response.request object (ie. self.request.meta).

request

The Request object that generated this response.

status_code

An integer representing the HTTP status of the response. Example: 200, 404, 500.

url

A string containing the URL of the response.

domainfingerprint 字段被 内置 middlewares 添加。

对象唯一识别标志

因为 Frontera 对象会在爬虫和服务器之间传递,所以需要一些机制来唯一标示一个对象。这个识别机制会基于 Frontera 逻辑不同而有所变化(大多数情况是根据后端的逻辑)。

默认 Frontera 会激活 fingerprint middleware ,根据 Request.urlResponse.url 分别生成一个唯一标示,并分别赋值给 Request.meta and Response.meta。你可以使用这个中间件或者自己定义。

一个为 Request 生成指纹的例子:

>>> request.url
'http://thehackernews.com'

>>> request.meta['fingerprint']
'198d99a8b2284701d6c147174cd69a37a7dea90f'

为对象添加其他值

大多数情况下 Frontera 存储了系统运行所需要的参数。

同样的,其他信息也可以存入 Request.metaResponse.meta

例如,激活 domain middleware 会为每个 Request.metaResponse.meta 添加 domain 字段:

>>> request.url
'http://www.scrapinghub.com'

>>> request.meta['domain']
{
    "name": "scrapinghub.com",
    "netloc": "www.scrapinghub.com",
    "scheme": "http",
    "sld": "scrapinghub",
    "subdomain": "www",
    "tld": "com"
}

Middlewares(中间件)

Frontier Middleware 位于 FrontierManagerBackend objects 之间, 根据 frontier data flow 的流程,处理 RequestResponse

Middlewares 是一个轻量级、低层次的系统,可以用来过滤和更改 Frontier 的 requests 和 responses。

激活一个 middleware

要激活 Middleware component, 需要添加它到 MIDDLEWARES setting(这是一个列表,包含类的路径或者一个 Middleware 对象)。

这是一个例子:

MIDDLEWARES = [
    'frontera.contrib.middlewares.domain.DomainMiddleware',
]

Middlewares按照它们在列表中定义的相同顺序进行调用,根据你自己的需要安排顺序。 该顺序很重要,因为每个中间件执行不同的操作,并且您的中间件可能依赖于一些先前(或后续的)执行的中间件。

最后,记住一些 middlewares 需要通过特殊的 setting。详细请参考 each middleware documentation

写你自己的 middleware

写自己的 Frontera middleware 是很简单的。每个 Middleware 是一个继承 Component 的 Python 类。

FrontierManager 会通过下面的方法和所有激活的 middlewares 通信。

class frontera.core.components.Middleware

Methods

Middleware.frontier_start()

Called when the frontier starts, see starting/stopping the frontier.

Middleware.frontier_stop()

Called when the frontier stops, see starting/stopping the frontier.

Middleware.add_seeds(seeds)

This method is called when new seeds are added to the frontier.

参数:seeds (list) – A list of Request objects.
返回:Request object list or None

应该返回 None 或者 Request 的列表。

如果返回 NoneFrontierManager 将不会处理任何中间件,并且种子也不会到达 Backend

如果返回 Request 列表,该列表将会传给下个中间件。这个过程会在每个激活的中间件重复,直到它到达 Backend

如果要过滤任何种子,请不要将其包含在返回的对象列表中。

Middleware.page_crawled(response)

This method is called every time a page has been crawled.

参数:response (object) – The Response object for the crawled page.
返回:Response or None

应该返回 None 或者一个 Response 对象。

如果返回 NoneFrontierManager 将不会处理任何中间件,并且 Backend 不会被通知。

如果返回 Response,它将会被传给下个中间件。这个过程会在每个激活的中间件重复,直到它到达 Backend

如果要过滤页面,只需返回 None。

Middleware.request_error(page, error)

This method is called each time an error occurs when crawling a page.

参数:
  • request (object) – The crawled with error Request object.
  • error (string) – A string identifier for the error.
返回:

Request or None

应该返回 None 或者一个 Request 对象。

如果返回 NoneFrontierManager 将不会和其他任何中间件通信,并且 Backend 不会被通知。

如果返回一个 Response 对象,它将会被传给下个中间件。这个过程会在每个激活的中间件重复,直到它到达 Backend

如果要过滤页面错误,只需返回 None。

Class Methods

Middleware.from_manager(manager)

Class method called from FrontierManager passing the manager itself.

Example of usage:

def from_manager(cls, manager):
    return cls(settings=manager.settings)

内置 middleware 参考

这篇文章描述了 Frontera 所有的 Middleware 组件。如何使用和写自己的 middleware,请参考 middleware usage guide.

有关默认启用的组件列表(及其顺序),请参阅 MIDDLEWARES 设置。

DomainMiddleware
class frontera.contrib.middlewares.domain.DomainMiddleware

This Middleware will add a domain info field for every Request.meta and Response.meta if is activated.

domain object will contain the following fields, with both keys and values as bytes:

  • netloc: URL netloc according to RFC 1808 syntax specifications
  • name: Domain name
  • scheme: URL scheme
  • tld: Top level domain
  • sld: Second level domain
  • subdomain: URL subdomain(s)

An example for a Request object:

>>> request.url
'http://www.scrapinghub.com:8080/this/is/an/url'

>>> request.meta['domain']
{
    "name": "scrapinghub.com",
    "netloc": "www.scrapinghub.com",
    "scheme": "http",
    "sld": "scrapinghub",
    "subdomain": "www",
    "tld": "com"
}

If TEST_MODE is active, It will accept testing URLs, parsing letter domains:

>>> request.url
'A1'

>>> request.meta['domain']
{
    "name": "A",
    "netloc": "A",
    "scheme": "-",
    "sld": "-",
    "subdomain": "-",
    "tld": "-"
}
UrlFingerprintMiddleware
class frontera.contrib.middlewares.fingerprint.UrlFingerprintMiddleware

This Middleware will add a fingerprint field for every Request.meta and Response.meta if is activated.

Fingerprint will be calculated from object URL, using the function defined in URL_FINGERPRINT_FUNCTION setting. You can write your own fingerprint calculation function and use by changing this setting. The fingerprint must be bytes.

An example for a Request object:

>>> request.url
'http//www.scrapinghub.com:8080'

>>> request.meta['fingerprint']
'60d846bc2969e9706829d5f1690f11dafb70ed18'
frontera.utils.fingerprint.hostname_local_fingerprint(key)

This function is used for URL fingerprinting, which serves to uniquely identify the document in storage. hostname_local_fingerprint is constructing fingerprint getting first 4 bytes as Crc32 from host, and rest is MD5 from rest of the URL. Default option is set to make use of HBase block cache. It is expected to fit all the documents of average website within one cache block, which can be efficiently read from disk once.

参数:key – str URL
返回:str 20 bytes hex string
DomainFingerprintMiddleware
class frontera.contrib.middlewares.fingerprint.DomainFingerprintMiddleware

This Middleware will add a fingerprint field for every Request.meta and Response.meta domain fields if is activated.

Fingerprint will be calculated from object URL, using the function defined in DOMAIN_FINGERPRINT_FUNCTION setting. You can write your own fingerprint calculation function and use by changing this setting. The fingerprint must be bytes

An example for a Request object:

>>> request.url
'http//www.scrapinghub.com:8080'

>>> request.meta['domain']
{
    "fingerprint": "5bab61eb53176449e25c2c82f172b82cb13ffb9d",
    "name": "scrapinghub.com",
    "netloc": "www.scrapinghub.com",
    "scheme": "http",
    "sld": "scrapinghub",
    "subdomain": "www",
    "tld": "com"
}

规范 URL 解算器 是一种特殊的 middleware 对象,用来识别网页的规范 URL,并根据这个修改 request 或者 response 的元数据。通常规范 URL 解算器是在调用后端方法之前最后一个执行的 middleware。

此组件的主要目的是防止元数据记录重复和混淆与其相关联的抓取器行为。原因是:

  • 不同的重定向链将指向相同的文档。
  • 同一份文件可以通过多个不同的URL访问。

精心设计的系统具有自己的稳定算法,为每个文档选择正确的 URL。另见 Canonical link element

规范 URL 解算器在Frontera Manager初始化期间使用 CANONICAL_SOLVER 设置中的类来实例化。

内置规范 URL 解算器参考

基本的

用作默认值。

class frontera.contrib.canonicalsolvers.basic.BasicCanonicalSolver

Implements a simple CanonicalSolver taking always first URL from redirect chain, if there were redirects. It allows easily to avoid leaking of requests in Frontera (e.g. when request issued by get_next_requests() never matched in page_crawled()) at the price of duplicating records in Frontera for pages having more than one URL or complex redirects chains.

后端

Frontier Backend 是抓取逻辑/策略所在的地方,本质上是你的爬虫的大脑。 Queue, MetadataStates 是为了放置低级代码的类,相反,后端类运行更高级的代码。Frontera 内置了内存或数据库方式实现的 Queue, Metadata 和 States,它们可以在你自定义的后端类中使用或者实例化 FrontierManager 和后端独立使用。

后端方法在 Middleware 之后被 FrontierManager 调用,根据 frontier data flow 使用 hooks 处理 RequestResponse

与中间件可以激活许多不同的实例不同,每个 Frontera 只能使用一种后端。

激活一个后端

要激活 Frontera 后端组件,请通过 BACKEND 设置进行设置。

这是一个例子

BACKEND = 'frontera.contrib.backends.memory.FIFO'

请记住,某些后端可能需要额外配置其他 settings。 更多信息,请参阅 backends documentation

写你自己的后端

每个后端组件是一个 Python 类,它继承自 BackendDistributedBackend ,且使用 Queue, MetadataStates 中的一个或多个。

FrontierManager 将通过下列方法与激活的后端通信。

class frontera.core.components.Backend

Methods

Backend.frontier_start()

Called when the frontier starts, see starting/stopping the frontier.

返回:None.
Backend.frontier_stop()

Called when the frontier stops, see starting/stopping the frontier.

返回:None.
Backend.finished()

Quick check if crawling is finished. Called pretty often, please make sure calls are lightweight.

返回:boolean
Backend.add_seeds(seeds)

This method is called when new seeds are added to the frontier.

参数:seeds (list) – A list of Request objects.
返回:None.
Backend.page_crawled(response)

This method is called every time a page has been crawled.

参数:response (object) – The Response object for the crawled page.
返回:None.
Backend.request_error(page, error)

This method is called each time an error occurs when crawling a page.

参数:
  • request (object) – The crawled with error Request object.
  • error (string) – A string identifier for the error.
返回:

None.

Backend.get_next_requests(max_n_requests, **kwargs)

Returns a list of next requests to be crawled.

参数:
  • max_next_requests (int) – Maximum number of requests to be returned by this method.
  • kwargs (dict) – A parameters from downloader component.
返回:

list of Request objects.

Class Methods

Backend.from_manager(manager)

Class method called from FrontierManager passing the manager itself.

Example of usage:

def from_manager(cls, manager):
    return cls(settings=manager.settings)

Properties

queue
states
metadata
class frontera.core.components.DistributedBackend

继承 Backend 的所有方法,并且还有两个类方法,它们在 strategy worker 和 db worker 实例化期间被调用。

classmethod DistributedBackend.strategy_worker(manager)
classmethod DistributedBackend.db_worker(manager)

Backend 应通过这些类与低级存储进行通信:

Metadata
class frontera.core.components.Metadata

Methods

Metadata.add_seeds(seeds)

This method is called when new seeds are added to the frontier.

参数:seeds (list) – A list of Request objects.
Metadata.request_error(page, error)

This method is called each time an error occurs when crawling a page.

参数:
  • request (object) – The crawled with error Request object.
  • error (string) – A string identifier for the error.
Metadata.page_crawled(response)

This method is called every time a page has been crawled.

参数:response (object) – The Response object for the crawled page.

已知的实现是: MemoryMetadatasqlalchemy.components.Metadata

Queue
class frontera.core.components.Queue

Methods

Queue.get_next_requests(max_n_requests, partition_id, **kwargs)

Returns a list of next requests to be crawled, and excludes them from internal storage.

参数:
  • max_next_requests (int) – Maximum number of requests to be returned by this method.
  • kwargs (dict) – A parameters from downloader component.
返回:

list of Request objects.

Queue.schedule(batch)

Schedules a new documents for download from batch, and updates score in metadata.

参数:batch – list of tuples(fingerprint, score, request, schedule), if schedule is True, then document needs to be scheduled for download, False - only update score in metadata.
Queue.count()

Returns count of documents in the queue.

返回:int

已知的实现是: MemoryQueuesqlalchemy.components.Queue

States
class frontera.core.components.States

Methods

States.update_cache(objs)

Reads states from meta[‘state’] field of request in objs and stores states in internal cache.

参数:objs – list or tuple of Request objects.
States.set_states(objs)

Sets meta[‘state’] field from cache for every request in objs.

参数:objs – list or tuple of Request objects.
States.flush(force_clear)

Flushes internal cache to storage.

参数:force_clear – boolean, True - signals to clear cache after flush
States.fetch(fingerprints)

Get states from the persistent storage to internal cache.

参数:fingerprints – list document fingerprints, which state to read

已知的实现是: MemoryStatessqlalchemy.components.States

内置后端引用

本文介绍了与 Frontera 捆绑在一起的所有后端组件。

要知道默认激活的 Backend 请看 BACKEND 设置。

基本算法

一些内置的 Backend 对象实现基本算法,如 FIFO/LIFO or DFS/BFS,用于页面访问排序。

它们之间的差异将在使用的存储引擎上。例如,memory.FIFOsqlalchemy.FIFO 将使用相同的逻辑,但使用不同的存储引擎。

所有这些后端变体都使用相同的 CommonBackend 类实现具有优先级队列的一次访问爬网策略。

class frontera.contrib.backends.CommonBackend
内存后端

这组 Backend 对象将使用 heapq 模块作为队列和本机字典作为 basic algorithms 的存储。

class frontera.contrib.backends.memory.BASE

Base class for in-memory Backend objects.

class frontera.contrib.backends.memory.FIFO

In-memory Backend implementation of FIFO algorithm.

class frontera.contrib.backends.memory.LIFO

In-memory Backend implementation of LIFO algorithm.

class frontera.contrib.backends.memory.BFS

In-memory Backend implementation of BFS algorithm.

class frontera.contrib.backends.memory.DFS

In-memory Backend implementation of DFS algorithm.

class frontera.contrib.backends.memory.RANDOM

In-memory Backend implementation of a random selection algorithm.

SQLAlchemy 后端

这组 Backend 对象将使用 SQLAlchemy 作为 basic algorithms 的存储。

默认情况下,它使用内存模式的 SQLite 数据库作为存储引擎,但可以使用 any databases supported by SQLAlchemy

如果你想使用你自己的 declarative sqlalchemy models ,你可以使用 SQLALCHEMYBACKEND_MODELS 设置。

这个 setting 使用一个字典,其中 key 代表要定义的模型的名称,value 代表了这个模型。

有关用于 SQLAlchemy 后端的所有 settings,请查看 settings

class frontera.contrib.backends.sqlalchemy.BASE

Base class for SQLAlchemy Backend objects.

class frontera.contrib.backends.sqlalchemy.FIFO

SQLAlchemy Backend implementation of FIFO algorithm.

class frontera.contrib.backends.sqlalchemy.LIFO

SQLAlchemy Backend implementation of LIFO algorithm.

class frontera.contrib.backends.sqlalchemy.BFS

SQLAlchemy Backend implementation of BFS algorithm.

class frontera.contrib.backends.sqlalchemy.DFS

SQLAlchemy Backend implementation of DFS algorithm.

class frontera.contrib.backends.sqlalchemy.RANDOM

SQLAlchemy Backend implementation of a random selection algorithm.

定时重爬后端

基于自定义 SQLAlchemy 后端和队列。从种子开始抓取。种子被抓取后,每一个新的 文件将被安排立即抓取。每个文档被抓取之后,将会在由 SQLALCHEMYBACKEND_REVISIT_INTERVAL 设置的时间间隔后再次抓取。

定时重爬后端当前没有实现优先级。 在长时间运行时,爬虫可能会闲置,因为 没有可用的抓取任务,但有任务等待他们的预定的访问时间。

class frontera.contrib.backends.sqlalchemy.revisiting.Backend

实现定时重爬后端的 SQLAlchemy Backend 基类。 Base class for SQLAlchemy Backend implementation of revisiting back-end.

HBase 后端
class frontera.contrib.backends.hbase.HBaseBackend

更适合大规模抓取。设置请参考 HBase 后端 。请考虑调整块缓存以适应平均网页块的大小。要实现这一点,建议使用 hostname_local_fingerprint ,可以让相同域名的网页放在一起。这个函数可以通过 URL_FINGERPRINT_FUNCTION 设置。

消息总线

消息总线是传输层抽象机制。Frontera 提供了接口和几个实现。同一时间只能使用一种类型的消息总线,并通过 MESSAGE_BUS 设置。

爬虫进程可以使用

class frontera.contrib.backends.remote.messagebus.MessageBusBackend(manager)

和消息总线进行通信。

内置消息总线参考

ZeroMQ

这是默认选项,使用轻量级的 ZeroMQ 库实现

可以使用 ZeroMQ 消息总线设置 配置。

ZeroMQ 需要按照 ZeroMQ 库,并且启动broker进程,请参考 启动集群

总的来说,使用 ZeroMQ 消息总线是为了用最少的部署实现 PoC (Patch Output Converter 成批输出转换程序)。因为它很容易 在组件的数据流未正确调整或启动过程中发生消息丢失,所以请参照下面的顺序启动组件:

  1. db worker
  2. strategy worker
  3. spiders

不幸的是,停止执行未完成抓取的爬虫时,无法避免消息丢失。如果你的爬虫程序对少量的信息丢失敏感的话,我建议你使用 Kafka。

警告!ZeroMQ消息总线不支持多个 SW worker 和 DB worker, 每种 woker 只能有一个实例。
Kafka

使用这个类

使用 Kafka 消息总线配置 配置。

需要运行 Kafka 服务,这个服务更适合大规模采集。

协议

根据数据流,Frontera 使用几种消息类型来编码它的消息。每种消息是用 msgpack 或 JSON 序列化的 python 对象。可以使用 MESSAGE_BUS_CODEC 选择编解码器模块,并且需要导出编码器和解码器类。

以下是子类实现自己的编解码器所需的类:

class frontera.core.codec.BaseEncoder
encode_add_seeds(seeds)

Encodes add_seeds message

参数:seeds (list) – A list of frontier Request objects
返回:bytes encoded message
encode_page_crawled(response)

Encodes a page_crawled message

参数:response (object) – A frontier Response object
返回:bytes encoded message
encode_request_error(request, error)

Encodes a request_error message

参数:
  • request (object) – A frontier Request object
  • error (string) – Error description
返回:

bytes encoded message

encode_request(request)

Encodes requests for spider feed stream.

参数:request (object) – Frontera Request object
返回:bytes encoded message
encode_update_score(request, score, schedule)

Encodes update_score messages for scoring log stream.

参数:
  • request (object) – Frontera Request object
  • score (float) – score
  • schedule (bool) – True if document needs to be scheduled for download
返回:

bytes encoded message

encode_new_job_id(job_id)

Encodes changing of job_id parameter.

参数:job_id (int) –
返回:bytes encoded message
encode_offset(partition_id, offset)

Encodes current spider offset in spider feed.

参数:
  • partition_id (int) –
  • offset (int) –
返回:

bytes encoded message

class frontera.core.codec.BaseDecoder
decode(buffer)

Decodes the message.

参数:buffer (bytes) – encoded message
返回:tuple of message type and related objects
decode_request(buffer)

Decodes Request objects.

参数:buffer (bytes) – serialized string
返回:object Request

可用的编解码器

MsgPack

Module: frontera.contrib.backends.remote.codecs.msgpack

JSON

A JSON codec for Frontera. Implemented using native json library.

Module: frontera.contrib.backends.remote.codecs.json

抓取策略

使用 cluster 例子和 frontera.worker.strategies.bfs 模型进行参考。 Use cluster example and frontera.worker.strategies.bfs module for reference. 一般来说,你需要写一个 抓取策略子类,参照:

class frontera.worker.strategies.BaseCrawlingStrategy(manager, mb_stream, states_context)

Interface definition for a crawling strategy.

Before calling these methods strategy worker is adding ‘state’ key to meta field in every Request with state of the URL. Pleases refer for the states to HBaseBackend implementation.

After exiting from all of these methods states from meta field are passed back and stored in the backend.

Methods

classmethod from_worker(manager, mb_stream, states_context)

Called on instantiation in strategy worker.

参数:
  • manager
    class:Backend <frontera.core.manager.FrontierManager> instance
  • mb_stream
    class:UpdateScoreStream <frontera.worker.strategy.UpdateScoreStream> instance
返回:

new instance

add_seeds(seeds)

Called when add_seeds event is received from spider log.

参数:seeds (list) – A list of Request objects.
page_crawled(response)

Called every time document was successfully crawled, and receiving page_crawled event from spider log.

参数:response (object) – The Response object for the crawled page.
page_error(request, error)

Called every time there was error during page downloading.

参数:
  • request (object) – The fetched with error Request object.
  • error (str) – A string identifier for the error.
finished()

Called by Strategy worker, after finishing processing each cycle of spider log. If this method returns true, then Strategy worker reports that crawling goal is achieved, stops and exits.

返回:bool
close()

Called when strategy worker is about to close crawling strategy.

该类可以放在任何模块中,并在启动时使用命令行选项或 CRAWLING_STRATEGY 设置传递给 strategy worker

这个策略类在 strategy worker 中实例化,可以使用自己的存储或任何其他类型的资源。所有来着 spider log 的都会传给这些方法。返回的分数不一定与方法参数相同。finished() 方法会被周期性的调用来检测抓取目标是否达到了。

使用 Frontier 和 Scrapy

Scrapy 中使用 Frontera 非常简单,它包含一组 Scrapy middlewares 和 Scrapy 调度程序,封装了 Frontera 的功能 ,可以使用 Scrapy settings 轻松配置。

激活 frontier

Frontera 使用两种不同的中间件:SchedulerSpiderMiddleware and SchedulerDownloaderMiddleware 和自己的调度程序 FronteraScheduler

要在你的 Scrapy 项目中激活 Frontera,只要把它们加入到 SPIDER_MIDDLEWARES, DOWNLOADER_MIDDLEWARESSCHEDULER settings:

SPIDER_MIDDLEWARES.update({
    'frontera.contrib.scrapy.middlewares.schedulers.SchedulerSpiderMiddleware': 1000,
})

DOWNLOADER_MIDDLEWARES.update({
    'frontera.contrib.scrapy.middlewares.schedulers.SchedulerDownloaderMiddleware': 1000,
})

SCHEDULER = 'frontera.contrib.scrapy.schedulers.frontier.FronteraScheduler'

创建一个 Frontera settings.py 并把它加入到你的 Scrapy 设置中:

FRONTERA_SETTINGS = 'tutorial.frontera.settings'

另一种选择是将这些设置放在 Scrapy 设置模块中。

组织文件

当使用 Scrapy 和 frontier 时,我们有以下目录结构:

my_scrapy_project/
    my_scrapy_project/
        frontera/
            __init__.py
            settings.py
            middlewares.py
            backends.py
        spiders/
            ...
        __init__.py
        settings.py
     scrapy.cfg

这些都是基本的:

  • my_scrapy_project/frontera/settings.py: Frontera settings 文件。
  • my_scrapy_project/frontera/middlewares.py: Frontera 的中间件。
  • my_scrapy_project/frontera/backends.py: Frontera 使用的后端。
  • my_scrapy_project/spiders: Scrapy spiders 文件夹
  • my_scrapy_project/settings.py: Scrapy settings 文件
  • scrapy.cfg: Scrapy 配置文件

运行爬虫

只要按照常规从命令行运行你的 Scrapy 爬虫:

scrapy crawl myspider

Frontier Scrapy settings

你可以使用两种方式配置 frontier :

  • 使用 FRONTERA_SETTINGS,可以在 Scrapy 配置文件中指明 Frontera 配置文件的路径,默认是 None
  • 将 frontier 设置定义到 Scrapy 设置文件中。
通过 Scrapy 配置 frontier

Frontier settings 也可以通过 Scrapy 配置。在这种情况下,配置优先级顺序如下:

  1. FRONTERA_SETTINGS 指向的文件中的配置(更高的优先级)
  2. Scrapy 配置文件中的配置
  3. 默认 frontier 配置

写 Scrapy 爬虫

爬虫逻辑

创建基本的 Scrapy 爬虫在 Quick start single process 中做了描述。

这也是一个防止爬虫因为队列中请求不足而关闭的好方法:

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
    spider = cls(*args, **kwargs)
    spider._set_crawler(crawler)
    spider.crawler.signals.connect(spider.spider_idle, signal=signals.spider_idle)
    return spider

def spider_idle(self):
    self.log("Spider idle signal caught.")
    raise DontCloseSpider
配置准则

您可以进行多种调整,以实现高效的广泛抓取。

添加一个种子加载器,用于启动爬虫进程:

SPIDER_MIDDLEWARES.update({
    'frontera.contrib.scrapy.middlewares.seeds.file.FileSeedLoader': 1,
})

适合广泛抓取的各种设置:

HTTPCACHE_ENABLED = False   # 关闭磁盘缓存,它在大量抓取中具有较低的命中率
REDIRECT_ENABLED = True
COOKIES_ENABLED = False
DOWNLOAD_TIMEOUT = 120
RETRY_ENABLED = False   # 重试可以由 Frontera 本身处理,具体取决于爬网策略
DOWNLOAD_MAXSIZE = 10 * 1024 * 1024  # 最大文档大小,如果未设置,会导致OOM
LOGSTATS_INTERVAL = 10  # 每10秒钟向控制台打印统计

自动限流和并发设置,以方便有礼貌和负责任的抓取:

# auto throttling
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_DEBUG = False
AUTOTHROTTLE_MAX_DELAY = 3.0
AUTOTHROTTLE_START_DELAY = 0.25     # 任何足够小的值,它将在平均运行期间通过瓶颈响应延迟进行调整。
RANDOMIZE_DOWNLOAD_DELAY = False

# concurrency
CONCURRENT_REQUESTS = 256           # 取决于许多因素,应通过实验确定
CONCURRENT_REQUESTS_PER_DOMAIN = 10
DOWNLOAD_DELAY = 0.0

具体参照 Scrapy broad crawling.

Scrapy 种子加载器

Frontera 有一些内置的 Scrapy 中间件用于种子装载。

种子装载使用 process_start_requests 方法从源中生成请求,这些请求后续会被加入 FrontierManager

激活一个种子加载器

只需将种子加载器中间件加入 SPIDER_MIDDLEWARES 中:

SPIDER_MIDDLEWARES.update({
    'frontera.contrib.scrapy.middlewares.seeds.FileSeedLoader': 650
})
FileSeedLoader

从文件中导入种子。该文件必须是每行一个 URL 的格式:

http://www.asite.com
http://www.anothersite.com
...

你可以使用 # 注释掉某一行:

...
#http://www.acommentedsite.com
...

Settings:

  • SEEDS_SOURCE: 种子文件路径
S3SeedLoader

从存储在 Amazon S3 中的文件导入种子 Load seeds from a file stored in an Amazon S3 bucket

文件格式应该和 FileSeedLoader 中的一样。

Settings:

  • SEEDS_SOURCE: S3 bucket 文件路径。 例如: s3://some-project/seed-urls/
  • SEEDS_AWS_ACCESS_KEY: S3 credentials Access Key
  • SEEDS_AWS_SECRET_ACCESS_KEY: S3 credentials Secret Access Key

Settings

Frontera settings 允许你定制所有的组件,包括 FrontierManager, Middleware and Backend themselves.

settings 提供了键值映射的全局命名空间可以用来获取配置值。可以通过不同的机制填充设置,如下所述。

内置的配置请查看: Built-in settings reference

设计 settings

当你使用 Frontera 的时候,你需要告诉它你在用什么配置。因为 FrontierManager 是 Frontier 使用的入口,你可以使用 Loading from settings 来进行配置。

当使用一个字符串路径指向配置文件的时候,我们建议下面这种文件结构:

my_project/
    frontier/
        __init__.py
        settings.py
        middlewares.py
        backends.py
    ...

这些都是基本的:

  • frontier/settings.py: frontier settings 文件。
  • frontier/middlewares.py: frontier 使用的中间件。
  • frontier/backends.py: frontier 使用的后端。

如何访问 settings

Settings 可以通过 FrontierManager.settings 属性访问, 它传给了 Middleware.from_managerBackend.from_manager 类方法:

class MyMiddleware(Component):

    @classmethod
    def from_manager(cls, manager):
        manager = crawler.settings
        if settings.TEST_MODE:
            print "test mode is enabled!"

换句话说, settings 可以通过 Settings 访问.

Settings 类

class frontera.settings.Settings

内置 frontier settings

下面是所以可用的 Frontera settings,以字母顺序排序,以及它们的默认值和适用范围。

AUTO_START

默认: True

是否允许 frontier 自动启动。参考 Starting/Stopping the frontier

BACKEND

默认: 'frontera.contrib.backends.memory.FIFO'

Backend 使用的后端。详情参考 Activating a backend

BC_MIN_REQUESTS

默认: 64

广泛的抓取队列的获取操作将继续重试,直到收集到指定的请求数。最大重试次数硬编码为3。

BC_MIN_HOSTS

默认: 24

持续从队列中获取请求任务,直到请求的 host 种类到达最小值。最大重试次数硬编码为3。

BC_MAX_REQUESTS_PER_HOST

默认:: 128

如果某个 host 的请求数已经到达单 host 请求的上限,将不包括这些请求。这是针对广泛抓取获取任务数量的建议。

CANONICAL_SOLVER

默认: frontera.contrib.canonicalsolvers.Basic

CanonicalSolver 被用来解析规划网址。详情参考 Canonical URL Solver.

SPIDER_LOG_CONSUMER_BATCH_SIZE

默认: 512

这是 strategy worker 和 db worker 消费 spider log 流时的批量大小。增加它将使 worker 在每个任务上花更多的时间,但是每个任务处理更多的 item,因此在一段时间内给其它任务留下了更少的时间。减少它将导致在同一时间段内运行多个任务,但总体效率较低。 当你的消费者太快或者太慢的时候使用这个选项。

SCORING_LOG_CONSUMER_BATCH_SIZE

默认: 512

这是 db worker 消费 scoring log 流时的批量大小。当你需要调整 scoring log 消费速度的时候使用这个选项。

CRAWLING_STRATEGY

默认: None

抓取策略类的路径,在 strategy worker 中实例化并使用,用来在分布式模式中设置抓取优先级和停止抓取。

DELAY_ON_EMPTY

默认: 5.0

当队列大小小于 CONCURRENT_REQUESTS 时,向调度器发送请求时的延迟时间。当后端没有请求的时候,这个延迟帮助消耗掉剩余的缓存而无需每次请求都去请求后端。如果对后端的调用花费的时间过长,则增加它,如果你需要更快从种子启动爬虫,则减少它。

KAFKA_GET_TIMEOUT

默认: 5.0

从 Kafka 中获取数据的超时时间。

LOGGING_CONFIG

默认: logging.conf

logging 模块的路径。参考 https://docs.python.org/2/library/logging.config.html#logging-config-fileformat 如果文件不存在,日志将使用 logging.basicConfig() 实例化,将在 CONSOLE 输出日志。这个选项只在 db workerstrategy worker 中使用。 The path to a file with logging module configuration.

MAX_NEXT_REQUESTS

默认: 64

get_next_requests API 返回的最大请求数量。在分布式模式中,它应该是 db worker 为每个爬虫生成的请求数或者是每次从消息总线中获取的请求数。在单进程模式下,它应该是每次调用 get_next_requests 获取的请求数量。

MAX_REQUESTS

默认: 0

Frontera完成之后返回的最大请求数总量。如果是0(默认值),frontier 将一直运行。详情参考: Finishing the frontier.

MESSAGE_BUS

默认: frontera.contrib.messagebus.zeromq.MessageBus

指向 message bus 的实现。默认是 ZeroMQ。

MESSAGE_BUS_CODEC

默认: frontera.contrib.backends.remote.codecs.msgpack

指向 message bus 编码实现。参考 codec interface description。 默认是 MsgPack。

MIDDLEWARES

包含在 frontier 中启用的中间件的列表。详情参考 Activating a middleware

默认:

[
    'frontera.contrib.middlewares.fingerprint.UrlFingerprintMiddleware',
]
NEW_BATCH_DELAY

默认: 30.0

在 DB worker 中使用,它是为所有分区产生任务集合的时间间隔。如果分区很忙,它将被忽略掉。

OVERUSED_SLOT_FACTOR

默认: 5.0

(某个 slot 中进行中+队列中的请求)/ (每个slot允许的最大并发送)称作 overused。它只影响 Scrapy scheduler。

REQUEST_MODEL

默认: 'frontera.core.models.Request'

frontier 使用的 Request 模型。

RESPONSE_MODEL

默认: 'frontera.core.models.Response'

frontier 使用的 Response 模型。

SCORING_PARTITION_ID

默认: 0

被 strategy worker 使用,代表 strategy worker 被分配的分区。

SPIDER_LOG_PARTITIONS

默认: 1

spider log 分区的数量。这个参数影响 strategy worker (s) 的数量,每个 strategy worker 被分配到自己的分区。

SPIDER_FEED_PARTITIONS

默认: 1

spider feed 分区的数量。这个参数影响爬虫进程数量。每个爬虫被分配到了自己的分区。

SPIDER_PARTITION_ID

默认: 0

每个爬虫的配置,将爬虫指向分配给自己的分区。

STATE_CACHE_SIZE

默认: 1000000

在被清除之前状态缓存的最大数量。

STORE_CONTENT

默认: False

Determines if content should be sent over the message bus and stored in the backend: a serious performance killer.

TEST_MODE

默认: False

是否开启 frontier 的测试模式。 参考 Frontier test mode

内置指纹中间件设置

设置被 UrlFingerprintMiddlewareDomainFingerprintMiddleware 使用。

URL_FINGERPRINT_FUNCTION

默认: frontera.utils.fingerprint.sha1

用来计算 url 指纹的函数。

DOMAIN_FINGERPRINT_FUNCTION

默认: frontera.utils.fingerprint.sha1

用来计算 domain 指纹的函数。

TLDEXTRACT_DOMAIN_INFO

默认: False

如果设置为 True ,将使用 tldextract 附加额外的域信息(二级,顶级和子域名)到 meta 字段(参考 为对象添加其他值 )。

内置后端配置

SQLAlchemy
SQLALCHEMYBACKEND_CACHE_SIZE

默认: 10000

SQLAlchemy 元数据的 LRU 缓存大小。它用来缓存对象,这些对象从数据库获得,还缓存抓取的文档。它主要节约了数据库的吞吐量,如果你面临从数据库获得太多数据的问题增加它,如果你想节约内存就减少它。

SQLALCHEMYBACKEND_CLEAR_CONTENT

默认: True

如果你想禁止每次后端初始化的时候清理表数据则将之设为 False (例如每次启动 Scrapy 爬虫的时候)。

SQLALCHEMYBACKEND_DROP_ALL_TABLES

默认: True

如果你想禁止每次后端初始化的时候删除数据库表则将之设为 False (例如每次启动 Scrapy 爬虫的时候)。

SQLALCHEMYBACKEND_ENGINE

默认:: sqlite:///:memory:

SQLAlchemy 数据库 URL。 默认使用内存。

SQLALCHEMYBACKEND_ENGINE_ECHO

默认: False

打开/关闭 SQLAlchemy 的冗余输出。调试 SQL 语句的时候有用。

SQLALCHEMYBACKEND_MODELS

默认:

{
    'MetadataModel': 'frontera.contrib.backends.sqlalchemy.models.MetadataModel',
    'StateModel': 'frontera.contrib.backends.sqlalchemy.models.StateModel',
    'QueueModel': 'frontera.contrib.backends.sqlalchemy.models.QueueModel'
}

用来设置后端使用的 SQLAlchemy 模型。主要用来定制化。

重新抓取后端
SQLALCHEMYBACKEND_REVISIT_INTERVAL

默认: timedelta(days=1)

重新访问网页的时间,使用 ``datetime.timedelta` 表示。它会影响网页的定期抓取。之前抓取的网页还是使用旧的时间间隔。

HBase 后端
HBASE_BATCH_SIZE

默认: 9216

在发送到HBase之前累计的PUT操作的数量。

HBASE_DROP_ALL_TABLES

默认: False

允许在 worker 启动的时候删除并重建 Hbase 表格。

HBASE_METADATA_TABLE

默认: metadata

网页元数据表格的名字。

HBASE_NAMESPACE

默认: crawler

所有爬虫程序相关表在 HBase 命名空间的名称。

HBASE_QUEUE_TABLE

默认: queue

Hbase 中队列优先级表。

HBASE_STATE_CACHE_SIZE_LIMIT

默认: 3000000

在写入 HBase 和清除之前 strategy workerstate cache 的最多数量。

HBASE_THRIFT_HOST

默认: localhost

HBase Thrift server 主机

HBASE_THRIFT_PORT

默认: 9090

HBase Thrift server 端口

HBASE_USE_FRAMED_COMPACT

默认: False

启用此选项可大大降低传输开销,但是服务器需要正确配置才能使用节俭框架运输和紧凑协议。

HBASE_USE_SNAPPY

默认: False

是否在 Hbase 中使用 snappy 压缩内容和元数据。在HBase中减少磁盘和网络IO的数量,降低响应时间。 HBase必须正确配置以支持Snappy压缩。

ZeroMQ 消息总线设置

消息总线类是 distributed_frontera.messagebus.zeromq.MessageBus

ZMQ_ADDRESS

默认: 127.0.0.1

定义 ZeroMQ 套接字应该绑定或连接的位置。可以是主机名或IP地址。现在,ZMQ 只有通过 IPv4进行了测试。IPv6在不久的将来会增加支持。

ZMQ_BASE_PORT

默认: 5550

所有 ZeroMQ 的基本端口。 它使用6个 sockets ,且6个端口是顺序的.确保[base:base + 5]所表示的端口都是可用的。

Kafka 消息总线配置

这个消息总线的类是 frontera.contrib.messagebus.kafkabus.MessageBus

KAFKA_LOCATION

kafka 代理的主机名和端口号,以 :分割。可以是主机名:端口的字符串对,用逗号(,)分隔。

KAFKA_CODEC

默认: None

Kafka-python 1.0.x 版本使用的压缩方式, 它应该是 None 或者 snappy, gzip``lz4``中的一个。

SPIDER_LOG_DBW_GROUP

默认: dbw-spider-log

Kafka 消费者群组名,被 db workerspider log 使用。

SPIDER_LOG_SW_GROUP

默认: sw-spider-log

Kafka 消费者群组名,被 strategy workerspider log 使用。

SCORING_LOG_DBW_GROUP

默认: dbw-scoring-log

Kafka 消费者群组名,被 db workerscoring log 使用。

SPIDER_FEED_GROUP

默认: fetchers-spider-feed

Kafka 消费者群组名,被 spiderspider feed 使用。

SPIDER_LOG_TOPIC

默认: frontier-done

spider log 数据流的 topic 名字。

SPIDER_FEED_TOPIC

默认: frontier-todo

spider feed 数据流的 topic 名字。

SCORING_LOG_TOPIC

scoring log 数据流的 topic 名字。

Default settings

如果没有指定设置,frontier 将使用内置的默认设置。有关默认值的完整列表,请参见 Built-in settings reference 。所有默认设置都可以被覆盖。

安装指南
安装方法和依赖的选项。
Frontier 对象
理解用来代表网络请求和网络响应的类。
Middlewares(中间件)
过滤或者更改链接和网页的信息。
内置规范 URL 解算器参考
确认和使用网页的规范url。
后端
自定义抓取规则和存储方式。
消息总线
内置消息总线参考。
抓取策略
为分布式后端实现自己的抓取策略。
使用 Frontier 和 Scrapy
学习如何使用 Frontera + Scrapy 。
Settings
设置参考。

高级用法

什么是 Crawl Frontier?

Frontera 一个实现 crawl frontier 的框架。crawler frontier 是爬虫系统的一部分,它决定了爬虫抓取网页时候的逻辑和策略(哪些页面应该被抓取,优先级和排序,页面被重新抓取的频率等)。

通常的 crawl frontier 方案是:

_images/frontier_01.png

frontier 以 URL 列表初始化,我们称之为种子。 一旦边界初始化,爬虫程序会询问下一步应该访问哪些页面。 当爬虫开始访问页面并获取结果时,它将通知 frontier 每个页面响应以及页面中包含的超链接。 这些链接被 frontier 当做新的请求加入,安装抓取策略进行抓取。

这个过程(请求新的任务/通知结果)会一直重复直到达到爬虫的结束条件。一些爬虫可能不会停止,我们称之为永久爬虫。

Frontier 抓取策略几乎可以基于任何的逻辑。常见情况是基于得分/优先级,它们通过一个或多个页面的属性(新鲜度,更新时间,某些条款的内容相关性等)计算得来。也可以基于很简单的逻辑,比如 FIFO/LIFODFS/BFS

根据 frontier 的逻辑,可能需要持久存储系统来存储,更新或查询页面信息。 其他系统可能是100%不稳定的,并且不会在不同爬虫之间共享任何信息。

更多 crawl frontier 理论请参照 Christopher D. Manning, Prabhakar Raghavan & Hinrich Schütze 写的 URL frontier 文章。

Graph Manager

Graph Manager 是一种将网站地图表示为图形的工具。

它可以很容易地用于测试 frontier。我们可以向 graph manager 查询页面来伪造爬虫的请求/响应,并且不使用爬虫就可以知道每个页面的提取链接。你可以使用你的伪造测试或者 Frontier Tester tool

你可以用它来定义你自己的网站,用来测试或者使用 Scrapy Recorder 来记录抓取过程并可以在之后重现。

定义一个网站图

网站的页面及其链接可以轻松定义为有向图,其中每个节点表示页面边缘之间的链接。

我们使用一个非常简单的站点表示,一个起始页面 A 里面有一个链接到 B, C, D 里面的链接。 我们可以用这个图表来表示网站:

_images/site_01.png

我们使用列表来表示不同的网站页面和元组来定义页面及其链接,上面的例子

site = [
    ('A', ['B', 'C', 'D']),
]

请注意,我们不需要定义没有链接的页面,但是我们也可以将其用作有效的表示

site = [
    ('A', ['B', 'C', 'D']),
    ('B', []),
    ('C', []),
    ('D', []),
]

一个更复杂的网站:

_images/site_02.png

可以表示为:

site = [
    ('A', ['B', 'C', 'D']),
    ('D', ['A', 'D', 'E', 'F']),
]

注意 D 链接到自己和它的父节点 A

同样的,一个页面可以有多个父节点:

_images/site_03.png
site = [
    ('A', ['B', 'C', 'D']),
    ('B', ['C']),
    ('D', ['C']),
]

为了简化示例,我们不使用网址表示,但当然url是可以用于网站图:

_images/site_04.png
site = [
    ('http://example.com', ['http://example.com/anotherpage', 'http://othersite.com']),
]

使用 Graph Manager

一旦我们将我们的站点定义为一个图表,我们就可以开始使用 Graph Manager 了。

我们必须先创建我们的图表管理器

>>> from frontera.utils import graphs
>>> g = graphs.Manager()

使用 add_site 方法添加网站

>>> site = [('A', ['B', 'C', 'D'])]
>>> g.add_site(site)

这个管理器现在被初始化并准备好使用。

我们可以得到图表中的所有页面

>>> g.pages
[<1:A*>, <2:B>, <3:C>, <4:D>]

星号表示页面是种子,如果我们想要获取站点图形的种子

>>> g.seeds
[<1:A*>]

我们可以用 get_page 获取一个单独页面, 如果页面不存在就返回 None

>>> g.get_page('A')
<1:A*>
>>> g.get_page('F')
None

CrawlPage 对象

页面使用 CrawlPage 对象表示:

class CrawlPage

CrawlPage 对象表示一个 Graph Manager 页面,且该页面通常在 Graph Manager 生成。

id

自动页面 id。

url

页面 url。

status

代表页面状态码。

is_seed

布尔值表示页面是不是种子页面

当前页面链接到的页面列表。

referers

链接到当前页面的页面列表。

例子:

>>> p = g.get_page('A')
>>> p.id
1

>>> p.url
u'A'

>>> p.status  # defaults to 200
u'200'

>>> p.is_seed
True

>>> p.links
[<2:B>, <3:C>, <4:D>]

>>> p.referers  # No referers for A
[]

>>> g.get_page('B').referers  # referers for B
[<1:A*>]

添加页面和链接

网站图也可以定义为单独添加页面和链接,我们可以用这种方式定义相同的图形:

>>> g = graphs.Manager()
>>> a = g.add_page(url='A', is_seed=True)
>>> b = g.add_link(page=a, url='B')
>>> c = g.add_link(page=a, url='C')
>>> d = g.add_link(page=a, url='D')

add_pageadd_link 可以随时和 add_site 配合使用:

>>> site = [('A', ['B', 'C', 'D'])]
>>> g = graphs.Manager()
>>> g.add_site(site)
>>> d = g.get_page('D')
>>> g.add_link(d, 'E')

添加多个网站

多个网站可以加入管理器:

>>> site1 = [('A1', ['B1', 'C1', 'D1'])]
>>> site2 = [('A2', ['B2', 'C2', 'D2'])]

>>> g = graphs.Manager()
>>> g.add_site(site1)
>>> g.add_site(site2)

>>> g.pages
[<1:A1*>, <2:B1>, <3:C1>, <4:D1>, <5:A2*>, <6:B2>, <7:C2>, <8:D2>]

>>> g.seeds
[<1:A1*>, <5:A2*>]

或者使用 add_site_list 方法

>>> site_list = [
    [('A1', ['B1', 'C1', 'D1'])],
    [('A2', ['B2', 'C2', 'D2'])],
]
>>> g = graphs.Manager()
>>> g.add_site_list(site_list)

Graphs 数据库

Graph Manager 使用 SQLAlchemy 来存储和重现图。

默认使用 SQLite 内存模式当成数据库引擎,但是 any databases supported by SQLAlchemy 中提到的都可以支持。

使用 SQLite 的例子

>>> g = graphs.Manager(engine='sqlite:///graph.db')

默认情况下,每个新添加都会进行更改,稍后可以加载图

>>> graph = graphs.Manager(engine='sqlite:///graph.db')
>>> graph.add_site(('A', []))

>>> another_graph = graphs.Manager(engine='sqlite:///graph.db')
>>> another_graph.pages
[<1:A1*>]

可以使用 clear_content 参数重置数据库

>>> g = graphs.Manager(engine='sqlite:///graph.db', clear_content=True)

使用状态码

为了使用图重新创建/模拟爬虫,可以为每个页面定义HTTP响应代码。

一个404错误的例子

>>> g = graphs.Manager()
>>> g.add_page(url='A', status=404)

可以使用元组列表以下列方式为站点定义状态代码:

>>> site_with_status_codes = [
    ((200, "A"), ["B", "C"]),
    ((404, "B"), ["D", "E"]),
    ((500, "C"), ["F", "G"]),
]
>>> g = graphs.Manager()
>>> g.add_site(site_with_status_codes)

新页面的默认状态码值为200。

一个简单的伪造爬虫例子

使用 Frontier Tester tool 能更好的完成 Frontier 的测试,但这里就是一个例子,说明如何伪造 frontier:

from frontera import FrontierManager, Request, Response
from frontera.utils import graphs

if __name__ == '__main__':
    # Load graph from existing database
    graph = graphs.Manager('sqlite:///graph.db')

    # Create frontier from default settings
    frontier = FrontierManager.from_settings()

    # Create and add seeds
    seeds = [Request(seed.url) for seed in graph.seeds]
    frontier.add_seeds(seeds)

    # Get next requests
    next_requets = frontier.get_next_requests()

    # Crawl pages
    while (next_requests):
        for request in next_requests:

            # Fake page crawling
            crawled_page = graph.get_page(request.url)

            # Create response
            response = Response(url=crawled_page.url, status_code=crawled_page.status)

            # Update Page
            page = frontier.page_crawled(response=response
                                         links=[link.url for link in crawled_page.links])
            # Get next requests
            next_requets = frontier.get_next_requests()

渲染图

图可以被渲染成 png 文件:

>>> g.render(filename='graph.png', label='A simple Graph')

渲染图使用的是 pydot, Graphviz’s 点语言的 python 接口。

如何使用

Graph Manager 可以结合 Frontier Tester 测试 frontiers,也可以结合 Scrapy Recordings

记录 Scrapy 抓取过程

Scrapy Recorder 是一套 Scrapy middlewares ,可以让您记录scrapy抓取过程并将其存储到 Graph Manager 中。

这可以用于执行 frontier 测试,而不必再次爬取整个站点,甚至不用使用 Scrapy。

激活 recorder

recorder 使用了两个中间件: CrawlRecorderSpiderMiddlewareCrawlRecorderDownloaderMiddleware

要在 Scrapy 项目中激活 recorder,只需要将他们加入 SPIDER_MIDDLEWARESDOWNLOADER_MIDDLEWARES 配置中:

SPIDER_MIDDLEWARES.update({
    'frontera.contrib.scrapy.middlewares.recording.CrawlRecorderSpiderMiddleware': 1000,
})

DOWNLOADER_MIDDLEWARES.update({
    'frontera.contrib.scrapy.middlewares.recording.CrawlRecorderDownloaderMiddleware': 1000,
})

选择你的存储引擎

因为 recorder 内部使用 Graph Manager 存储抓取的网页,所以你可以选择存储引擎,参照 different storage engines

我们使用 RECORDER_STORAGE_ENGINE 配置存储引擎:

RECORDER_STORAGE_ENGINE = 'sqlite:///my_record.db'

您还可以选择重置数据库表或仅重置数据:

RECORDER_STORAGE_DROP_ALL_TABLES = True
RECORDER_STORAGE_CLEAR_CONTENT = True

运行爬虫

和之前一样从命令行运行爬虫:

scrapy crawl myspider

一旦完成,抓取过程会被记录。

如果你需要取消记录,可以设置 RECORDER_ENABLED

scrapy crawl myspider -s RECORDER_ENABLED=False

Recorder 设置

以下是所有可用Scrapy Recorder设置的列表,按字母顺序排列,以及默认值及其应用范围。

RECORDER_ENABLED

默认: True

激活或停用中间件。

RECORDER_STORAGE_CLEAR_CONTENT

默认: True

删除 storage database 中的数据。

RECORDER_STORAGE_DROP_ALL_TABLES

默认: True

删除 storage database 中的表。

RECORDER_STORAGE_ENGINE

默认: None

设置 Graph Manager storage engine 来存储记录。

Frontera 集群优化

为什么爬行速度如此之低?

寻找瓶颈。

  • 所有请求都针对少量的几个网站,
  • DNS 解析(参考 DNS Service ),
  • strategy worker 性能问题
  • db worker 生成任务不足
  • HBase 相应时间过长
  • 集群内的网络过载。

优化 HBase

  • 在 HBase 中增加块缓存。
  • 在每个 HBase 区服务器上部署 Thrift 服务器,并将负载从 SW 传播到 Thrift。
  • 启用 Snappy 压缩 (参照 HBASE_USE_SNAPPY).

优化 Kafka

  • 将日志大小降至最低,并优化系统以避免在 Kafka 存储大量数据。 一旦写入数据,它应尽可能快地消耗。
  • 使用SSD或甚至RAM存储 Kafka logs,
  • 启用 Snappy 压缩。

各种组件之间的流量控制

MAX_NEXT_REQUESTS 用于控制批量任务大小。 在爬虫配置中,它控制每个 get_next_requests 调用返回多少任务。同时在 DB worker 中配置它,它会设置每个分区生成的任务数。 设置这些参数时要牢记:

  • DB worker 和爬虫值必须保持一致,以避免消息总线过载和消息丢失。 换句话说,DB worker 产生的任务要比爬虫消耗的要少一些,因为即使DB worker还没来得及产生新的任务,蜘蛛应该仍然可以获取新的页面。
  • 爬虫消费率取决于许多因素:互联网连接延迟,蜘蛛解析/抓取工作量,延迟和自动限制设置,代理使用等。
  • 保持爬虫任务队列总是满的,以防止蜘蛛空闲。
  • 一般建议是设置 DB worker值比爬虫大2-4倍。
  • 批量生成任务数量不应太大,这样才不会在后端产生太多的负载,并允许系统对队列更改做出快速反应。
  • 注意有关丢失的消息的警告。

DNS 服务

除了提到的 前提 你可能还需要一个专用的 DNS 服务。特别是在你的爬虫会产生大量 DNS 请求的情况下。在广度优先抓取或者其他短时间内访问大量网站的情况下,使用专用的 DNS 服务都是正确的。

由于负载巨大,DNS 服务最终可能会被您的网络提供商阻止。

DNS策略有两种选择:

  • 递归DNS解析,
  • 利用上游服务器(大规模的DNS缓存像OpenDNS或Verizon)。

第二个仍然容易阻塞。

NLnet 实验室发布的 DNS 服务软件很好用 https://www.unbound.net/ 。它允许选择上述策略之一,并维护本地DNS缓存。

请查看 Scrapy 选项 REACTOR_THREADPOOL_MAXSIZEDNS_TIMEOUT

什么是 Crawl Frontier?
学习 Crawl Frontier 理论。
Graph Manager
定义假的抓取规则来测试你的 frontier 。
记录 Scrapy 抓取过程
创建 Scrapy 抓取记录,并在之后重现他们。
Frontera 集群优化
机器部署和微调信息。
DNS 服务
DNS 服务搭建简介。

开发者文档

架构概述

本文档介绍了Frontera Manager管道,分布式组件以及它们的交互方式。

单进程

下图显示了Frontera管道的架构,其组件(由数字引用)和系统内发生的数据流的轮廓。 有关组件的简要说明,请参见以下有关它们的更多详细信息的链接。 数据流也在下面描述。

_images/frontier_02.png
组件
Fetcher

Fetcher(2)负责从网站(1)中获取网页,并将其提供给管理接下来要抓取哪些页面的 frontier。

Fetcher 可以使用 Scrapy 或任何其他爬虫框架/系统来实现,因为框架提供了通用的 frontier 功能。

在分布式运行模式下,Fetcher由Frontera Manager侧的消息总线生产者和Fetcher侧的消费者替代。

Frontera API / Manager

Frontera API(3)的主要入口点是 FrontierManager 对象。Frontier 用户(在我们的案例中是Fetcher(2))将通过它与 Frontier 进行通信。

更多请参考 Frontera API

Middlewares

Frontier middlewares (4) 是位于Manager(3)和Backend(5)之间的特定钩子。 这些中间件在传入和传出 Frontier 和 Backend 时处理 RequestResponse 对象。 它们通过插入自定义代码提供了一种方便的扩展功能的机制。 规范URL解算器是一种特殊的中间件,负责替代非规范文档URL。

更改请参考 Middlewares(中间件)内置规范 URL 解算器参考

Backend

frontier Backend(5)是爬行逻辑/策略所在的地方。 它负责接收所有抓取信息并选择接下来要抓取的页面。 Backend 旨在在更高级别上运行,而:class:Queue <frontera.core.components.Queue>, MetadataStates 对象负责低级存储通信代码。

根据实现的逻辑,可能需要一个持久性存储(6)来管理 RequestResponse 对象信息。

更多请参考 后端

数据流

Frontera 的数据流由 Frontier Manager 控制,所有数据都通过 manager-middlewares-backend 流程,如下所示:

  1. frontier初始化为种子请求列表(种子URL)作为爬虫的入口点。
  2. fetcher请求一批任务去抓取。
  3. 每个url都被提取,并且 frontier 被通知回传抓取结果以及页面包含的提取数据。 如果在爬行中出现问题,frontier 也会被通知。

一旦所有 url 被抓取,重复步骤2-3,直到达到 frontier 结束条件。每个循环(步骤2-3)重复被称为 frontier 迭代

分布式

在分布式模式下运行时,所有 Frontera 进程都使用相同的 Frontera Manager。

整体系统形成一个封闭的圆圈,所有的组件都在无限循环中作为守护进程工作。 有一个 message bus 负责在组件,持久存储和 fetcher(当和提取结合时,fetcher又叫做spider)之间传输消息。 有一个传输和存储层抽象,所以可以插上它自己的实现。 分布式后端运行模式具有三种类型的实例:

  • Spiders 或者 fetchers,使用Scrapy(分片)实现。
    负责解决DNS查询,从互联网获取内容并从内容中进行链接(或其他数据)提取。
  • Strategy workers (分片)。
    运行爬网策略代码:为链接链接,决定链接是否需要被抓取,以及何时停止抓取。
  • DB workers (分片)。
    存储所有元数据,包括分数和内容,并生成新的批量任务以供爬虫下载。

*分片*意味着组件仅消耗分配的分区的消息,例如处理数据流的某些共享。*复制*是组件消耗数据流,而不管分区。

这样的设计允许在线操作。可以更改抓取策略,而无需停止抓取。 爬虫策略 也可以作为单独的模块实现; 包含用于检查爬网停止条件,URL排序和评分模型的逻辑。

Frontera 的设计是对Web友好的,每个主机由不超过一个的爬虫进程下载。这是通过数据流分区实现的。

_images/frontera-design.png
数据流

我们从爬虫开始吧。爬虫内的用户定义的种子URL通过 spider log 流发送给 strategy workers 和 DB workers。strategy workers 使用状态缓存决定抓取哪些页面,为每个页面分配一个分数,并将结果发送到 scoring log 流。

DB Worker存储各种元数据,包括内容和分数。另外 DB Worker 检查爬虫消费者的偏移量,并在需要时生成新的任务,并将其发送到 spider feed 流。爬虫消耗这些任务,下载每个页面并从中提取链接。然后将链接发送到 ‘Spider Log’ 流,并将其存储和记分。 这样,流量将无限期地重复。

Frontera API

本节介绍了 Frontera 核心API,适用于中间件和后端的开发人员。

Frontera API / Manager

Frontera API的主要入口点是 FrontierManager 对象,通过from_manager类方法传递给中间件和后端。该对象提供对所有Frontera核心组件的访问,并且是中间件和后端访问它们并将其功能挂接到Frontera中的唯一方法。

FrontierManager 负责加载安装的中间件和后端,以及用于管理整个 frontier 的数据流。

从settings加载

尽管 FrontierManager 可以通过参数初始化,但最常用的初始化方法还是使用 Frontera Settings

这个可以通过 from_settings 类方法实现,使用字符串路径:

>>> from frontera import FrontierManager
>>> frontier = FrontierManager.from_settings('my_project.frontier.settings')

或者一个 BaseSettings 对象:

>>> from frontera import FrontierManager, Settings
>>> settings = Settings()
>>> settings.MAX_PAGES = 0
>>> frontier = FrontierManager.from_settings(settings)

也可以无参数初始化,这种情况下 frontier 会使用 默认配置

>>> from frontera import FrontierManager, Settings
>>> frontier = FrontierManager.from_settings()

Frontier Manager

class frontera.core.manager.FrontierManager(request_model, response_model, backend, middlewares=None, test_mode=False, max_requests=0, max_next_requests=0, auto_start=True, settings=None, canonicalsolver=None, db_worker=False, strategy_worker=False)

The FrontierManager object encapsulates the whole frontier, providing an API to interact with. It’s also responsible of loading and communicating all different frontier components.

参数:
  • request_model (object/string) – The Request object to be used by the frontier.
  • response_model (object/string) – The Response object to be used by the frontier.
  • backend (object/string) – The Backend object to be used by the frontier.
  • middlewares (list) – A list of Middleware objects to be used by the frontier.
  • test_mode (bool) – Activate/deactivate frontier test mode.
  • max_requests (int) – Number of pages after which the frontier would stop (See Finish conditions).
  • max_next_requests (int) – Maximum number of requests returned by get_next_requests method.
  • auto_start (bool) – Activate/deactivate automatic frontier start (See starting/stopping the frontier).
  • settings (object/string) – The Settings object used by the frontier.
  • canonicalsolver (object/string) – The CanonicalSolver object to be used by frontier.
  • db_worker (bool) – True if class is instantiated in DB worker environment
  • strategy_worker (bool) – True if class is instantiated in strategy worker environment

Attributes

request_model

The Request object to be used by the frontier. Can be defined with REQUEST_MODEL setting.

response_model

The Response object to be used by the frontier. Can be defined with RESPONSE_MODEL setting.

backend

The Backend object to be used by the frontier. Can be defined with BACKEND setting.

middlewares

A list of Middleware objects to be used by the frontier. Can be defined with MIDDLEWARES setting.

test_mode

Boolean value indicating if the frontier is using frontier test mode. Can be defined with TEST_MODE setting.

max_requests

Number of pages after which the frontier would stop (See Finish conditions). Can be defined with MAX_REQUESTS setting.

max_next_requests

Maximum number of requests returned by get_next_requests method. Can be defined with MAX_NEXT_REQUESTS setting.

auto_start

Boolean value indicating if automatic frontier start is activated. See starting/stopping the frontier. Can be defined with AUTO_START setting.

settings

The Settings object used by the frontier.

iteration

Current frontier iteration.

n_requests

Number of accumulated requests returned by the frontier.

finished

Boolean value indicating if the frontier has finished. See Finish conditions.

API Methods

start()

Notifies all the components of the frontier start. Typically used for initializations (See starting/stopping the frontier).

返回:None.
stop()

Notifies all the components of the frontier stop. Typically used for finalizations (See starting/stopping the frontier).

返回:None.
add_seeds(seeds)

Adds a list of seed requests (seed URLs) as entry point for the crawl.

参数:seeds (list) – A list of Request objects.
返回:None.
get_next_requests(max_next_requests=0, **kwargs)

Returns a list of next requests to be crawled. Optionally a maximum number of pages can be passed. If no value is passed, FrontierManager.max_next_requests will be used instead. (MAX_NEXT_REQUESTS setting).

参数:
  • max_next_requests (int) – Maximum number of requests to be returned by this method.
  • kwargs (dict) – Arbitrary arguments that will be passed to backend.
返回:

list of Request objects.

page_crawled(response)

Informs the frontier about the crawl result.

参数:response (object) – The Response object for the crawled page.
返回:None.
request_error(request, error)

Informs the frontier about a page crawl error. An error identifier must be provided.

参数:
  • request (object) – The crawled with error Request object.
  • error (string) – A string identifier for the error.
返回:

None.

Class Methods

classmethod from_settings(settings=None, db_worker=False, strategy_worker=False)

Returns a FrontierManager instance initialized with the passed settings argument. If no settings is given, frontier default settings are used.

启动/停止 frontier

有时,frontier 组件需要执行初始化和最终化操作。frontier 通过 start()stop() 方法去通知不同组件启动或者停止。

默认 auto_start 值是激活的,这意味着在创建 FrontierManager 对象后,组件将被通知。如果您需要对初始化不同组件时进行更精细的控制,请停用 auto_start 并手动调用frontier API start()stop() 方法。

auto_start 处于激活状态时,Frontier stop() 方法不会自动调用(因为frontier 不知道抓取状态)。如果您需要通知 frontier 组件,您应该手动调用该方法。

Frontier 迭代

一旦 frontier 运行,通常的过程就是 data flow 部分所描述的过程。

爬虫调用 get_next_requests() 方法请求接下来要抓取的页面。每次 frontier 返回一个非空列表(可用数据),就是我们所说的前沿迭代。当前 frontier 迭代可以通过 iteration 属性访问。

结束 frontier

抓取过程可以被爬虫程序或者 Frontera 停止。当返回最大页数时,Frontera 将结束。此限制由 max_requests 属性控制( MAX_REQUESTS 设置)。

如果 max_requests 设置为0,那么 frontier 会无限抓取下去。

一旦 frontier 完成,get_next_requests 方法将不再返回任何页面,并且 finished 属性将为True。

组件对象

class frontera.core.components.Component

Interface definition for a frontier component The Component object is the base class for frontier Middleware and Backend objects.

FrontierManager communicates with the active components using the hook methods listed below.

Implementations are different for Middleware and Backend objects, therefore methods are not fully described here but in their corresponding section.

Attributes

name

The component name

Abstract methods

frontier_start()

Called when the frontier starts, see starting/stopping the frontier.

frontier_stop()

Called when the frontier stops, see starting/stopping the frontier.

add_seeds(seeds)

This method is called when new seeds are added to the frontier.

参数:seeds (list) – A list of Request objects.
page_crawled(response)

This method is called every time a page has been crawled.

参数:response (object) – The Response object for the crawled page.
request_error(page, error)

This method is called each time an error occurs when crawling a page.

参数:
  • request (object) – The crawled with error Request object.
  • error (string) – A string identifier for the error.

Class Methods

classmethod from_manager(manager)

Class method called from FrontierManager passing the manager itself.

Example of usage:

def from_manager(cls, manager):
    return cls(settings=manager.settings)

测试模式

在某些情况下,在测试中,frontier 组件需要采用与通常不同的方式(例如,在测试模式下解析域URL时,domain middleware 会接受诸如 'A1' 或者 'B1' 之类的非有效URL)。

组件可以通过 test_mode 属性知道 frontier 是否处于测试模式。

使用 frontier 的其他方法

与 frontier 通信也可以通过HTTP API或队列系统等其他机制完成。这些功能暂时不可用,但希望包含在将来的版本中。

Frontier + Requests

为了结合 frontier 和 Requests ,提供了 RequestsFrontierManager 类。

这个类是一个简单的 FrontierManager 封装,它使用 Requests 对象 (Request/Response),将他们和 frontier 相互转换。

FrontierManager 一样使用,使用你的 settings 初始化它。 get_next_requests 将返回 Requests Request 对象。

一个例子:

import re

import requests

from urlparse import urljoin

from frontera.contrib.requests.manager import RequestsFrontierManager
from frontera import Settings

SETTINGS = Settings()
SETTINGS.BACKEND = 'frontera.contrib.backends.memory.FIFO'
SETTINGS.LOGGING_MANAGER_ENABLED = True
SETTINGS.LOGGING_BACKEND_ENABLED = True
SETTINGS.MAX_REQUESTS = 100
SETTINGS.MAX_NEXT_REQUESTS = 10

SEEDS = [
    'http://www.imdb.com',
]

LINK_RE = re.compile(r'href="(.*?)"')


def extract_page_links(response):
    return [urljoin(response.url, link) for link in LINK_RE.findall(response.text)]

if __name__ == '__main__':

    frontier = RequestsFrontierManager(SETTINGS)
    frontier.add_seeds([requests.Request(url=url) for url in SEEDS])
    while True:
        next_requests = frontier.get_next_requests()
        if not next_requests:
            break
        for request in next_requests:
                try:
                    response = requests.get(request.url)
                    links = [requests.Request(url=url) for url in extract_page_links(response)]
                    frontier.page_crawled(response=response)
                    frontier.links_extracted(request=request, links=links)
                except requests.RequestException, e:
                    error_code = type(e).__name__
                    frontier.request_error(request, error_code)

例子

这个项目包含了 examples 文件夹,其中有使用 Frontera 的代码

examples/
    requests/
    general-spider/
    scrapy_recording/
    scripts/
  • requests: Example script with Requests library.
  • general-spider: Scrapy 整合示例项目。
  • scrapy_recording: Scrapy 记录示例项目。
  • scripts: 一些简单的脚本。

注解

这个例子可能需要安装额外的库才能工作.

你可以使用 pip 来安装它们:

pip install -r requirements/examples.txt

requests

一个使用 Requests 库,抓取一个网站所有链接的脚本。

运行:

python links_follower.py

general-spider

一个简单的 Scrapy 爬虫,执行所有的种子任务。包含单进程,分布式爬虫和后端运行模式的配置文件。

查看 分布式模式快速入门 如何运行。

cluster

是一个大型可扩展爬虫程序,用于抓取每个域有限制的大量网站。它在HBase中保留每个域的状态,并在安排新的下载请求时使用它。设计用于使用 HBase 在分布式后端运行模式下运行。

scrapy_recording

一个带有爬虫的简单脚本,可以跟踪站点的所有链接,记录抓取结果。

运行:

scrapy crawl recorder

scripts

一些关于如何使用不同 frontier 组件的示例脚本。

Tests

Frontera 测试使用 pytest 工具实现。

您可以使用pip安装 pytest 和测试中使用的附加必需库:

pip install -r requirements/tests.txt

运行 tests

要运行所有测试,请转到源代码的根目录并运行:

py.test

写 tests

所有功能(包括新功能和错误修复)都必须包含一个测试用例,以检查它是否按预期工作,所以如果你希望他们能够早点上线,请尽快写好相关的测试。

后端 testing

有一个继承 pytest 的类用来测试 Backend: BackendTest

比方说,你想测试你的后端 MyBackend 并为每个测试方法调用创建一个新的 frontier 实例,你可以定义一个这样的测试类:

class TestMyBackend(backends.BackendTest):

    backend_class = 'frontera.contrib.backend.abackend.MyBackend'

    def test_one(self):
        frontier = self.get_frontier()
        ...

    def test_two(self):
        frontier = self.get_frontier()
        ...

    ...

如果它使用一个数据库文件,你需要在每次测试之前和之后进行清理:

class TestMyBackend(backends.BackendTest):

    backend_class = 'frontera.contrib.backend.abackend.MyBackend'

    def setup_backend(self, method):
        self._delete_test_db()

    def teardown_backend(self, method):
        self._delete_test_db()

    def _delete_test_db(self):
        try:
            os.remove('mytestdb.db')
        except OSError:
            pass

    def test_one(self):
        frontier = self.get_frontier()
        ...

    def test_two(self):
        frontier = self.get_frontier()
        ...

    ...

测试后端执行顺序

为了测试 Backend 抓取顺序你可以使用 BackendSequenceTest 类。

BackendSequenceTest 将依据传过来的网站图进行一遍完整的抓取,并返回后端访问网页的顺序。

比如你想测试一个按照字母顺序抓取网页的后端。你可以这样写测试:

class TestAlphabeticSortBackend(backends.BackendSequenceTest):

    backend_class = 'frontera.contrib.backend.abackend.AlphabeticSortBackend'

    SITE_LIST = [
        [
            ('C', []),
            ('B', []),
            ('A', []),
        ],
    ]

    def test_one(self):
        # Check sequence is the expected one
        self.assert_sequence(site_list=self.SITE_LIST,
                             expected_sequence=['A', 'B', 'C'],
                             max_next_requests=0)

    def test_two(self):
        # Get sequence and work with it
        sequence = self.get_sequence(site_list=SITE_LIST,
                            max_next_requests=0)
        assert len(sequence) > 2

    ...

测试基本算法

如果你的后端使用 基本算法逻辑 中的一个,你可以继承对应的测试类,之后顺序会被自动测试:

from tests import backends


class TestMyBackendFIFO(backends.FIFOBackendTest):
    backend_class = 'frontera.contrib.backends.abackend.MyBackendFIFO'


class TestMyBackendLIFO(backends.LIFOBackendTest):
    backend_class = 'frontera.contrib.backends.abackend.MyBackendLIFO'


class TestMyBackendDFS(backends.DFSBackendTest):
    backend_class = 'frontera.contrib.backends.abackend.MyBackendDFS'


class TestMyBackendBFS(backends.BFSBackendTest):
    backend_class = 'frontera.contrib.backends.abackend.MyBackendBFS'


class TestMyBackendRANDOM(backends.RANDOMBackendTest):
    backend_class = 'frontera.contrib.backends.abackend.MyBackendRANDOM'

Logging

Frontera 使用 Python 原生日志系统。这允许用户通过配置文件(参考 LOGGING_CONFIG )或者在运行时配置 logger。

Logger 配置语法在这里 https://docs.python.org/2/library/logging.config.html

使用的Loggers

  • kafka
  • hbase.backend
  • hbase.states
  • hbase.queue
  • sqlalchemy.revisiting.queue
  • sqlalchemy.metadata
  • sqlalchemy.states
  • sqlalchemy.queue
  • offset-fetcher
  • messagebus-backend
  • cf-server
  • db-worker
  • strategy-worker
  • messagebus.kafka
  • memory.queue
  • memory.dequequeue
  • memory.states
  • manager.components
  • manager
  • frontera.contrib.scrapy.schedulers.FronteraScheduler

测试一个 Frontier

Frontier Tester是一个帮助类,用于方便 frontier 测试。

基本上它基于 Frontier 运行一个模拟的爬取过程,爬虫信息是使用Graph Manager实例伪造的。

创建一个 Frontier Tester

FrontierTester 需要一个 Graph Manager 和一个 FrontierManager 实例:

>>> from frontera import FrontierManager, FrontierTester
>>> from frontera.utils import graphs
>>> graph = graphs.Manager('sqlite:///graph.db')  # Crawl fake data loading
>>> frontier = FrontierManager.from_settings()  # Create frontier from default settings
>>> tester = FrontierTester(frontier, graph)

运行一个 Test

tester 已经被实例化,现在只需调用 run 函数运行:

>>> tester.run()

当 run 方法被调用 tester 将:

  1. 从图中添加所有的种子。
  2. 向 frontier 询问新的任务。
  3. 模拟页面响应,并通知 frontier 关于页面抓取及其链接。

重复步骤1和2,直到抓取或 frontier 结束。

测试完成后,抓取页面 sequence 可作为 frontier Request 对象列表使用。

测试参数

在某些测试用例中,您可能需要将所有页面添加为种子,这可以通过参数 add_all_pages 来完成:

>>> tester.run(add_all_pages=True)

每个 get_next_requests 调用的最大返回页数可以使用 frontier settings进行设置,也可创建 FrontierTester 时使用 max_next_pages 参数进行修改:

>>> tester = FrontierTester(frontier, graph, max_next_pages=10)

使用的例子

一个使用 graph 测试数据和 basic backends 的例子

from frontera import FrontierManager, Settings, FrontierTester, graphs


def test_backend(backend):
    # Graph
    graph = graphs.Manager()
    graph.add_site_list(graphs.data.SITE_LIST_02)

    # Frontier
    settings = Settings()
    settings.BACKEND = backend
    settings.TEST_MODE = True
    frontier = FrontierManager.from_settings(settings)

    # Tester
    tester = FrontierTester(frontier, graph)
    tester.run(add_all_pages=True)

    # Show crawling sequence
    print '-'*40
    print frontier.backend.name
    print '-'*40
    for page in tester.sequence:
        print page.url

if __name__ == '__main__':
    test_backend('frontera.contrib.backends.memory.heapq.FIFO')
    test_backend('frontera.contrib.backends.memory.heapq.LIFO')
    test_backend('frontera.contrib.backends.memory.heapq.BFS')
    test_backend('frontera.contrib.backends.memory.heapq.DFS')

F.A.Q.

如何并行有效下载?

通常,URL排序的设计意味着从同一个域获取许多URL。 如果抓取过程需要有礼貌,则必须保留一些延迟和请求率。 另一方面,下载器同时可以同时下载许多网址(例如100)。 因此,来自同一域的URL的并发导致下载器连接池资源的低效浪费。

这是一个简短的例子。 想像一下,我们有来自许多不同域的10K网址的队列。 我们的任务是尽快取得它。 在下载期间,我们希望对每个主机有礼貌和并限制RPS。 同时,我们有一个优先级,它倾向于对来自同一个域的URL进行分组。 当抓取工具正在请求批量的URL以获取时,将从同一主机获取数百个URL。 由于RPS限制和延迟,下载器将无法快速获取它们。 因此,从队列中挑选统一域的URL可以让我们浪费时间,因为下载器的连接池浪费了大部分时间。

解决方案是在下载程序中为 Frontera 后端提供主机名/ ip(通常但不是必需的)使用。 我们在方法 get_next_requests 中有一个关键字参数,用于传递这些统计信息到 Fronter a后端。 任何类型的信息都可以通过。 这个参数通常设置在 Frontera 之外,然后通过 FrontierManagerWrapper 子类传递给CF到后端。

贡献指引

  • 所有的问题和讨论请使用 Frontera google group
  • 提交补丁请使用 Github repo 的 pull request 。
  • Github issues 中请提交 Frontera 将来能受益的问题。请将自己运行 Frontera 的问题提交到 google group 。

我们总是乐意接受有文档和测试的解决方案。

术语表

spider log
来自爬虫的编码消息流。每个消息都是从文档中提取,通常是链接,分数,分类结果。
scoring log
从 strategy worker 到 db worker,包含更新事件和调度标志(如果链接需要调度下载)的分数。
spider feed
db worker 到爬虫,包含新的一批需要抓取的文档。
strategy worker
一种特殊的 worker,运行抓取策略代码:为链接评分,决定链接是否需要被调度(查询 state cache )和合适停止抓取。这种 worker 是分片的。
db worker
负责和存储数据库通信,主要存储元数据和内容,同时拉取新的任务去下载。
state cache
内存数据结构,包含文档是否被抓取的状态信息。 定期与持久存储同步。
message bus
传输层抽象机制。提供传输层抽象和实现的接口。
spider
从 Web 检索和提取内容,使用 spider feed 作为输入队列,将结果存储到 spider log 。在这个文档中,提取器被用作同义词。
架构概述
了解 Frontera 如何工作和它的不同组件。
Frontera API
学习如何使用 frontier 。
Frontier + Requests
学习如何使用 Frontera + Requests 。
例子
一些使用 Frontera 的示例工程和示例脚本。
Tests
如果运行和写 Frontera 的测试用例。
Logging
使用 python 原生日志系统创建的一些 loggers 。
测试一个 Frontier
使用一个简单的方法测试你的 frontier。
F.A.Q.
常见问题。
贡献指引
如何贡献。
术语表
术语表。