异步编程与响应式框架
2010-09-14 20:14 by 老赵, 13488 visits本文原发表于《程序员》杂志9月刊,现将全文公开至此。前几天的技术交流会上我也讲了这部分内容,配合着看会有更好效果。
此外我想再多几句。《程序员》杂志在某些方面必须尽快做出调整。就拿我这篇文章来说,代码除了被三栏的版式搞得支离破碎以外,其中必要的空格也莫名奇妙地少了许多。我认为《程序员》如今在内容方面已经有了显著提高,但是在这种“表面”工夫上也得抓紧才好,这也是国内同类杂志中“最专业者”所应有的风范。
前言
异步操作是强大的,它是许多高伸缩性架构的基石。异步操作在许多情况下是必须的,例如在客户端保持用户界面的响应能力,以及在日益兴起的云计算场景中。但是,异步编程又是十分困难的,它让这让许多程序员敬而远之。因此,越来越多的编程语言都对异步编程提供了相当程度的支持,其中的典型代表便是F#中的异步工作流以及Scala的Actor模型。不过目前的一些主流编程语言,如C#或是JavaScript,它们在设计之时并没有在异步编程上考虑太多,我们便会根据它们的语言特性,提供合适的异步编程模型及其实现。而本文介绍的便是其中一例:响应式编程(Reactive Programming)模型及响应式框架(Reactive Framework,简称Rx)。
异步编程的难点
异步编程之所以困难,主要有三大难点。
首先是对于状态的维护。在普通编程中,我们已经习惯了根据各种状态采取不同做法的编程方式。在异步编程中,状态对于操作的影响则往往更为复杂。例如,我们在编写一个鼠标“拖动及绘图”的行为时,一般会采用这样的逻辑:
- 在MouseDown事件中将isDragging标记设为true,表示“拖动开始”,并记录当前鼠标位置prevPos。
- 在MouseUp事件中将isDragging标记设为false,表示“拖动结束”。
- 在MouseMove事件中检查isDragging标记,如果为true,根据鼠标当前位置currPos和之前记录的prevPos进行绘图,并将currPos的值写入prevPos。
仅在这样一个最基本的场景中,我们便需要编写三个事件处理器(Event Handler),控制isDragging,prevPos等外部状态,并根据这些状态决定事件触发时的效果。这样的例子数不胜数,尤其是在各式拖放操作中,几乎都会涉及大量状态的控制(例如,判断物体是否进入某个特定区域)。
异步编程的另一个难点,在于异步操作之间的组合及交互。例如在如上的简单拖放操作中,我们便涉及到了MouseDown,MouseUp及MouseMove三个事件。从某些角度来说,客户端的UI事件还是比较容易处理的,因为它们往往都是在单一线程上依次执行。但是在另外一些场景中,如云计算时,我们往往会同时发起多个异步操作,并根据这些操作的结果进行后续处理,甚至还会有一个额外的超时监控,这样便很有可能会出现并发操作的竞争(Race)情况,这将会成为程序复杂度的灾难。
此外,异步操作还会破坏“代码局部性(Code Locality)”,这可能也是异步操作中最为常见的阻碍。程序员早已习惯了“线性”地表达逻辑,但即便是多个顺序执行的异步操作,也会因为大量的回调函数而将算法拆得支离破碎,更何况还会出现各种循环及条件判断。同时,在线性的代码中,我们可以使用“局部变量”保存状态,而在编写异步代码时则需要手动地在多个函数中传递状态。此外,由于逻辑被拆分至多个方法,因此我们也无法使用传统的try/catch进行统一异常处理。
推模型与拉模型
平时我们使用最多的便是“交互式(Interactive)”的编程方式,采用的是组件之间的相互调用来表现逻辑。例如,对象A向对象B请求数据并等待返回,待对象B完成并返还数据之后A才继续进行后面的操作。交互式编程的一个典型应用便是GoF23中的迭代器(Iterator)模式,它在.NET中的实现为IEnumerable及IEnumerator接口,例如:
void Traverse(IEnumerable<int> source) { var etor = source.GetEnumerator(); while (etor.MoveNext()) { Console.WriteLine(etor.Current); } }
为了更好地说明问题,这里我们将标准的foreach操作展开为传统的迭代器使用形式,并省略了using语句。在使用时,我们先调用一个IEnumerable对象的GetEnumerator方法,获得一个迭代器,再根据MoveNext及Current进行遍历。在调用MoveNext时,迭代器会去“准备”下一个元素,并根据存在与否返回true或者false。试想,如果其中某个MoveNext的“准备”工作涉及到一个耗时较长的操作,则迭代器的使用者也必须眼巴巴地等待其返回。
这是一种“拉(Pull)”模型,数据由消费者(Consumer)从生产者(Producer)那里主动“拉”来。这是一种同步的交互方式,数据消费者会依赖于数据生产者的表现。这就好比我们去食堂吃饭时必须主动去取餐,此时则必须从队伍的最后排起,我们什么时候能结束等待并进行下一步操作(即“吃饭”),则要看食堂的生产速度如何。很显然,有些时候这种交互方式是不可接受的,例如我们在实现一个搜索引擎的“关键字提示”功能时,不可能让用户在输入一个字符后,必须等待远程的提示请求返回才能继续输入下一个字符。
而与交互式编程对应的便是“响应式(Reactive)”编程。响应式编程是一种基于“改变”的编程方式。例如在交互式编程中,A = B + C这样的表达式意味着将B与C之和赋给A,而此后B与C的改变都与A无关。而在响应式编程中,A会去“响应”B或C的变化,即一旦B或C改变之后,A的值也会随之变化。响应式编程的一个典型应用便是GoF23中的观察者(Observer)模式。与迭代器的IEnumerable/IEnumerator不同,在之前的.NET框架中并没有对这样一种编程模型指定“标准化(Formallized)”接口,不过在.NET 4.0的基础类库中增加了IObservable及IObserver接口,签名如下:
public interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); } public interface IObserver<in T> { void OnCompleted(); void OnError(Exception error); void OnNext(T value); }
如果我们仔细比较“迭代器”与“观察者”的标准化接口,则会发现它们是完全“对偶(dual)”的:
- IEnumerable.GetEnumerator方法“输出”一个IEnumerater对象;IObservable.Subscribe方法“输入”一个IObserver对象。
- 在遍历元素用尽时,IEnumerator.MoveNext方法返回false;在响应内容用尽时,IObserver.OnCompleted方法被调用。
- 在有新元素需要遍历时,IEnumerator.MoveNext方法返回true,并通过Current属性“输出”;在有新元素需要响应时,IObserver.OnNext方法被调用,并通过参数“输入”。
- 在出现错误时,IEnumerator.MoveNext方法会“抛出”一个异常;在出现错误时,IObserver.OnError方法会被调用,并通过参数“接受”异常信息。
至于IObservable.Subscribe方法返回的IDisposable对象,则用于“退定”操作,即让输入的IObserver对象再也不需要继续响应IObservable对象的新元素了。
从比较中可以看出,如果说IEnumerator对象是由数据消费者使用的话,那么IObserver对象则是由数据的生产者,即IObservable对象使用的。换句话说,数据是由数据的生产者“推”给数据消费者的,是一种“推(Push)”模型。在这种异步的交互方式中,数据消费者不必依赖于数据生产者的表现。这就好比我们去饭店吃饭,点菜后便可坐下和同伴聊聊天或是用手机上上网,而作为菜品的生产者,饭店,则会在产出之后主动端上桌来。这么做无疑解放了数据的消费者,例如用户可以在文本框里不断地输入字符,而只需等远程服务器将提示结果“推”给客户端后再显示即可。
许多模型都可以统一至标准的生产者接口IObservable上,如MouseMove事件便可以认做是“永不停止的MouseEventArgs对象的生产者”。而另一方面,单个异步操作则可以被视为“只产出单个数据便结束的生产者”。
LINQ to Observable
在许多人眼中,C# 3.0中新增的LINQ特性只是一种用于操作数据的DSL,它的主要作用也仅仅是针对IEnumerable或是IQueryable的数据操作。事实上,LINQ本身的能力远不止此。LINQ是一种非常简单的语言特性,编译器只是将LINQ查询语句转化为“字面等价”的“LINQ标准方法”调用(如Where,Select等等)和“和Lambda表达式”参数(如x => x > 0)而已。但是这里的关键便是“字面等价”四个字,LINQ本身并不规定“LINQ标准方法”是对象的实例方法还是扩展方法,“Lambda表达式”是构造出一个匿名函数还是表达式树,这一切都是由利用LINQ的类库来决定的。因此,微软能够基于LINQ实现了PLINQ这样的并行类库。直到最近的访谈中,LINQ的设计者Erik Meijer依旧认为LINQ还是被低估了,他们自己也还在继续挖掘LINQ的更多能力。
虽然LINQ本身在语法上只是一些方法调用,但是它在语义上是针对数据流的一系列操作,因此LINQ to Object,LINQ to SQL以及PLINQ可以认为是最自然,最符合LINQ语义的应用。如今,微软的“云编程能力团队(Cloud Programmability Team)”在其“响应式框架(Reactive Framework,简称Rx)”提供了LINQ to Observable,这又是另一个LINQ的经典使用案例。IObservable本身可以被认为是一股“推送数据”的数据流,因此也可以对其进行“过滤”或是“投影”等操作,这便是LINQ的应用场景,LINQ to Observable是对IObservable接口实现的一系列LINQ标准方法。因此,我们可以认为,LINQ to Observable是一套与LINQ to Object对偶的类库,事实上在响应式框架中,还有一套与LINQ to Queryable对偶的LINQ to Qbservable,请注意第一个字母是Q,我们可以把它看作是LINQ to Observable的“远程查询”版本。
利用LINQ to Object可以编写出声明式(表示“做什么”)的代码,其可读性往往远高于等价的命令式(表示“怎么做”)代码。LINQ to Observable也有类似的效果。假设现在有一个需求:利用ADWS键控制小球的位置。传统的写法可能是这样的:
void OnKeyPress(object sender, KeyPressEventArgs e) { // 如果游戏已经开始 if (isPlaying) { // 向左且小球没有超出边界 if (e.KeyChar == 'a' && ball.Left > 0) { ball.Left -= 5; } // 向上且小球没有超出边界 else if (e.KeyChar == 'w' && ball.Top > 0) { ball.Top -= 5; } else ... } else ... }
由于KeyPress事件总是不断触发,因此我们只能它的事件处理器中进行判断各种状态,采取不同措施。而如果我们利用LINQ to Observable,则几乎是另外一种思维方式:
// 过滤出isPlaying时的keyPress事件 var keyPress = GetKeyPress().Where(_ => isPlaying); // 过滤出向左移动的事件 var moveLeft = from ev in keyPress where ev.EventArgs.KeyChar == 'a' where ball.Left > 0 select ev; moveLeft.Subscribe(_ => ball.Left -= 5); // 过滤出向上移动的事件 var moveTop = from ev in keyPress where ev.EventArgs.KeyChar == 'w' where ball.Top > 0 select ev; moveTop.Subscribe(_ => ball.Top -= 5);
我们可以将“KeyPress事件”视为“推送KeyPressEventArgs对象”这一数据流的数据源(由GetKeyPress方法返回),那么如今的代码便是使用LINQ过滤出“需要”的数据,并针对真正需要的那部分进行响应。这么做,便将“条件”与“操作”解耦,显著增强了代码的语义表达能力。事实上,只要补充一些辅助方法,可以利用LINQ表示更为完整复杂的逻辑。例如,微软咨询师Matthew Podwysocki便在博客中展示过一段代码,基于LINQ to Observable实现了创建一个WebRequest对象,设置属性,异步发送及下载数据的一系列操作。
更多扩展
.NET基础类库针对IEnumerable定义了大量的函数式的辅助方法,开发人员可以直接将它们组合运用在项目中。除了标准的LINQ操作方法之外,响应式框架中同样定义了大量辅助方法,可以配合LINQ to Observable组合使用。例如本文开头所设想的鼠标“拖动及绘图”功能,便可以使用如下代码完成:
var mouseMove = GetMouseMove(); var mouseDiff = mouseMove.Zip(mouseMove.Skip(1), (prev, curr) => new { PrevPos = new Point(prev.EventArgs.X, prev.EventArgs.Y), CurrPos = new Point(curr.EventArgs.X, curr.EventArgs.Y) }); var mouseDrag = from _ in GetMouseDown() from diff in mouseDiff.TakeUntil(GetMouseUp()) select diff; mouseDrag.Subscribe(diff => DrawLine(diff.PrevPos, diff.CurrPos));
在这段代码中,我们首先将mouseMove事件使用Skip跳开一项,再与自身通过Zip方法组合成mouseDiff,这是一个输出相邻两次MouseMove事件坐标的数据源;接着,我们利用LINQ从触发MouseDown事件开始,向mouseDiff数据源获取每一项diff,直至(TakeUntil)触发MouseUp事件,以此生成最终的mouseDrag;最后再将绘图功能订阅至这个数据源上。您会发现此时我们已经无须手动维护操作过程中的各种状态了,从事件的“开始”到“结束”均使用响应式框架的辅助方法“声明”而来。
以上便是一个利用了Skip,Zip,TakeUntil等辅助方法的例子。当然,这些辅助方法在IEnumerable上都有语义相同的对应操作,而在响应式框架中还有更多辅助方法是针对特性异步场景的。假设我们现在要编写一个即时翻译功能,同时发起三个请求,将中文分别翻译至英语、法语及西班牙语,并显示最先返回的两个结果(真是个奇怪的需求)。此外,我们不会在用户输入每个字符的时候便发起一个远程请求,而是在用户停止输入0.5秒之后才根据当前的输入框中的文字进行提示。于是我们可以编写这样的代码:
var limit = TimeSpan.FromSeconds(0.5); var translate = from _ in GetKeyPress().Throttle(limit) let text = this.txtInput.Text where text.Length > 0 let english = Bing.Translate(text, "en") let french = Bing.Translate(text, "fr") let spanish = Bing.Translate(text, "es") from result in Observable.Join( english.And(french).Then((en, fr) => new { English = en, French = fr, Spanish = "" }), english.And(spanish).Then((en, es) => new { English = en, French = "", Spanish = es }), french.And(spanish).Then((fr, es) => new { English = "", French = fr, Spanish = es })) select result; translate.Subscribe(...);
这里用到了Throottle方法,它会过滤某个数据源的输出,确保在该数据源“静默”特定时间之后,才将最近的一条数据推送至外部。此外,这里还使用了Observable.Join方法控制多个数据源,根据返回结果的先后获得合适的结果。响应式框架提供了大量针对某种异步场景的辅助方法,例如用于定期推送数据的Interval方法,从一个数据源根据特定条件进行采样的Sample方法,合并多个数据源的ForkJoin方法,以及表示流程控制的For,While,If等等。这些方法内部会维护各种所需要的状态,为我们打理各种复杂的竞争情况,以此节省了开发人员的精力。
如果这些还不能满足我们的要求,我们也可以根据自己的需要开发特定的辅助方法,就像我们在使用LINQ to Object时为IEnumerable所作的各种扩展那样。响应式框架也提供了一系列Subject类型,简化了IObservable自定义扩展的开发过程。由于响应式框架尚未正式发布,微软目前建立了一个Wiki,用于展示关于各辅助方法及Subject类的使用示例及其他相关信息。
响应式框架的JavaScript版本
响应式编程的重要使用场景之一便是与用户交互的GUI界面。例如,Silverlight禁止任何阻塞的IO操作,换言之Silverlight中的所有网络操作都是异步的,微软也正是出于简化异步开发的目的才设计了响应式框架(事实上响应式框架已经集成到Silverlight Toolkit中)。不过与Silverlight相比,基于浏览器的原生JavaScript应用程序无疑使用地更为广泛。对于这样的应用程序来说,动画是异步的,AJAX请求也是异步的,我们几乎可以断言,如果有一套面向JavaScirpt应用程序的响应式框架,一定会比面向Silverlight的框架更有意义得多。
微软也想到了这一点。之前我们讨论的“响应式框架”,其实只是响应式编程模型的一种实现。更确切地说,我们只是讨论了这套框架的.NET版本,微软还提供了JavaScript版本的响应式框架。JavaScript版本的API与.NET版本几乎完全一致,例如我们之前讨论的拖放操作,使用JavaScript即可写作:
var target = $("#dragTarget"); var mouseMove = target.toObservable("mousemove"); var mouseDiff = mouseMove.Zip(mouseMove.Skip(1), function(prev, curr) { return { PrevPos: { x: prev.clientX, y: prev.clientY }, CurrPos: { x: curr.clientX, y: curr.clientY } }; }); var mouseDown = target.toObservable("mousedown"); var mouseUp = target.toObservable("mouseup"); var mouseDrag = mouseDown.SelectMany(function() { mouseDiff.TakeUntil(mouseUp); }); mouseDrag.Subscribe(...);
由于没有C#中的LINQ查询语言,我们只能直接使用展开后的方法,如SelectMany来编写逻辑。JavaScript版本的响应式框架还提供了一系列的“胶合”层,能够与jQuery,Dojo,MooTools,Prototype等流行框架同时使用。例如,上一段代码中的toObservable便是在jQuery根对象上扩展的方法。
总结
异步编程在用户交互式界面及一些云计算场景中尤其重要。微软的云编程能力团队针对.NET平台和JavaScirpt分别提供了一套响应式框架,希望以此简化异步程序的开发。不过,这套响应式框架所表现出的理念是通用的。而且,事实上只要是拥有匿名函数及闭包的语言,例如Scala,Python,Ruby等等,实现这样一套框架其实都不是十分困难的事情。
原因是:程序没有记忆能力,不能记忆之前的操作和状态。假如程序有了记忆能力,能查询之前的状态,就能大大简化异步编程了。让程序拥有记忆的方法是,使用AOP为某些状态改变的操作增加监视,这样可以在程序中查询某个状态的变化。
于是可以这样定义(伪码),将一个消息序列定义为一个新消息,例如Drag消息。
drag = ( mousedown, mousemove, mousemove,... )
含义是,一个drag消息是由一个mousedown消息和后面数个mousemove消息组成的,而drag消息的参数自然就包括了第一个mousedown的x,y(起始位置),一系列的x,y(路径)所组成。
当序列后面出现了一个mouseup消息,drag消息就再也不能发出。剩下的工作,就是响应这个自定义的drag消息来绘图,而且,消息参数很丰富,所以撤销操作也变得简单了。
我这上面说的不知道c#F#里面能怎么搞,反正lisp和prolog我是搞得出来的。