Lua协程详解:多线程模拟与非对称协程的实现 | 南锋

南锋

南奔万里空,脱死锋镝余

Lua协程详解:多线程模拟与非对称协程的实现

协程可以颠倒调用者和被调用者的关系,而且这种灵活性解决了软件架构中被称为“谁是老大”或者”谁拥有主循环“的问题。这正是对诸如事件驱动编程、通过构造器构建迭代器和协作式多线程等几个看上去并不相关的问题的泛化,而协程以简单和高效的方式解决了这些问题。

从多线程的角度看,协程与线程类似:协程是一系列的可执行语句,拥有自己的栈、局部变量和指令指针,同时协程又与其他协程共享了全局变量和其他几乎一切资源。线程与协程的主要区别在于,一个多线程程序可以并行运行多个线程,而协程却需要彼此协作地运行,即在任意指定的时刻只能有一个协程运行。且只有当正在运行的协程显式地要求被挂起时其执行才会暂停。

协程基础

Lua语言中协程相关的所有函数都被放在表coroutine中。函数create用于创建新协程,该函数只有一个参数,即协程要执行的代码的函数。函数create返回一个”thread”类型的值,即新协程。通常,函数create的参数是一个匿名函数,例如:

1
2
co = coroutine.create(function()print("hi")end)
print(type(co)) -- thread

一个协程有以下四种状态,即挂起、运行、正常和死亡。我们可以通过函数coroutine.status来检查协程的状态:

1
print(coroutine.statye(co))		-- suspended

如果在交互模式下运行上述代码,最好在最后一行加上一个分号来阻止输出函数resume的返回值。在上例中,协程体只是简单地打印了”hi”后便终止了,然后协程就编程了死亡状态:

1
print(coroutine.status(co)) -- end

到目前为止,协程看上去也就是一种复杂的调用函数的方式。协程的真正强大之处在于函数yield,函数可以让一个运行中的协程挂起自己,然后在后续恢复运行。例如下面这个简单示例:

1
2
3
4
5
6
co = coroutine.create(function()
for i = 1, 10 do
print("co",i)
coroutine.yield()
end
end)

其中,协程进行了一个循环,在循环中输出数字并在每次打印后挂起。当唤醒协程后,它就会开始执行直到遇见第一个yield:

1
coroutine.resume(co)	-- co 	1

此时,如果我们查看协程状态,会发现协程处于挂起状态,因此可以再次恢复运行:

1
print(coroutine.status(co))			-- suspended

从协程的角度看,在挂起期间发生的活动都发生协程调用yield期间。当我们唤醒协程时,函数yield才会最终返回,然后协程会继续执行直到遇到下一个yield或执行结束:

1
2
3
4
5
coroutine.resume(co)	-- co 2
coroutine.resume(co) -- co 3
...
coroutine.resume(co) -- co 10
coroutine.resume(co) -- 不输出任何数据

在最后一次调用resume时,协程体执行完毕并返回,比输出任何数据。如果我们试图再次唤醒它,函数resume将返回false及一条错误信息:

1
2
print(coroutine.resume(co))
-- false cannot resume dead coroutine

请注意,像函数pcall一样,函数resume也运行在保护模式中。因此,如果协程在执行中出错,Lua语言不会显示错误信息,而是将错误信息返回给函数resume。
当协程A唤醒协程B时,协程A既不是挂起状态,也不是运行状态。所以,协程A此时的状态就被称为正常状态。
Lua语言中一个非常有用的机制是通过一对resume-yield来交换数据。第一个resume函数会把所有的额外参数传递给协程的主函数:

1
2
3
4
co = coroutine.create(function(a,b,c)
print("co",a,b,c+2)
end)
coroutine.resume(co,1,2,3) -- co 1 2 5

在函数coroutine.resume的返回值中,第一个返回值为true时表示没有错误,之后的返回值对应函数yield的参数:

1
2
3
4
co = coroutine.create(function(a,b)
coroutine.yield(a+b,a-b)
end)
print(coroutine.resume(co,20,10)) -- true 30 10

与之对应的是,函数 coroutine.yield的返回值是对应的resume的参数:

1
2
3
4
5
6
7
co = coroutine.create(function(x)
print("co1",x)
print("co2",coroutine.yield())
end)

