Hello World
Spiga

适合C# Actor的消息执行方式(3):中看不中用的解决方案

2009-07-17 08:45 by 老赵, 6579 visits

在前两篇文章中,我们了解到Erlang中灵活的模式匹配,以及在C#甚至F#中会都遭遇的尴尬局面。那么现在就应该来设计一个解决方案了,我们如何才能在C#这样的语言里顺畅地使用Actor模型呢?不仅如此,最好我们还能获得其它一些优势。

“消息”、“协议”和“接口”

Actor模型中的对象如果要进行交互,唯一的手段便是发送消息。不同语言/平台上的消息有不同的表现形式,但是它们所传递的信息是一致的:

  1. 做什么事情
  2. 做这件事情需要的数据

例如,Erlang中往往会使用Tag Message的格式作为消息:

{doSomething, Arg1, Arg2, Arg3, ...}

其中,原子doSomthing表示“做什么”,而后面的ArgN便是一个个的参数,使用Erlang中的模式匹配可以很方便地捕获消息中的数据。在C#等语言中,由于并非专为了Actor模型设计,因此一个Message往往只能是一个对象。但是这个对象的职责并没有减轻,因此我们需要自己处理的事情就多了。我们可能会这样做:

  • 学Erlang的Tag Message,但是这样会产生大量丑陋的类型转换操作,并且丧失了静态检查功能。
  • 为每种消息创建不同的Message类型,但是这样会产生大量类类型,每个类型又有各种属性,非常麻烦。

这两种做法在上一篇文章里都有过讨论,感兴趣的朋友可以再去“回味”一番。那么,究竟什么是消息呢?根据我的理解,“消息”其实是这么一种东西:

  1. “消息”表示“发送方”和“接受方”之间的“通信协议”(例如Erlang中的“模式”)。
  2. “消息”表示“发送方”要“接受方”所做的事情,但是并没有要求“接受方”需要怎么做。
  3. 一个Actor可能会会作为“接受方”遵守多种“通信协议”。

经过这样的描述,您是否觉得.NET中有一种东西和“消息”非常接近?没错,那就是“接口”,因为:

  1. “接口”从概念上讲便是一种“协议”。
  2. “接口”表示“能做什么”,但没有限制“怎么做”。
  3. 一个Actor可以实现多个接口,即遵守多种协议。

看上去还真是一一对应啊!那么我们再来深入一步进行对比,“接口”能否传递消息所要表现的信息?答案也是肯定的:

  1. 做什么事情:接口中的一个方法。
  2. 需要的数据:接口的参数。

也就是说,如之前的那条Erlang消息,在C#中便可以表示为:

x.DoSomething(arg1, arg2, arg3, ...)

基于这样的类比,我们发现使用“接口”还可以带来一个额外的东西,那就是“消息组”。如Erlang这样语言,消息与消息之间是完全独立的。.NET中的接口可以包含多个方法,这就是一种“分组”,我们可以利用这种方式来更好地管理有关联的消息。此外,利用.NET中的访问限制符(public,internal等)还可以实现消息的公开和隐藏。而且因为接口的参数是强类型的,所以可以得到编译期的检查,也可以享受编辑工具的代码提示及重构……C#编程里的种种优势似乎我们一个都没有拉下。

看似美好的实现

等一下,接口只是一种“协议”,但是“消息”还必须是一个实体,一个对象,并且“携带”了这个协议才能在Actor之间传递啊。这个对象除了携带协议所需要的数据以外,还要能够告诉接受方究竟该“操作什么”。“操作”带上“数据”,于是我就想到了“委托”。例如,如果我们想要发送一个“协议”,叫做IDoHandler,那么我们便可以构造一个Action<IDoHandler>对象——这正是Lambda表达式的用武之地:

Action<IDoHandler> m = x => x.Do(0, 1, 2, ...);

好,那么我们还是用乒乓测试来尝试一番。我们知道,乒乓测试会让Ping对象和Pong对象相互发送消息,我们各使用一个“消息组”,也就是“接口”来定义消息:

public interface IPongMessageHandler { }

public interface IPingMessageHandler { }

那么,Ping和Pong两个Actor类型又该如何定义呢?我们知道,Ping需要处理Pong发来的消息,因此它需要实现IPongMessageHandler接口,并且需要接受类型为Action<IPongMessageHandler>的消息。Pong与Ping类似,因此它们的定义为:

public class Ping : Actor<Action<IPongMessageHandler>>, IPongMessageHandler
{
    private int m_count;

    public Ping(int count)
    {
        this.m_count = count;
    }

    protected override void Receive(Action<IPongMessageHandler> message)
    {
        message(this);
    }

    ...
}

public class Pong : Actor<Action<IPingMessageHandler>>, IPingMessageHandler
{
    protected override void Receive(Action<IPingMessageHandler> message)
    {
        message(this);
    }

    ...
}

从代码上看,实际操作中我们并不需要让Ping或Pong直接继承Handler接口,只要最终提供一个对象给message执行即可。严格说来,“接口”只是一个“消息组”,具体的“消息”还是要落实到接口中的方法。定义了Ping和Pong之后,我们便可以明确接口中的方法了(确切地说,是明确了方法的参数):

public interface IPongMessageHandler
{
    void Pong(Pong pong);
}

public interface IPingMessageHandler
{
    void Ping(Ping ping);
    void Finish();
}

使用了接口,自然就要提供方法的实现了。我们先从典型而简单的Pong对象看起:

public class Pong : Actor<Action<IPingMessageHandler>>, IPingMessageHandler
{
    ...

    #region IPingMessageHandler Members

    void IPingMessageHandler.Ping(Ping ping)
    {
        Console.WriteLine("Pong received ping");
        ping.Post(h => h.Pong(this));
    }

    void IPingMessageHandler.Finish()
    {
        Console.WriteLine("Finished");
        this.Exit();
    }

    #endregion
}

原本需要在得到消息之后,根据消息的内容作出不同的响应。而现在,消息会被自动转发为接口中的方法调用,我们只需要实现特定的方法即可。在Ping方法中,我们会得到一个Ping类型的对象——于是我们再向它回复一个消息。消息的类型是Action<IPongMessageHandler>,可以看出,使用Lambda表达式构造这样一个消息特别方便。

Ping类也只需要实现IPongMessageHandler即可,只是这段逻辑“略显复杂”:

public class Ping : Actor<Action<IPongMessageHandler>>, IPongMessageHandler
{
    ...

    public void Start(Pong pong)
    {
        pong.Post(h => h.Ping(this));
    }

    #region IPongMessageHandler Members

    void IPongMessageHandler.Pong(Pong pong)
    {
        Console.WriteLine("Ping received pong");

        if (--this.m_count > 0)
        {
            pong.Post(h => h.Ping(this));
        }
        else
        {
            pong.Post(h => h.Finish());
            this.Exit();
        }
    }

    #endregion
}

收到Pong消息之后,将count减1,如果还大于0,则回复一个Ping消息,否则就回复一个Finish并退出。最后启动乒乓测试:

new Ping(5).Start(new Pong());

由于使用了接口作为消息的协议,因此无论是编辑器还是编译器都可以给我们足够的支持。同时,对于消息的处理也无须如上一篇文章那样不断进行判断和类型转换,代码可谓流畅不少。

致命的缺陷

虽说没有完美的东西,但目前的缺陷却是致命的。

在实际使用过程中,消息的“发送方”和消息的“接收方”应该完全无关,它们互不知道对方具体是谁,只应该基于“协议”,也就是“接口”来实现。可惜在上面这段代码中,很多东西都被“强横”地限制住了。例如,Ping消息会附带一个ping对象作为参数,ping对象会等待一个Pong消息。但是,发送Ping消息(并等待Pong消息)的一方很可能是各种类型的Actor,不一定是Ping类型。有朋友可能会说,那么我们把IPingMessageHandler的Ping方法的签名改成这样,不就可以了吗?

