Hello World
Spiga

ActorLite:一个轻量级Actor模型实现(中)

2009-05-14 09:29 by 老赵, 26974 visits

上一篇文章的反响来看,似乎大家对于这一话题并没有太大兴趣。而这篇文章将会为大家带来一个简单但完整的Actor模型实现。此外,在下一篇文章中……可能会出现一些让您觉得有趣的东西。:)

任务分配逻辑

如上文所述,这次要实现的是一个非常简单的Actor模型,使用基于事件的分配方式,直接把任务交给.NET自带的线程池去使用。不过我们又该什么时候把一个Actor推入线程池的执行队列呢?这其实取决于我们执行Actor的两个“基本原则”:

  • 如果Actor的邮箱中包含消息,那么要尽早执行。
  • 对于单个Actor对象来说,它的消息是顺序执行的。

因此,我们有两个“时机”可以把一个Actor交由线程池去执行:

  • 当Actor接收到一个消息(且该Actor处于“等待”状态)
  • 当Actor执行完一个消息(且Actor的邮箱中存在更多消息)

显然,在进行操作时需要小心处理并发造成的问题,因为一个“执行完”和多个“接受到”事件可能同时出现。如果操作不当,则容易出现各种错误的情况:

  • 某个Actor的邮箱未空,却已停止执行。
  • 同一个Actor的两个消息被并行地处理。
  • Actor的邮箱已经没有消息,却被要求再次执行。

至于并行控制的方式,就请关注下面的实现吧。

简单的Actor模型实现

Actor模型中最关键的莫过于Actor对象的实现。一个Actor的功能有如下三种:

  • 将消息放入邮箱
  • 接受并处理消息
  • 循环/退出循环

因此Actor抽象类对外的接口大致如下:

public abstract class Actor<T> : IActor
{
    protected abstract void Receive(T message);

    protected void Exit() { ... }

    public void Post(T message) { ... }
}

三个方法的签名应该已经充分说明了各自的含义。不过IActor又是什么呢?请看它的定义:

internal interface IActor
{
    void Execute();

    bool Existed { get; }

    int MessageCount { get; }

    ActorContext Context { get; }
}

这是一个internal修饰的类型,这意味着它的访问级别被限制在程序集内部。IActor接口的作用是作为一个统一的类型,交给Dispatcher——也就是Actor模型的任务分发逻辑所使用的。IActor接口的前三个成员很容易从名称上理解其含义,那么ActorContext又是做什么用的呢?

internal class ActorContext
{
    public ActorContext(IActor actor)
    {
        this.Actor = actor;
    }

    public IActor Actor { get; private set; }

    ...
}

public abstract class Actor<T> : IActor
{
    protected Actor()
    {
        this.m_context = new ActorContext(this);
    }

    private ActorContext m_context;
    ActorContext IActor.Context
    {
        get
        {
            return this.m_context;
        }
    }

    ...
}

在多线程的环境中,进行一些同步控制是非常重要的事情。线程同步的常用手段是lock,不过如果要减小锁的粒度,那么势必会使用Interlocked类下的CAS等原子操作,而那些操作只能针对最基础的域变量,而不能针对经过封装的属性或方法等成员。ActorContext便包含了用于同步控制,以及其他直接表示Actor内部状态各种字段的对象。这样,我们便可以通过ActorContext对象来实现一个Lock-Free的链表或队列。您可以会说,那么为什么要用独立的ActorContext类型,而不直接把字段放置在统一的基类(例如ActorBase)中呢?这有两点原因,第一点是所谓的“统一控制”便于管理,而第二点才是更为关键的:后文会涉及到F#对这Actor模型的使用,只可惜F#在对待父类的internal成员时有一个bug,因此不得不把相关实现替换成接口(IActor)。不过这不是本文的主题,我们下次再讨论F#的问题。

ActorContext目前只有一个字段——没错,只需要一个,这个字段便是表示状态的m_status。