coroutine.resume(co,"hi") -- co1 hi
coroutine.resume(co,4,5) -- co2 4 5

最后,当一个协程运行结束时,主函数所返回的值都将变成对应函数resume的返回值:

1
2
3
4
co = coroutine.create(function()
return 6,7
end)
print(coroutine.resume(co)) -- true 6,7

我们很少在同一个协程中用到所有这些机制,但每种机制都有各自的用处。
虽然协程的概念很容易理解,但涉及的细节其实有很多。因此,对于那些已经对协程有一定了解的读者来首,有必要再进行进一步学习前先理清一些细节。Lua语言提供的是所谓的非对称协程,也就是说需要两个函数来控制协程的执行,一个用于挂起协程的执行,另外一个用于恢复协程的执行。而其他一些语言提供的是对称协程,只提供一个函数用于在一个协程和另一个协程之间切换控制权。
一些人将非对称协程称为semi-coroutines。然而,其他人则用相同的术语半协程表示协程的一种受限制版实现。在这种实现中,一个协程只能在它没有调用其他函数时才可以挂起,即在调用栈中没有挂起的调用时。换句话说,只有这种半协程的主函数才能让出执行权。

哪个协程占据主循环

有关协程的最经典实例之一就是生产者-消费者问题。在生产者-消费者问题中设计两个函数,一个函数不断产生值,另一个函数不断地消费这些值。这两个函数可能形式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
function producer()
while true do
local x = io.read() -- 产生新值
send(x) -- 发给消费者
end
end

function consumer( )
while true do
local x = receive() -- 接收来自生产者的值
io.write(x,"\n") -- 消费
end
end

为了简化这个示例,生产者和消费者都是无限循环的;不过,可以很容易地将其修改为没有数据需要处理时退出循环。这里的问题在于如何将send与receive匹配起来,也就是”谁占据主循环“问题的典型示例。其中,生产者和消费者都处于活跃状态,它们各自具有自己的循环,并且都将对方视为一个可调用的服务。对于这个特定的示例,可以很容易地修改其中一个函数的结构,展开它的循环使其成为一个被动的代理。不过,在其他的真实场景下,这样的代码结构改动可能会很不容易。
由于成对的resume-yield可以颠倒调用者与被调用者之间的关系,因此协程提供了一种无须修改生产者和消费者的代码结构就能匹配它们执行顺序的理想工具。当一个协程调用函数yield时,它不是进入了一个新函数,而是返回一个挂起的调用。同样地,对函数resume的调用也不会启动一个新函数,而是返回一个对函数yield的调用。这种特性正好可以用于匹配send和receive,使得双方都认为自己是主动方而对方是被动方。因此,receive唤醒生产者的执行使其能生成一个新值,然后send则让出执行权,将生成的值传递给消费者:

1
2
3
4
5
6
7
8
function receive()
local status, value = coroutine.resume(producer)
return value
end

function send(x)
coroutine.yield(x)
end

当然,生产者现在必须运行在一个协程里:

1
producer - coroutine.create(producer)

在这种设计中,程序通过调用消费者启动。当消费者需要新值时就唤醒生产者,生产者向消费者返回新值后挂起,直到消费者再次将其唤醒。因此,我们将这种设计称为消费者驱动式的设计。另一种方式则是使用生产者驱动式的设计,其中消费者是协程。虽然上述两种设计思路看上去是相反的,但实际上它们的整体思路相同。
我们可以使用过滤器来扩展上述设计。过滤器位于生产者和消费者之间,用于完成一些对数据进行某种变换的任务。过滤器即是一个消费者又是一个生产者,它通过唤醒一个生产者来获得新值,然后又将变换后的值传递给消费者。例如,我们可以在前面代码中添加一个过滤器以实现在每行的起始处插入行号:

示例 使用过滤器的生产者和消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
function receive(prod)
local status, value = coroutine.resume(prod)
return value
end

function send(x)
coroutine.yield(x)
end

function producer()
return coroutine.create(function()
while true do
local x = io.read()
send(x)
end
end)
end
function filter(prod)
return coroutine.create(function()
for line = 1,math.huge do
local x = receive(prod)
x = string.format("%5d %s",line,x)
send(x)
end
end)
end

function consumer(prod)
while true do
local x = receive(prod)
io.write(x,"\n")
end
end

