Lua连续教程之Lua线程和状态

Lua语言不支持真正的多线程,即不支持共享内存的抢占式线程。原因有两个,其一是IOS C没有提供这样的功能,因此也没有可移植的方法能在Lua中实现这种机制:

其二,也是更重要的原因,在于我们认为在Lua中引入多线程不是一个好主意。
多线程一般用于底层编程。像信号量和监视器这样的同步机制一般都是操作系统上下文提供的,而非应用程序提供。要查找和纠正多线程相关的bug是很困难的,其中有些Bug还会导致安全隐患。此外,程序中的一些需要同步的临界区还可能由于同步而导致性能问题。
多线程的这些问题源于线程抢占和共享内存,因此如果使用非抢先式的线程后者不使用共享内存就可以避免这些问题。Lua语言同时支持这两种方案。Lua语言的线程是协作式的,因此可以避免因不可预知的线程切换而带来的问题。另一方面,Lua状态之间不共享内存,因此也为Lua语言中实现并行化提供了良好基础。

多线程

在Lua语言中,协程的本质就是线程。我们可以认为协程是带有良好编程接口的线程,也可以认为线程是带有底层API的协程。
从C API的角度来看,把线程当作一个栈会比较有用;而从实现的角度来看,栈实际上就是线程。每个站都保存着一个线程中挂起的函数调用信息,外加每个函数调用的参数和局部变量。换句话说,一个栈包括了一个线程得以继续运行所需的所有信息。因此,多个线程就意味着多个独立的栈。
Lua语言中 CAPI的大多数函数操作的特定的栈,Lua是如何知道应该使用哪个栈的呢?当调用lua_pushnumber时,是怎么制定将数字压入何处的呢?秘密在于lua_State类型,即这些函数的第一个参数,它不仅表示一个Lua状态,还表示带有该状态的一个线程。
当创建一个Lua状态时,Lua就会自动用这个状态创建一个主线程,并返回代表该线程的lua_State。这个主线程永远不会被垃圾回收,它只会调用lua_close关闭状态时随着状态一起释放。与线程无关的程序会在这个主线程中运行所有的代码。
调用lua_newthread可以在一个状态中创建其他的线程:

1
lua_State *lua_newthread (lua_State *L);

该函数会将新线程作为一个”thread”类型的值压入栈中,并返回一个表示新线程的lua_State类型的指针。例如,考虑如下的语句:
1
L1 = lua_newthread(L);

执行上述代码后,我们就有了两个线程L1和L,它们都在内部引用了相同的Lua状态。每个线程都有其自己的栈。新线程L1从空栈开始运行,而老线程L在其栈顶会引用这个新线程:
1
2
printf("%d\n",lua_gettop(L1));  -- 0
printf("%d\n",luaL_typename(L,-1)); -- thread

除主线程以外,线程和其他的Lua对象一样都是垃圾回收的对象。当新建一个线程时,新创建的线程会被压入栈中,这样就保证了新线程不会被垃圾收集。永远不要使用未被正确锚定在Lua状态中的线程。所有对LuaAPI的调用都有可能回收未锚定的线程,即使是在正在使用这个线程的函数调用。例如,考虑如下的代码:
1
2
3
lua_State *L1 = lua_newthread(L);
lua_pop(L,1);
lua_pushstring(L1,"hello");

调用lua_pushstring可能会触发垃圾收集器并回收L1,从而导致应用崩溃,尽管L1正在被使用。要避免这种情况,应该在诸如一个已锚定线程的栈、注册表或Lua变量中保留一个对使用中线程的引用。
一旦拥有一个新线程,我们就可以像使用主线程一样来使用它了。我们可以将元素压入栈中,或者从栈中弹出元素,还可以用它来调用函数等等。例如,如下代码在新线程中调用了f(5),然后将结果传递到老线程中:
1
2
3
4
lua_getglobal(L1,"f");  /* 假设'f'是一个全局函数 */
lua_pushinteger(L1,5);
lua_call(L1,1,1);
lua_xmove(L1,L,1);

函数lua_xmove可以在同一个Lua状态的两个栈之间移动Lua值。一个形如lua_xmove(F,T,n)的调用会从栈F中弹出n个元素,并将它们压入栈T中。
不过,对于这类用法,我们不需要用新线程,用主线程就足够了。使用多线程的主要目的是实现协程,从而可以挂起某些协程的执行,并在之后恢复执行。因此,我们需要用到函数lua_resume:
1
int lua_resume (lua_State *L,lua_State *from, int narg);

