lua-users home
lua-l archive

[Date Prev][Date Next][Thread Prev][Thread Next] [Date Index] [Thread Index]


Hi.

I just noticed the co-routine/threading discussion resurfacing in the
mailing list. I had promised a while back to post some documentation on how
to thread lua and how to make lua stackless for lua->lua calls. Well, things
have happened on my side and time/motivation for writing such document
hasn't been there. So right now, i'v decided to write a post about
co-routines and lua. I'm not gonna write a technical document about that
right now but i am going to post some snipits of code from my own
implementation.

Due to non-disclosure agreements with my employer, i probably can't post the
whole code without permission but i will post snipits of the essential
sections in hope that it will help people get their code up and running.
Also, the code isn't garanteed to be bug free. I have run some tests but
nothing that test out every possibility. Anyways, i hope this will be
usefull to some people out there....

First thing to mention is that this code uses the 4.0 final version and not
the beta (there are a few changes in the API between 4.0b and 4.0). Now,
i'll start by showing the modifications done to lua which are relevant to
co-routines(or threading if you prefer).

1) Piggyback pointer in lua_State structure. This is mostly so in case of
special events like "Sleep" i can know which thread i'm actualy sleeping
(note that most of my changes are commented with a  "// ZAXIS :" block) :

// --- SNIP START ---
struct lua_State {
  /* thread-specific state */
  StkId top;  /* first free slot in the stack */
  StkId stack;  /* stack base */
  StkId stack_last;  /* last free slot in the stack */
  int stacksize;
  StkId Cbase;  /* base for current C function */
  struct lua_longjmp *errorJmp;  /* current error recover point */
  char *Mbuffer;  /* global buffer */
  size_t Mbuffsize;  /* size of Mbuffer */

  // ZAXIS : Piggyback pointer to our internal thread structure
  void*					zslThread;

  /* global state */
  Proto *rootproto;  /* list of all prototypes */
  Closure *rootcl;  /* list of all closures */
  Hash *roottable;  /* list of all tables */
  stringtable strt;  /* hash table for strings */
  stringtable udt;   /* hash table for udata */
  Hash *gt;  /* table for globals */
  struct TM *TMtable;  /* table for tag methods */
  int last_tag;  /* last used tag in TMtable */
  struct Ref *refArray;  /* locked objects */
  int refSize;  /* size of refArray */
  int refFree;  /* list of free positions in refArray */
  unsigned long GCthreshold;
  unsigned long nblocks;  /* number of `bytes' currently allocated */
  lua_Hook callhook;
  lua_Hook linehook;
  int allowhooks;
};
// --- SNIP END ---

2) One thing to note is that in my implementation, i create the "thread
state" and "global state" of the lua_State seperateley. This way i can
create a process (which is a global environement) and each process can have
multiple threads. One consequence of doing multiple threads within one same
global environement is that the garbage collection has to be segmented into
2 operations. Here is the code for the modified GC (i will explain how i use
thoes functions later in the message):

// --- SNIP START ---
// ZAxis : Seperated mark and collection
void lua_collectgarbage_mark (lua_State *L, long limit) {
  markall(L);
  invalidaterefs(L);
}

long lua_collectgarbage_collect (lua_State *L, long limit) {
  unsigned long recovered = L->nblocks;  /* to subtract `nblocks' after gc
*/
  luaC_collect(L, 0);
  recovered = recovered - L->nblocks;
  L->GCthreshold = (limit == 0) ? 2*L->nblocks : L->nblocks+limit;
  if (L->Mbuffsize > MINBUFFER*2) {  /* is buffer too big? */
    L->Mbuffsize /= 2;  /* still larger than MINBUFFER */
    luaM_reallocvector(L, L->Mbuffer, L->Mbuffsize, char);
  }
  callgcTM(L, &luaO_nilobject);
  return recovered;
}

// ZAxis : Don't do auto GC cuz of our thread system
void luaC_checkGC (lua_State *L) {
//  if (L->nblocks >= L->GCthreshold)
//    lua_collectgarbage(L, 0);
}
// --- SNIP END ---

