Hello World
Spiga

适合C# Actor的消息执行方式(5):一个简单的网络爬虫

2009-07-27 09:13 by 老赵, 7984 visits

之前的几篇文章大都在摆一些“小道理”,有经验的朋友容易想象出来其中的含义,不过对于那些还不了解Actor模型的朋友来说,这些内容似乎有些太过了。此外,乒乓测试虽然经典,但是不太容易说明问题。因此,今天我们就来看一个简单的有些简陋的网络爬虫,对于Actor模型的使用来说,它至少比乒乓测试能够说明问题。对了,我们先来使用那“中看不中用”的消息执行方式。

功能简介

这个网络爬虫的功能还是用于演示,先来列举出它的实现目标吧:

  • 给出一个初始链接,然后抓取它的HTML并分析出所有html链接,然后继续爬,不断爬,直到爬完所有链接为止。
  • 多线程运行,我们可以指定由多少个爬虫同时工作。
  • 多个爬虫组成一个“工作单元”,程序中可以同时出现多个工作单元,工作单元之间互相独立。
  • 能简化的地方便简化,如一切不涉及任何永久性存储(也就是说,只使用内存),没有太复杂的容错机制。

的确很简单吧?那么,现在您不妨先在脑海中想象一下,在不用Actor模型的时候您会怎么实现这个功能。然后,我们就要动手使用ActorLite这个小类库了。

协议制定

正如我们不断强调的那样,在Actor模型中唯一的通信方式便是互相发送消息。于是使用Actor模型的第一步往往便是设计Actor类型,以及它们之间传递的消息。在这个简单的场景中,我们会定义两种Actor类型。一是Monitor,二是Crawler。一个Monitor便代表一个“工作单元”,它管理了多个爬虫,即Crawler。

Monitor将负责在合适的时候创建Crawler,并向其发送一个消息,让其开始工作。在我们的系统中,我们使用ICrawlRequestHandler接口来表示这个消息:

public interface ICrawlRequestHandler
{
    void Crawl(Monitor monitor, string url);
}

在接受到上面的Crawl消息后,Crawler将去抓取指定的url对象,并将结果发还给Monitor。在这里我们要求报告Cralwer向Monitor报告“成功”和“失败”两种消息1

public interface ICrawlResponseHandler
{
    void Succeeded(Crawler crawler, string url, List<string> links);
    void Failed(Crawler crawler, string url, Exception ex);
}

我们使用“接口”这种方式定义了“消息组”,把Succeeded和Failed两种关系密切的消息绑定在一起。如果抓取成功,则Crawler会从抓取内容中获得额外的链接,并发还给Monitor——失败的时候自然就发还一个异常对象。此外,无论是成功还是失败,我们都会把Crawler对象交给Monitor,Monitor会安排给Crawler新的抓取任务。

因此,Monitor和Cralwer类的定义大约应该是这样的:

public class Monitor : Actor<Action<ICrawlResponseHandler>>, ICrawlResponseHandler
{
    protected override void Receive(Action<ICrawlResponseHandler> message)
    {
        message(this);
    }

    #region ICrawlResponseHandler Members

    void ICrawlResponseHandler.Succeeded(Crawler crawler, string url, List<string> links)
    {
        ...
    }

    void ICrawlResponseHandler.Failed(Crawler crawler, string url, Exception ex)
    {
        ...
    }

    #endregion
}

public class Crawler : Actor<Action<ICrawlRequestHandler>>, ICrawlRequestHandler
{
    protected override void Receive(Action<ICrawlRequestHandler> message)
    {
        message(this);
    }

    #region ICrawlRequestHandler Members

    void ICrawlRequestHandler.Crawl(Monitor monitor, string url)
    {
        ...
    }

    #endregion
}

Crawler实现

我们先从简单的Crawler类的实现开始。Crawler类只需要实现ICrawlRequestHandler接口的Crawl方法即可:

void ICrawlRequestHandler.Crawl(Monitor monitor, string url)
{
    try
    {
        string content = new WebClient().DownloadString(url);

        var matches = Regex.Matches(content, @"href=""(http://[^""]+)""").Cast<Match>();
        var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
        monitor.Post(m => m.Succeeded(this, url, links));
    }
    catch (Exception ex)
    {
        monitor.Post(m => m.Failed(this, url, ex));
    }
}

没错,使用WebClient下载页面内容只需要一行代码就可以了。然后便是使用正则表达式提取出页面上所有的链接。很显然这里是有问题的,因为我们我只分析出以“http://”开头的地址,但是无视其他的“相对地址”——不过作为一个小实验来说已经足够说明问题了。最后自然是使用Post方法将结果发还给Monitor。在抛出异常的情况下,这几行代码的逻辑也非常自然。

Monitor实现

Monitor相对来说便略显复杂了一些。我们知道,Monitor要负责控制Crawler的数量,那么必然需要负责维护一些必要的字段:

private HashSet<string> m_allUrls; // 所有待爬或爬过的url
private Queue<string> m_readyToCrawl; // 待爬的url

public int MaxCrawlerCount { private set; get; } // 最大爬虫数目
public int WorkingCrawlerCount { private set; get; } // 正在工作的爬虫数目

public Monitor(int maxCrawlerCount)
{
    this.m_allUrls = new HashSet<string>();
    this.m_readyToCrawl = new Queue<string>();
    this.MaxCrawlerCount = maxCrawlerCount;
    this.WorkingCrawlerCount = 0;
}

Monitor要处理的自然是ICrawlResponseHandler中的Succeeded或Failed方法:

void ICrawlResponseHandler.Succeeded(Crawler crawler, string url, List<string> links)
{
    Console.WriteLine("{0} crawled, {1} link(s).", url, links.Count);

    foreach (var newUrl in links)
    {
        if (!this.m_allUrls.Contains(newUrl))
        {
            this.m_allUrls.Add(newUrl);
            this.m_readyToCrawl.Enqueue(newUrl);
        }
    }

    this.DispatchCrawlingTasks(crawler);
}

void ICrawlResponseHandler.Failed(Crawler crawler, string url, Exception ex)
{
    Console.WriteLine("{0} error occurred: {1}.", url, ex.Message);
    this.DispatchCrawlingTasks(crawler);
}

在抓取成功时,Monitor将遍历links列表中的所有地址,如果发现新的url,则加入相关集合中。在抓取失败的情况下,我们也只是简单的继续下去而已。而“继续”则是由DispatchCrawlingTasks方法实现的,我们需要传入一个“可复用”的Crawler对象:

private void DispatchCrawlingTasks(Crawler reusableCrawler)
{
    if (this.m_readyToCrawl.Count <= 0)
    {
        this.WorkingCrawlerCount--;
        return;
    }

    var url = this.m_readyToCrawl.Dequeue();
    reusableCrawler.Post(c => c.Crawl(this, url));

    while (this.m_readyToCrawl.Count > 0 &&
        this.WorkingCrawlerCount < this.MaxCrawlerCount)
    {
        var newUrl = this.m_readyToCrawl.Dequeue();
        new Crawler().Post(c => c.Crawl(this, newUrl));

        this.WorkingCrawlerCount++;
    }
}

如果已经没有需要抓取的内容了,则直接抛弃Crawler对象即可,否则则分派一个新任务。接着便不断创建新的爬虫,分配新的抓取任务,直到爬虫数额用满,或者没有需要抓取的内容位置。

使用

我们使用区区几十行代码遍实现了一个简单的多线程爬虫,其中一个关键便是使用了Actor模型。使用Actor模型,对象之间通过消息传递进行交互。而且对于单个Actor对象来说,消息的执行完全是线程安全的。因此,我们只要作用最直接的逻辑便可以完成整个实现,从而回避了内存共享的并行模式中所使用的互斥体、锁等各类组件。

