Lua使用协程实现多线程

协程能够实现一种协作式多线程。每个协程都等价于一个线程。一对yield-resume可以将执行权在不同线程之间切换。

不过,与普通的多线程的不同,协程是非抢占的。当一个协程正在运作时,是无法从外部停止它的。只有当协程显式地要求时它才会挂起执行。对于有些应用而言,这并没有问题,而对于另外一些应用则不行。当不存在抢占时,编程简单得多。由于在程序中所有的线程间同步都是显式的,所以我们无须为线程同步问题抓狂,只需要确保一个协程只在它的临界区之外调用yield即可。

不过,对于非抢占式多线程来说,只要有一个线程调用了阻塞操作,整个程序在该操作完成前都会阻塞。对于很多应用来说,这种行为是无法接受的,而这也正是导致许多程序员不把协程看作传统多线程的一种实现的原因。
让我们假设一个典型的多线程的场景:我们希望通过HTTP下载多个远程文件。为了下载多个远程文件,我们必须先知道如何下载一个远程文件。要下载一个文件,必须先打开一个到对应站点的连接,然后发送下载文件的请求,接收文件,最后关闭连接。在Lua语言中,可以按以下步骤来完成这项任务。首先,加载LuaSocket库:

1
local socket = require "socket"

然后,定义主机和要下载的文件。在本例中,我们从Lua语言官网下载Lua5.3手册:
1
2
host = "www.lua.org"
file = "/manual/5.3/manual.html"

接下来,打开一个TCP连接,连接到站点的80端口:
1
c = assert(socket.connect(host,80))

这步操作返回一个连接对象,可以用它来发送下载文件的请求:
1
2
local request = string.format("GET %s HTTP/1.0\r\nhost: %s\r\n\r\n",file,host)
c:send(request)

接下来,以1KB为一块读取文件,并将每块写入到标准输出中:
1
2
3
4
repeat
local s ,status,partial = c:receive(2^10)
io.write(s or partial)
until status == "closed"

函数receive要么返回它读取到的字符串,要么在发生错误时返回nil外加错误码及出错前读取到的内容。当主机关闭连接时,把输入流中剩余的内容打印出来,然后退出接收循环。
下载完文件后,关闭连接:
1
c:close()

既然我们知道了如何下载一个文件,那么再回到下载多个文件的问题上。最简单的做法是逐个地下载文件。不过,这种串行的做法太慢了,它只能在下载完一个文件后再下载一个文件。当读取一个远程文件时,程序把大部分的时间耗费在了等待数据到达上。更确切地说,程序将时间耗费在了对receive的阻塞调用上。因此,如果一个程序能够同时并行下载所有文件的话,就会快很多。当一个连接没有可用数据时,程序便可以从其他连接读取数据。很明显,协程为构造这种并发下载的代码结构提供了一种简单的方式。我们可以为每个下载任务创建一个新线程,当一个线程无可用数据时,它就可以将控制权传递给一个简单的调度器,这个调度器再去调用其他的线程。
在用协程重写程序前,我们先把之前下载的代码重写成一个函数。

示例 下载Web页面的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
function download (host,file)
local c = assert(socket.connect(host,80))
local count = 0
local request = string.format("GET %s HTTP/1.0\r\nhost:%s\r\n\r\n",file,host)
c:send(request)
while true do
local s ,status = receive(c)
count = count + #s
if status == "closed" then break end
end
c:close()
print(file,count)
end

由于我们对远程文件的内容并不感兴趣,所以不需要将文件内容写入到标准输出中,只要计算并输出文件大小即可。
在新版本中,我们使用一个辅助函数receiver从连接接收数据。在串行的下载方式中,receive的代码如下:
1
2
3
4
function receive(connection)
local s,status,partial = connection:receive(2^10)
return s or partial,status
end

在并行的实现中,这个函数在接收数据时不能阻塞。因此,在没有足够的可用数据时,该函数会挂起,如下:
1
2
3
4
5
6
7
8
function receive(connection)
connection:settimeout(0) --不阻塞
local s ,status,partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial,status
end

调用settimeout(0)是的后续所有对连接进行的操作不会则塞。如果返回状态为”timeout”,就表示该操作在返回时还未完成。此时,线程就会挂起。传递给yield的非假参数通知调度器线程仍在执行任务中。请注意,即使在超时的情况下,连接也会返回超时前已读取到的内容,也就是变量partial中的内容。
示例 调度器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tasks = {}			-- 所有活跃任务的列表
function get (host,file)
local co = coroutine.wrap(function()
download(host,file)
end)
table.insert(tasks,co)
end

function dispatch ()
local i = 1
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
end
local res = tasks[i]()
if not res then
table.remove(tasks,i)
else
i = i + 1
end
end
end

在tasks为调度器保存着所有正在运行中的线程的列表。函数get保证每个下载任务运行在一个独立的线程中。调度器本身主要就是一个循环,它遍历所有的线程,逐个唤醒它们。调度器还必须在线程完成任务后,将该线程从列表中删除。在所有线程都完成运行后,调度器停止循环。
最后,主程序创建所有需要的线程并调起调度器。例如,如果要从Lua官网下载几个发行包,主程序可能如下:
1
2
3
4
5
6
get("www.lua.org","/ftp/lua-5.3.2.tar.gz")
get("www.lua.org","/ftp/lua-5.3.1.tar.gz")
get("www.lua.org","/ftp/lua-5.3.0.tar.gz")
get("www.lua.org","/ftp/lua-5.2.4.tar.gz")
get("www.lua.org","/ftp/lua-5.2.3.tar.gz")
dispatch()

在笔者的机器上,串行实现花了15秒下载到这些文件,而协程实现比串行实现快了三倍多。
尽管速度提高了,但最后一种实现还有很大的优化空间。当至少由一个线程有数据可读取时不会有问题;然而,如果所有的线程都没有数据可读,调度程序就会陷入忙等待,不断地从一个线程切换到另一个线程来检查是否有数据可读。这样,会导致协程版的实现比串行版实现耗费多达3倍的CPU时间。
为了避免这样的情况,可以使用LuaSocket中的函数select,该函数允许程序阻塞直到一组套接字的状态发生改变。要实现这种改动,只需要修改调度器即可。
示例 使用select的调度器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function dispatch()
local i = 1
local timedout = {}
while true do
if tasks[i] == nil then
if tasks[1] == nil then
break
end
i = 1
timedout ={}
end
local res = tasks[i]()
if not res then
table.remove(tasks,i)
else
i = i + 1
timeout[#timedout + 1] = res
if #timedout == #tasks then
socket.select(timedout)
end
end
end
end

在循环中,新的调度器将所有超时的连接收集到表timedout中。请记住,函数receive将这种超时的连接传递给yield,然后由resume返回。如果所有的连接均超时,那么调度器调用select等待这些连接的状态就会发生改变。这个最终的实现与上一个使用协程的实现一样快。另外,由于它不会有忙等待,所以与串行实现耗费的CPU资源一样多。