Hello World
Spiga

适合C# Actor的消息执行方式(6):协变与逆变

2009-08-03 09:19 by 老赵, 8105 visits

上一篇文章中,我们实现了一个简单的爬虫,并指出了这种方式的缺陷。现在,我们就来看一下,如何使用C# 4.0中所引入的“协变和逆变”特性来改进这种消息执行方式,这也是我认为在“普适Actor模型”中最合适的做法。这次,我们动真格的了,我们会一条一条地改进前文提出的缺陷。

协变与逆变

在以前的几篇文章中,我们一直挂在嘴边的说法便是消息于Actor类型的“耦合”太高。例如在简单的爬虫实现中,Crawler接受的消息为Crawl(Monitor, string),它的第一个参数为Monitor类型。但是在实际应用中,这个参数很可能需要是各种类型,唯一的“约束”只是它必须能够接受一个ICrawlResponseHandler类型的消息,这样我们就能把抓取的结果传递给它。至于操作Crawler对象的是Monitor还是Robot,还是我们单元测试时动态创建的Mock对象(这很重要),Crawler一概不会关心。

但就是这个约束,在以前的实现中,我们必须让这个目标继承Actor<ICrawlResponseHandler>,这样它也就无法接受其他类型的消息了。例如Monitor还要负责一些查询操作我们该怎么办呢?幸运的是,在.NET 4.0中,我们只需要让这个目标实现这样一个接口即可:

public interface IPort<out T>
{
    void Post(Action<T> message);
}

瞅到out关键字了没?事实上,还有一个东西您在这里还没有看到,这便是Action委托在.NET 4.0中的签名:

public delegate void Action<in T>(T obj);

就在这样一个简单的示例中,协变和逆变所需要的in和out都出现了。这意味着如果有两个类型Parent和Child,其中Child是Parent的子类(或Parent接口的实现),那么实现了IPort<Child>的对象便可以自动赋值给IPort<Parent>类型的参数或引用1。使用代码来说明问题可能会更清楚一些:

public class Parent
{
    public void ParentMethod() { };
}

public class Child : Parent { }

static void Main(string[] args)
{
    IPort<Child> childPort = new ChildPortType();
    IPort<Parent> parentPort = childPort; // 自动转化
    parentPort.Post(p => p.ParentMethod()); // 可以接受Action<Parent>类型作为消息
}

这意味着,我们可以把ICrawlRequestHandler和ICrawlResponseHandler类型写成下面的形式:

internal interface ICrawlRequestHandler
{
    void Crawl(IPort<ICrawlResponseHandler> collector, string url);
}

internal interface ICrawlResponseHandler
{
    void Succeeded(IPort<ICrawlRequestHandler> crawler, string url, string content, List<string> links);
    void Failed(IPort<ICrawlRequestHandler> crawler, string url, Exception ex);
}

如今,Monitor和Crawler便可以写成如下模样:

internal class Crawler : Actor<Action<Crawler>>, IPort<Crawler>, ICrawlRequestHandler
{
    protected override void Receive(Action<Crawler> message) { message(this); }

    #region ICrawlRequestHandler Members

    void ICrawlRequestHandler.Crawl(IPort<ICrawlResponseHandler> collector, string url)
    {
        try
        {
            string content = new WebClient().DownloadString(url);

            var matches = Regex.Matches(content, @"href=""(http://[^""]+)""").Cast<Match>();
            var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
            collector.Post(m => m.Succeeded(this, url, content, links));
        }
        catch (Exception ex)
        {
            collector.Post(m => m.Failed(this, url, ex));
        }
    }

    #endregion
}

public class Monitor : Actor<Action<Monitor>>, IPort<Monitor>, ICrawlResponseHandler
{
    protected override void Receive(Action<Monitor> message) { message(this); }

    #region ICrawlResponseHandler Members
    void ICrawlResponseHandler.Succeeded(...) { ... }
    void ICrawlResponseHandler.Failed(...) { ... }
    #endregion