internal class ActorContext
{
    ...

    public const int WAITING = 0;
    public const int EXECUTING = 1;
    public const int EXITED = 2;

    public int m_status;
}

m_status字段的类型为int,而不是枚举,这是为了可以使用Interlocked中的CAS操作。而对这个状态的操作,也正好形成了我们同步操作过程中的“壁垒”。我们的每个Actor在任意时刻都处于三种状态之一:

  • 等待(Waiting):邮箱为空,或刚执行完一个消息,正等待分配任务。
  • 执行(Executing):正在执行一个消息(确切地说,由于线程池的缘故,它也可能是还在队列中等待,不过从概念上理解,我们认为它“已经”执行了)。
  • 退出(Exited):已经退出,不会再执行任何消息。

显然,只有当m_status为WAITING时才能够为Actor分配运算资源(线程)以便执行,而分配好资源(将其推入.NET线程池)之后,它的状态就要变成EXECUTING。这恰好可以用一个原子操作形成我们需要的“壁垒”,可以让多个“请求”,“有且只有一个”成功,即“把Actor的执行任务塞入线程池”。如下:

internal class Dispatcher
{
    ...

    public void ReadyToExecute(IActor actor)
    {
        if (actor.Existed) return;

        int status = Interlocked.CompareExchange(
            ref actor.Context.m_status,
            ActorContext.EXECUTING,
            ActorContext.WAITING);

        if (status == ActorContext.WAITING)
        {
            ThreadPool.QueueUserWorkItem(this.Execute, actor);
        }
    }

    ...
}

CompareExchange方法返回这次原子操作前m_status的值,如果它为WAITING,那么这次操作(也仅有这次操作)成功地将m_status修改为EXECUTING。在这个情况下,Actor将会被放入线程池,将会由Execute方法来执行。从上述实现中我们可以发现,这个方法在多线程的情况下也能够正常工作。那么ReadyToExecute方法该在什么地方被调用呢?应该说是在任何“可能”让Actor开始执行的时候得到调用。按照文章开始的说法,其中一个情况便是“当Actor接收到一个消息时”:

public abstract class Actor<T> : IActor
{
    ...

    private Queue<T> m_messageQueue = new Queue<T>();

    ...

    public void Post(T message)
    {
        if (this.m_exited) return;

        lock (this.m_messageQueue)
        {
            this.m_messageQueue.Enqueue(message);
        }

        Dispatcher.Instance.ReadyToExecute(this);
    }
}

而另一个地方,自然是消息“执行完毕”,且Actor的邮箱中还拥有消息的时候,则再次为其分配运算资源。这便是Dispatcher.Execute方法的逻辑:

public abstract class Actor<T> : IActor
{
    ...

    bool IActor.Existed
    {
        get
        {
            return this.m_exited;
        }
    }

    int IActor.MessageCount
    {
        get
        {
            return this.m_messageQueue.Count;
        }
    }

    void IActor.Execute()
    {
        T message;
        lock (this.m_messageQueue)
        {
            message = this.m_messageQueue.Dequeue();
        }

        this.Receive(message);
    }

    private bool m_exited = false;

    protected void Exit()
    {
        this.m_exited = true;
    }

    ...
}

internal class Dispatcher
{
    ...

    private void Execute(object o)
    {
        IActor actor = (IActor)o;
        actor.Execute();

当程序执行到此处时,actor的Execute方法已经从邮箱尾部获取了一条消息,并交由用户实现的Receive方法执行。同时,Actor的Exit方法也可能被调用,使它的Exited属性返回true。不过到目前为止,因为ActorContext.m_status一直保持为EXECUTING,因此这段时间中任意新消息所造成的ReadyToExecute方法的调用都不会为Actor再次分配运算资源。不过接下来,我们将会修改m_status,这可能会造成竞争。那么我们又该怎么处理呢?

如果用户调用了Actor.Exit方法,那么它的Exited属性则会返回true,我们可以将m_status设为EXITED,这样Actor再也不会回到WAITING状态,也就避免了无谓的资源分配:

         if (actor.Existed)
        {
            Thread.VolatileWrite(
                ref actor.Context.m_status,
                ActorContext.EXITED);
        }
        else
        {

如果Actor没有退出,那么它会被短暂地切换为WAITING状态。此后如果Actor的邮箱中存在剩余的消息,那么我们会再次调用ReadyToExecute方法“尝试”再次为Actor分配运算资源:

            Thread.VolatileWrite(
                ref actor.Context.m_status,
                ActorContext.WAITING);

            if (actor.MessageCount > 0)
            {
                this.ReadyToExecute(actor);
            }
        }
    }
}

