webmagic源码分析

/ 技术 / 无站内评论 / 535浏览

什么是webmagic

前点评网大牛黄亿华开发,代码质量极佳,参考了业界最优秀的爬虫Scrapy 设计。在码云上抓取框架排名总第二,最优质项目排名第一。

webmagic是一个开源的Java垂直爬虫框架,目标是简化爬虫的开发流程,让开发者专注于逻辑功能的开发。webmagic的核心非常简单,但是覆盖爬虫的整个流程,也是很好的学习爬虫开发的材料。

webmagic的主要特色:

git地址:https://gitee.com/flashsword20/webmagic

快速入门

maven

  <dependency>
  <groupId>us.codecraft</groupId>
  <artifactId>webmagic-core</artifactId>
  <version>0.7.3</version>
</dependency>
<dependency>
  <groupId>us.codecraft</groupId>
  <artifactId>webmagic-extension</artifactId>
  <version>0.7.3</version>
</dependency>

第一个爬虫

这里通过page.addTargetRequests()方法来增加要抓取的URL,并通过page.putField()来保存抽取结果。page.getHtml().xpath()则是按照某个规则对结果进行抽取,这里抽取支持链式调用。调用结束后,toString()表示转化为单个String,all()则转化为一个String列表。

Spider是爬虫的入口类。Pipeline是结果输出和持久化的接口,这里ConsolePipeline表示结果输出到控制台。

ps:看完是不是觉得很简单?用户无需考虑其他东西,最简单只需实现开发PageProcessor(页面处理接口,要提取哪些元素等等。。)

  public class OschinaBlogPageProcesser implements PageProcessor {

  private Site site = Site.me().setDomain("my.oschina.net");

  @Override
  public void process(Page page) {
      List<String> links = page.getHtml().links().regex("http://my\\.oschina\\.net/flashsword/blog/\\d+").all();
      //提取当前页面url,重新加入种子池中
      page.addTargetRequests(links);
      //分析节点数据
      page.putField("title", page.getHtml().xpath("//div[@class='BlogEntity']/div[@class='BlogTitle']/h1").toString());
      page.putField("content", page.getHtml().$("div.content").toString());
      page.putField("tags",page.getHtml().xpath("//div[@class='BlogTags']/a/text()").all());
  }

  @Override
  public Site getSite() {
      return site;

  }

//启动
  public static void main(String[] args) {
      Spider.create(new OschinaBlogPageProcesser()).addUrl("http://my.oschina.net/flashsword/blog")
            .addPipeline(new ConsolePipeline()).run();
  }
}

分析webmagic源码

1.1前言

做抓取也有半年多了,还是停留在抓包请求的工作中,在最近项目的重构中意识到自己对抓取项目架构了解的欠缺,最近看了两个抓取框架的源码,大致了解了抓取项目架构正确的玩法。

1.2 总体架构

WebMagic的结构分为DownloaderPageProcessorSchedulerPipeline四大组件,并由Spider将它们彼此组织起来。这四大组件对应爬虫生命周期中的下载、处理、管理和持久化等功能。WebMagic的设计参考了Scapy,但是实现方式更Java化一些。

而Spider则将这几个组件组织起来,让它们可以互相交互,流程化的执行,可以认为Spider是一个大的容器,它也是WebMagic逻辑的核心。

WebMagic总体架构图如下:

1.2.1 WebMagic的四个组件

1.Downloader

Downloader负责从互联网上下载页面,以便后续处理。WebMagic默认使用了Apache HttpClient作为下载工具。

2.PageProcessor

PageProcessor负责解析页面,抽取有用信息,以及发现新的链接。WebMagic使用Jsoup作为HTML解析工具,并基于其开发了解析XPath的工具Xsoup

在这四个组件中,PageProcessor对于每个站点每个页面都不一样,是需要使用者定制的部分。

3.Scheduler

Scheduler负责管理待抓取的URL,以及一些去重的工作。WebMagic默认提供了JDK的内存队列来管理URL,并用集合来进行去重。也支持使用Redis进行分布式管理。

除非项目有一些特殊的分布式需求,否则无需自己定制Scheduler。

4.Pipeline

Pipeline负责抽取结果的处理,包括计算、持久化到文件、数据库等。WebMagic默认提供了“输出到控制台”和“保存到文件”两种结果处理方案。

Pipeline定义了结果保存的方式,如果你要保存到指定数据库,则需要编写对应的Pipeline。对于一类需求一般只需编写一个Pipeline

1.2.2 用于数据流转的对象

1. Request

Request是对URL地址的一层封装,一个Request对应一个URL地址。

它是PageProcessor与Downloader交互的载体,也是PageProcessor控制Downloader唯一方式。