What i did here was basicly split up the mark and collection process of the
GC so i can mark all threads within a process before i do the actual
collection.

Ok, this pretty much covers the essential changes to the lua source code. I
have some other modifications in my implementation but they are mostly to
use our own memory functions, change the math to float instead of
doubles,... So this is not really relevant to the threading. Now, i will
show some snipits of the threading system that uses lua.

1) Here is the internal thread structure i use, i won't go through every
field explaining their usage. I'm mostly showing this so you know where
variables in other functions of mine come from. Also note that my threading
system supports C and Lua threads :

// --- SNIP START ---
typedef struct _ZSLThread
{
	struct _ZSLProcess*	OwnerProcess;
	zslThreadType		ThreadType;
	zslThreadEnv		ThreadEnv;

	// Lua Specific
	lua_State*			luaState;
	Uint8				nArg;
	Uint8				nRes;

	// C Specific
	ZSLThreadFct		Funciton;
	void*				FctParam;

	// Thread system stuff
	jmp_buf			LongJump;
	Uint8*			Stack;
	Uint32			StackSize;

	// Thread status
	Bool				FistRun;
	zslThreadState		ThreadState;
	Uint32			LastTime;
	Uint32			WakeTime;

} ZSLThread;

typedef struct _ZSLProcess
{
	lua_State*			luaState;
	ZSLThread			DummyThread;
} ZSLProcess;
// --- SNIP END

2) Now, the first thing that we need to do is create threads. And to do so,
we need to create seperate process and thread lua_State environements, here
is the code which does that :

// --- SNIP START ---
void zslLuaClearState(lua_State* L)
{
  L->stack = NULL;
  L->strt.size = L->udt.size = 0;
  L->strt.nuse = L->udt.nuse = 0;
  L->strt.hash = NULL;
  L->udt.hash = NULL;
  L->Mbuffer = NULL;
  L->Mbuffsize = 0;

  L->rootproto = NULL;
  L->rootcl = NULL;
  L->roottable = NULL;
  L->TMtable = NULL;
  L->last_tag = -1;
  L->refArray = NULL;
  L->refSize = 0;
  L->refFree = NONEXT;
  L->nblocks = sizeof(lua_State);
  L->GCthreshold = MAX_INT;  /* to avoid GC during pre-definitions */
  L->callhook = NULL;
  L->linehook = NULL;
  L->allowhooks = 1;
  L->errorJmp = NULL;
}

lua_State* zslLuaCreateEmptyState()
{
  lua_State *L = luaM_new(NULL, lua_State);
  if (L == NULL) return NULL;  /* memory allocation error */
  zslLuaClearState(L);
  return L;
}

lua_State* zslLuaCreateProcess()
{
  lua_State *L = zslLuaCreateEmptyState();
  if (L == NULL) return NULL;  /* memory allocation error */
  L->gt = luaH_new(L, 10);
  luaS_init(L);
  luaX_init(L);
  luaT_init(L);

  // Force upper threshold on GC to avoid GC unless we want it to happen
  L->GCthreshold = MAX_INT;

  return L;
}

lua_State* zslLuaCreateThread(int StackSize)
{
	lua_State *L = zslLuaCreateEmptyState();
	if (L == NULL) return NULL;  /* memory allocation error */
	luaD_init(L, (StackSize == 0) ? DEFAULT_STACK_SIZE :
                                    StackSize+LUA_MINSTACK);

	// Force upper threshold on GC to avoid GC unless we want it to happen
	L->GCthreshold = MAX_INT;

	L->errorJmp = NULL;
	return L;
}
// --- SNIP END ---

Also, when a process is created, the basic lua API functions must be
registered to that process, here is the code :

// --- SNIP START ---
#define num_zslluafct 1
static const struct luaL_reg zslLua_RegFct [num_zslluafct] =
{
  {"sleep", zslLua_Sleep}
};

