Coroutines As Event Handlers

lua-users home
wiki

This is a very basic Supervisor (or event dispatcher), similar to ones found in Copas and ProgrammingInLua? (but not based on any of those). It was written partly in response to a question on the mailing list, and I've tried to comment it fully; there is an example of using it at the end. The question it attempts to respond to is:

So, how would one do something like...

result = block(self, event)
...where block() is a SWIG wrapped function that takes the script context (i.e. state) and an event as arguments, and blocks (yields) the script until the event occurs, after which the details are copied into a result structure (resumes)?

The response was, why not do it in Lua? The scaffolding below can handle hundreds of thousands of event dispatches per second, so it is not likely that writing it in C will help much; CPU time will be dominated by the cost of actually processing the requests.

local cocreate, resume, yield, costatus =
  coroutine.create, coroutine.resume, coroutine.yield, coroutine.status

local LOG = function(...)
  io.stderr:write(os.date"!%F %T ")
  io.stderr:write(...)
  io.stderr:write"\n"
end

-- This function expects to be a coroutine. When it is started,
-- it initializes itself, and then awaits further instructions.
-- Whenever it has nothing else to do it yields, unless it was
-- told to quit in which case it returns.

function supervisor(...)
  -- each event is a table of sets of coroutines;
  -- we assume we can handle the events in any order.
  local events = {}
  -- This table associates coroutines with names, mostly
  -- for debugging
  local name = {}
  -- to signal that we're done
  local done = false

  -- This function removes all references to a coro
  -- from all events, which we do when the coro dies.
  -- We could use weak tables to do the same thing.
  -- On the other hand, we might want to clean up other
  -- data associated with the coroutine. Anyway, this is easy:

  local function remove(coro)
    for _, coros in pairs(events) do
      coros[coro] = nil
    end
    name[coro] = nil
  end

  -- Convenience function to log tracebacks
  local function log_traceback(coro, msg, err)
    LOG("Coroutine ", name[coro], " ", msg, ":")
    local tb = debug.traceback(coro, err)
    for line in tb:gmatch"[^\n]+" do LOG("  ", line) end
  end

  -- The core routine, which handles the results of a
  -- coroutine.resume. First argument is the coro, rest
  -- are the return values from coro.resume(). The only
  -- thing we're expecting from the coro is a "waitfor"
  -- command, but other ones could be added...
  local function handle_resume(coro, ok, todo, ...)
    if costatus(coro) == "dead" then
      remove(coro) -- always get rid of it.
      -- log a traceback on error
      if not ok then log_traceback(coro, "failed with error", todo) end
    elseif todo ~= "waitfor" then
      -- todo should be "waitfor" and ... the event.
      -- If we had more verbs, we could use a case-table.
      log_traceback(coro, "unknown request "..tostring(todo),
                          "bad return value")
      remove(coro)
    else
      -- We don't care if the event doesn't exist (should we?)
      local q = events[...]
      if q == nil then q = {}; events[...] = q end
      -- We might want to tell the upper level that a new
      -- event needs to be recognized, though.
      -- if next(q) == nil then addevent((...)) end
      q[coro] = true
    end
  end 
      
  -- A table of actions; essentially a case statement
  local handler = {}

  -- do a clean exit (although actually there's no
  -- cleanup necessary, but there might be.)
  function handler.done()
    done = true
  end

  -- debugging: report on status
  function handler.status()
    -- invert the events table for nicer printing
    local n, e = {}, {}
    for evt, coros in pairs(events) do
      for coro in pairs(coros) do
        local who = name[coro]
        n[#n+1] = who
        e[who] = evt
      end
    end
    -- sort the names
    table.sort(n)
    -- and produce the report
    for _, who in ipairs(n) do
      LOG(who, " is waiting for ", tostring(e[who]))
    end
  end

  -- introduce a new actor (coroutine) into the system, and run
  -- it until it blocks
  function handler.introduce(who, func, ...)
    local coro = cocreate(func)
    name[coro] = who
    -- let it initialize itself
    return handle_resume(coro, resume(coro, ...))
  end

  -- send an event to whoever cares
  function handler.signal(what, ...)
    local q = events[what]
    if q and next(q) then
      for coro in pairs(q) do
        q[coro] = nil  -- handled
        handle_resume(coro, resume(coro, what, ...))
      end
      -- Maybe tell the top-level whether the event is
      -- still active?
      return next(q) ~= nil
    else
      -- No-one cares, sniff. "Log" the fact
      LOG("Event ", tostring(what), " dropped into the bit bucket")
    end
  end

  -- Set the __index meta for the handler table to avoid having
  -- to test for bad commands explicitly
  local function logargs(...)
    local t = {n = select("#", ...), ...}
    if t.n > 0 then
      for i = 1, t.n do
        local ti = t[i]
        t[i] = (type(ti) == "string" and "%q" or "%s")
               :format(tostring(ti))
      end
      LOG("..with arguments: ", table.concat(t, ", "))
    end
  end
  function handler:__index(what)
    LOG("Supervisor received unknown message ", what)
    return logargs
  end
  setmetatable(handler, handler)

  -- auxiliary function to handle a command, necessary to
  -- capture multiple returns from yield()
  local function handle(simonsays, ...)
    return handler[simonsays](...)
  end

  -- The main loop is a bit anti-climactic
  LOG"Starting up"
  local rv = handle(...)
  repeat 
    rv = handle(yield(rv))
  until done
  LOG"Shutting down"
end

------------------------------------------------------------------------------
--  Sample very simple processor
------------------------------------------------------------------------------

local function block(event) return yield("waitfor", event) end
local function process(n)
  for i = 1, n do
    local e, howmany = block"TICK"
    assert(e == "TICK" and howmany == i)
  end
end

------------------------------------------------------------------------------
-- Test driver in Lua
------------------------------------------------------------------------------

local super = cocreate(supervisor)

local function check(ok, rv)
  if not ok then
    LOG("supervisor crashed")
    local tb = debug.traceback(super, rv)
    for line in tb:gmatch"[^\n]+" do LOG("  ", line) end
    os.exit(1)
  end
  return rv
end

local function send(msg, ...) return check(resume(super, msg, ...)) end

local nprocess, nreps = tonumber(arg[1]) or 10, tonumber(arg[2] or 12)
for i = 1, tonumber(nprocess) do
  send("introduce", ("p%04i"):format(i), process, nreps)
end
local j = 1
while send("signal", "TICK", j) do
  j = j + 1
end
-- This should be empty
send"status"
send"done"
LOG("Endcount ", j)

Define Supervisor. [1] ? --DavidManura

This definition comes from a paper about Erlang, and is somewhat circular, but it sums it up pretty well: "a behaviour that implements a specific style of supervisor-worker programming which allows supervision trees to be constructed dynamically."

In other words, a Supervisor is responsible for the care and maintenance of a bunch of workers, organizing their workload, sending them events (as in this example) and handling errors.

There are also some nice examples in [Termite] (pdf document, readable if you like parentheses).

RecentChanges · preferences
edit · history
Last edited March 2, 2007 5:23 am GMT (diff)