1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
--[[
LuaEvent - Copyright (C) 2007 Thomas Harning <harningt@gmail.com>
Licensed as LGPL - See doc/COPYING for details.
]]
module("luaevent", package.seeall)
require("luaevent.core")
local EV_READ = luaevent.core.EV_READ
local EV_WRITE = luaevent.core.EV_WRITE
local fair = false -- Not recommended for most cases...
local base = luaevent.core.new()
local sockMap = setmetatable({}, {'__mode', 'kv'})
local function addevent(sock, ...)
local item = base:addevent(sock, ...)
if not item then print("FAILED TO SETUP ITEM") return item end
local fd = sock:getfd()
sockMap[item] = fd
print("SETUP ITEM FOR: ", fd)
if not hookedObjectMt then
hookedObjectMt = true
--[[
local mt = debug.getmetatable(item)
local oldGC = mt.__gc
mt.__gc = function(...)
print("RELEASING ITEM FOR: ", sockMap[(...)])
return oldGC(...)
end]]
end
return item
end
-- Weak keys.. the keys are the client sockets
local clientTable = setmetatable({}, {'__mode', 'kv'})
local function getWrapper()
local running = coroutine.running()
return function(...)
if coroutine.running() == running then return end
return select(2, coroutine.resume(running, ...))
end
end
function send(sock, data, start, stop)
local s, err
local from = start or 1
local sent = 0
repeat
from = from + sent
s, err, sent = sock:send(data, from, stop)
-- Add extra coro swap for fairness
-- CURRENTLY DISABLED FOR TESTING......
if fair and math.random(100) > 90 then
if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
coroutine.yield(EV_WRITE)
end
if s or err ~= "timeout" then return s, err, sent end
if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
coroutine.yield(EV_WRITE)
until false
end
function receive(sock, pattern, part)
local s, err
pattern = pattern or '*l'
repeat
s, err, part = sock:receive(pattern, part)
if s or err ~= "timeout" then return s, err, part end
if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end
coroutine.yield(EV_READ)
until false
end
-- same as above but with special treatment when reading chunks,
-- unblocks on any data received.
function receivePartial(client, pattern)
local s, err, part
pattern = pattern or "*l"
repeat
s, err, part = client:receive(pattern)
if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or
err ~= "timeout" then return s, err, part end
if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end
coroutine.yield(EV_READ)
until false
end
function connect(sock, ...)
sock:settimeout(0)
local ret, err = sock:connect(...)
if ret or err ~= "timeout" then return ret, err end
if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
coroutine.yield(EV_WRITE)
ret, err = sock:connect(...)
if err == "already connected" then
return 1
end
return ret, err
end
-- Deprecated..
function flush(sock)
end
local function clientCoroutine(sock, handler)
-- Figure out what to do ......
return handler(sock)
end
local function handleClient(co, client, handler)
local ok, res, event = coroutine.resume(co, client, handler)
end
local function serverCoroutine(sock, callback)
local listenItem = addevent(sock, EV_READ, getWrapper())
repeat
local event = coroutine.yield(EV_READ)
-- Get new socket
local client = sock:accept()
if client then
client:settimeout(0)
local co = coroutine.create(clientCoroutine)
handleClient(co, client, callback)
end
until false
end
function addserver(sock, callback)
local coro = coroutine.create(serverCoroutine)
assert(coroutine.resume(coro, sock, callback))
end
function addthread(func, ...)
return coroutine.resume(coroutine.create(func), ...)
end
local _skt_mt = {__index = {
connect = function(self, ...)
return connect(self.socket, ...)
end,
send = function (self, data)
return send(self.socket, data)
end,
receive = function (self, pattern)
if (self.timeout==0) then
return receivePartial(self.socket, pattern)
end
return receive(self.socket, pattern)
end,
flush = function (self)
return flush(self.socket)
end,
settimeout = function (self,time)
self.timeout=time
return
end,
close = function(self)
clientTable[self.socket]:close()
self.socket:close()
end
}}
function wrap(sock)
return setmetatable({socket = sock}, _skt_mt)
end
loop = function(...) base:loop(...) end
|