void Ping(Actor<Action<IPongMessageHandler>> ping)

是的,此时的ping,的确是在“等待Pong消息的Actor对象”。但是,这意味着ping对象它也只能是这个指明的Actor类型了。在实际使用过程中,这几乎是不可能的事情。因为一个Actor很可能会接受各种消息,它很难做到“一心一意”。因此这篇文章所提出的做法,几乎只能满足如乒乓测试这样简单的Actor模型使用场景。我们必须改变。

改变的方式有不少,从“向弱类型妥协”到“利用.NET 4.0中的协变/逆变”,都可以满足不同的场景——不过我们还是下次再说吧。

F#的实现

本文描述的方式也可以运用在在F#中。首先自然还是接口的定义:

type IPingMessageHandler =
    abstract Ping : Ping -> unit
    abstract Finish : unit -> unit

and IPongMessageHandler = 
    abstract Pong : Pong -> unit

以上便是F#中定义接口的方式,与C#相比更为简洁。接着便是Ping类型的实现:

and Ping() =
    inherit (IPongMessageHandler -> unit) Actor()
    let mutable count = 5
    override self.Receive(message) = message self

    member self.Start(pong : Pong) = 
        pong << fun h -> self |> h.Ping
    
    interface IPongMessageHandler with 
        member self.Pong(pong) =
            printfn "Ping received pong"
            count <- count - 1
            if (count > 0) then
                pong << fun h -> self |> h.Ping
            else
                pong << fun h -> h.Finish()
                self.Exit()

Pong类型的实现则更为简单:

and Pong() =
    inherit (IPingMessageHandler -> unit) Actor()
    override self.Receive(message) = message self
    
    interface IPingMessageHandler with
        member self.Ping(ping) =
            printfn "Pong received ping"
            ping << fun h -> self |> h.Pong
        
        member self.Finish() =
            printfn "Finished"
            self.Exit()

启动乒乓测试:

(new Pong()) |> (new Ping()).Start;

相关文章

 

本文代码访问地址:http://gist.github.com/148464

Creative Commons License

本文基于署名 2.5 中国大陆许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名赵劼(包含链接),具体操作方式可参考此处。如您有任何疑问或者授权方面的协商,请给我留言

Add your comment

