协程(二):协程的应用

发布时间:2011-09-02 09:39:59,
关注:+1400,
评论:+0,
赞美:+7,
不爽:+1

本文标签:
协程
lua

原始出处:
Indie 之路

# 感谢“书香”推荐: @shuxiang29

上一篇中对协程的概念做出了解释和澄清。总的来说,完全协程才算得上是真正意义上的协程,其它如生成器等只是部分实现了协程概念的非完全协程,我们之后主要讨论完全协程。

本篇介绍一些协程的实际应用。协程本质是一种控制抽象,它的价值在于可以简洁优雅地实现一些控制行为。在协程中,控制可以从当前执行上下文跳转到程序的其它位置,并且可以在之后的任意时刻恢复当前执行上下文,控制从跳出点处继续执行。这种行为与Continuation类似,但协程相比之下更容易理解,某些情况下还更加高效。之后会有一篇专门对比这两个概念。

生产者-消费者模型

在前一篇中已有提及,这是协程最典型也最常见的应用场景。Conway提出协程这个概念时所解决的编译器问题就属于生产者-消费者问题。

管道(Pipeline)也是由生产者-消费者模型扩展而来的,管道实际上是由一个生产者,加上一个或多个过滤器(Filter),再加上一个最终的消费者组成的生产者-消费者链。其中过滤器既是生产者又是消费者。以下是由Lua实现的过滤器:

function filter(ant)

return coroutine.wrap(function())

while true do

-- resume antecessor to obtain value

local x = ant()

-- yield transformed value

coroutine.yield(f(x))

end

end

end

只需要一行语句把生产者、过滤器和消费者串联起来就可以创建出一个管道:

consumer(filter(producer()))

生成器(Generator)

生成器实际上可以看作只有生产者的生产者-消费者模型。生成器的主要用途就是实现迭代器(Iterator)。需要提到的一点就是目前大多数语言提供的生成器都是非栈式(Stackful)的,即不能在嵌套调用中直接yield,这样在实现复杂的生成器时会非常麻烦。比如要实现一个二叉树的先序遍历,先看Lua实现:

function preorder(node)

if node then

preorder(node.left)

coroutine.yield(node.key)

preorder(node.right)

end

end

-- create an iterator

function preorder_iterator(tree)

return coroutine.wrap(function()

preorder(tree)

return nil

end)

end

简洁而直接,因为Lua的协程是完全协程,允许在嵌套调用中yield。如果用C#实现:

public static class BinaryTree<T>

{

static IEnumerable<T> Preorder(Node<T> node)

{

if (node != null) {

foreach (var key in Preorder(node.left))

yield return key;

yield return node.key;

foreach (var key in Preorder(node.right))

yield return key;

}

}

public static IEnumerable<T> PreorderIterator(Node<T> tree)

{

foreach (var key in Preorder(tree))

yield return key;

}

}

可以看到,在递归的每一级都需要一个迭代块向上一级yield,直到最顶层。如果生成器更加复杂,比如需要调用一系列辅助方法,那么在每个调用点处都要增加机械的yield代码。

目标导向(Goal-oriented)编程

目标导向编程是指模式匹配(Pattern-matching)或Prolog的查询这样的系统,由用户提出一个形式化定义的目标(Goal),系统会在一系列可选的子目标中寻找直到确认一个解决方案。在寻找过程中常常会需要回溯(Backtracking)机制,这种机制使用完全非对称协程作为生成器很容易实现。

例如用Lua实现一个模式匹配系统,支持匹配常量、可选和序列三种模式的组合:

-- matching a string literal

function prim(str)

return function(S, pos)

local len = string.len(str)

if string.sub(S, pos, pos+len-1) == str then

coroutine.yield(pos + len)

end

end

end

-- alternative patterns (disjunction)

function alt(patt1, patt2)

return function(S, pos)

patt1(S, pos)

patt2(S, pos)

end

end

-- sequence of sub-patterns (conjuction)

function seq(patt1, patt2)

return function(S, pos)

local btpoint = coroutine.wrap(function()

patt1(S, pos)

end)

for npos in btpoint do

patt2(S, npos)

end

end

end

function match(S, patt)

local len = string.len(S)

local m = coroutine.wrap(function() patt(S, 1) end)

for pos in m do

if pos == len+1 then

return true

end

end

return false

end

使用时形如("abc"|"de")."x"的目标可以定义如下:

patt = seq(alt(prim("abc"), prim("de")), prim("x"))

在进行匹配时这个目标被封装到一个协程中,通过循环逐一尝试匹配每个子目标。实现中的每个模式函数接受目标字符串和起始位置作为参数,匹配成功时yield下一个位置给后续匹配,无法继续匹配时返回nil。我尝试了一下使用C#来实现上述代码,得到的编译错误是:不能在匿名方法或lambda表达式内部使用yield语句。于是我便没有继续了,因为已经可以想象最后的完成代码会有多臃肿。这是C#生成器的又一使用限制,的确非常影响表达力。

协作式多任务(Cooperative multitasking)

并发编程所涉及的范围太广,这里仅讨论线程与协程的对比。在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。

由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点),因此非常容易使用。下面是一段Lua实现多任务的代码:

-- list of "live" tasks

tasks = {}

-- create a task

function create_task(f)

local co = coroutine.wrap(function() f(); return "ended" end)

table.insert(tasks, co)

end

-- task dispatcher

function dispatcher()

while true do

local n = table.getn(tasks)

if n == 0 then break end -- no more tasks to run

for i = 1, n do

local status = tasks[i]()

if status == "ended" then

table.remove(tasks, i)

break

