lua-users home
lua-l archive

[Date Prev][Date Next][Thread Prev][Thread Next] [Date Index] [Thread Index]


On Fri, Sep 19, 2014 at 11:12:48PM +0200, chris 0 wrote:
> Hi,
> 
> I'm looking for a way in which to spawn a pre-emptive thread (e.g. pthread)
> for each TCP connection, in both lua and luajit.
> 
> The problem being I can't seem to find any libraries that are able to do
> this.
> 
> I've tried combining lua lanes with lua sockets, which fails because of the
> 'deep' data
> from lua sockets.  I was working on trying to handle deep data using
> luaG_newdeepuserdata(L, idfunc) in lua sockets meth_accept function, but
> i'm currently
> getting nothing really but segfaults which I can't seem to diagnose why:
<snip>
> I've looked at luv - based on libuv, however this doesn't appear to support
> threading.
> 
> If anyones got any suggestions, I'd be most appreciative!
> 

My cqueues project supports this. See

	http://25thandclement.com/~william/projects/cqueues.html

Extensive API documentation can be found at

	http://25thandclement.com/~william/projects/cqueues.pdf

Github mirror is at

	https://github.com/wahern/cqueues

The following code is an example I just threw together, starting from the
script examples/thread.count included with the project source code.

It's an echo server: when you connect to the listening port it will spawn a
new thread which runs an I/O loop to echo back anything typed or execute any
known commands.

There might be bugs. But I've quickly tested it using LuaJIT 2.0.1, Lua
5.2.1, and Lua 5.3.0 (alpha) on OS X, and Lua 5.2.3 on Linux. It should work
identically on all those versions and all supported platforms. However, it
won't work on PUC Lua 5.1 without patches to implement yielding across
methamethods.


#!/usr/bin/env lua5.2

local PORT = ... or 8000

local cqueues = require"cqueues"
local auxlib = require"cqueues.auxlib"
local socket = require"cqueues.socket"
local thread = require"cqueues.thread"
local errno = require"cqueues.errno"
local signal = require"cqueues.signal"
local assert = auxlib.assert

--
-- Slave function to run in separate thread. Note that when executed the
-- slave function is running in a new Lua environment, so we have to
-- duplicate some of the premable above.
--
-- We pass in the client socket using the built-in descriptor passing
-- methods, :sendfd and :recvfd, as the thread library does not yet know how
-- to copy user data across threads. The first argument to each thread-init
-- function is one end of a socket pair, followed by any arguments passed by
-- the caller. We can use the socket pair to pass descriptors back and forth
-- between the threads.
--
-- Note that we must run a new event loop even if we're not going to be very
-- asynchronous because the socket routines expect to be able to yield.
-- There's preliminary support for running socket methods outside of a
-- scheduled coroutine, but it's not yet well tested. The event loop is
-- using epoll on Linux, kqueue on BSD, and Ports on Solaris.
--
local function slave(com)
	local cqueues = require"cqueues"
	local auxlib = require"cqueues.auxlib"
	local assert = auxlib.assert --> translates any errno #s to strings
	local fibers = cqueues.new() --> slave loop

	local ok, why = fibers:wrap(function ()
		local _, cli = assert(com:recvfd())
		local af, ip, port = assert(cli:peername())
		local who = string.format("%s:%d", ip, port)

		io.stderr:write("hello ", who, "\n")

		-- Put into text code so we convert CRLF to LF on input and
		-- LF to CRLF on output. And set output as line-buffered so
		-- we don't need to manually flush.
		cli:setmode("t", "tl")

		-- run new connection asychronously
		fibers:wrap(function ()
			cli:write("# hello ", who, "\n")
			cli:write('# send "quit" or "exit" to disconnect\n')
			cli:write('# send "throw [...]" to simulate error\n')

			for ln in cli:lines"*l" do
				-- run command or echo text
				if ln == "quit" or ln == "exit" then
					cli:write("# goodbye\n")
					cli:shutdown"rw"
					io.stderr:write("goodbye ", who, "\n")
				elseif ln:match("^throw ") then
					local what = ln:match("^throw (.*)")
					error(what)
				else
					io.stderr:write(who, ' said "', ln, '"\n')
					cli:write('# you said "', ln, '"\n')
				end
			end
		end)
	end):loop()

	if not ok then
		error(why, 0)
	end
end


local fibers = cqueues.new() -- master loop

--
-- Spawn a new thread and pass the client socket.
--
local function spawn(cli)
	--
	-- Run in a new coroutine so we can wait asynchronously for thread
	-- to return. If we were doing this is in production code we'd
	-- recycle threads rather than letting them exit.
	--
	fibers:wrap(function ()
		local af, ip, port = assert(cli:peername())
		local who = string.format("%s:%d", ip, port)

		io.stderr:write("spawning thread for ", who, "\n")

		local thr, com = assert(thread.start(slave))

		assert(com:sendfd("new connection", cli))
		cli:close() -- no longer needed in our thread

		-- wait on thread and print any errors
		local ok, why = thr:join()

		-- convert system error to string because we're not checking
		-- for particular values
		if type(why) == "number" then
			why = errno.strerror(why)
		end

		if not ok then
			io.stderr:write("failed to join thread: ", why, "\n")
		elseif why ~= nil then
			io.stderr:write("error in thread: ", why, "\n")
		else
			io.stderr:write("finished thread for ", who, "\n")
		end
	end)
end

--
-- Pull new connections from listening port.
--
fibers:wrap(function ()
	local port = socket.listen("0.0.0.0", PORT)

	for cli in port:clients() do
		spawn(cli)
	end
end)

--
-- Handle signals gracefully.
--
fibers:wrap(function ()
	local TERM = signal.SIGTERM
	local INT = signal.SIGINT
	local HUP = signal.SIGHUP

	-- NOTE: Delivered signals cannot be caught by Linux signalfd or
	-- Solaris sigtimedwait. Works without blocking on *BSD and OS X.
	signal.block(TERM, INT, HUP)

	local signo = assert(assert(signal.listen(TERM, INT, HUP)):wait())

	io.stderr:write("exiting on signal ", signal[signo], "\n")
	os.exit(0)
end)

assert(fibers:loop()) --> our master event loop