除了URL本身外,它还包含一个Key-Value结构的字段extra。你可以在extra中保存一些特殊的属性,然后在其他地方读取,以完成不同的功能。例如附加上一个页面的一些信息等。

2. Page

Page代表了从Downloader下载到的一个页面——可能是HTML,也可能是JSON或者其他文本格式的内容。

Page是WebMagic抽取过程的核心对象,它提供一些方法可供抽取、结果保存等。在第四章的例子中,我们会详细介绍它的使用。

3. ResultItems

ResultItems相当于一个Map,它保存PageProcessor处理的结果,供Pipeline使用。它的API与Map很类似,值得注意的是它有一个字段skip,若设置为true,则不应被Pipeline处理。

1.2.3 控制爬虫运转的引擎--Spider

Spider是WebMagic内部流程的核心。Downloader、PageProcessor、Scheduler、Pipeline都是Spider的一个属性,这些属性是可以自由设置的,通过设置这个属性可以实现不同的功能。Spider也是WebMagic操作的入口,它封装了爬虫的创建、启动、停止、多线程等功能。下面是一个设置各个组件,并且设置多线程和启动的例子。详细的Spider设置请看第四章——爬虫的配置、启动和终止

  public static void main(String[] args) {
  Spider.create(new GithubRepoPageProcessor())
          //从https://github.com/code4craft开始抓    
          .addUrl("https://github.com/code4craft")
          //设置Scheduler,使用Redis来管理URL队列
          .setScheduler(new RedisScheduler("localhost"))
          //设置Pipeline,将结果以json方式保存到文件
          .addPipeline(new JsonFilePipeline("D:\\data\\webmagic"))
          //开启5个线程同时执行
          .thread(5)
          //启动爬虫
          .run();
}

解析源码

Spider是控制核心,所有组件都是该类的成员变量,为何这么定义呢?

如此定义,用户只需通过Spider类便可简单实现一个爬虫,无需关注其他接口实现。

  public class Spider implements Runnable, Task {
//下载组件
  protected Downloader downloader;
//管道组件
  protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
//页面处理组件
  protected PageProcessor pageProcessor;
//调度组件
  protected Scheduler scheduler = new QueueScheduler();

  }

PageProcessor

Spider.create初始化一个PageProcessor,通过源码发现把传入PageProcessor赋值给自身的pageProcessor变量

  
/**
* create a spider with pageProcessor.
*
* @param pageProcessor pageProcessor
* @return new spider
* @see PageProcessor
*/
public static Spider create(PageProcessor pageProcessor) {
  return new Spider(pageProcessor);
}

/**
* create a spider with pageProcessor.
*
* @param pageProcessor pageProcessor
*/
public Spider(PageProcessor pageProcessor) {
  this.pageProcessor = pageProcessor;
  this.site = pageProcessor.getSite();
}

scheduler

接下来调用addUrl,添加我们要抓取的url

  private void addRequest(Request request) {
  if (site.getDomain() == null && request != null && request.getUrl() != null) {
      site.setDomain(UrlUtils.getDomain(request.getUrl()));
  }
  scheduler.push(request, this);
}

将urlpush到scheduler,注意,这里scheduler在启动的时候,是可以不指定的,默认使用jdk自带的阻塞队列来实现调度。当然也可以自己实现该Scheduler接口调用setScheduler覆盖默认的阻塞队列。

  
public Spider setScheduler(Scheduler scheduler) {
  checkIfRunning();
  Scheduler oldScheduler = this.scheduler;
  this.scheduler = scheduler;
  if (oldScheduler != null) {
      Request request;
      while ((request = oldScheduler.poll(this)) != null) {
          this.scheduler.push(request, this);
      }
  }
  return this;
}

Downloader

这里Downloader也一样,可以指定,默认使用httpclient进行下载。可以指定使用驱动自动化工具等。。

  
public Spider setDownloader(Downloader downloader) {
  checkIfRunning();
  this.downloader = downloader;
  return this;
}

Pipeline

Pipeline同上。默认使用ConsolePipeline(打印控制台)

当你对Spider配置好所有参数后,run方法启动抓取,核心方法。

  public Spider addPipeline(Pipeline pipeline) {
  checkIfRunning();
  this.pipelines.add(pipeline);
  return this;
}

public Spider setPipelines(List<Pipeline> pipelines) {
  checkIfRunning();
  this.pipelines = pipelines;
  return this;
}

run()