void zslLuaRegisterFunctions (lua_State *L)
{
	lua_newtable(L);
	lua_ref(L, 1);  /* create registry */

	// Register Lua's system libs
	lua_baselibopen(L);
	lua_iolibopen(L);
	lua_strlibopen(L);
	lua_mathlibopen(L);
	lua_dblibopen(L);

	lua_register(L, LUA_ERRORMESSAGE, zslerrormessage);
	lua_register(L,"print",zslprint);

	luaL_openl(L, zslLua_RegFct);
}
// --- SNIP END ---

3) One thing to note... Since we create the global and thread segments of
the lua_State seperateley, we must have a way to associate them together
before we can actualy run a thread, the folowing code takes care of this :

// --- SNIP START ---
lua_State*   zslBindState(ZSLThread* Thread)
{
	lua_State* State 	= Thread->OwnerProcess->luaState;
	State->top		= Thread->luaState->top;
	State->stack	= Thread->luaState->stack;
	State->stack_last	= Thread->luaState->stack_last;
	State->stacksize	= Thread->luaState->stacksize;
	State->Cbase	= Thread->luaState->Cbase;
	State->errorJmp	= Thread->luaState->errorJmp;
	State->Mbuffer	= Thread->luaState->Mbuffer;
	State->Mbuffsize	= Thread->luaState->Mbuffsize;
	State->zslThread	= Thread;

	return State;
}

void		zslUnBindState(ZSLThread* Thread)
{
	lua_State* State = Thread->OwnerProcess->luaState;
	Thread->luaState->top			=State->top			;
	Thread->luaState->stack			=State->stack		;
	Thread->luaState->stack_last		=State->stack_last	;
	Thread->luaState->stacksize		=State->stacksize	;
	Thread->luaState->Cbase			=State->Cbase		;
	Thread->luaState->errorJmp		=State->errorJmp	;
	Thread->luaState->Mbuffer		=State->Mbuffer	    ;
	Thread->luaState->Mbuffsize		=State->Mbuffsize	;

	// Be nice and put clean values in
	State->top			= NULL;
	State->stack		= NULL;
	State->stack_last		= NULL;
	State->stacksize		= 0;
	State->Cbase		= NULL;
	State->errorJmp		= NULL;
	State->Mbuffer	    	= NULL;
	State->Mbuffsize		= 0;

}
// --- SNIP END ---

4) Now that we have a process and a thread set up, we need to initialize the
thread with a Lua function call so it can begin it's execution. The folowing
code takes care of this :

// --- SNIP START ---
/*--------------------------------------------------------------------------
--
 * FUNCTION:	zslScriptCall

* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
-
 * Purpose:		Starts up a function call, assumes thew funciton has no params
and
 *				return values
 * Arguments:	Thread		- Thread to use for call
 *				Function	- Name of function to call
				Params		- String indicating the params passed as a variable arg.
 * Returns:
 * Note:		Params is a string that can contain the foloring chars to inticate
 *				the type of the parameters passed
 *				s = string;
 *				o = object; (note, that an extra int param is required to set the tag
of the value)
 *				n = numbrt;

*---------------------------------------------------------------------------
-*/
void			zslScriptCall(zslTID Thread, char* Fct,char* Params, ...)
{
	lua_State* L;
	ZSLThread *Th = (ZSLThread*)Thread;
    va_list vl;
    int i;
	int nArg = 0;

	if ( !((Th->ThreadState==zslThreadState_Init) ||
(Th->ThreadState==zslThreadState_Done)) )
		return;

	// Make sure thread of apropriate type
	if ( Th->ThreadEnv != zslThreadEnvLua )
		return;

	// Build temp composite state
	L = zslTempBindState(Thread);

	// Push function name
	lua_getglobal(L,Fct);

	// Push all params
	if (Params != NULL)
	{
		va_start( vl, Params );

		// Step through the list.
		for( i = 0; Params[i] != '\0'; ++i )
		{
			char   *s;
			ZFloat   f;
			void*   o;
			int		ot;

			switch( Params[i] )    // Type to expect.
			{
			case 's':
				s = va_arg( vl, char* );
				lua_pushstring(L,s);
				nArg++;
				break;
			case 'o':
				o = va_arg( vl, char* );
				ot = va_arg( vl, int );
				lua_pushusertag(L,o,ot);
				nArg++;
				break;
			case 'n':
				f = va_arg( vl, ZFloat );
				lua_pushnumber(L,f);
				nArg++;
				break;
			default:
				break;
			}
		}
		va_end( vl );
	}

	// Flag so we start the thread on the first call
	Th->FistRun = TRUE;
	Th->nArg = nArg;
	Th->nRes = 0;

	// Clear temp state
	zslTempUnBindState(Thread);


	// Schedule task
	zslScheduleThread(Thread);
}
// --- SNIP END ---

