Hello World
Spiga

F#中的异步及并行模式(3 - 上):代理的基本使用

2010-03-15 01:35 by 老赵, 3970 visits

在本系列的第3部分中,我们会来探索F#中轻量级的,交互式的代理,以及与代理有关的一些模式,包括“隔离的内部状态”。(译注:由于原文较长,因此译文分为两段,目前是第一段,讲解了F#中异步代理的基本使用方式。)

  • 第1部分描述了F#作为一个并行及异步语言,是如何支持轻量级的响应操作,并给出了CPU异步并行和I/O异步并行两种模式。
  • 第2部分描述了如何从异步计算或后台计算单元中获得结果。

模式4:您的第一个代理

我们来观察您所创建的第一个异步代理:

type Agent<'T> = MailboxProcessor<'T>
 
let agent =
   Agent.Start(fun inbox ->
     async { while true do
               let! msg = inbox.Receive()
               printfn "got message '%s'" msg } )

这个代理不断地异步等待消息,并将它们打印出来。在这段代码中,每个消息都是一个字符串,且agent的类型是:

agent.Post "hello!"

这便会打印出:

got message 'hello!'

也可以这样发送多条消息:

for i in 1 .. 10000 do
   agent.Post (sprintf "message %d" i)

这样便可以打印出10000条消息。

您可以认为每个代理对象都包含一个消息队列(或管道),并在消息到达时进行响应。一个委托一般都使用异步的循环等待来消息并进行处理。如在上面的例子中,代理使用while循环进行处理。

许多读者可能已经对代理颇为熟悉了。如Erlang,它便是基于代理设计的(在那里被称为进程)。而不久之前,一个基于.NET平台的实验性的孵化型语言,Axum,也注重了基于代理编程的重要性。Axum与F#中的代理设计相互影响,而其他包含轻量级线程的语言也强调了基于代理的组合与设计。

上面的例子一开始创建了一个类型的缩写:Agent,它代表了F#类库中基于内存的代理类型“MailboxProcessor”。如果您愿意的话也可以使用这个完整的名字,不过我更喜欢简单的命名。

您的第一批10万个代理

代理对象非常轻量,这是因为它基于F#的异步编程模型。例如,您可以在一个.NET进程中创建成百上千,甚至更多个代理。例如,我们来创建10万个简单的代理对象:

let agents =
    [ for i in 0 .. 100000 ->
       Agent.Start(fun inbox ->
         async { while true do
                   let! msg = inbox.Receive()
                   if i % 10000 = 0 then
                       printfn "agent %d got message '%s'" i msg } ) ]

您可以这样向每个代理对象发送消息:

for agent in agents do
    agent.Post "ping!

每第1万个代理对象会在收到消息时打印信息。这个代理集合在处理消息时非常迅速,只要几秒钟时间。代理和内存中的消息处理非常快。

很显然,代理并不与.NET线程直接对应──您不可能在单个应用程序中创建10万的线程(在32位操作系统中,即便1000个线程也已经太多了)。相反,在代理等待消息时,它实际上只是表现为一个回调函数,一些对象分配,以及代理所引用的闭包等等。在收到消息之后,代理的工作会在一个线程池(默认便是.NET线程池)中分配并执行。

尽管需要10万个代理的情况并不多见,不过2000多个代理倒是很正常的。接下来我们便会看到这样一些例子。

高伸缩的Web服务器处理请求

在F#编程中,异步代理的思想其实是一种在多个环境中反复出现的设计模式。在F#中,我们经常使用“代理”这个词表示一种随时发生的,特别是通过循环,或是处理消息,或是产生结果的异步计算。

例如,在以后的文章中,我们会来关注如何使用F#构建伸缩性强的TCP或HTTP服务器应用程序,并将它们部署到EC2或是Windows Azure中去。这里我们打算用“股票服务器”作为例子,它接受TCP或HTTP连接,并向客户端返回一系列的股票信息。每个客户端会每隔一秒钟收到一条股票信息。这个服务最终会以单个URL或REST API的形式发布。