要启动一个协程,我们可以像使用lua_pcall一样使用lua_resume:将待调用函数压入栈,然后压入协程的参数,并以参数的数量作为参数narg调用lua_resume(参数from是正在执行调用的线程,或为NULL)。这个行为与lua_pcall类似,但有三个不同点。首先,lua_resume中没有表示期望结果数量的参数,它总是返回被调用函数的额所有结果。其次,它没有表示错误处理函数的参数,发生错误时不会进行栈展开,这样我们就可以在错误发生后检查栈的情况。最后,如果正在运行的哈数被挂起,lua_resume就会返回代码LUA_YIELD,并将线程置于一个可以后续再恢复执行的状态中。
当lua_resume返回LUA_YIELD时,线程栈中的可见部分只包含传递给yield的值。调用lua_gettop会返回这些值的个数。如果要将这些值转移到另一个线程,可以使用lua_xmove。
要恢复一个挂起的线程,可以再次调用lua_resume。在这种调用中,Lua假设栈中所有的值都会被调用的yield返回。例如,如果在一个lua_resume返回后到再次调用lua_resume时不改变线程的栈,那么yield会原样返回它产生的值。
通常,我们会把一个Lua函数作为协程启动协程。这个Lua函数可以调用其他Lua函数,并且其中任意一个函数都可以挂起,从而结束对lua_resume的调用。例如,假设有如下定义:
1
2
function foo(x) coroutine.yield(10,x) end
function foo1(x) foo(x + 1); return 3 end

现在运行一下C语言代码:
1
2
3
4
lua_State *L1 = lua_newthread(L);
lua_getglobal(L1,"foo1");
lua_pushinteger(L1,20);
lua_Resume(L1,L,1);

调用lua_resume会返回LUA_YIELD,表示线程已交出了控制权。此时,L1的栈便有了位yield指定的值:
1
2
3
printf("%d\n",lua_gettpo(L1));			--> 2
printf("%lld\n",lua_tointeger(L1,1)); --> 10
printf("%lld\n",lua_tointeger(L1,2)); --> 21

当恢复此线程时,它会从挂起的地方(即调用yield的地方)继续执行。此时,foo会返回到foo1,foo1继而又返回到lua_resume:
1
2
3
lua_resume(L1,L,0);
printf("%d\n",lua_gettpo(L1)); --> 1
printf("%lld\n",lua_tointeger(L1,1)); -->3

第二次调用lua_resume是会返回NULL_OK,表示一个正常的返回。
一个协程也可以调用C语言函数,而C语言函数又可以反过来调用其他Lua函数。我们已经讨论过如何使用延续来让这些Lua函数交出控制权。C语言函数也可以交出控制权。在这这种情况下,它必须提供一个在线恢复时被调用的延续函数。要交出控制权,C语言函数必须调用如下的函数:
1
int lua_yield (lua_State *L, int nresults, int ctx, lua_CFunction k);

在返回语句中我们应该始终使用这个函数,例如:
1
2
3
4
static inf myCfunction(lua_State *L){
...
return lua_yieldk(L,nreseults,ctx, k);
}

这个调用会立即挂起正在运行的协程。参数nresults是将要返回给对应的lua_resume的栈中的值的个数;参数ctx是传递给延续的上下文信息;参数k是延续函数。当协程恢复运行时,控制权会直接交给延续函数k;当协程交出控制权后,myCfunction就不会再有其他任何动作,它必须将所有后续的工作委托给延续函数处理。
让我们看一个典型的例子。假设要编写一个读取数据的函数,如果无数据可读则交出控制权。我们可能会用C语言写出一个这样的函数:
1
2
3
4
5
6
7
8
9
10
11
12
int readK (lua_State *L,int status, lua_KContext ctx){
(void)status;(void)ctx; /* 未使用的参数 */
if (something_to_read()){
lua_pushstring(L,read_some_data());
return 1;
}
else
return lua_yieldk(L,0,0,&readK);
}
int prim_read(lua_State *L){
return readK(L,0,0);
}

在这个示例中,prim_read无须做任何初始化,因此它可以直接调用延续函数。如果有数据可读。readK会读取并返回数据;否则,它会交出控制权。当线程恢复时,prim_read会再次调用延续函数,该延续函数会再次尝试读取数据。
如果C语言函数在交出控制权之后什么都不做,那么它可以不带延续函数调用lua_yieldk或者使用宏lua_yield:
1
return lua_yield(L,nres);

在这一句调用之后,当线程恢复时,控制权会返回到名为myCfunction的函数中。

Lua状态

