aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--luaevent/CHANGELOG9
-rw-r--r--luaevent/COROUTINE_MANAGEMENT39
-rw-r--r--luaevent/include/luaevent.h8
-rw-r--r--luaevent/luaevent.lua84
-rw-r--r--luaevent/src/luaevent.c179
-rw-r--r--luaevent/test/test.lua34
-rw-r--r--luaevent/test/testClient.lua38
7 files changed, 218 insertions, 173 deletions
diff --git a/luaevent/CHANGELOG b/luaevent/CHANGELOG
index c3622c8..f2f964d 100644
--- a/luaevent/CHANGELOG
+++ b/luaevent/CHANGELOG
@@ -1,4 +1,13 @@
======
+0.1.2 - Revision 15 - 2007-08-18
++ Setup system to use new coro management as described in COROUTINE_MANAGEMENT
+ The callbacks are called from the event_loop 'thread' rather than that which they are
+ created in. This will prevent the self-resume problem as well as dead-thread problems.
+- Recognized issues to fix in next release:
+ * Socket/event closing needs to be cleaned
+ * luaevent.lua needs refactoring
+ * luaevent.[ch] need to be cleaned up
+======
0.1.1 - Revision 14 - 2007-06-13
+ Fixed event-handling code to cancel events on nothing being returned
+ Added socket/object cleanup.
diff --git a/luaevent/COROUTINE_MANAGEMENT b/luaevent/COROUTINE_MANAGEMENT
new file mode 100644
index 0000000..1af0cb4
--- /dev/null
+++ b/luaevent/COROUTINE_MANAGEMENT
@@ -0,0 +1,39 @@
+Due to the issue w/ self-resuming threads and crashing out threads,
+a management system needs to be in place.
+
+Example thread system:
+
+MAIN
+EVENT_LOOP --------running---
+WAITING ON READ
+WAITING ON WRITE
+WAITING ON CONNECT
+
+
+Since main and the other 'waiting' threads are yielded, it is unsafe to call things arbitrarily on them
+or resume them from themselves...
+However the EVENT_LOOP one is running and thus can execute the callbacks (which can resume the threads)
+Each of the 'waiting' events are attached to an event and contain a pointer, this pointer can be setup to point
+to a per event_base item which will be updated w/ the lua_State of whatever calls EVENT_LOOP...
+this will guarantee that the thread will be resumed from the currently running EVENT_LOOP
+
+
+Other system that's more complicated and less likely:
+
+MAIN
+EVENT_LOOP a -----running---
+
+WAITING ON READ a
+WAITING ON WRITE a
+
+EVENT_LOOP b ----yielded
+WAITING ON READ b
+
+
+Since there can only be one event_loop running per event_base, you do not have to worry about
+cross-pollination of the different waits...
+
+NOTES:
+If the event_loop thread goes away... then the waiting coroutines will have no way to get back...
+though in this case, they are dead in the water anyways.. until a new event_loop starts...
+in which case the lua_State references has been updated... \ No newline at end of file
diff --git a/luaevent/include/luaevent.h b/luaevent/include/luaevent.h
index 8b97be7..2878632 100644
--- a/luaevent/include/luaevent.h
+++ b/luaevent/include/luaevent.h
@@ -9,10 +9,14 @@
#include <event.h>
typedef struct {
+ struct event_base* base;
+ lua_State* loop_L;
+} le_base;
+
+typedef struct {
struct event ev;
- lua_State* L;
+ le_base* base;
int callbackRef;
- int objectRef; /* TEMP */
} le_callback;
int luaopen_luaevent(lua_State* L);
diff --git a/luaevent/luaevent.lua b/luaevent/luaevent.lua
index c6bd015..ec9bc15 100644
--- a/luaevent/luaevent.lua
+++ b/luaevent/luaevent.lua
@@ -7,9 +7,37 @@ require("luaevent.core")
local EV_READ = luaevent.core.EV_READ
local EV_WRITE = luaevent.core.EV_WRITE
-local fair = false
+local fair = false -- Not recommended for most cases...
+local base = luaevent.core.new()
+local sockMap = setmetatable({}, {'__mode', 'kv'})
+local function addevent(sock, ...)
+ local item = base:addevent(sock, ...)
+ if not item then print("FAILED TO SETUP ITEM") return item end
+ local fd = sock:getfd()
+ sockMap[item] = fd
+ print("SETUP ITEM FOR: ", fd)
+ if not hookedObjectMt then
+ hookedObjectMt = true
+ --[[
+ local mt = debug.getmetatable(item)
+ local oldGC = mt.__gc
+ mt.__gc = function(...)
+ print("RELEASING ITEM FOR: ", sockMap[(...)])
+ return oldGC(...)
+ end]]
+ end
+ return item
+end
+-- Weak keys.. the keys are the client sockets
+local clientTable = setmetatable({}, {'__mode', 'kv'})
-local hookedObjectMt = false
+local function getWrapper()
+ local running = coroutine.running()
+ return function(...)
+ if coroutine.running() == running then return end
+ return select(2, coroutine.resume(running, ...))
+ end
+end
function send(sock, data, start, stop)
local s, err
@@ -21,9 +49,11 @@ function send(sock, data, start, stop)
-- Add extra coro swap for fairness
-- CURRENTLY DISABLED FOR TESTING......
if fair and math.random(100) > 90 then
+ if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
coroutine.yield(EV_WRITE)
end
if s or err ~= "timeout" then return s, err, sent end
+ if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
coroutine.yield(EV_WRITE)
until false
end
@@ -33,6 +63,7 @@ function receive(sock, pattern, part)
repeat
s, err, part = sock:receive(pattern, part)
if s or err ~= "timeout" then return s, err, part end
+ if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end
coroutine.yield(EV_READ)
until false
end
@@ -45,6 +76,7 @@ function receivePartial(client, pattern)
s, err, part = client:receive(pattern)
if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or
err ~= "timeout" then return s, err, part end
+ if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end
coroutine.yield(EV_READ)
until false
end
@@ -52,6 +84,7 @@ function connect(sock, ...)
sock:settimeout(0)
local ret, err = sock:connect(...)
if ret or err ~= "timeout" then return ret, err end
+ if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
coroutine.yield(EV_WRITE)
ret, err = sock:connect(...)
if err == "already connected" then
@@ -66,74 +99,59 @@ local function clientCoroutine(sock, handler)
-- Figure out what to do ......
return handler(sock)
end
-
+local function handleClient(co, client, handler)
+ local ok, res, event = coroutine.resume(co, client, handler)
+end
local function serverCoroutine(sock, callback)
+ local listenItem = addevent(sock, EV_READ, getWrapper())
repeat
local event = coroutine.yield(EV_READ)
-- Get new socket
local client = sock:accept()
if client then
- --cl[#cl + 1] = client
client:settimeout(0)
- local coFunc = coroutine.wrap(clientCoroutine)
- luaevent.core.addevent(client, coFunc, client, callback)
+ local co = coroutine.create(clientCoroutine)
+ handleClient(co, client, callback)
end
until false
end
-
-local oldAddEvent = luaevent.core.addevent
-luaevent.core.addevent = function(...)
- local item = oldAddEvent(...)
- if not item then print("FAILED TO SETUP ITEM") return item end
- print("SETUP ITEM FOR: ", debug.getmetatable(item).getfd(item))
- if not hookedObjectMt then
- hookedObjectMt = true
- local mt = debug.getmetatable(item)
- local oldGC = mt.__gc
- mt.__gc = function(...)
- print("RELEASING ITEM FOR: ", mt.getfd(...))
- return oldGC(...)
- end
- end
- return item
-end
-
function addserver(sock, callback)
- local coFunc = coroutine.wrap(serverCoroutine)
- luaevent.core.addevent(sock, coFunc, sock, callback)
+ local coro = coroutine.create(serverCoroutine)
+ assert(coroutine.resume(coro, sock, callback))
end
-function addthread(sock, func, ...)
- local coFunc = coroutine.wrap(func)
- luaevent.core.addevent(sock, coFunc, ...)
+function addthread(func, ...)
+ return coroutine.resume(coroutine.create(func), ...)
end
local _skt_mt = {__index = {
connect = function(self, ...)
return connect(self.socket, ...)
end,
send = function (self, data)
- return send (self.socket, data)
+ return send(self.socket, data)
end,
receive = function (self, pattern)
if (self.timeout==0) then
return receivePartial(self.socket, pattern)
end
- return receive (self.socket, pattern)
+ return receive(self.socket, pattern)
end,
flush = function (self)
- return flush (self.socket)
+ return flush(self.socket)
end,
settimeout = function (self,time)
self.timeout=time
return
end,
+
close = function(self)
+ clientTable[self.socket]:close()
self.socket:close()
end
}}
function wrap(sock)
return setmetatable({socket = sock}, _skt_mt)
end
-loop = luaevent.core.loop \ No newline at end of file
+loop = function(...) base:loop(...) end \ No newline at end of file
diff --git a/luaevent/src/luaevent.c b/luaevent/src/luaevent.c
index e5e33cf..38f19ce 100644
--- a/luaevent/src/luaevent.c
+++ b/luaevent/src/luaevent.c
@@ -1,111 +1,87 @@
/* LuaEvent - Copyright (C) 2007 Thomas Harning <harningt@gmail.com>
* Licensed as LGPL - See doc/COPYING for details */
-
-#include "luaevent.h"
+ #include "luaevent.h"
#include <lua.h>
#include <lauxlib.h>
+#include <assert.h>
#define EVENT_BASE_MT "EVENT_BASE_MT"
#define EVENT_CALLBACK_ARG_MT "EVENT_CALLBACK_ARG_MT"
-#define EVENT_BASE_LOCATION 1
+#define MAIN_THREAD_LOCATION 1
-static void setEventBase(lua_State* L, struct event_base* base) {
- struct event_base** pbase = lua_newuserdata(L, sizeof(base));
- *pbase = base;
- luaL_getmetatable(L, EVENT_BASE_MT);
- lua_setmetatable(L, -2);
- lua_rawseti(L, LUA_ENVIRONINDEX, EVENT_BASE_LOCATION);
+void setMainThread(lua_State* L) {
+ lua_pushthread(L);
+ lua_rawseti(L, LUA_ENVIRONINDEX, MAIN_THREAD_LOCATION);
}
-struct event_base* getEventBase(lua_State* L) {
- struct event_base* base;
- lua_rawgeti(L, LUA_ENVIRONINDEX, EVENT_BASE_LOCATION);
- base = *(struct event_base**)lua_topointer(L, -1);
+lua_State* getMainThread(lua_State* L) {
+ lua_State* g_L;
+ lua_rawgeti(L, LUA_ENVIRONINDEX, MAIN_THREAD_LOCATION);
+ g_L = lua_tothread(L, -1);
lua_pop(L, 1);
- return base;
+ return g_L;
}
-static void freeCallbackArgs(le_callback* arg) {
- if(arg->L) {
- lua_State* L = arg->L;
- arg->L = NULL;
- event_del(&arg->ev);
- luaL_unref(L, LUA_REGISTRYINDEX, arg->callbackRef);
- luaL_unref(L, LUA_REGISTRYINDEX, arg->objectRef);
- }
+int luaevent_newbase(lua_State* L) {
+ le_base *base = (le_base*)lua_newuserdata(L, sizeof(le_base));
+ base->loop_L = NULL; /* No running loop */
+ base->base = event_init();
+ luaL_getmetatable(L, EVENT_BASE_MT);
+ lua_setmetatable(L, -2);
+ return 1;
}
-static int call_callback_function(lua_State* L, int argCount) {
- int ret;
- if(lua_pcall(L, argCount, 1, 0) || !(lua_isnil(L, -1) || lua_isnumber(L, -1))) {
- printf("ERROR IN INIT: %s\n", lua_tostring(L, -1));
- lua_pop(L, 1);
- return -1;
- }
- /* Lua_isnil returns 1 if the value is nil... */
- ret = lua_tointeger(L, -1) | -lua_isnil(L, -1);
- lua_pop(L, 1);
- if(ret < 0) { /* Done, no need to setup event */
- return -1;
- }
- if(ret != EV_READ && ret != EV_WRITE) {
- printf("BAD RET_VAL IN INIT: %i\n", ret);
+void freeCallbackArgs(le_callback* arg, lua_State* L) {
+ if(arg->base) {
+ arg->base = NULL;
+ event_del(&arg->ev);
+ luaL_unref(L, LUA_REGISTRYINDEX, arg->callbackRef);
}
- return ret;
}
-
-static void luaevent_callback(int fd, short event, void* p);
-
-static void setup_event(le_callback* arg, int fd, short event, int resetEvent) {
- /* Setup event... */
- if(resetEvent) event_del(&arg->ev);
- event_set(&arg->ev, fd, event| EV_PERSIST, luaevent_callback, arg);
- if(!resetEvent) event_base_set(getEventBase(arg->L), &arg->ev);
- event_add(&arg->ev, NULL);
-}
-
/* le_callback is allocated at the beginning of the coroutine in which it
is used, no need to manually de-allocate */
/* Index for coroutine is fd as integer for *nix, as lightuserdata for Win */
static void luaevent_callback(int fd, short event, void* p) {
le_callback* arg = p;
- lua_State* L = arg->L;
+ lua_State* L;
int ret;
+ assert(arg && arg->base && arg->base->loop_L);
+ L = arg->base->loop_L;
lua_rawgeti(L, LUA_REGISTRYINDEX, arg->callbackRef);
lua_pushinteger(L, event);
-
- if(-1 == (ret = call_callback_function(L, 1))) {
- freeCallbackArgs(arg);
- return;
+ lua_call(L, 1, 1);
+ ret = lua_tointeger(L, -1);
+ lua_pop(L, 1);
+ if(ret == -1) {
+ freeCallbackArgs(arg, L);
+ } else {
+ struct event *ev = &arg->ev;
+ int newEvent = ret;
+ if(newEvent != event) { // Need to hook up new event...
+ event_del(ev);
+ event_set(ev, fd, EV_PERSIST | newEvent, luaevent_callback, arg);
+ event_add(ev, NULL);
+ }
}
-
- if(event != ret)
- setup_event(arg, fd, ret, 1);
}
static int luaevent_base_gc(lua_State* L) {
- struct event_base** pbase = luaL_checkudata(L, 1, EVENT_BASE_MT);
- if(*pbase) {
- event_base_free(*pbase);
- *pbase = NULL;
+ le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT);
+ if(base->base) {
+ event_base_free(base->base);
+ base->base = NULL;
}
return 0;
}
static int luaevent_cb_gc(lua_State* L) {
le_callback* arg = luaL_checkudata(L, 1, EVENT_CALLBACK_ARG_MT);
- freeCallbackArgs(arg);
+ freeCallbackArgs(arg, L);
return 0;
}
-static int luaevent_cb_getfd(lua_State* L) {
- le_callback* arg = luaL_checkudata(L, 1, EVENT_CALLBACK_ARG_MT);
- lua_pushinteger(L, arg->ev.ev_fd);
- return 1;
-}
-
-static int getSocketFd(lua_State* L, int idx) {
+int getSocketFd(lua_State* L, int idx) {
int fd;
luaL_checktype(L, idx, LUA_TUSERDATA);
lua_getfield(L, idx, "getfd");
@@ -118,52 +94,48 @@ static int getSocketFd(lua_State* L, int idx) {
return fd;
}
-static void push_new_callback(lua_State* L, int callbackRef, int fd, short event) {
- le_callback* arg = lua_newuserdata(L, sizeof(*arg));
+/* sock, event, callback */
+static int luaevent_addevent(lua_State* L) {
+ int fd, event, callbackRef;
+ le_callback* arg;
+ le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT);
+ fd = getSocketFd(L, 2);
+ event = luaL_checkinteger(L, 3);
+ luaL_checktype(L, 4, LUA_TFUNCTION);
+ lua_pushvalue(L, 4);
+ callbackRef = luaL_ref(L, LUA_REGISTRYINDEX);
+ arg = lua_newuserdata(L, sizeof(*arg));
luaL_getmetatable(L, EVENT_CALLBACK_ARG_MT);
lua_setmetatable(L, -2);
- arg->L = L;
+ arg->base = base;
arg->callbackRef = callbackRef;
- lua_pushvalue(L, -1);
- arg->objectRef = luaL_ref(L, LUA_REGISTRYINDEX);
- setup_event(arg, fd, event, 0);
-}
-/* Expected to be called at the beginning of the coro that uses it..
-Value must be kept until coro is complete....
-*/
-/* sock, callback */
-static int luaevent_addevent(lua_State* L) {
- int fd, callbackRef;
- int top, ret;
- fd = getSocketFd(L, 1);
- luaL_checktype(L, 2, LUA_TFUNCTION);
- top = lua_gettop(L);
- /* Preserve the callback function */
- lua_pushvalue(L, 2);
- callbackRef = luaL_ref(L, LUA_REGISTRYINDEX);
- /* Call the callback with all arguments after it to get the loop primed.. */
- if(-1 == (ret = call_callback_function(L, top - 2))) {
- luaL_unref(L, LUA_REGISTRYINDEX, callbackRef);
- return 0;
- }
-
- push_new_callback(L, callbackRef, fd, ret);
+ /* Setup event... */
+ event_set(&arg->ev, fd, event | EV_PERSIST, luaevent_callback, arg);
+ event_base_set(base->base, &arg->ev);
+ event_add(&arg->ev, NULL);
return 1;
}
static int luaevent_loop(lua_State* L) {
- int ret = event_base_loop(getEventBase(L), 0);
+ le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT);
+ base->loop_L = L;
+ int ret = event_base_loop(base->base, 0);
lua_pushinteger(L, ret);
return 1;
}
-static luaL_Reg funcs[] = {
+static luaL_Reg base_funcs[] = {
{ "addevent", luaevent_addevent },
{ "loop", luaevent_loop },
{ NULL, NULL }
};
+static luaL_Reg funcs[] = {
+ { "new", luaevent_newbase },
+ { NULL, NULL }
+};
+
typedef struct {
const char* name;
int value;
@@ -191,18 +163,21 @@ int luaopen_luaevent_core(lua_State* L) {
lua_replace(L, LUA_ENVIRONINDEX);
/* Setup metatable */
luaL_newmetatable(L, EVENT_BASE_MT);
+ lua_newtable(L);
+ luaL_register(L, NULL, base_funcs);
+ lua_setfield(L, -2, "__index");
lua_pushcfunction(L, luaevent_base_gc);
lua_setfield(L, -2, "__gc");
lua_pop(L, 1);
luaL_newmetatable(L, EVENT_CALLBACK_ARG_MT);
lua_pushcfunction(L, luaevent_cb_gc);
lua_setfield(L, -2, "__gc");
- lua_pushcfunction(L, luaevent_cb_getfd);
- lua_setfield(L, -2, "getfd");
+ lua_newtable(L);
+ lua_pushcfunction(L, luaevent_cb_gc);
+ lua_setfield(L, -2, "close");
+ lua_setfield(L, -2, "__index");
lua_pop(L, 1);
- setEventBase(L, event_init());
-
luaL_register(L, "luaevent.core", funcs);
setNamedIntegers(L, consts);
return 1;
diff --git a/luaevent/test/test.lua b/luaevent/test/test.lua
index fd9919d..412857e 100644
--- a/luaevent/test/test.lua
+++ b/luaevent/test/test.lua
@@ -5,20 +5,30 @@
require"luaevent"
require"socket"
-local function echoHandler(skt)
- while true do
- local data,ret = luaevent.receive(skt, 10)
- if data == "quit" or ret == 'closed' or not data then
- break
- end
- --collectgarbage()
- if not luaevent.send(skt, data) then return end
- end
- if skt then skt:close() end
+local oldPrint = print
+print = function(...)
+ oldPrint("SRV", ...)
end
+local function echoHandler(skt)
+ while true do
+ local data,ret = luaevent.receive(skt, 10)
+ --print("GOT: ", data, ret)
+ if data == "quit" or ret == 'closed' then
+ break
+ end
+ luaevent.send(skt, data)
+ collectgarbage()
+ end
+ skt:close()
+ --print("DONE")
+end
local server = assert(socket.bind("localhost", 20000))
server:settimeout(0)
-
+local coro = coroutine.create
+coroutine.create = function(...)
+ local ret = coro(...)
+ return ret
+end
luaevent.addserver(server, echoHandler)
-luaevent.loop() \ No newline at end of file
+luaevent.loop()
diff --git a/luaevent/test/testClient.lua b/luaevent/test/testClient.lua
index 8d14887..fbbcf87 100644
--- a/luaevent/test/testClient.lua
+++ b/luaevent/test/testClient.lua
@@ -1,29 +1,19 @@
require"luaevent"
require"socket"
-local function setupHook(thread)
- if not thread then debug.sethook(function(event) print("TRACE >: ", debug.getinfo(2, 'n').name) end, 'c')
- else debug.sethook(thread, function(event) print("TRACE ", thread,">: ", debug.getinfo(2, 'n').name) end, 'c') end
+local oldPrint = print
+print = function(...)
+ oldPrint("CLT", ...)
end
-local count = 100
-local function func(sock)
+
+local function func()
+ print("ACTIVATED")
+ local sock = socket.tcp()
+ --sock:
sock = luaevent.wrap(sock)
- assert(sock:connect("localhost", 20000))
- for i = 1, 2 do
- local maxZ = 10
- for z = 1, maxZ do
- assert(sock:send("Greet me "))
- end
- assert(sock:receive(10 * maxZ))
- end
- if skt then skt:close() end
- count = count - 1
- if count > 0 then
- --local sock = assert(socket.tcp())
- --luaevent.addthread(sock, func, sock)
- end
+ print(assert(sock:connect("localhost", 20000)))
+ for i = 1, 100 do assert(sock:send("Greet me ")) assert(sock:receive(10)) collectgarbage() end
end
-for i = 1, 500 do
- local sock = assert(socket.tcp())
- luaevent.addthread(sock, func, sock)
-end
-luaevent.loop()
+
+luaevent.addthread(func)
+
+luaevent.loop() \ No newline at end of file