不过有没有发现,我们没有一个入口可以“开启”一个抓取任务啊,Monitor类中还缺少了点什么。好吧,那么我们补上一个Start方法:

public class Monitor : Actor<Action<ICrawlResponseHandler>>, ICrawlResponseHandler
{
    ...

    public void Start(string url)
    {
        this.m_allUrls.Add(url);
        this.WorkingCrawlerCount++;
        new Crawler().Post(c => c.Crawl(this, url));
    }
}

于是,我们便可以这样打开一个或多个抓取任务:

static class Program
{
    static void Main(string[] args)
    {
        new Monitor(5).Start("http://www.cnblogs.com/");
        new Monitor(10).Start("http://www.csdn.net/");

        Console.ReadLine();
    }
}

这里我们新建两个工作单元,也就是启动了两个抓取任务。一是使用5个爬虫抓取cnblogs.com,二是使用10个爬虫抓取csdn.net。

缺陷

这里的缺陷是什么?其实很明显,您发现了吗?

使用Actor模型可以保证消息执行的线程安全,不过很明显Start方法并非如此,我们只能用它来“开启”一个抓取任务。但是如果我们想再次“手动”提交一个需要抓取的URL怎么办?所以理想的方法,其实也应该是向Monitor发送一个消息来启动第一个URL抓取任务。需要补充,则发送多个URL即可。可是,这个消息定义在什么地方才合适呢?我们的Monitor类已经实现了Actor<Action<ICrawlResponseHandler>>,已经没有办法接受另一个接口作为消息了,不是吗?

这就是一个致命的限制:一个Actor虽然可以实现多个接口,但只能接受其中一个作为消息。同样的,如果我们要为Monitor提供其他功能,例如“查询”某个URL的抓取状态,也因为同样的原因而无法实现。还有,便是在前几篇文章中谈到的问题了。Crawler和Monitor直接耦合,我们向Crawler发送的消息只能携带一个Monitor对象。

最后,便是一个略显特别的问题了。我们这里使用WebClient的DownloadString方法来获取网页的内容,但是这是个同步IO操作,理想的做法中我们应该使用异步的方法。所以,我们可以这么写:

void ICrawlRequestHandler.Crawl(Monitor monitor, string url)
{
    WebClient webClient = new WebClient();
    webClient.DownloadStringCompleted += (sender, e) =>
    {
        if (e.Error == null)
        {
            var matches = Regex.Matches(e.Result, @"href=""(http://[^""]+)""").Cast<Match>();
            var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
            monitor.Post(m => m.Succeeded(this, url, links));
        }
        else
        {
            monitor.Post(m => m.Failed(this, url, e.Error));
        }
    };
    webClient.DownloadStringAsync(new Uri(url));
}

如果您还记得老赵在最近一篇文章中关于IO线程池的讨论,就可以了解到DownloadStringCompleted事件的处理方法会在统一的IO线程池中运行,这样我们无法控制其运算能力。因此,我们应该在回调函数中向Crawler自己发送一条消息表示抓取完毕……呃,但是我们现在做不到啊。

嗯,下次再说吧。

相关文章

 

本文完整代码:http://gist.github.com/154815

Creative Commons License

本文基于署名 2.5 中国大陆许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名赵劼(包含链接),具体操作方式可参考此处。如您有任何疑问或者授权方面的协商,请给我留言

Add your comment