end

end

end

end

协作式多任务的缺点之一是进行阻塞(Blocking)操作如IO时会阻塞掉整个程序,解决方案是提供一个辅助函数,初始化IO操作之后如果操作不能立即完成就挂起当前协程,Programming in Lua中给出了一个多任务下载的例子:

function download(host, file)

local c = assert(socket.connect(host, 80))

local count = 0

c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")

while true do

local s, status, partial = receive(c)

count = count + #(s or partial)

if status == "closed" then break end

end

c:close()

print(file, count)

end

function receive(connection)

connection:settimeout(0)

local s, status, partial = connection:receive(2^10)

if status == "timeout" then

coroutine.yield(connection)

end

return s or partial, status

end

threads = {}

function get(host, file)

local co = coroutine.create(function()

download(host, file)

end)

table.insert(threads, co)

end

function dispatch()

local i = 1

while true do

if threads[i] == nil then

if threads[1] == nil then break end

i = 1

end

local status, res = coroutine.resume(threads[i])

if not res then

table.remove(threads, i)

else

i = i + 1

end

end

end

这个例子把每个下载任务放在一个协程中,通过settimeout(0)使获取操作不会阻塞,再由调度函数逐一唤醒协程执行,如果获取还未完成就挂起,直到所有下载完成。

协作式多任务的另一个缺点是无法利用多核资源,这一点我认为我们日常所编写的绝大部分应用都没有这个必要,除非是实时性要求非常高(如操作系统)或需要尽可能地榨取机器性能(如Web服务器)的情况下(即使这时多线程也并非唯一选择)。所以在你打算使用多线程时先认真考虑一下是否协程就已经足够。在这篇采访中也有一些关于协程和并发的观点供参考。

异常处理

支持异常处理的语言需要实现两个基础原语:try和raise。try原语包含两项表达:主体和异常处理器。若主体正常返回,则返回值作为try的值,忽略异常处理器。若主体遇到异常条件,则引发(raise)一个异常并立即送给异常处理器,主体的剩余部分被忽略。异常处理器可以返回一个值作为try的值,或重新引发一个异常给更外层的异常处理器。

使用完全非对称协程来实现异常处理很简单:try原语由一个函数实现,此函数接受两个函数(主体和异常处理器)作为参数,然后在一个协程中执行主体函数;raise原语也是一个函数,在其中yield出一个异常即可。

总结

协程还适于实现状态机(每对入口/出口点表现一个状态),参与者模型(Actor model)(实质上也是协作式多任务)等。重要的是理解协程所表达的控制抽象,在此之上能够灵活运用的话,原本一些棘手的控制问题也许就可以简洁优雅地实现出来。

function filter(ant)

return coroutine.wrap(function())

while true do

-- resume antecessor to obtain value

local x = ant()

-- yield transformed value

coroutine.yield(f(x))

end

end

end

consumer(filter(producer()))

function preorder(node)

if node then

preorder(node.left)

coroutine.yield(node.key)

preorder(node.right)

end

end

-- create an iterator

function preorder_iterator(tree)

return coroutine.wrap(function()

preorder(tree)

return nil

end)

end

public static class BinaryTree<T>

{

static IEnumerable<T> Preorder(Node<T> node)

{

if (node != null) {

foreach (var key in Preorder(node.left))

yield return key;

yield return node.key;

foreach (var key in Preorder(node.right))

yield return key;

}

}

public static IEnumerable<T> PreorderIterator(Node<T> tree)

{

foreach (var key in Preorder(tree))

yield return key;

}

}

-- matching a string literal

function prim(str)

return function(S, pos)

local len = string.len(str)

if string.sub(S, pos, pos+len-1) == str then

coroutine.yield(pos + len)

end

end

end

-- alternative patterns (disjunction)

function alt(patt1, patt2)

return function(S, pos)

patt1(S, pos)

patt2(S, pos)

end

end

-- sequence of sub-patterns (conjuction)

function seq(patt1, patt2)

return function(S, pos)

local btpoint = coroutine.wrap(function()

patt1(S, pos)

end)

for npos in btpoint do

patt2(S, npos)

end

end

end

function match(S, patt)

local len = string.len(S)

local m = coroutine.wrap(function() patt(S, 1) end)

for pos in m do

if pos == len+1 then

return true

end

end

return false

end

patt = seq(alt(prim("abc"), prim("de")), prim("x"))

-- list of "live" tasks

tasks = {}

-- create a task

function create_task(f)

local co = coroutine.wrap(function() f(); return "ended" end)

table.insert(tasks, co)

end

-- task dispatcher

function dispatcher()

while true do

local n = table.getn(tasks)

if n == 0 then break end -- no more tasks to run

for i = 1, n do

local status = tasks[i]()

if status == "ended" then

table.remove(tasks, i)

break

end

end

end

end

function download(host, file)

local c = assert(socket.connect(host, 80))

local count = 0

c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")

while true do

local s, status, partial = receive(c)

count = count + #(s or partial)

if status == "closed" then break end

end

c:close()

print(file, count)

end

function receive(connection)

connection:settimeout(0)

local s, status, partial = connection:receive(2^10)

if status == "timeout" then

coroutine.yield(connection)

end

return s or partial, status

end

threads = {}

function get(host, file)

local co = coroutine.create(function()

download(host, file)

end)

table.insert(threads, co)

end

function dispatch()

local i = 1

while true do

if threads[i] == nil then

if threads[1] == nil then break end

i = 1

end

local status, res = coroutine.resume(threads[i])

if not res then

table.remove(threads, i)

else

i = i + 1

end

end

end

发表回复