consumer(filter(producer()))

代码的最后一行只是简单地创建出所需的各个组件,将这些组件连接在一起,然后启动消费者。

将协程用作迭代器

我们可以讲循环迭代器视为生产者-消费者模式的一种特例:一个迭代器会生产由循环体消费的内容。因此,用协程来实现迭代器看上去就很合适。的确。协程为实现这类任务提供了一种强大的工具。同时,协程最关键的特行是能够颠倒调用者与被调用者之间的关系。有了这种特性,我们在编写迭代器时就无须担心如何保存连续调用之间的状态了。
为了说明这类用途,让我们编写一个遍历指定数组所有排列的迭代器。要直接编写这种迭代器并不容易,但如果要编写一个递归函数来产生所有的排列则不是很难。思路很简单,只要依次将每个数组元素放到最后一个位置,然后递归地生成其余元素的所有排列即可。

示例 一个生成排列的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
function permgen(a,n)
n = n or #a
if n <= 1 then
printResult(a)
else
for i = 1, n do
a[n],a[i] = a[i], a[n]
permgen(a,n-1)
a[n],a[i] = a[i], a[n]
end
end
end

-- 还需要定义一个合适的函数printResult来输出结果,并使用恰当的参数调用premgen
function printResult(a)
for i = 1, #a do io.write(a[i],"") end
io.write("\n")
end

permgen({1,2,3,4})
-- 2 3 4 1
-- 3 2 4 1
-- 3 4 2 1
...
-- 2 1 3 4
-- 1 2 3 4

当有了生成器之后,将其转换为迭代器就很容易了。首先,我们把printResult改为yield:

1
2
3
4
5
6
function permgen(a,n)
n = n or #a
if n <= 1 then
coroutine.yield(a)
else
同前

然后,我们定义一个将生成器放入协程运行并创建迭代函数的工厂。迭代器只是简单地唤醒协程,让其生产下一个排列:

1
2
3
4
5
6
7
function permutations(a)
local co = coroutine.create(function()permgen(a) end)
return function()
local code ,res = coroutine.resume(co)
return res
end
end

有了上面的这些,在for循环中遍历一个数组的所有排列就非常简单了:

1
2
3
4
5
6
7
8
9
10
for p in permutations{"a","b","c"} do
printResult(p)
end

-- b c a
-- c b a
-- c a b
-- a c b
-- b a c
-- a b c

函数permutations使用了Lua语言中一种常见的模式,就是唤醒对应协程的调用包装在一个函数中。由于这种模式比较常见,所以Lua语言专门提供了一种特殊的函数coroutine.wrap来完成这个功能。与函数create类似,函数wrap也用来创建一个新的协程。但不同的是,函数wrap返回的不是协程本身而是一个函数,当这个函数被调用时会唤醒协程。与原始的函数resume不同,该函数的第一个返回值不是错误代码,当遇到错误时该函数会抛出异常,我们可以使用函数wrap改写permutations:

1
2
3
function permutations(a)
return coroutine.wrap(function() permgen(a) end)
end

通常,函数coroutine.wrap比函数coroutine.create更易于使用。它为我们提供了对于操作协程而言所需的功能,即一个唤醒协程的函数。不过,该函数缺乏灵活性。我们无法检查通过函数wrap所创建的协程的状态,也无法检查运行的异常。

事件驱动式编程

虽然第一眼看上去不是特别明显,但实际上传统的时间驱动编程伴随的典型问题就是衍生自who-is-the-boss问题。
在典型的事件驱动平台下,一个外部的实体向我们程序中所谓的事件循环或运行循环生成事件。这里,我们的代码很明显不是主循环。我们的程序变成了事件循环的附属品,使得我们的程序成为了一组无须任何显式关联的、相互独立的事件处理程序的集合。
再举一个更加具体的例子,假设有一个与libuv类似的异步I/O库,该库中有四个与我们的示例有关的函数:

1
2
3
4
lib.runloop();
lib.readline(stream,callback);
lib.writeline(stream,line,callback);
lib.stop();

第一个函数运行事件循环,在其中处理所有发生的事件并调用对应的回调函数。一个典型的事件驱动程序初始化某些机制然后调用这个函数,这个函数就变成了应用的主循环。第二个函数指示库从指定的流中读取一行,并在读取完成后带着读取的结果调用指定的回调函数。第三个函数与第二个函数类似,只是该函数写入一行。最后一个函数打破事件循环,通常用于结束程序。

