[Date Prev][Date Next][Thread Prev][Thread Next]
[Date Index]
[Thread Index]
- Subject: Re: Lua, threading & sockets
- From: William Ahern <william@...>
- Date: Fri, 19 Sep 2014 15:42:51 -0700
On Fri, Sep 19, 2014 at 11:12:48PM +0200, chris 0 wrote:
> Hi,
>
> I'm looking for a way in which to spawn a pre-emptive thread (e.g. pthread)
> for each TCP connection, in both lua and luajit.
>
> The problem being I can't seem to find any libraries that are able to do
> this.
>
> I've tried combining lua lanes with lua sockets, which fails because of the
> 'deep' data
> from lua sockets. I was working on trying to handle deep data using
> luaG_newdeepuserdata(L, idfunc) in lua sockets meth_accept function, but
> i'm currently
> getting nothing really but segfaults which I can't seem to diagnose why:
<snip>
> I've looked at luv - based on libuv, however this doesn't appear to support
> threading.
>
> If anyones got any suggestions, I'd be most appreciative!
>
My cqueues project supports this. See
http://25thandclement.com/~william/projects/cqueues.html
Extensive API documentation can be found at
http://25thandclement.com/~william/projects/cqueues.pdf
Github mirror is at
https://github.com/wahern/cqueues
The following code is an example I just threw together, starting from the
script examples/thread.count included with the project source code.
It's an echo server: when you connect to the listening port it will spawn a
new thread which runs an I/O loop to echo back anything typed or execute any
known commands.
There might be bugs. But I've quickly tested it using LuaJIT 2.0.1, Lua
5.2.1, and Lua 5.3.0 (alpha) on OS X, and Lua 5.2.3 on Linux. It should work
identically on all those versions and all supported platforms. However, it
won't work on PUC Lua 5.1 without patches to implement yielding across
methamethods.
#!/usr/bin/env lua5.2
local PORT = ... or 8000
local cqueues = require"cqueues"
local auxlib = require"cqueues.auxlib"
local socket = require"cqueues.socket"
local thread = require"cqueues.thread"
local errno = require"cqueues.errno"
local signal = require"cqueues.signal"
local assert = auxlib.assert
--
-- Slave function to run in separate thread. Note that when executed the
-- slave function is running in a new Lua environment, so we have to
-- duplicate some of the premable above.
--
-- We pass in the client socket using the built-in descriptor passing
-- methods, :sendfd and :recvfd, as the thread library does not yet know how
-- to copy user data across threads. The first argument to each thread-init
-- function is one end of a socket pair, followed by any arguments passed by
-- the caller. We can use the socket pair to pass descriptors back and forth
-- between the threads.
--
-- Note that we must run a new event loop even if we're not going to be very
-- asynchronous because the socket routines expect to be able to yield.
-- There's preliminary support for running socket methods outside of a
-- scheduled coroutine, but it's not yet well tested. The event loop is
-- using epoll on Linux, kqueue on BSD, and Ports on Solaris.
--
local function slave(com)
local cqueues = require"cqueues"
local auxlib = require"cqueues.auxlib"
local assert = auxlib.assert --> translates any errno #s to strings
local fibers = cqueues.new() --> slave loop
local ok, why = fibers:wrap(function ()
local _, cli = assert(com:recvfd())
local af, ip, port = assert(cli:peername())
local who = string.format("%s:%d", ip, port)
io.stderr:write("hello ", who, "\n")
-- Put into text code so we convert CRLF to LF on input and
-- LF to CRLF on output. And set output as line-buffered so
-- we don't need to manually flush.
cli:setmode("t", "tl")
-- run new connection asychronously
fibers:wrap(function ()
cli:write("# hello ", who, "\n")
cli:write('# send "quit" or "exit" to disconnect\n')
cli:write('# send "throw [...]" to simulate error\n')
for ln in cli:lines"*l" do
-- run command or echo text
if ln == "quit" or ln == "exit" then
cli:write("# goodbye\n")
cli:shutdown"rw"
io.stderr:write("goodbye ", who, "\n")
elseif ln:match("^throw ") then
local what = ln:match("^throw (.*)")
error(what)
else
io.stderr:write(who, ' said "', ln, '"\n')
cli:write('# you said "', ln, '"\n')
end
end
end)
end):loop()
if not ok then
error(why, 0)
end
end
local fibers = cqueues.new() -- master loop
--
-- Spawn a new thread and pass the client socket.
--
local function spawn(cli)
--
-- Run in a new coroutine so we can wait asynchronously for thread
-- to return. If we were doing this is in production code we'd
-- recycle threads rather than letting them exit.
--
fibers:wrap(function ()
local af, ip, port = assert(cli:peername())
local who = string.format("%s:%d", ip, port)
io.stderr:write("spawning thread for ", who, "\n")
local thr, com = assert(thread.start(slave))
assert(com:sendfd("new connection", cli))
cli:close() -- no longer needed in our thread
-- wait on thread and print any errors
local ok, why = thr:join()
-- convert system error to string because we're not checking
-- for particular values
if type(why) == "number" then
why = errno.strerror(why)
end
if not ok then
io.stderr:write("failed to join thread: ", why, "\n")
elseif why ~= nil then
io.stderr:write("error in thread: ", why, "\n")
else
io.stderr:write("finished thread for ", who, "\n")
end
end)
end
--
-- Pull new connections from listening port.
--
fibers:wrap(function ()
local port = socket.listen("0.0.0.0", PORT)
for cli in port:clients() do
spawn(cli)
end
end)
--
-- Handle signals gracefully.
--
fibers:wrap(function ()
local TERM = signal.SIGTERM
local INT = signal.SIGINT
local HUP = signal.SIGHUP
-- NOTE: Delivered signals cannot be caught by Linux signalfd or
-- Solaris sigtimedwait. Works without blocking on *BSD and OS X.
signal.block(TERM, INT, HUP)
local signo = assert(assert(signal.listen(TERM, INT, HUP)):wait())
io.stderr:write("exiting on signal ", signal[signo], "\n")
os.exit(0)
end)
assert(fibers:loop()) --> our master event loop