    private void DispatchCrawlingTasks(IPort<ICrawlRequestHandler> reusableCrawler)
    {
        if (this.m_readyToCrawl.Count <= 0)
        {
            this.WorkingCrawlerCount--;
        }

        var url = this.m_readyToCrawl.Dequeue();
        reusableCrawler.Post(c => c.Crawl(this, url));

        while (this.m_readyToCrawl.Count > 0 &&
            this.WorkingCrawlerCount < this.MaxCrawlerCount)
        {
            var newUrl = this.m_readyToCrawl.Dequeue();
            IPort<ICrawlRequestHandler> crawler = new Crawler();
            crawler.Post(c => c.Crawl(this, newUrl));

            this.WorkingCrawlerCount++;
        }
    }
}

Monitor的具体实现和上篇文章区别不大,您可以参考文章末尾给出的完整代码,并配合前文的分析来理解,这里我们只关注被标红的两行代码。

在第一行中我们创建了一个Crawler类型的对象,并把它赋值给IPort<ICrawlerRequestHandler>类型的变量中。请注意,Crawler对象并没有实现这个接口,它只是实现了IPort<Crawler>及ICrawlerRequestHandler。不过由于IPort<T>支持协变,于是IPort<Crawler>被安全地转换成了IPort<ICrawlerRequestHandler>对象。

第二行中再次发生了协变:ICrawlRequestHandler.Crawel的第一个参数需要IPort<ICrawlResponseHandler>类型的对象,但是this是Monitor类型的,它并没有实现这个接口。不过,和上面描述的一样,由于IPort<T>支持协变,因此这样的类型转化是安全的,允许的。于是在Crawler类便可以操作一个“抽象”,而不是具体的Monitor类型来办事了。

神奇不?但就是这么简单。

“内部”消息控制

在上一篇文章中,我们还提出了Crawler实现的另一个缺点:没有使用异步IO。WebClient本身的DownloadStringAsync方法可以进行异步下载,但是如果在异步完成的后续操作(如分析链接)会在IO线程池中运行,这样我们就很难对任务所分配的运算能力进行控制。我们当时提出,可以把后续操作作为消息发送给Crawler本身,也就是进行“内部”消息控制——可惜的是,我们当时无法做到。不过现在,由于Crawler实现的是IPort<Crawler>接口,因此,我们可以把Crawler内部的任何方法作为消息传递给自身,如下:

internal class Crawler : Actor<Action<Crawler>>, IPort<Crawler>, ICrawlRequestHandler
{
    protected override void Receive(Action<Crawler> message) { message(this); }

    #region ICrawlRequestHandler Members

    public void Crawl(IPort<ICrawlResponseHandler> collector, string url)
    {
        WebClient client = new WebClient();
        client.DownloadStringCompleted += (sender, e) =>
        {
            if (e.Error == null)
            {
                this.Post(c => c.Crawled(collector, url, e.Result));
            }
            else
            {
                collector.Post(c => c.Failed(this, url, e.Error));
            }
        };

        client.DownloadStringAsync(new Uri(url));
    }

    private void Crawled(IPort<ICrawlResponseHandler> collector, string url, string content)
    {
        var matches = Regex.Matches(content, @"href=""(http://[^""]+)""").Cast<Match>();
        var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();

        collector.Post(c => c.Succeeded(this, url, content, links));
    }

    #endregion
}

我们准备了一个private的Crawled方法,如果抓取成功了,我们会把这个方法的调用封装在一条消息中重新发给自身(红色代码)。请注意,这是个私有方法,因此这里完全是在做“内部”消息控制。

开启抓取任务

在上一篇文章中,我们为Monitor添加了一个Start方法,它的作用是启动URL。我们知道,对单个Actor来说消息的处理是线程安全的,但是这个前提是使用“消息”传递的方式进行通信,如果直接调用Start公有方法,便会破坏这种线程安全特性。不过现在的Monitor已经不受接口的限制,可以自由接受任何它可以执行的消息,因此我们只要对外暴露一个Crawl方法即可:

public class Monitor : Actor<Action<Monitor>>, IPort<Monitor>,
    ICrawlResponseHandler,
    IStatisticRequestHandelr
{
    ...

    public void Crawl(string url)
    {
        if (this.m_allUrls.Contains(url)) return;
        this.m_allUrls.Add(url);

        if (this.WorkingCrawlerCount < this.MaxCrawlerCount)
        {
            this.WorkingCrawlerCount++;
            IPort<ICrawlRequestHandler> crawler = new Crawler();
            crawler.Post(c => c.Crawl(this, url));
        }
        else
        {
            this.m_readyToCrawl.Enqueue(url);
        }
    }
}

于是我们便可以向Monitor发送消息,让其抓取特定的URL:

string[] urls =
{
    "http://www.cnblogs.com/dudu/",
    "http://www.cnblogs.com/TerryLee/",
    "http://www.cnblogs.com/JeffreyZhao/"
};

Random random = new Random(DateTime.Now.Millisecond);
Monitor monitor = new Monitor(10);
foreach (var url in urls)
{
    var urlToCrawl = url;
    monitor.Post(m => m.Crawl(urlToCrawl));
    Thread.Sleep(random.Next(1000, 3000));
}

上面的代码会每隔1到3秒发出一个抓取请求。由于我们使用了消息传递的方式进行通信,因此对于Monitor来说,这一切都是线程安全的。我们可以随时随地为Monitor添加抓取任务。

接受多种消息(协议)

我们再观察一下Monitor的签名:

class Monitor : Actor<Action<Monitor>>, IPort<Monitor>, ICrawlResponseHandler

可以发现,如今的Monitor已经和它实现的协议没有一对一的关系了。也就是说,它可以添加任意功能,可以接受任意类型的消息,我们只要让它实现另一个接口即可。于是乎,我们再要一个“查询”功能2

public interface IStatisticRequestHandelr
{
    void GetCrawledCount(IPort<IStatisticResponseHandler> requester);
    void GetContent(IPort<IStatisticResponseHandler> requester, string url);
}

public interface IStatisticResponseHandler
{
    void ReplyCrawledCount(int count);
    void ReplyContent(string url, string content);
}

为了让Monior支持查询,我们还需要为它添加这样的代码:

public class Monitor : Actor<Action<Monitor>>, IPort<Monitor>,
    ICrawlResponseHandler,
    IStatisticRequestHandelr
{
    ...

    #region IStatisticRequestHandelr Members

    void IStatisticRequestHandelr.GetCrawledCount(IPort<IStatisticResponseHandler> requester)
    {
        requester.Post(r => r.ReplyCrawledCount(this.m_urlContent.Count));
    }

    void IStatisticRequestHandelr.GetContent(IPort<IStatisticResponseHandler> requester, string url)
    {
        string content;
        if (!this.m_urlContent.TryGetValue(url, out content))
        {
            content = null;
        }

        requester.Post(r => r.ReplyContent(url, content));
    }

    #endregion
}

最后,我们来尝试着使用这个“查询”功能。首先,我们编写一个测试用的TestStatisticPort类:

public class TestStatisticPort : IPort<IStatisticResponseHandler>, IStatisticResponseHandler
{
    private IPort<IStatisticRequestHandelr> m_statisticPort;

    public TestStatisticPort(IPort<IStatisticRequestHandelr> statisticPort)
    {
        this.m_statisticPort = statisticPort;
    }

    public void Start()
    {
        while (true)
        {
            Console.ReadLine();
            this.m_statisticPort.Post(s => s.GetCrawledCount(this));
        }
    }

    #region IPort<IStatisticResponseHandler> Members

    void IPort<IStatisticResponseHandler>.Post(Action<IStatisticResponseHandler> message)
    {
        message(this);
    }

    #endregion

    #region IStatisticResponseHandler Members

    void IStatisticResponseHandler.ReplyCrawledCount(int count)
    {
        Console.WriteLine("Crawled: {0}", count);
    }

    void IStatisticResponseHandler.ReplyContent(string url, string content) { ... }

    #endregion
}

当调用Start方法时,控制台将会等待用户敲击回车键。当按下回车键时,TestStatisticPort将会向Monitor发送一个IStatisticRequestHandler.GetCrawledCount消息。Monitor回复之后,屏幕上便会显示当前已经抓取成功的URL数目。例如,我们可以编写如下的测试代码:

static void Main(string[] args)
{
    var monitor = new Monitor(5);
    monitor.Post(m => m.Crawl("http://www.cnblogs.com/"));

    TestStatisticPort testPort = new TestStatisticPort(monitor);
    testPort.Start();
}

随意敲击几下回车,结果如下:

Crawl Statistic

总结

如今的做法,兼顾了强类型检查,并使用C# 4.0中的协变和逆变特性,把上一篇文章中提出的问题解决了,不知您是否理解了这些内容?只可惜,我们在C# 3.0中还没有协变和逆变。因此,我们还必须思考一个适合C# 3.0的做法。

顺便一提,由于F#不支持协变和逆变,因此本文的做法无法在F#中使用。

相关文章

 

注1:关于协变和逆变特性,我认为脑袋兄的这篇文章讲的非常清楚——您看得头晕了?是的,刚开始了解协变和逆变,以及它们之间的嵌套规则时我也头晕,但是您在掌握之后就会发现,这的确是一个非常有用的特性。

注2:不知您是否发现,与之前internal的Crawl相关接口不同,Statistic相关接口是public的。我们在使用接口作为消息时,也可以通过这种办法来控制哪些消息是可以对外暴露的。这也算是一种额外的收获吧。

 

本文完整代码:http://gist.github.com/160043

Creative Commons License

本文基于署名 2.5 中国大陆许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名赵劼(包含链接),具体操作方式可参考此处。如您有任何疑问或者授权方面的协商,请给我留言

Add your comment

