[Date Prev][Date Next][Thread Prev][Thread Next]
[Date Index]
[Thread Index]
- Subject: Re: How to wait for both sockets and timers in a coroutine dispatcher.
- From: Sean Conner <sean@...>
- Date: Thu, 5 Jan 2012 20:56:16 -0500
It was thus said that the Great pansz once stated:
> Hello,
>
> I've be using coroutine and write a dispatcher to a server program
> written by lua. The basic idea (psudo code) is like this:
>
> repeat
> for i=#threads,1,-1 do resume thread[i] and check the result
> end
> if all threads are yield by socket timeout, then do socket.select
> for all socket
> end
> until #threads == 0
>
> the thread itself will handle the global table, which stores all
> pending socket for socket.select()
>
> Now I want to add timer feature, so that threads can suspend for a
> specific time and then resume.
>
> The problem is that timer threads are not yielded by a socket timeout,
> so socket.select does not work for timer. When I do socket.select and
> any timer expires, select() still blocks.
>
> Is it possible that the select() works both for socket and timer? or
> is there any good practise to dispatch both threads yield by socket
> timeout and those who yield by timer like sleep()?
>
> Thanks.
I've done that. Below is the main code from my program. I have three
queues I maintain, a run queue (g_trun), a wait queue (g_twait, for those
coroutines waiting on socket IO) and a sleep queue (g_tsleep). Before
calling epoll_wait() (this could be adapted for select() or poll(), but it
would require quite a bit of change, as epoll() allows one to associate an
arbitrary block of memory to a file descriptor), I check the queues and
select an appropriate timeout.
For each ready descriptor that epoll_wait() returns, I call a routine (via
the function pointer in the epoll structure) that does the socket I/O and
determines if the conditions have been satisfied to move the coroutine to
the run queue (say, a coroutines asked for 1024 bytes of data, we only got
799 bytes, that coroutine will still remain on the wait queue), then the
coroutine will be moved to the run queue (the socket IO is done in C---none
of it is done in Lua).
I then run through the sleep queue, moving the coroutines to the run queue
if enough time has passed.
Then, and only then, I pull the next coroutine off the run queue and run
it. It will run until it either calls a socket IO routine (which will then
put the coroutine on the wait queue and yield) or the sleep routine
(similar, but put on the sleep queue). Then we head back around for another
go.
There are no calls to the underlying operating system sleep() or
nanosleep().
-spc (Hope this helps some)
static void mainloop(void)
{
struct epoll_event *list;
time_t thissecond;
list = malloc(sizeof(struct epoll_event) * c_pollqueue);
thissecond = time(NULL);
memset(list,0,sizeof(struct epoll_event) * c_pollqueue);
while(1)
{
PollNode node;
SocketNode thread;
int events;
int timeout;
time_t now;
check_signals();
/*--------------------------------------------------
; if there are runnable threads, return immediately,
; else if there are sleeping threads, return in a second,
; else wait indefinitely.
;------------------------------------------------------*/
if (!ListEmpty(&g_trun))
timeout = 0;
else if (!ListEmpty(&g_tsleep))
timeout = 1;
else
timeout = -1;
events = epoll_wait(g_queue,list,c_pollqueue,timeout);
if (events > 0) /* if there are any events */
{
for (int i = 0 ; i < events ; i++)
{
node = list[i].data.ptr;
(*node->fn)(&list[i]);
}
}
/*---------------------------------------------------------
; there may be some threads sleeping for a period of time.
; schedule them here.
;----------------------------------------------------------*/
now = time(NULL);
if (now > thissecond)
{
thissecond = now;
while(!ListEmpty(&g_tsleep))
{
thread = (SocketNode)ListGetHead(&g_tsleep);
assert(thread->state == STATE_SLEEP);
if (lua_status(thread->L) != LUA_YIELD)
{
syslog(LOG_DEBUG,"waking thread %5lu in state %d",(unsigned long)thread->count,lua_status(thread->L));
assert(lua_status(thread->L) == LUA_YIELD);
}
if (thread->sleep > thissecond) break;
thread_schedule(thread);
}
}
/*--------------------------------------------------
; an epoll event may have caused some threads to become
; active. Pull the first thread (node) off the list. If
; the resulting node is valid, it's a runnable thread,
; so run it.
;------------------------------------------------------*/
thread = (SocketNode)ListGetHead(&g_trun);
if (NodeValid(&thread->node.node))
thread_run(thread);
}
}