lua-users home
lua-l archive

[Date Prev][Date Next][Thread Prev][Thread Next] [Date Index] [Thread Index]


Hi,

*Warning* This is probably a boring tale of my experiments, but I though
it could help others so ... :)
If you just want to see how to use lanes + sockets, go at the bottom.

In my never-ending quest to make http://mailcatch.com handle more
incoming mails with less cpu usage I have tested many ways to handle
a multitude of sockets which all send data to the same storage.

At first I used lua coroutines with Loop scheduler mechanism but it was
quickly eating up cpu. Then I tried copas scheduler, it was a bit faster
but still not enough for the growing incoming mails count. I tried to
write my own scheduler, with as less checks as possible to make it
faster but not much was gained there.

Then I gave up and tried with a forking daemon. It was much better and
could handle more mails, but a problem was accessing a common storage.
In the previous versions I simply used some tables but with a forking
daemon it was pointless. I have tried using memcached and even a RamFS
mounted somewhere in the filesystem. The later option worked pretty good
but forced me to use file locks and when under some pressure it happened
to fail sometimes.

So for the N'th time I've tried my hand at lua threading and this time,
it works!

For that I have used Lua Lanes, a neat threading library indeed. It
spawns a few tens or hundreds of worker threads at boot and they all
wait for a linda key. An other thread is responsible for the storage
which thus can go back inside the program's memory (this removes the
need to serialize data, thus increasing speed even more).
And the main thread is a classic infinite socket accept loop.

But now luasocket accept() function returns a socket objects, which
lanes can't pass through linda objects to worker threads. So I modified
luasocket source to allow two very small changes (I would be so happy
should they be included upstream):
* a new acceptfd() method that returns the file descriptor of the socket
  instead of a socket object
* the tcp() constructor takes an optional parameter which is a file
  descriptor and construct a client socket directly from it

I have attached the patch and a Very-Quick-Dirty-Simple-Example (which
just spawns a new thread for each incoming connection, this is less
optimal as it will require a new lua state+loading of all required
libraries for each connections).

Well, I've bothered you all too long, if you like my patch I'm happy.
If you include it in luasocket I'm even happier.
If you have a better solution, I'm EVEN happier :)
--- luasocket-2.0.2/src/tcp.c	2007-10-15 06:21:05.000000000 +0200
+++ luasocket-2.0.2.new/src/tcp.c	2009-08-24 23:58:47.000000000 +0200
@@ -30,6 +30,7 @@
 static int meth_shutdown(lua_State *L);
 static int meth_receive(lua_State *L);
 static int meth_accept(lua_State *L);
+static int meth_acceptfd(lua_State *L);
 static int meth_close(lua_State *L);
 static int meth_setoption(lua_State *L);
 static int meth_settimeout(lua_State *L);
@@ -42,6 +43,7 @@
     {"__gc",        meth_close},
     {"__tostring",  auxiliar_tostring},
     {"accept",      meth_accept},
+    {"acceptfd",      meth_acceptfd},
     {"bind",        meth_bind},
     {"close",       meth_close},
     {"connect",     meth_connect},
@@ -186,6 +188,27 @@
 }
 
 /*-------------------------------------------------------------------------*\
+* Waits for and returns a client object attempting connection to the
+* server object
+\*-------------------------------------------------------------------------*/
+static int meth_acceptfd(lua_State *L)
+{
+    p_tcp server = (p_tcp) auxiliar_checkclass(L, "tcp{server}", 1);
+    p_timeout tm = timeout_markstart(&server->tm);
+    t_socket sock;
+    int err = socket_accept(&server->sock, &sock, NULL, NULL, tm);
+    /* if successful, push client socket */
+    if (err == IO_DONE) {
+        lua_pushnumber(L, sock);
+        return 1;
+    } else {
+        lua_pushnil(L);
+        lua_pushstring(L, socket_strerror(err));
+        return 2;
+    }
+}
+
+/*-------------------------------------------------------------------------*\
 * Binds an object to an address 
 \*-------------------------------------------------------------------------*/
 static int meth_bind(lua_State *L)
@@ -316,12 +339,19 @@
 static int global_create(lua_State *L)
 {
     t_socket sock;
-    const char *err = inet_trycreate(&sock, SOCK_STREAM);
+    const char *err = NULL;
+    int fd = luaL_optnumber(L, 1, -1);
+    if (fd < 1)
+        err = inet_trycreate(&sock, SOCK_STREAM);
+    else
+        sock = fd;
     /* try to allocate a system socket */
     if (!err) { 
         /* allocate tcp object */
         p_tcp tcp = (p_tcp) lua_newuserdata(L, sizeof(t_tcp));
-        /* set its type as master object */
+        if (fd >= 1)
+            auxiliar_setclass(L, "tcp{client}", -1);
+        else
         auxiliar_setclass(L, "tcp{master}", -1);
         /* initialize remaining structure fields */
         socket_setnonblocking(&sock);
require 'lanes'
require 'socket'

local linda = lanes.linda()

function handler(fd)
	require'socket'

	local sock = socket.tcp(fd)
	local line = sock:receive("*l")
	linda:send("order", {line=line})
	sock:send("line: "..tostring(line).." \n")
	sock:close()
	return "done"
end

function main_thread()
	while true do
		local order = linda:receive(3, "order")
		if order then
			print("order", order.line)
		end
	end
end

function run()
	local s = socket.bind("0.0.0.0", 2525)
	local threads = {}

	lanes.gen("*", main_thread)()

	while true do
		local fd = s:acceptfd()
		threads[lanes.gen("*", handler)(fd)] = fd

		print("threads:")
		for h, s in pairs(threads) do
			print(" * ", h, " :=: ", h.status)

			local v, err = h:join(0)
			if not v and err then
				print(err)
			elseif v and v == "done" then
				threads[h] = nil
			end
		end
	end
end

run()