ActorLite:一个轻量级Actor模型实现(中)
2009-05-14 09:29 by 老赵, 26972 visits从上一篇文章的反响来看,似乎大家对于这一话题并没有太大兴趣。而这篇文章将会为大家带来一个简单但完整的Actor模型实现。此外,在下一篇文章中……可能会出现一些让您觉得有趣的东西。:)
任务分配逻辑
如上文所述,这次要实现的是一个非常简单的Actor模型,使用基于事件的分配方式,直接把任务交给.NET自带的线程池去使用。不过我们又该什么时候把一个Actor推入线程池的执行队列呢?这其实取决于我们执行Actor的两个“基本原则”:
- 如果Actor的邮箱中包含消息,那么要尽早执行。
- 对于单个Actor对象来说,它的消息是顺序执行的。
因此,我们有两个“时机”可以把一个Actor交由线程池去执行:
- 当Actor接收到一个消息(且该Actor处于“等待”状态)
- 当Actor执行完一个消息(且Actor的邮箱中存在更多消息)
显然,在进行操作时需要小心处理并发造成的问题,因为一个“执行完”和多个“接受到”事件可能同时出现。如果操作不当,则容易出现各种错误的情况:
- 某个Actor的邮箱未空,却已停止执行。
- 同一个Actor的两个消息被并行地处理。
- Actor的邮箱已经没有消息,却被要求再次执行。
至于并行控制的方式,就请关注下面的实现吧。
简单的Actor模型实现
Actor模型中最关键的莫过于Actor对象的实现。一个Actor的功能有如下三种:
- 将消息放入邮箱
- 接受并处理消息
- 循环/退出循环
因此Actor抽象类对外的接口大致如下:
public abstract class Actor<T> : IActor { protected abstract void Receive(T message); protected void Exit() { ... } public void Post(T message) { ... } }
三个方法的签名应该已经充分说明了各自的含义。不过IActor又是什么呢?请看它的定义:
internal interface IActor { void Execute(); bool Existed { get; } int MessageCount { get; } ActorContext Context { get; } }
这是一个internal修饰的类型,这意味着它的访问级别被限制在程序集内部。IActor接口的作用是作为一个统一的类型,交给Dispatcher——也就是Actor模型的任务分发逻辑所使用的。IActor接口的前三个成员很容易从名称上理解其含义,那么ActorContext又是做什么用的呢?
internal class ActorContext { public ActorContext(IActor actor) { this.Actor = actor; } public IActor Actor { get; private set; } ... } public abstract class Actor<T> : IActor { protected Actor() { this.m_context = new ActorContext(this); } private ActorContext m_context; ActorContext IActor.Context { get { return this.m_context; } } ... }
在多线程的环境中,进行一些同步控制是非常重要的事情。线程同步的常用手段是lock,不过如果要减小锁的粒度,那么势必会使用Interlocked类下的CAS等原子操作,而那些操作只能针对最基础的域变量,而不能针对经过封装的属性或方法等成员。ActorContext便包含了用于同步控制,以及其他直接表示Actor内部状态各种字段的对象。这样,我们便可以通过ActorContext对象来实现一个Lock-Free的链表或队列。您可以会说,那么为什么要用独立的ActorContext类型,而不直接把字段放置在统一的基类(例如ActorBase)中呢?这有两点原因,第一点是所谓的“统一控制”便于管理,而第二点才是更为关键的:后文会涉及到F#对这Actor模型的使用,只可惜F#在对待父类的internal成员时有一个bug,因此不得不把相关实现替换成接口(IActor)。不过这不是本文的主题,我们下次再讨论F#的问题。
ActorContext目前只有一个字段——没错,只需要一个,这个字段便是表示状态的m_status。
internal class ActorContext { ... public const int WAITING = 0; public const int EXECUTING = 1; public const int EXITED = 2; public int m_status; }
m_status字段的类型为int,而不是枚举,这是为了可以使用Interlocked中的CAS操作。而对这个状态的操作,也正好形成了我们同步操作过程中的“壁垒”。我们的每个Actor在任意时刻都处于三种状态之一:
- 等待(Waiting):邮箱为空,或刚执行完一个消息,正等待分配任务。
- 执行(Executing):正在执行一个消息(确切地说,由于线程池的缘故,它也可能是还在队列中等待,不过从概念上理解,我们认为它“已经”执行了)。
- 退出(Exited):已经退出,不会再执行任何消息。
显然,只有当m_status为WAITING时才能够为Actor分配运算资源(线程)以便执行,而分配好资源(将其推入.NET线程池)之后,它的状态就要变成EXECUTING。这恰好可以用一个原子操作形成我们需要的“壁垒”,可以让多个“请求”,“有且只有一个”成功,即“把Actor的执行任务塞入线程池”。如下:
internal class Dispatcher { ... public void ReadyToExecute(IActor actor) { if (actor.Existed) return; int status = Interlocked.CompareExchange( ref actor.Context.m_status, ActorContext.EXECUTING, ActorContext.WAITING); if (status == ActorContext.WAITING) { ThreadPool.QueueUserWorkItem(this.Execute, actor); } } ... }
CompareExchange方法返回这次原子操作前m_status的值,如果它为WAITING,那么这次操作(也仅有这次操作)成功地将m_status修改为EXECUTING。在这个情况下,Actor将会被放入线程池,将会由Execute方法来执行。从上述实现中我们可以发现,这个方法在多线程的情况下也能够正常工作。那么ReadyToExecute方法该在什么地方被调用呢?应该说是在任何“可能”让Actor开始执行的时候得到调用。按照文章开始的说法,其中一个情况便是“当Actor接收到一个消息时”:
public abstract class Actor<T> : IActor { ... private Queue<T> m_messageQueue = new Queue<T>(); ... public void Post(T message) { if (this.m_exited) return; lock (this.m_messageQueue) { this.m_messageQueue.Enqueue(message); } Dispatcher.Instance.ReadyToExecute(this); } }
而另一个地方,自然是消息“执行完毕”,且Actor的邮箱中还拥有消息的时候,则再次为其分配运算资源。这便是Dispatcher.Execute方法的逻辑:
public abstract class Actor<T> : IActor { ... bool IActor.Existed { get { return this.m_exited; } } int IActor.MessageCount { get { return this.m_messageQueue.Count; } } void IActor.Execute() { T message; lock (this.m_messageQueue) { message = this.m_messageQueue.Dequeue(); } this.Receive(message); } private bool m_exited = false; protected void Exit() { this.m_exited = true; } ... } internal class Dispatcher { ... private void Execute(object o) { IActor actor = (IActor)o; actor.Execute();
当程序执行到此处时,actor的Execute方法已经从邮箱尾部获取了一条消息,并交由用户实现的Receive方法执行。同时,Actor的Exit方法也可能被调用,使它的Exited属性返回true。不过到目前为止,因为ActorContext.m_status一直保持为EXECUTING,因此这段时间中任意新消息所造成的ReadyToExecute方法的调用都不会为Actor再次分配运算资源。不过接下来,我们将会修改m_status,这可能会造成竞争。那么我们又该怎么处理呢?
如果用户调用了Actor.Exit方法,那么它的Exited属性则会返回true,我们可以将m_status设为EXITED,这样Actor再也不会回到WAITING状态,也就避免了无谓的资源分配:
if (actor.Existed) { Thread.VolatileWrite( ref actor.Context.m_status, ActorContext.EXITED); } else {
如果Actor没有退出,那么它会被短暂地切换为WAITING状态。此后如果Actor的邮箱中存在剩余的消息,那么我们会再次调用ReadyToExecute方法“尝试”再次为Actor分配运算资源:
Thread.VolatileWrite( ref actor.Context.m_status, ActorContext.WAITING); if (actor.MessageCount > 0) { this.ReadyToExecute(actor); } } } }
显然,在VolatileWrite和ReadyToExecute方法之间,可能会到来一条新的消息,因而再次引发一次并行地ReadyToExecute调用。不过根据我们之前的分析,这样的竞争并不会造成问题,因此在这方面我们可以完全放心。
至此,我们已经完整地实现了一个简单的Actor模型,逻辑清晰,功能完整——而这一切,仅仅用了不到150行代码。不用怀疑,这的确是事实。
使用示例
Actor模型的关键在于消息传递形式(Message Passing Style)的工作方式,通信的唯一手段便是传递消息。在使用我们的Actor模型之前,我们需要继承Actor<T>类来构建一个真正的Actor类型。例如一个最简单的计数器:
public class Counter : Actor<int> { private int m_value; public Counter() : this(0) { } public Counter(int initial) { this.m_value = initial; } protected override void Receive(int message) { this.m_value += message; if (message == -1) { Console.WriteLine(this.m_value); this.Exit(); } } }
当计数器收到-1以外的数值时,便会累加到它的计数器上,否则便会打印出当前的值并退出。这里无需做任何同步方面的考虑,因为对于单个Actor来说,所有的消息都是依次处理,不会出现并发的情况。Counter的使用自然非常简单:
static void Main(string[] args) { Counter counter = new Counter(); for (int i = 0; i < 10000; i++) { counter.Post(i); } counter.Post(-1); Console.ReadLine(); }
不过您可能会问,这样的调用又有什么作用,又能实现什么呢?您现在可以去网上搜索一些Actor模型解决问题的示例,或者您可以等待下一篇文章中,我们使用F#来操作这个Actor模型。您会发现,配合F#的一些特性,这个Actor模型会变得更加实用,更为有趣。
此外,在下一篇文章里我们也会对这个Actor模型进行简单的性能分析。如果您要把它用在生产环境中,那么可能还需要对它再进行一些细微地调整。
坐个沙发