29 条回复

  1. CoolCode
    *.*.*.*
    链接

    CoolCode 2009-08-03 09:47:00

    我最喜欢老赵的文章,虽然有些看不懂。

  2. billlo[未注册用户]
    *.*.*.*
    链接

    billlo[未注册用户] 2009-08-03 09:49:00

    沙發

  3. Test1234[未注册用户]
    *.*.*.*
    链接

    Test1234[未注册用户] 2009-08-03 10:21:00

    CoolCode:我最喜欢老赵的文章,虽然有些看不懂。


    OK `

  4. YeanJay
    *.*.*.*
    链接

    YeanJay 2009-08-03 10:23:00

    努力理解中

  5. 老赵
    admin
    链接

    老赵 2009-08-03 11:41:00

    Actor模型,协变逆变的确是比较生僻的内容,如果觉得哪里需要再详细说说的,兄弟们可以提出。

  6. Coolin
    *.*.*.*
    链接

    Coolin 2009-08-03 12:29:00

    :)

  7. 一抹红
    *.*.*.*
    链接

    一抹红 2009-08-03 12:37:00

    可能是我是菜鸟,理解困难啊,呵呵

  8. 一抹红
    *.*.*.*
    链接

    一抹红 2009-08-03 12:38:00

    可否提供下lambda表达式用法的教程啊

  9. 平静中的疯狂
    *.*.*.*
    链接

    平静中的疯狂 2009-08-03 12:48:00

    走在前列的人

  10. 太极
    *.*.*.*
    链接

    太极 2009-08-03 13:16:00

    我落后了。

  11. billlo[未注册用户]
    *.*.*.*
    链接

    billlo[未注册用户] 2009-08-03 13:42:00

    Actor模型老趙可否列出一些項目常見應用場景呢?
    很多人學技術,學了卻不會靈活運用.當然也包括我.

  12. yyww
    *.*.*.*
    链接

    yyww 2009-08-03 14:14:00

    老赵的文章,不论内容还是排版都非常好看。

  13. 老赵
    admin
    链接

    老赵 2009-08-03 14:24:00

    @billlo
    其实我现在给的这个示例,就是常见应用场景。
    我也再看一些文章,设计思想方面的,有想法了会分享一下。

  14. qiaojie
    *.*.*.*
    链接

    qiaojie 2009-08-03 15:29:00

    @Jeffrey Zhao

    协变来逆变去的太复杂了,没法理解啊。虽然还没仔细理解代码,不过感觉这可能不是一个很好的设计。好的设计应该是容易让人理解的。

  15. PKDA[未注册用户]
    *.*.*.*
    链接

    PKDA[未注册用户] 2009-08-03 16:37:00

    语言搞的很好,但性能一点也没有提高,

  16. 老赵
    admin
    链接

    老赵 2009-08-03 17:03:00

    @qiaojie
    我觉得,这个是否容易理解,也是看对于某个特性是否掌握的缘故吧,如果你了解协变逆变,应该就会好很多了。
    其实纯粹看代码,这些应该还是很清楚吧,不容易的是背后的实现。对于现在的Actor这样看就容易了,就比如它的签名:
    class T : ActorLite<Action<T>>, IPort<T>, IMessage1, IMessage2, ...
    含义就是,一个执行IMessage1,IMessage2等消息的Actor,至于哪里协变哪里逆变了,使用者是不必关心的。

    同样的例子还有比如Scala语言,可以实现出各种美妙的形式,但是其中的API是什么样的?不容易搞清楚。

  17. 老赵
    admin
    链接

    老赵 2009-08-03 17:08:00

    PKDA:语言搞的很好,但性能一点也没有提高,


    如果你是指协变逆变这个特性的话,那的确没有性能提高,因为其实这个能力在以前的CLR中已经支持了,所以这个特性也可以归为“语法糖”。
    如果你是指.NET 4.0的话,那性能是增强了,具体可见相关评测。例如GC在CLR 4.0里有改进,效果很明显的。

  18. DSFA[未注册用户]
    *.*.*.*
    链接

    DSFA[未注册用户] 2009-08-03 17:45:00

    那Actor如果要处理多个消息不是有N个
    class T : ..., IMessage1, IMessage2, ... IMessageN?
    这个有其他办法吗?

  19. 老赵
    admin
    链接

    老赵 2009-08-03 18:36:00

    @DSFA
    需要处理哪些消息,就实现哪些接口咯,你的问题是什么?

  20. qiaojie
    *.*.*.*
    链接

    qiaojie 2009-08-03 23:43:00

    回家又仔细看了一遍代码,总算是把程序看懂了。
    不过我觉得这种实现方式并没有多大实用价值,关键是不能运用到跨进程的分布式系统中去。其实我觉得这种东西用传统的反射机制非常容易实现。

    大致做法如下:
    class Message
    {
    public object Receiver { set; };
    public MethodInfo Method { set; };
    public object[] Parameters{ set; };

    public void Call(object actor);
    }

    class Actor<T>
    {
    object proxy = GenerateProxy(typeof(T));

    static object GenerateProxy(Type t)
    {
    //generator a new type of TProxy
    //that implement all the message functions.
    //The implementation is to marshal the
    //MethodInfo and parameters into a Message stream
    //and post to the local message queue or
    //remote service.
    }

    public object Proxy
    {
    get { return proxy; }
    }

    public void Receive(Message msg)
    {
    msg.Call(this);
    }
    }

    interface ICrawlResponseHandler
    {
    void Succeeded(ICrawlRequestHandler crawler, string url, string content, List<string> links);
    void Failed(ICrawlRequestHandler crawler, string url, Exception ex);
    }

    public interface IStatisticRequestHandelr
    {
    void GetCrawledCount(IStatisticResponseHandler requester);
    void GetContent(IStatisticResponseHandler requester, string url);
    }

    public class Monitor : Actor<Monitor>, ICrawlResponseHandler, IStatisticRequestHandelr
    {
    void ICrawlResponseHandler.Succeeded(...) { ... }
    void ICrawlResponseHandler.Failed(...) { ... }
    void IStatisticRequestHandelr.GetCrawledCount(...) { ... }
    void IStatisticRequestHandelr.GetContent(...) { ... }
    }


    其中GenerateProxy用来动态生成actor的proxy对象,这个proxy对象实现具体actor类中的消息接口, 每个消息函数里做的就是将MethodInfo和参数打包到一个Message对象中,放入消息队列或是传给远程的服务程序。
    不知楼主认为这种传统的实现方式如何?


  21. 老赵
    admin
    链接

    老赵 2009-08-04 09:08:00

    @qiaojie
    我接下来会谈的方法和这个略有接近,不过还是不用反射,反射麻烦性能还差一些,如果可以方便的避免反射,我也就不用了。
    还有就是,写这样一个GenerateProxy方法,不容易啊,我要去翻大量资料,呵呵。
    至于跨进程,包括容易,都是很重要的,以后会搞的。这点微软的Axum已经支持了,而且Scala Actor也已经开始了。

  22. qiaojie
    *.*.*.*
    链接

    qiaojie 2009-08-04 10:19:00

    @Jeffrey Zhao

    用反射生成代码虽然有些麻烦,但也并不是件困难的事情,我就做过这样的事情。当然了,手工写一堆Emit IL的代码不是什么有趣的事情,所以我的做法是先用C#把代码写好再反编译成IL,然后再用Emit来生成代码。至于性能上应该跟硬编码没有差别的。

    不过我现在关心的不是这些实现细节,我比较关心的是如何设计一个高效率的调度算法,我看了一下你的代码,调度算法写的非常简单,每个任务都是直接交给线程池去做,这样每个任务的执行都会产生一次系统线程调度,一个线程调度的开销需要上千CPU周期,而每个任务的计算开销则可能非常小,这样算下来这样不但无法提高性能,反而变得更慢。

  23. 老赵
    admin
    链接

    老赵 2009-08-04 10:27:00

    @qiaojie
    没错,所以这个只是ActorLite,就是越简单越好,如果是真正的应用中,肯定需要重新编写执行的方式。
    不过你说的“每个任务的执行都会产生一次系统线程调度”这个是有问题的,在CLR线程池中,每个线程是自顾自的不断从队列中拿出任务来执行,除非队列拿空,否则是不会出现手工的线程切换的。每个线程执行完一个任务后,不会“主动”进入Sleep等暂停状态,而是再去拿一个任务执行,所有的线程切换也是操作系统负责的。这里其实可以看成1个队列n个Consumer。

    目前ActorLite的性能问题主要有两个:
    1、CLR线程池本身性能不高——不过不是你说的原因,而是实现的问题,CLR 2.0比1.0性能提高很多,据说4.0比2.0又提高不少。不过最好还是自己写一个,因为……只是感觉,“面向Actor模型”的实现,不应该是普通线程池那样的。
    2、如果在某些场景下,队列可能经常变空,这时候调度性能就会下降不少了。

  24. 大漠枭狼
    *.*.*.*
    链接

    大漠枭狼 2009-08-04 16:21:00

    你好,我想请教个问题,百度有啊进入购物车界面里,有个最近浏览过的商品,我想请教下是如何记录客户没有登录时浏览过的商品的信息的?我看了,cookies和网页相关的东西我清理完了,但还是有我浏览过的记录

  25. 老赵
    admin
    链接

    老赵 2009-08-04 16:25:00

    @大漠枭狼
    应该是cookie没清干净吧,用Fiddler看一下请求里有没有cookie。

  26. 大漠枭狼
    *.*.*.*
    链接

    大漠枭狼 2009-08-04 17:18:00

    @Jeffrey Zhao
    我清理干净了,同样是购物车的cookies都没有看到购物车的内容了,老赵,你能不能上百度研究下,youa.baidu.com里面进去点上商品,点个购物车,就会有看到浏览过的商品

  27. 打扰下你们讨论问题[未注册用户]
    *.*.*.*
    链接

    打扰下你们讨论问题[未注册用户] 2009-08-04 17:31:00

    打扰一下你们讨论问题,
    想问下老赵,你模板中“我的情况--language skills”的图叫什么名称,比如XX图,我一直很想知道,呵呵

  28. 老赵
    admin
    链接

    老赵 2009-08-04 19:04:00

    @打扰下你们讨论问题
    好像一个叫雷达图,一个就普通的饼图。

  29. Jijie Chen
    60.194.113.*
    链接

    Jijie Chen 2014-10-08 22:24:16

    这个系列现在看来,仍然十分不错。 感谢分享。

发表回复

登录 / 登录并记住我 ,登陆后便可删除或修改已发表的评论 (请注意保留评论内容)

昵称:(必填)

邮箱:(必填,仅用于Gavatar

主页:(可选)

评论内容(大于5个字符):

  1. Your Name yyyy-MM-dd HH:mm:ss

使用Live Messenger联系我