每次调用luaL_newstate(或lua_newstate)都会创建一个新的Lua状态。不同的Lua状态之间是完全独立的,它们根本不共享数据。也就是说,无论在一个Lua状态中发生了什么,都不会影响其他Lua状态。这也意味着Lua状态之间不能直接通信,因而必须借助一些C语言代码的帮助。例如,给定两个状态L1和L2,如下命令会将L1栈顶的字符串压入L2的栈中:

1
lua_pushstring(L2,lua_tostring(L1,-1));

由于所有数据必须由C语言进行传递,因此Lua状态之间只能交换能够使用C语言表示的类型,例如字符串和数值。其他诸如表之类的类型必须序列化后才能传递。
在支持多线程的系统中,一种有趣的设计是为每个线程创建一个独立的Lua状态。这种设计使得线程类似于POSIX进程,它实现了非共享内存的并发。在本节中,我们会根据这种方法开发一个多线程的原型实现。在这个实现中,将会使用POSIX线程。因为这些代码只使用了一些基础功能,所以将它们移植到其他线程系统中并不难。
我们要开发的系统很简单,其主要目的是演示一个多线程环境中使用多个Lua状态。在这个系统开始运行之后,我们可以为它添加几个高级功能。我们把这个库称为lproc,它只提供4个函数:
1
lproc.start(chunk)

启动一个新进程来运行指定的代码段(一个字符串)。这个库将Lua进程实现为一个C语言进程外加与其相关联的Lua状态。
1
lproc.send(channel,val1,val2,...)

将所有指定值(应为字符串)发送给指定的、由名称(也是一个字符串)标识的通道。
1
lproc.receive(channel)

接收发送给指定通道的值。
1
lproc.exit()

结束一个进程。只有主进程需要这个函数。如果主程序不调用lproc.exit就直接结束,那么整个程序会终止,而不会等待其他进程结束。
这个库通过字符串标识不同的通道,并通过字符串来匹配发送者和接收这。一个发送操作可以发送任意数量的字符串,这些字符串由对应的接收操作返回。所有的通信都是同步的,向通道发送消息的进程会一直阻塞,知道有进程从该通道接收信息,从而通道接收信息的进程会一直阻塞,直至有进程向其发送消息。
lproc的实现像其接口一样简单,它使用了两个循环双向链表,一个用于等待发送消息的进程,另一个用于等待接收消息的进程。lproc使用一个互斥量来控制对着两个链表的访问。每个进程有一个关联的条件变量。当进程要向通道发送一条消息时,它会遍历接收链表以查找一个在该通道上等到的进程。如果找到了这样的进程,它会将该进程从等待链表中删除,并将消息的值从自身转移到找到的进程中,然后通知其他进程;否则,它就将自己插入发送链表,然后等待其条件变量发生变化。接收消息的操作也与此基本类似。
在这种实现中,主要的元素之一就是表示进程的结构体:
1
2
3
4
5
6
7
8
9
10
#include <pthread.h>
#include "lua.h"
#include "luaxlib.h"
typedef struct Proc{
lua_State *L;
pthread_t thread;
pthread_cond_t cond;
const char *channel;
struct Proc *previous, *next;
}Proc;

前两个字段表示进程使用的Lua状态和运行该进程的C线程。第三个字段cond是条件变量,线程会在等待匹配的发送/接收时用它来使自己进入则塞状态。第四个字段保存了进程正在等待的通过。最后两个字段previous和next将进程的结构体组成等待链表。
下面的代码声明了两个等待链表及关联的互斥量:
1
2
3
4
static Proc *waitsend = NULL;
static Proc *waitreceive = NULL;

static pthread_mutex_t kernel_access = PTHERAD_MUTEX_INITIALIZER;

每个进程都需要一个Proc结构体,并且进程脚本调用send或receive时就需要访问这个结构体。这些函数接收的唯一参数就是进程的Lua状态;因此,每个进程都应该将其Proc结构体保存在其Lua状态中。在我们的实现中,每个状态都将其对应的Proc结构体作为完整的用户数据存储在注册表中,关联的键为”_SELF”。辅助函数getself可以从指定的状态中获取相关联的Prco结构体:
1
2
3
4
5
6
7
static Proc *getself(lua_State *L){
Proc *p;
lua_getfield(L<LUA_REGISTRYINDEX,"_SELF");
p = (Proc *)lua_touserdata(L,-1);
lua_pop(L,1);
return p;
}

下一个函数,movevalues,将值从发送进程移动到接收进程:
1
2
3
4
5
6
7
static void movevalues(lua_State *send, lua_State *rec){
int n = lua_gettop(send);
int i;
lua_checkstack(rec,n,"too many results");
for (i = 2; i <= n; i++)
lua_pushstring(rec,lua_tostring(send,i));
}