显然,在VolatileWrite和ReadyToExecute方法之间,可能会到来一条新的消息,因而再次引发一次并行地ReadyToExecute调用。不过根据我们之前的分析,这样的竞争并不会造成问题,因此在这方面我们可以完全放心。

至此,我们已经完整地实现了一个简单的Actor模型,逻辑清晰,功能完整——而这一切,仅仅用了不到150行代码。不用怀疑,这的确是事实。

使用示例

Actor模型的关键在于消息传递形式(Message Passing Style)的工作方式,通信的唯一手段便是传递消息。在使用我们的Actor模型之前,我们需要继承Actor<T>类来构建一个真正的Actor类型。例如一个最简单的计数器:

public class Counter : Actor<int>
{
    private int m_value;

    public Counter() : this(0) { }

    public Counter(int initial)
    {
        this.m_value = initial;
    }

    protected override void Receive(int message)
    {
        this.m_value += message;

        if (message == -1)
        {
            Console.WriteLine(this.m_value);
            this.Exit();
        }
    }
}

当计数器收到-1以外的数值时,便会累加到它的计数器上,否则便会打印出当前的值并退出。这里无需做任何同步方面的考虑,因为对于单个Actor来说,所有的消息都是依次处理,不会出现并发的情况。Counter的使用自然非常简单:

static void Main(string[] args)
{
    Counter counter = new Counter();
    for (int i = 0; i < 10000; i++)
    {
        counter.Post(i);
    }

    counter.Post(-1);

    Console.ReadLine();
}

不过您可能会问,这样的调用又有什么作用,又能实现什么呢?您现在可以去网上搜索一些Actor模型解决问题的示例,或者您可以等待下一篇文章中,我们使用F#来操作这个Actor模型。您会发现,配合F#的一些特性,这个Actor模型会变得更加实用,更为有趣。

此外,在下一篇文章里我们也会对这个Actor模型进行简单的性能分析。如果您要把它用在生产环境中,那么可能还需要对它再进行一些细微地调整。

Creative Commons License

本文基于署名 2.5 中国大陆许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名赵劼(包含链接),具体操作方式可参考此处。如您有任何疑问或者授权方面的协商,请给我留言

Add your comment