5) Now, at this point, we have created the thread and set it up so that is
executed a Lua function and scheduled it in our system for execution. Now
the next step would be to look at the actual scheduler which takes care of
finding threads to execute and launching them.

// --- SNIP START ---
Bool	zslProcessThreads(void)
{
	ZbinHeapNode*	Node;
	ZSLThread*		Thread;
	lua_State*		ExecState;
	Bool			Done=FALSE;
	Bool			Processed=FALSE;
	Uint32		Time = zglGetTime();

	// Process all tasks which are ready.
	while (!Done)
	{
		Node = zbinHeapFindMin(&fzslCore.ActiveThreads);
		if (Node==NULL) break;

		// Is the task ready for processing
		Thread = Node->data;
		if (Thread->WakeTime <= Time)
		{
			Processed = TRUE;

			// Ops it off the priority queue
			zbinHeapDeleteMin(&fzslCore.ActiveThreads);
			Thread->LastTime = Time;

			// Execute the thread (depending on type)
			switch (Thread->ThreadEnv)
			{
			case zslThreadEnvLua:
				ExecState = zslBindState(Thread);
				zslChangeThreadState(Thread,zslThreadState_Running);
				zslLuaThreadExec(Thread,ExecState);
				zslUnBindState(Thread);
				break;
			case zslThreadEnvC:
				zslChangeThreadState(Thread,zslThreadState_Running);
				zslCThreadExec(Thread);
				break;
			default:
				// TODO - ERROR
				break;
			}


			// Check for return condition (sleep, suspend, done, ...)
			switch(Thread->ThreadState)
			{
			case zslThreadState_Running:
			case zslThreadState_Sleeping:
				// Reschedule
				zslScheduleThread((zslTID)Thread);
				break;
			default:
				// Unknown state
				// TODO
				break;
			}
		}
		else
		{
			// The top thread isn't ready, this implies that none of the folowing
ones are too. So we are done
			// with the processing
			Done = TRUE;
		}
	}
	return Processed;
}
// --- SNIP END ---

6) Obviously, the code to show now is the code to the zslLuaThreadExec
function which handles tha actual thread execution.

// --- SNIP START ---

// Some global variables to keep function state since we can't trust the
local variables with setjmp and longjmp
ZSLThread*	gLuaThread;
lua_State*	gLuaState;
jmp_buf	gLuaTmpbuf;
int		gLuaRes;

void	zslLuaThreadExec(ZSLThread* Thread, lua_State *L)
{
	Uint8*	Stack;
	Uint8*	StackBase;
	zmemCopy(&gLuaTmpbuf,&Thread->LongJump,sizeof(jmp_buf));
	gLuaState = L;
	gLuaThread = Thread;

	// Get the current state so we can return on execution end or sleep
	gLuaRes = setjmp(gLuaThread->LongJump);
	if (gLuaRes==0)
	{
		// Check for first run
		if (gLuaThread->FistRun)
		{
			// Setup the stack
			Stack = &gLuaThread->Stack[gLuaThread->StackSize-1];
			StackBase = &gLuaThread->Stack[0];
			_asm
			{
				mov esp, Stack;
			}

			// Do the first time call
			gLuaThread->FistRun = FALSE;
			zslLuaThreadCall(gLuaState,gLuaThread->nArg,gLuaThread->nRes);

			// If we get here, it means the thread ended, or else we would have
longjumped to
			// the begining of the function
			zslChangeThreadState(gLuaThread,zslThreadState_Done);
			longjmp(gLuaThread->LongJump,1);
		}
		else
		{
			// Resume exec
			if (gLuaThread->ThreadState == zslThreadState_Running)
			{
				// Return to the sleep point
				longjmp(gLuaTmpbuf,1);
			}
			else
			{
				zslChangeThreadState(gLuaThread,zslThreadState_Done);
			}
		}
	}
}