这个函数将发送进程的栈中所有的值移动到接收进程的栈中。请注意,在压入任意数量的元素时,需要检查栈空间。
示例定义了函数searchhmatch,该函数会遍历列表以寻找等待指定通道的进程。

示例 用于寻找等待通道的进程的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
static Proc *serachmatch(const char *channel, Proc **list){
Proc *node;
for (node = *list; node != NULL; node = node ->next){
if (strcmp(channel, node ->channel) == 0){
if (*list == node)
*list = (node->next == node)?NULL:node->next;
node->provious->next = node ->next;
node->next->previous = node ->previous;
return node;
}
}
return NULL;
}

如果找到一个进程,那么该函数会将这个进程从列表中移除并返回该进程;否则,该函数会返回NULL。
当找不到匹配的进程时,会调用最后的辅助函数,参见下例
示例 用于在等待列表中新增一个进程的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void waitonlist (lua_State *L, const char *channel, Proc **list){
Proc *p = getself(L);
if (*list == NULL){
*list = p;
p->previous = p->next = p;
}
else {
p->previous = (*list)-> previous;
p->next = *list;
p->previous->next = p->next->previous = p;
}
p->channel = channel;
do{
pthread_cond_wait(&p->cond,&kernel_access);
}while(p->channel);
}

在这种情况下,进程会将自己链接到相应等待链表的末尾,然后进入等待状态,知道另一个进程与之匹配并将其唤醒。当一个进程唤醒另一个进程时,它会将另一个进程的channel字段设置为NULL。因此,如果p->channel不是NULL,那就表示尚未出现与进程p匹配的进程,所以需要继续等待。
有了这些辅助函数,我们就可以编写send和receive了
示例 用于发送和接收消息的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static int ll_send(lua_State *L){
Proc *p;
const char *channel = luaL_checkstring(L,1);
pthread_mutex_lock(&kernel_access);
p = searchmatch(channel,&waitreceive);
if(p){
movevalues(L,p->L);
p->channel = NULL;
pthread_cond_signal(&p->cond);
}
else
waitonlist(L,channel,&waitsend);

pthread_mutex_unlock(&kernel_access);
return 0;
}

static int ll_receive (lua_State *L){
Proc *p;
const char *channel = luaL_checkstring(L,1);
lua_settop(L,1);

pthread_mutex_lock(&kernel_access);
p = searchmatch(channel,&waitsend);

if(p){
movevalues(p->L,L);
p->channel = NULL;
pthread_cond_signal(&p->cond);
}
else
waitonlist(L,channel,&waitreceive);

pthread_mutex_unlock(&kernel_access);

return lua_gettop(L) -1;
}

函数llsend先获取通道,然后锁住互斥量并搜索匹配的接收进程。如果找到了,就把待发送的值传递给这个接收进程,然后将接收进程标记为就绪状态并唤醒接收进程。否则,发送进程就将自己放入等待链表。当操作完成后,ll_send解锁互斥量且不向Lua返回任何值。函数ll_receive与之类似,但它会返回所有接收到的值。
现在,让我们看一下如何创建新进程。新进程需要一个新的POSIX线程,而POSIX线程的运行需要一个线程体。我们会在后面的内容中定义这个线程体。在此,先看一下它的原型,这是pthreads所需求的:
1
static void *ll_thread(void *arg);

要创建并运行一个新进程,我们开发的系统必须创建一个新的Lua状态,启动一个新线程,编译指定的代码段,调用该代码段,最后释放其资源。原线程会完成前面三个任务,而新线程则负责其余任务。
函数ll_start可以创建一个新的进程。
示例 用于创建进程的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static int ll_start(lua_State *L){
pthread_t thread;
const char *chunk = luaL_checkstring(L,1);
lua_State *L1 = luaL_newstate();
if(L1 == NULL)
lua_error(L,"unable to create new state");

if (luaL_loadstring(L1,chunk) != 0)
luaL_error(L,"error in thread body:%s",lua_tostring(L1,-1));

if(pthread_create(&thread,NULL,ll_thread,L1)!=0)
luaL_error(L,"unable to create new thread");

pthread_detach(thread);
return 0;
}