18 条回复

  1. Victor
    *.*.*.*
    链接

    Victor 2009-05-14 09:33:00

    坐个沙发

  2. xiao_p(未登录)[未注册用户]
    *.*.*.*
    链接

    xiao_p(未登录)[未注册用户] 2009-05-14 09:36:00

    因为大家关注的领域不同,而这个领域有少有人关注,所以看的人可能比较少吧。

    不过,貌似这个年代不关注并发有点过分,必经都多核时代了。

  3. 老赵
    admin
    链接

    老赵 2009-05-14 09:46:00

    @xiao_p(未登录)
    没事,大家不关注的时候我关注,那么我就走在前面了,嘿嘿。

  4. 墙头草
    *.*.*.*
    链接

    墙头草 2009-05-14 09:57:00

    能不能给份源码看看...

  5. Kuoching Ju
    *.*.*.*
    链接

    Kuoching Ju 2009-05-14 10:13:00

    支持老赵!

  6. CHwANG
    *.*.*.*
    链接

    CHwANG 2009-05-14 10:20:00

    一直在期待老赵的书托系列,问一句,什么时候出2呢?

  7. 老赵
    admin
    链接

    老赵 2009-05-14 10:22:00

    @墙头草
    都在文章里了。

  8. 老赵
    admin
    链接

    老赵 2009-05-14 10:23:00

    @CHwANG
    不知道,其实一直准备了一些书,但是可写的东西太多,所以就凭兴致了……

  9. redmoon
    *.*.*.*
    链接

    redmoon 2009-05-14 10:26:00

    没有兴趣是很正常的!

  10. 菜鸟001[未注册用户]
    *.*.*.*
    链接

    菜鸟001[未注册用户] 2009-05-14 11:29:00

    支持,继续关注

  11. Indigo Dai
    *.*.*.*
    链接

    Indigo Dai 2009-05-14 15:30:00

    大家对目前老赵写的文章不感兴趣,可能是因为有不少人没深入接触消息传递形式这个并发控制方法。
    现在采用消息传递,意图是不是为了减小并发粒度(较信号量、锁),提高并发程度。可能我说得不是很对,还请老赵指正。
    老赵对这方面研究很深啊,每篇文章都有嚼头,老赵一下子就把新鲜概念用语言实现出来了,老赵的境界已经相当高了,从有一些文章没有受到很多人的兴趣就可以看出来了。在老赵这发现了自身的好多不足,谢谢老赵。
    期待下篇关于F#的文章。

  12. 老赵
    admin
    链接

    老赵 2009-05-14 15:32:00

    @Indigo Dai
    应该说是强调了隔离“Isolation”,这样就避免了对共享内存的数据进行并发修改,不用总是在锁啊,互斥体阿这种上面纠缠,简化并行程序开发。

  13. Arthraim[未注册用户]
    *.*.*.*
    链接

    Arthraim[未注册用户] 2009-05-14 19:41:00

    Expecting...

  14. 阿牛
    *.*.*.*
    链接

    阿牛 2009-05-15 13:49:00

    关注中。。。。。

    支持老赵。

  15. applezorro[未注册用户]
    *.*.*.*
    链接

    applezorro[未注册用户] 2009-08-25 00:13:00

    老赵,
    你好,看了你关于Actor的实现,有处小疑问:
    请看代码:
    bool IActor.Existed
    {
    get
    {
    return this.m_exited;
    }
    }

    你是否笔误将IActor.Exited写为了IActor.Existed
    如当作Exited来看,语义通顺;
    而当作Existed来看,语义似乎有些不通畅。
    请核实。

  16. 老赵
    admin
    链接

    老赵 2009-08-25 00:19:00

    @applezorro
    Existed是我写错了,应该是Exited。

  17. 仁面寿星
    *.*.*.*
    链接

    仁面寿星 2009-08-29 11:53:00

    老赵,你好。对于该文有一些不了解,请教下。
    IActor 接口定义了 Execute方法,
    但是在文中的代码中没有实现(类Actor没有实现)
    在类Dispatcher的回调方法Execute中又会执行到Actor的Execute方法。
    因此我推断 Execute应该是类Actor的一个抽象方法,由继承Actor的类根据需要来实现。
    但是在您的使用示例中未看到这点,请问应该是怎样?

    另外 Dispatcher.Instanc属性您也没有实现,我认为 Dispatcher.Instanc应该作为单例模型来实现,不知对否?


    谢谢了。

  18. 仁面寿星
    *.*.*.*
    链接

    仁面寿星 2009-08-29 12:09:00

    抱歉,是我看的时候漏看了一段,看到了,呵呵

发表回复

登录 / 登录并记住我 ,登陆后便可删除或修改已发表的评论 (请注意保留评论内容)

昵称:(必填)

邮箱:(必填,仅用于Gavatar

主页:(可选)

评论内容(大于5个字符):

  1. Your Name yyyy-MM-dd HH:mm:ss

使用Live Messenger联系我