void	zslLuaThreadCall(lua_State *L, int nArgs, int nResults)
{
  StkId func = L->top - (nArgs+1);  /* function to be called */
  luaD_call(L, func, nResults);
}
// --- SNIP END ---

7) Another piece of code worth showing is the one that actualy sleeps a
thread. So here it is.

// --- SNIP START ---
int zslLua_Sleep (lua_State *L)
{
  ZSLThread*	Thread = (ZSLThread*)L->zslThread;
  Uint32		SleepTime;

  // Can't sleep if system thread
  if (Thread->ThreadState!=zslThreadState_Running)
  {
	  lua_error(L,"Thread is not in a valid state for sleeping");
  }

  // Get sleep time off stack
  SleepTime = (Uint32)(CZF(1000.0)*lua_tonumber(L,1));

  zslThreadSleep(Thread, SleepTime);

  return 0;
}

void			zslThreadSleep(zslTID Thread, Uint32 SleepTime)
{
	ZSLThread *Th = (ZSLThread*)Thread;
	jmp_buf	JumpBuffer;
	jmp_buf	TmpJumpBuffer;

	// Thread must be already running for it to sleep
	if (Th->ThreadState != zslThreadState_Running) return;

	// Set wake up
	Th->WakeTime = Th->LastTime+SleepTime;

	// Flag thread for sleep
	zslChangeThreadState(Thread, zslThreadState_Sleeping);

	// Flip back to scheduler and handle resume
	if (setjmp(JumpBuffer)==0)
	{
		// Copy the buffer so we can return (exchange the state we just
		// captured with the one we got at the begining of the execution
		zmemCopy(&TmpJumpBuffer,&Th->LongJump,sizeof(jmp_buf));
		zmemCopy(&Th->LongJump,&JumpBuffer,sizeof(jmp_buf));

		// Return to scheduler
		longjmp(TmpJumpBuffer,1);
	}
}
// --- SNIP END ---

8) And a final piece of code that is worth showing is how i handle garbage
collection for multiple threads and processes. So here is the function that
handles this

// --- SNIP START ---
int	zslCleanUp(zslPID Process)
{
	ZSLThread* Thread;
	ZSLProcess* Proc;
	lua_State *L;
	int CleanCount;
	if (Process == NULL) Process = fzslCore.GlobalProcess;
	Proc = (ZSLProcess*)Process;

	// Perform garbage collection. First, we go through all threads and mark
all of them
	Thread = zheapListGetFirst(fzslCore.ThreadList);
	while (Thread)
	{
		if (Thread->OwnerProcess == Process && Thread->ThreadEnv==zslThreadEnvLua)
		{
			L = zslTempBindState(Thread);
			lua_collectgarbage_mark(L,1);
			zslTempUnBindState(Thread);
		}
		Thread = zheapListGetNext(fzslCore.ThreadList);
	}

	// Then we do the collection on the actual process
	Thread = &Proc->DummyThread;
	L = zslTempBindState(Thread);
	CleanCount = lua_collectgarbage_collect(L,1);
	zslTempUnBindState(Thread);

	return CleanCount;
}
// --- SNIP END ---

Ok, this is it. I'm sure it's pretty cryptic to lots of people since it's
just straight up code but that's all i have time to do at the moment. And my
explinations probably aren't that good at the moment (that's what happens
when you post at 6am :) ). However, if you have any questions, feel free to
ask. I'll take some time to answer them.

Hoping i was of any help...
---
Sebastien St-Laurent
Software Engineer, Z-Axis ltd
sebby@z-axis.com