16 条回复

  1. 为了梦想
    *.*.*.*
    链接

    为了梦想 2009-07-27 09:25:00

    境界不到,无法做出评论
    佩服楼主的强大

  2. 侯锋
    *.*.*.*
    链接

    侯锋 2009-07-27 09:26:00

    以前用C#写过一个简单的怕虫,但却写不出来好的文章.呵呵!

  3. kenny.guo
    *.*.*.*
    链接

    kenny.guo 2009-07-27 09:29:00

    Actor,学习了

  4. pythonic
    *.*.*.*
    链接

    pythonic 2009-07-27 09:31:00

    占个座先~

  5. 代震军
    *.*.*.*
    链接

    代震军 2009-07-27 09:31:00

    沙发没有了。

  6. FP[未注册用户]
    *.*.*.*
    链接

    FP[未注册用户] 2009-07-27 13:05:00

    赵爷,再花点时间写一下关于函数编程的东西吧,

  7. 老赵
    admin
    链接

    老赵 2009-07-27 23:30:00

    侯锋:以前用C#写过一个简单的怕虫,但却写不出来好的文章.呵呵!


    把你的思路全部讲一遍就OK了。

  8. CCTVONE[未注册用户]
    *.*.*.*
    链接

    CCTVONE[未注册用户] 2009-07-28 14:41:00

    老赵究竟会几门语言啊...看那个图上面有十二个,怪吓人的,我是没这个能耐的!现在只会C#和JavaScript,以前学过C、Java,不过已经忘得差不多了...

  9. ask[未注册用户]
    *.*.*.*
    链接

    ask[未注册用户] 2009-07-28 14:46:00

    有个问题请教老赵,F#究竟要不要学?我看了一些关于F#的文章,都说该语言怎么怎么好,但是我还是热忠于C#,而且不知道F#究竟好在哪里。希望老赵写篇文章来阐述一下F#,期待!

  10. 老赵
    admin
    链接

    老赵 2009-07-28 18:38:00

    @ask
    F#强大,难学,简单说来就是这样……我不看好它能成为一线语言。

  11. 老赵
    admin
    链接

    老赵 2009-07-28 18:38:00

    @CCTVONE
    除了C#,JavaScript,Java,其他语言我可没有说过我“会”……

  12. 老赵
    admin
    链接

    老赵 2009-08-02 21:36:00

    @geo898
    这只是一个小示例,没打算做完整,呵呵。
    有机会我看看啊,谢谢分享。

  13. live-evil
    *.*.*.*
    链接

    live-evil 2009-08-04 15:10:00

    文章收藏按钮放哪里了?

  14. lurga[未注册用户]
    *.*.*.*
    链接

    lurga[未注册用户] 2010-02-22 16:57:00

    关于缺陷,有点疑问,monitor虽然继承了actor接口,但是它不是一个actor,对吧?
    如http://www.cypherpunks.to/erights/history/actors/AITR-844.pdf所述,对于外部系统而言,应该先用new expression创建一个monitor actor,然后让monitor成为actor系统中的receptionist,外部系统通过发送消息给monitor来驱动actor系统完成计算,并将结果通过消息返回给外部系统,这样才符合every thing is actor。

  15. 老赵
    admin
    链接

    老赵 2010-02-22 17:00:00

    @lurga
    Monitor就是这样工作的啊,不是被直接调用,而是消息传递驱动再返回消息的。

  16. lurga[未注册用户]
    *.*.*.*
    链接

    lurga[未注册用户] 2010-02-23 10:54:00

    抱歉,可能是我没仔细看代码。是用start方法来接受消息吧?我刚开始学习actor模型,是否可以用post方法向monitor发送url消息,monitor处理此消息,通过创建一个新的monitor,然后向其传递start消息,这样post方法对于外部系统而言是唯一入口,而且也是线程安全的,main函数照这样写
    static void Main(string[] args)
    {
    Monitor m = new Monitor(5);
    m.Post("http://www.cnblogs.com/");
    m.Post("http://www.csdn.net/");

    Console.ReadLine();
    }

发表回复

登录 / 登录并记住我 ,登陆后便可删除或修改已发表的评论 (请注意保留评论内容)

昵称:(必填)

邮箱:(必填,仅用于Gavatar

主页:(可选)

评论内容(大于5个字符):

  1. Your Name yyyy-MM-dd HH:mm:ss

使用Live Messenger联系我