这是Spider的启动方法,也是Spider核心方法。抓取架构教科书般的教程。

  
public void run() {
  checkRunningStat();
  initComponent();
  logger.info("Spider {} started!",getUUID());
  while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
      final Request request = scheduler.poll(this);
      if (request == null) {
          if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
              break;
          }
          // wait until new url added
          waitNewUrl();
      } else {
          threadPool.execute(new Runnable() {
              @Override
              public void run() {
                  try {
                      processRequest(request);
                      onSuccess(request);
                  } catch (Exception e) {
                      onError(request);
                      logger.error("process request " + request + " error", e);
                  } finally {
                      pageCount.incrementAndGet();
                      signalNewUrl();
                  }
              }
          });
      }
  }
  stat.set(STAT_STOPPED);
  // release some resources
  if (destroyWhenExit) {
      close();
  }
  logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}

我们一步步解析

  
checkRunningStat()

Spider中stat字段维护抓取运行的状态,使用线程安全类AtomicInteger

  
private void checkRunningStat() {
  while (true) {
  //获取当前任务状态
      int statNow = stat.get();
      //正在运行则抛异常
      if (statNow == STAT_RUNNING) {
          throw new IllegalStateException("Spider is already running!");
      }
      //判断之前获取到的任务状态是否跟当前值一致,一致则修改任务状态成运行中。
      if (stat.compareAndSet(statNow, STAT_RUNNING)) {
          break;
      }
  }
}
  
initComponent();

对缺省的组件将初始化默认值。

  protected void initComponent() {
  if (downloader == null) {
  //默认httpclient
      this.downloader = new HttpClientDownloader();
  }
  if (pipelines.isEmpty()) {
  //控制台管道
      pipelines.add(new ConsolePipeline());
  }
  downloader.setThread(threadNum);
  if (threadPool == null || threadPool.isShutdown()) {
  //初始化线程池
      if (executorService != null && !executorService.isShutdown()) {
          threadPool = new CountableThreadPool(threadNum, executorService);
      } else {
          threadPool = new CountableThreadPool(threadNum);
      }
  }
  if (startRequests != null) {
      for (Request request : startRequests) {
          addRequest(request);
      }
      startRequests.clear();
  }
  startTime = new Date();
}
  
final Request request = scheduler.poll(this);

从调度组件中取出一个任务

  if (request == null) {
  if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
      break;
  }
  // wait until new url added
  waitNewUrl();
}

如果任务为空,则休眠进入等待

   else {
      threadPool.execute(new Runnable() {
          @Override
          public void run() {
              try {
                  processRequest(request);
                  onSuccess(request);
              }
          }
      });
  }
}

任务不为空,则提交到线程池中执行。

重点来了!!!!敲黑板

  
processRequest(request);

拿出任务第一件事应该干嘛?

当然是下载页面啊!

  private void processRequest(Request request) {
//使用Spider变量中的downloader组件进行下载
  Page page = downloader.download(request, this);
  if (page.isDownloadSuccess()){
      onDownloadSuccess(request, page);
  } else {
      onDownloaderFail(request);
  }
}

下载成功后干嘛?

当然是解析页面啊!!!

  private void onDownloadSuccess(Request request, Page page) {
  if (site.getAcceptStatCode().contains(page.getStatusCode())){
      //使用Spider变量中的pageProcessor组件进行页面分析
      pageProcessor.process(page);
      //page中有一个targetRequests字段,将页面中继续要抓的请求重新放入调度中心中
      extractAndAddRequests(page, spawnUrl);
      if (!page.getResultItems().isSkip()) {
      //允许多个管道,循环pipeline进行处理页面分析结果
          for (Pipeline pipeline : pipelines) {
              pipeline.process(page.getResultItems(), this);
          }
      }
  }
}
  
onDownloaderFail(request);

失败将进行重试,重新调整优先级加入到调度中心中。

总结

此框架对于传统的页面结构开发效率很高,如果只是页面结构的抓取,单机的情况下我们最少只需开发PageProcessor即可,分布式手动实现Scheduler实现为Redis队列即可。

但是对于网联网高速发展的今天,很多传统项目已经从页面由后端渲染到现在的前后分离。

前后分离的出现让数据不直接展示到页面上,而是通过异步请求接口再进行渲染。会出现以下问题

1.直接抓取页面分提取节点得不到数据,因为是通过异步渲染的数据。

2.没有通用的方式去获取数据。因为异步一般是Json的返回结果,如果我们只需要具体一个字段数据,会导致在庞大的Json响应中去提取,目前没有很方便实现这种功能框架。

3.用户开发的时候可能要学会抓包,学会分析抓包数据,正常情况下还会校验登录状态等。增加抓取难度。

4.所需数据可能由多个请求组成,增加抓取时间成本。


存在缺陷

无法在正式抓取前执行一些操作,例如 登录

case

知乎,知乎在访问的时候首先要登录,登录成功才能允许访问。

想法

提供一个接口,执行登录操作后返回cookie,将cookie保存到后续httpclient中。


计划

开发一套爬虫框架demo

召唤蕾姆
琼ICP备18000156号

鄂公网安备 42011502000211号