该函数创建了一个新的Lua状态L1,并在其中编译了指定的代码段。如果有错误发生,该函数会把错误传递给原来的状态L。然后,该函数使用ll_thread作为线程体创建一个新线程,同时将新状态L1作为参数传递给这个线程体。最后,该函数调用pthread_detach通知系统我们不需要该线程的任何运行结果。
每个新线程的线程体都是函数ll_thread,它接手相应的Lua状态,这个Lua状态的栈中只含有预编译的主代码段。
示例 新线程的线程体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int luaopen_lproc(lua_State *L);

static void *ll_thread(void *arg){
lua_State *L = (lua_State *) arg;
Proc *self;

openlibs(L);
luaL_requiref(L,"lproc",luaopen_lproc,1);
lua_pop(L,1);
self = (Proc *)lua_newuserdata(L,sizeof(Proc));
lua_setfield(L,LUA_REGISTRYINDEX, "_SELF");
self->L = L;
self->thread = pthread_self();
self->channel = NULL;
pthread_cond_init(&self->cond,NULL);

if(lua_pcall(L,0,0,0,)!=0)
fprintf(stderr,"thread error: %s",lua_tostring(L,-1));

pthread_cond_destroy(&getself(L)->cond);
lua_close(L);
return NULL;
}

首先,该函数打开Lua标准库和库lproc;之后,它创建并初始化其自身的控制块;然后,调用主代码段;最后,销毁其条件变量并关闭Lua状态。
请注意使用luaL_requiref打开库lproc的用法。这个函数在某种意义上等价于require,但它用指定函数来打开库而没有搜索打开函数。在调用这个打开函数后,luaL_requiref会在表package.loaded中注册结果,这样以后再调用require加载这个库时就无须再次打开库了。当luaL_requiref的最后一个参数为真时,该函数还会在相应的全局变量中注册这个库。
示例演示了这个模块中的最后一个函数
示例 模块lproc的其他函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static inti ll_exit(lua_State *L){
pthread_exit(NULL);
return 0;
}
static const struct luaL_Reg l_funcs[] ={
{"start",ll_start},
{"send",ll_send},
{"receive",ll_receive},
{"exit",ll_exit},
{NULL,NULL}
};

int luaopen_lproc(lua_State *L){
luaL_newlib(L,ll_funcs);
return 1;
}

这两个函数都很简单。函数ll_exit应该只能在主进程结束时由主进程调用,以避免整个程序立即结束。函数luaopen_lproc是用于打开这个模块的标准函数。
正如笔者之前说过的,在Lua语言中这种进程的实现方式非常简单。我们可以对它进行各种改进,这里简单介绍几种。
第一种显而易见的改进是改变对匹配通道的线性查找,更好的选择是用哈希表来寻找通道,并为每个通道设置一个独立的等待列表。
另一种改进涉及创建进程的效率。创建一个新的Lua状态时一个轻量级操作,但打开所有的标准库可不是轻量级的,并且大部分进程可能并不需要用到所有的标准库。我们可以通过对库进行预注册来避免打开无用的库。相对于为每个标准库调用luaL_requiref,使用这种方法时我们只需要将库的打开函数放入表package.preload中即可。当且仅当进程调用require”lib”时,require才会调用这个与库相关的函数来打开库。
示例 注册按需打开的库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void registerlib (lua_State *L, const char *name,lua_CFunction f){
lua_getglobal(L,"package");
lua_getfield(L,-1,"preload");
lua_pushcfunction(L,f);
lua_setfield(L,-2,name);
lua_pop(L,2);
}
static void openlibs(lua_State *L){
luaL_requiref(L,"_G",luaopen_base,1);
lua_requiref(L,"package",luaopen_package,1);
lua_pop(L,2);
registerlib(L,"coroutine",luaopen_coroutine);
registerlib(L,"table",luaopen_table);
registerlib(L,"io",luaopen_io);
registerlib(L,"os",luaopen_os);
registerlib(L,"string",luaopen_string);
registerlib(L,"math",luaopen_math);
registerlib(L,"utf8",luaopen_utf8);
registerlib(L,"debug",luaopen_debug);
}

一般情况都需要打开基础库。另外,我们还需要package库;如果没有package库,就无法通过require来打开其他库。所有其他的库都是可选的。因此,除了调用luaL_openlibs之外,可以在打开新状态时调用我们自己的函数openlibs。当进程需要用到其中任意一个库时,只需显式地调用require,require就会调用相应的luaopen
*函数ll_start可以创建一个新的进程。
另一个改进涉及通信原语。例如,为lproc.send和lproc.receive设置一个等待匹配的时间阀值会非常有用。特别的,当等待时间阈值为零时,这两个函数会成为非阻塞的。在POSIX线程中,可以用pthread_cond_timedwait实现这个功能。