在实现时,我们为每个客户端请求分配一个异步代理(由于只是演示,我们在这里便不断地写入相同的AAPL股票信息):

open System.Net.Sockets
 
/// serve up a stream of quotes
let serveQuoteStream (client: TcpClient) = async {
    let stream = client.GetStream()
    while true do
        do! stream.AsyncWrite( "AAPL 200.38"B )
        do! Async.Sleep 1000.0 // sleep one second}

每个代理会一直运行到客户端连接断开。因为代理非常轻量,因此这个股票服务能够在一台机器上支持数千个并发连接(如果使用云托管服务则会有更好的伸缩性)。而同一时刻会出现多少个代理对象则取决于客户端的数量。

上面的例子演示了使用F#进行网络编程是多么的方便──网络协议在此变成了基于异步代理的数据流读写。在以后的文章中我们会观察更多使用F#进行伸缩性强的TCP/HTTP编程。

代理与隔离状态(命令式)

F#代理编程的一个优秀的关键之处便是其隔离性。隔离性则意味着资源“归属”与某个特定的代理,而不会暴露给其他代理。因此,独立状态对并发的访问及数据竞争是一种良好的保护。

在F#中,异步代理的独立性直接表现为文法上的作用域。例如,下面的代码使用一个字典来累计发送至代理对象的不同消息的次数。内部的字典在文法上是异步代理私有的,因此我们无法在代理外部对字典进行读写。这意味着字典的可变状态实际上是被隔离的,代理保证了对它的非并发的安全访问。

type Agent<'T> = MailboxProcessor<'T>
 
open System.Collections.Generic
let agent =
   Agent.Start(fun inbox ->
     async { let strings = Dictionary<string,int>()
             while true do
               let! msg = inbox.Receive()
               if strings.ContainsKey msg then
                   strings.[msg] <- strings.[msg] + 1
               else
                   strings.[msg] <- 0
               printfn "message '%s' now seen '%d' times" msg strings.[msg] } )

状态隔离是F#的基本特性,它并不是代理所独有的。例如,下面的代码对StreamReader和ResizeArray(这是F#中对System.Collections.Generics.List的别称)的隔离访问。请注意这段代码和.NET类库中的System.IO.File.ReadAllLines方法功能相同:

let readAllLines (file:string) =
    use reader = new System.IO.StreamReader(file)
    let lines = ResizeArray<_>()
    while not reader.EndOfStream do
        lines.Add (reader.ReadLine())
    lines.ToArray()

在这里,reader和ResizeArray对象都无法在函数外部使用。在代理或其他持续计算的情况里,隔离性是至关重要的──状态在这里永远独立于程序运行中的其他部分。

说到底,隔离性是个动态的属性,它经常受到文法的约束。例如,考虑这样一个延迟的,随需加载的读取器,它会读取文件中的所有行:

let readAllLines (file:string) =
    seq { use reader = new System.IO.StreamReader(file)
          while not reader.EndOfStream do
              yield reader.ReadLine() }

隔离状态经常包含可变的值,包括F#中的引用单元。例如,下面的代码在一个引用单元中不断累计接受到的消息个数:

let agent =
   Agent.Start(fun inbox ->
     async { let count = ref 0
             while true do
               let! msg = inbox.Receive()
               incr count
               printfn "now seen a total of '%d' messages" !count } )

再次强调,这里可变的状态被隔离了,确保了对它单线程的安全访问。

在代理中使用循环和隔离状态(函数式)

F#程序员了解两种实现循环的方法:使用“命令式”的while/for以及可变的累加器(refmutable),或是“函数式”风格的调用,将累加状态作为参数传递给一个或多个递归函数。例如,计算文件行数的程序可以使用命令式的方式来写:

let countLines (file:string) =
    use reader = new System.IO.StreamReader(file)
    let count = ref 0
    while not reader.EndOfStream do
        reader.ReadLine() |> ignore
        incr count
    !count

或是递归式的:

let countLines (file:string) =
    use reader = new System.IO.StreamReader(file)
    let rec loop n =
        if reader.EndOfStream then n
        else
            reader.ReadLine() |> ignore
            loop (n+1)
    loop 0

在使用时,两种方式都是可行的:函数式的做法相对更加高级一些,但是大大减少了代码中显式的状态修改次数,且更为通用。两种写法对于F#程序员来说,一般都可以理解,他们还可以将“while”循环转化为等价的“let rec”函数(这是个不错的面试问题!)。

有趣的是,在编写异步循环时的规则也一样:您可以使用命令式的“while”或函数式的“let rec”中的任意一种来定义循环。例如,这里有一个利用递归的异步函数统计消息数量的做法:

let agent =
   Agent.Start(fun inbox ->
     let rec loop n = async {
         let! msg = inbox.Receive()
         printfn "now seen a total of %d messages" (n+1)
         return! loop (n+1)
     }
     loop 0 )

这样我们便可以获得这样的输出:

now seen a total of 0 messages
now seen a total of 1 messages
....
now seen a total of 10000 messages

再提一次,定义代理对象的两种常见模式为命令式的:

let agent =
   Agent.Start(fun inbox ->
     async {
         // isolated imperative state goes here
         ...
         while <condition> do
             // read messages and respond
             ...
     })

及函数式的:

let agent = 
    Agent.Start(fun inbox ->
      let rec loop arg1 arg2 = async { 
          // receive and process messages here
          ...
          return! loop newArg1 newArg2
       }

      loop initialArg1 initialArg2)

再次强调,两种方法在F#都是合理的──使用递归的异步函数可能是更高级的方法,但更为函数式且更为通用。

原文:Async and Parallel Design Patterns in F#: Agents

Creative Commons License

本文基于署名 2.5 中国大陆许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名赵劼(包含链接),具体操作方式可参考此处。如您有任何疑问或者授权方面的协商,请给我留言

Add your comment

17 条回复

  1. 老赵
    admin
    链接

    老赵 2010-03-15 01:43:00

    翻译的不是很好的样子

  2. 星染流云[未注册用户]
    *.*.*.*
    链接

    星染流云[未注册用户] 2010-03-15 04:22:00

    其实这么看倒是没觉得不好。

    例如,在以后的文章中,我们会来关注如何使用F#构建伸缩性强的TCP或HTTP服务器应用程序,并将它们部署到EC2或是Windows Azure中去。


    这周末本来想去试试在Azure上部署F#,却发现免费测试已经结束服务开始收费了,只能等以后有机会再说了。

  3. 苏飞
    *.*.*.*
    链接

    苏飞 2010-03-15 07:05:00

    从来没有坐过赵哥的沙发,今天 可能是赶上了,呵呵F#我不懂,Mark

  4. 兴说:
    *.*.*.*
    链接

    兴说: 2010-03-15 08:53:00

    嗯...感觉使用代理,就跟Erlang上面简单的使用PingPong一样。

  5. 老赵
    admin
    链接

    老赵 2010-03-15 09:13:00

    兴说::嗯...感觉使用代理,就跟Erlang上面简单的使用PingPong一样。


    满足PingPong的方式有许多,几乎每种语言/平台都可以,但是这太简单了,不能代表真正的生产能力,PingPong只是用来测试基本调度性能的。

  6. xiao_p
    *.*.*.*
    链接

    xiao_p 2010-03-15 10:40:00

    @Jeffrey Zhao
    前几天看无废话erlang,发现用的是进程而不是代理,这两个应该是一个意思吧,感觉进程更好理解点,而代理总是让我想到delegate。。。

  7. xiao_p
    *.*.*.*
    链接

    xiao_p 2010-03-15 10:42:00

    话说老外的文章看起来就是累,有条理性的还好,像这样的文章,即使翻译的不错,来来去去看的也别扭死了。

    很多原理性的东西就没交代清楚,如果不是对f#有一些了解的,看起来简直太累了。。。

  8. 老赵
    admin
    链接

    老赵 2010-03-15 10:42:00

    @xiao_p
    一个意思,delegate是委托吧。代理是agent或proxy。
    进程更容易混,你看哪本将进程的书没有强调一下它不是操作系统进程啊。

  9. xiao_p
    *.*.*.*
    链接

    xiao_p 2010-03-15 10:48:00

    Jeffrey Zhao:
    @xiao_p
    一个意思,delegate是委托吧。代理是agent或proxy。
    进程更容易混,你看哪本将进程的书没有强调一下它不是操作系统进程啊。



    我明白这里翻译成进程肯定也很容易让人产生误解,但是,总感觉代理也挺别扭的。

    估计是我的执念?,,呵呵

  10. 老赵
    admin
    链接

    老赵 2010-03-15 10:57:00

    @xiao_p
    嗯,这几篇就是讲怎么用,而不是讲原理的……不过其实原理很简单,以前我也提到过。

  11. xiao_p
    *.*.*.*
    链接

    xiao_p 2010-03-15 12:31:00

    F# 貌似现在最大的问题是没有大的应用可以参考,这个甚至还不如scala,至少twitter还用它开发了点东西。

    我在codeplex上搜了下F#的应用,也是寥寥无几。

    这从老赵的这几篇关于f#的blog在博客园的关注程度上也可见一斑。

    不过,我相信这样的情况会有所改善,尤其是vs2010RC的发布,我身边的人都是选择vs2010而不是用vs2008来开发F#了,F#也终于成为了dotnet平台的主流语言了。

  12. 老赵
    admin
    链接

    老赵 2010-03-15 13:29:00

    @xiao_p
    F#是新语言,正式版还没出呢,呵呵。
    Scala至少也有个5、6年了,这方面肯定没得比。

  13. heaiping
    *.*.*.*
    链接

    heaiping 2010-03-15 20:41:00

    F#是4.0里面的东西,安装个VS 2010就都有了,对于做商业软件的人来说,个人感觉他目前只是一个小小的玩具,爱好而已

  14. 老赵
    admin
    链接

    老赵 2010-03-15 22:42:00

    @heaiping
    F#在.NET 3.5和mono上都可以编译及运行。
    对于做商业软件的人来说,生产力就代表了金钱,还是应该关注这些东西的。

  15. heaiping
    *.*.*.*
    链接

    heaiping 2010-03-15 23:26:00

    Jeffrey Zhao:
    @heaiping
    F#在.NET 3.5和mono上都可以编译及运行。
    对于做商业软件的人来说,生产力就代表了金钱,还是应该关注这些东西的。


    恩,其实我的意思就是说他目前应该还不是很成熟吧,说实话,我目前所做的东西基本都在2.0-3.0之间呢,一天忙啊,好多东西都是赶鸭子上架的时候才开始学习的,感觉很疲惫,很无奈的,睡觉了,老赵多搞点好东西出来,我不一定要学,至少我会了解一下

  16. 老赵
    admin
    链接

    老赵 2010-03-15 23:36:00

    @heaiping
    其实真已经很成熟了。语言一般分两部分,编译器及类库,更主要是后者(因为前者基本不会出问题)。
    F#的编译器搞了4、5年了,不会有bug,而类库么其实完全就是.net fx啊,呵呵。

  17. heaiping
    *.*.*.*
    链接

    heaiping 2010-03-16 09:28:00

    @Jeffrey Zhao
    我今天就看看,我最近安装了vs 2010 里面就有呢,谢谢老赵,我是一位毕业不到两年的小小菜

发表回复

登录 / 登录并记住我 ,登陆后便可删除或修改已发表的评论 (请注意保留评论内容)

昵称:(必填)

邮箱:(必填,仅用于Gavatar

主页:(可选)

评论内容(大于5个字符):

  1. Your Name yyyy-MM-dd HH:mm:ss

使用Live Messenger联系我