13 条回复

  1. Ryan Gene
    *.*.*.*
    链接

    Ryan Gene 2009-07-17 09:10:00

    的确是中看不中用

  2. 老赵
    admin
    链接

    老赵 2009-07-17 09:12:00

    @Ryan Gene
    所以我还会接着写下去,呵呵。

  3. Ryan Gene
    *.*.*.*
    链接

    Ryan Gene 2009-07-17 09:17:00

    @Jeffrey Zhao
    恩,那拭目以待

  4. 学F#有什么好书吗[未注册用户]
    *.*.*.*
    链接

    学F#有什么好书吗[未注册用户] 2009-07-17 09:21:00

    学F#有什么好书吗?

  5. 老赵
    admin
    链接

    老赵 2009-07-17 09:22:00

    @Ryan Gene
    其实现在这个方法对于简单场景已经够用了。
    接下去我会提出两种方案,一种是往弱类型方向让一小步,还有一种就是利用.NET 4.0的协变/逆变了。
    但是我平时在用Actor模型写简单小程序时,还是使用这篇文章的方式。

  6. 老赵
    admin
    链接

    老赵 2009-07-17 09:23:00

    @学F#有什么好书吗
    Expert F#是一本好书中的好书,我没有看过相同级别的ASP.NET,C#书。
    也有可能是F#本身内容很丰富,很强大的原因。

  7. 风云
    *.*.*.*
    链接

    风云 2009-07-17 10:48:00

    看了老赵的文章后,我用我以前写的消息总线来实现这个PingPong,
    下面是代码:

    [TestFixture]
        public class PingPongTest
        {
            class Actor
            {
                static MessageRouter MB = new MessageRouter();
    
                public static void Post<TMessage>(string topic, TMessage message)
                {
                    MB.PublishTopic<TMessage>(Topic.Make<TMessage>(topic, message));
                }
    
                public static void Receive<TMessage>(string topic, ObserverHandler<Topic<TMessage>> handler)
                {
                    MB.SubscribeTopic<TMessage>(topic, handler);
                }
    
                public static void Exit<TMessage>(string topic)
                {
                    MB.Remove<Topic<TMessage>>(topic);
                }
            }
    
            private class Topics
            {
                public const string Ping = "Ping";
                public const string Pong = "Pong";
            }
    
            private class Ping
            {
                int m_count = 5;
                public Ping()
                {
                    Actor.Receive<Pong>(Topics.Pong, (s, e) =>
                        {
                            Console.WriteLine("Ping received pong");
    
                            if (--this.m_count > 0)
                                Actor.Post<Ping>(Topics.Ping, this);
                            else
                            {
                                Console.WriteLine("Finished");
                                Actor.Exit<Ping>(Topics.Ping);
                            }
                        });
                }
    
                public void Start()
                {
                    Actor.Post<Ping>(Topics.Ping, this);
                }
            }
    
            private class Pong
            {
                public Pong()
                {
                    Actor.Receive<Ping>(Topics.Ping, (s, e) =>
                        {
                            Console.WriteLine("Pong received ping");
                            Actor.Post<Pong>(Topics.Pong, this);
                        });
                }
            }
    
            [Test]
            public void Test()
            {
                var ping = new Ping();
                var pong = new Pong();
                ping.Start();
            }
        }
    
    


    代码输出:
    Pong received ping
    Ping received pong
    Pong received ping
    Ping received pong
    Pong received ping
    Ping received pong
    Pong received ping
    Ping received pong
    Pong received ping
    Ping received pong
    Finished
    我将会在单独一篇文章中介绍这个实现的,欢迎拍砖

  8. 老赵
    admin
    链接

    老赵 2009-07-17 11:00:00

    @风云
    支持啊,非常期待。

  9. 小标题很漂亮[未注册用户]
    *.*.*.*
    链接

    小标题很漂亮[未注册用户] 2009-07-17 13:32:00

    小标题很漂亮,赞一个!

  10. 老赵
    admin
    链接

    老赵 2009-07-17 14:57:00

    @小标题很漂亮
    谢谢

  11. anonymous..[未注册用户]
    *.*.*.*
    链接

    anonymous..[未注册用户] 2009-07-18 11:30:00

    MQ is Actor, Why not using MQ ?

  12. 老赵
    admin
    链接

    老赵 2009-07-18 12:08:00

    @anonymous..
    和一般的MQ还是不一样的。
    Actor一般都需要是非常轻量级(因为会创建成千上万个),对单个Actor来说线程安全(普通MQ作为Actor会同时执行其中多个消息)。

  13. 11游客[未注册用户]
    *.*.*.*
    链接

    11游客[未注册用户] 2009-10-06 15:47:00

    博主您好:
    我试着动手实现了您的Actor模式,有一个问题。
    如果在Dispatcher中的Execute方法中,在actor没有退出的情况下(if()/else中的else的情况下),调用Thread.Sleep(1000),会出现队列为空的报错。这是为什么呢?修改的代码如下:
    else
    {
    Thread.VolatileWrite(ref actor.Context.m_status, ActorContext.WAITING);
    Thread.Sleep(1000);
    if (actor.MessageCount > 0)
    {
    this.ReadyToExecute(actor);
    }
    }

发表回复

登录 / 登录并记住我 ,登陆后便可删除或修改已发表的评论 (请注意保留评论内容)

昵称:(必填)

邮箱:(必填,仅用于Gavatar

主页:(可选)

评论内容(大于5个字符):

  1. Your Name yyyy-MM-dd HH:mm:ss

使用Live Messenger联系我