示例 异步I/O库的简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
local cmdQueue = {}		-- 挂起操作的队列
local lib = {}

function lib.readline (stream,callback)
local nextCmd = function()
callback(stream:read())
end
table.insert(cmdQueue,nextCmd)
end

function lib.writeline(stream,line,callback)
local nextCmd = function()
callback(stream:write(line))
end
table.insert(cmdQueue,nextCmd)
end

function lib.stop()
table.insert(cmdQueue,"stop")
end

function lib.runloop()
while true do
local nextCmd = table.remove(cmdQueue,1)
if nextCmd == "stop" then
break
else
nextCmd()
end
end
end
return lib

上述代码是一种简单而丑陋的实现。该程序的”事件队列“实际上是一个由挂起操作组成的列表,当这些操作被异步调用时会产生事件。尽管很丑陋,但该程序还是完成了之前我们提到的功能,也使得我们无须使用真实的异步库就可以测试接下来的例子。
现在,让我们编写一个使用这个库的简单程序,这个程序把输入流中的所有行读取到一个表中,然后再逆序将其写到输出流中。如果使用同步I/O,那么代码可能如下:

1
2
3
4
5
6
7
8
9
10
11
local t = {}
local inp = io.input()
local out = io.output()

for line in inp:lines() do
t[#t + 1] = line
end

for i = #t, 1 , -1 do
out:write(t[i],"\n")
end

现在,让我们再使用异步I/O库按照时间驱动的方式重写这个程序。

示例 使用事件驱动方式逆序一个文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
local lib = require "async-lib"
local t = {}
local inp = io.input()
local out = io.output()
local i

-- 写入行的时间处理函数
local function putline()
i = i - 1
if i == 0 then
lib.stop()
else
lib.writeline(out,t[i] .. "\n",putline)
end
end

-- 读取行的事件处理函数
local function getline (line)
if line then
t[#t + 1] = line
lib.readline(inp,getline)
else
i = #t + 1
putline()
end
end

lib.readline(inp,getline)
lib.runloop()

作为一种典型的事件驱动场景,由于主循环位于库中,因此所有的循环都消失了,这些循环被以事件区分的递归调用所取代。尽管我们可以通过使用闭包以后续传递风格进行改进,但仍然不能编写我们自己的循环。如果要这么做,那么必须通过递归来重写。
协程可以让我们使用事件循环来简化循环的代码,其核心思想是使用协程运行主要代码,即在每次调用库时将回调函数设置为唤醒协程的函数然后让出执行权。

示例 是用异步库运行同步代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
local lib = require "async-lib"

function run(code)
local co = coroutine.wrap(function()
code()
lib.stop()
end)
co()
lib.runloop()
end

function putline (stream,line)
local co = coroutine.running()
local callback = (function() coroutine.resume(co) end)
lib.writeline(stream,line,callback)
coroutine.yield()
end

function getline(stream,line)
local co = coroutine.running()
local callback = (function(l) coroutine.resume(co,l) end)
lib.readline(stream,callback)
local line = coroutine.yield()
return line
end

顾名思义,run函数运行通过参数传入的同步代码。该函数首先创建一个协程来运行指定的代码,并在完成后停止事件循环。然后,该函数唤醒协程,进入事件循环。
函数getline和putline模拟了同步I/O。正如之前强调的,这两个函数都调用了恰当的异步函数,这些异步函数被当做唤醒调用协程的回调函数传入。之后,异步函数挂起,然后将控制权返回给事件循环。一旦异步操作完成,事件循环就会调用函数来唤醒触发异步函数的协程。
使用这个库,我们可以在异步库上运行同步代码了。如下示例再次实现了逆序行的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
run(function()
local t = {}
local inp = io.input()
local out = io.output()

while true do
local line = getline(inp)
if not line then break end
t[#t + 1] = line
end

for i = #t, 1 ,-1 do
putline(out,t[i] .. "\n")
end
end)

除了使用get/putline来进行I/O操作和运行在run以内,上述代码与之前的同步示例等价。在同步代码结构的外表之下,程序其实是以事件驱动模式运行的。

+