diff options
-rw-r--r-- | core/modulemanager.lua | 8 | ||||
-rw-r--r-- | net/dns.lua | 17 | ||||
-rw-r--r-- | net/httpserver.lua | 8 | ||||
-rw-r--r-- | net/server_event.lua | 2 | ||||
-rw-r--r-- | net/server_select.lua | 31 | ||||
-rw-r--r-- | plugins/mod_compression.lua | 13 | ||||
-rw-r--r-- | plugins/mod_console.lua | 4 | ||||
-rw-r--r-- | plugins/mod_proxy65.lua | 43 | ||||
-rwxr-xr-x | prosodyctl | 22 | ||||
-rw-r--r-- | util/dependencies.lua | 2 | ||||
-rw-r--r-- | util/pluginloader.lua | 38 |
11 files changed, 140 insertions, 48 deletions
diff --git a/core/modulemanager.lua b/core/modulemanager.lua index d1f7d413..65a38ccb 100644 --- a/core/modulemanager.lua +++ b/core/modulemanager.lua @@ -158,6 +158,7 @@ function load(host, module_name, config) log("error", "Error initializing module '%s' on '%s': %s", module_name, host, err or "nil"); end if success then + hosts[host].events.fire_event("module-loaded", { module = module_name, host = host }); return true; else -- load failed, unloading unload(api_instance.host, module_name); @@ -174,7 +175,7 @@ function is_loaded(host, name) end function unload(host, name, ...) - local mod = get_module(host, name); + local mod = get_module(host, name); if not mod then return nil, "module-not-loaded"; end if module_has_method(mod, "unload") then @@ -200,6 +201,7 @@ function unload(host, name, ...) end hooks:remove(host, name); modulemap[host][name] = nil; + hosts[host].events.fire_event("module-unloaded", { module = name, host = host }); return true; end @@ -280,7 +282,7 @@ function module_has_method(module, method) end function call_module_method(module, method, ...) - if module_has_method(module, method) then + if module_has_method(module, method) then local f = module.module[method]; return pcall(f, ...); else @@ -289,7 +291,7 @@ function call_module_method(module, method, ...) end ----- API functions exposed to modules ----------- --- Must all be in api.* +-- Must all be in api.* -- Returns the name of the current module function api:get_name() diff --git a/net/dns.lua b/net/dns.lua index c50e893c..08bc53a9 100644 --- a/net/dns.lua +++ b/net/dns.lua @@ -726,7 +726,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive local packet = sock:receive(); if packet then response = self:decode(packet); - if response and self.active[response.header.id] + if response and self.active[response.header.id] and self.active[response.header.id][response.question.raw] then --print('received response'); --self.print(response); @@ -745,7 +745,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive if not next(self.active) then self:closeall(); end -- was the query on the wanted list? - local q = response.question; + local q = response.question[1]; local cos = get(self.wanted, q.class, q.type, q.name); if cos then for co in pairs(cos) do @@ -768,21 +768,18 @@ function resolver:feed(sock, packet) self.time = socket.gettime(); local response = self:decode(packet); - if response then + if response and self.active[response.header.id] + and self.active[response.header.id][response.question.raw] then --print('received response'); --self.print(response); - for i,section in pairs({ 'answer', 'authority', 'additional' }) do - for j,rr in pairs(response[section]) do - self:remember(rr, response.question[1].type); - end + for j,rr in pairs(response.answer) do + self:remember(rr, response.question[1].type); end -- retire the query local queries = self.active[response.header.id]; - if queries[response.question.raw] then - queries[response.question.raw] = nil; - end + queries[response.question.raw] = nil; if not next(queries) then self.active[response.header.id] = nil; end if not next(self.active) then self:closeall(); end diff --git a/net/httpserver.lua b/net/httpserver.lua index 51dca166..654025ba 100644 --- a/net/httpserver.lua +++ b/net/httpserver.lua @@ -23,6 +23,9 @@ local urlencode = function (s) return s and (s:gsub("%W", function (c) return st local log = require "util.logger".init("httpserver"); +-- TODO: Should we read this from /etc/mime.types if it exists? (startup time...?) +local mime_map = { html = "text/html", txt = "plain/text; charset=utf-8", js = "text/javascript" }; + local http_servers = {}; module "httpserver" @@ -65,6 +68,9 @@ local function send_response(request, response) resp = { "HTTP/1.0 200 OK\r\n" }; t_insert(resp, "Connection: close\r\n"); + t_insert(resp, "Content-Type: "); + t_insert(resp, mime_map[request.url.path:match("%.(%w+)")] or "application/octet-stream"); + t_insert(resp, "\r\n"); t_insert(resp, "Content-Length: "); t_insert(resp, #response); t_insert(resp, "\r\n\r\n"); @@ -210,7 +216,7 @@ end function new_request(handler) return { handler = handler, conn = handler.socket, write = function (...) return handler:write(...); end, state = "request", - server = http_servers[handler.serverport()], + server = http_servers[handler:serverport()], send = send_response, destroy = destroy_request, id = tostring{}:match("%x+$") diff --git a/net/server_event.lua b/net/server_event.lua index 450bd341..99e972d7 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -572,6 +572,7 @@ do interface.eventread = nil
return -1
end
+ interface.onincoming( interface, buffer, err ) -- send new data to listener
if err and ( err ~= "timeout" and err ~= "wantread" ) then
if "wantwrite" == err then -- need to read on write event
if not interface.eventwrite then -- register new write event if needed
@@ -592,7 +593,6 @@ do return -1
end
end
- interface.onincoming( interface, buffer, err ) -- send new data to listener
return EV_READ, cfg.READ_TIMEOUT
end
end
diff --git a/net/server_select.lua b/net/server_select.lua index ab2efcb0..f278edd5 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -434,30 +434,37 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport handler.bufferlen = function( self, readlen, sendlen )
maxsendlen = sendlen or maxsendlen
maxreadlen = readlen or maxreadlen
- return maxreadlen, maxsendlen
+ return bufferlen, maxreadlen, maxsendlen
end
- handler.lock = function( self, switch )
+ handler.lock_read = function (self, switch)
if switch == true then
- handler.write = idfalse
- local tmp = _sendlistlen
- _sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
- _writetimes[ handler ] = nil
- if _sendlistlen ~= tmp then
- nosend = true
- end
- tmp = _readlistlen
+ local tmp = _readlistlen
_readlistlen = removesocket( _readlist, socket, _readlistlen )
_readtimes[ handler ] = nil
if _readlistlen ~= tmp then
noread = true
end
elseif switch == false then
- handler.write = write
if noread then
noread = false
_readlistlen = addsocket(_readlist, socket, _readlistlen)
_readtimes[ handler ] = _currenttime
end
+ end
+ return noread
+ end
+ handler.lock = function( self, switch )
+ handler.lock_read (switch)
+ if switch == true then
+ handler.write = idfalse
+ local tmp = _sendlistlen
+ _sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
+ _writetimes[ handler ] = nil
+ if _sendlistlen ~= tmp then
+ nosend = true
+ end
+ elseif switch == false then
+ handler.write = write
if nosend then
nosend = false
write( "" )
@@ -467,7 +474,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport end
local _readbuffer = function( ) -- this function reads data
local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern"
- if not err or ( err == "timeout" or err == "wantread" ) then -- received something
+ if not err or (err == "wantread" or err == "timeout") or string_len(part) > 0 then -- received something
local buffer = buffer or part or ""
local len = string_len( buffer )
if len > maxreadlen then
diff --git a/plugins/mod_compression.lua b/plugins/mod_compression.lua index 802e1822..8fdf9dcc 100644 --- a/plugins/mod_compression.lua +++ b/plugins/mod_compression.lua @@ -44,7 +44,7 @@ module:hook("s2s-stream-features", end ); --- S2Sout handling aka the client perspective in the S2S connection +-- Hook to activate compression if remote server supports it. module:hook_stanza(xmlns_stream, "features", function (session, stanza) if not session.compressed then @@ -135,15 +135,14 @@ local function setup_decompression(session, inflate_stream) end; end --- TODO Support compression on S2S level too. module:add_handler({"s2sout_unauthed", "s2sout"}, "compressed", xmlns_compression_protocol, function(session ,stanza) session.log("debug", "Activating compression...") -- create deflate and inflate streams - deflate_stream = get_deflate_stream(session); + local deflate_stream = get_deflate_stream(session); if not deflate_stream then return end - inflate_stream = get_inflate_stream(session); + local inflate_stream = get_inflate_stream(session); if not inflate_stream then return end -- setup compression for session.w @@ -161,7 +160,7 @@ module:add_handler({"s2sout_unauthed", "s2sout"}, "compressed", xmlns_compressio local default_stream_attr = {xmlns = "jabber:server", ["xmlns:stream"] = "http://etherx.jabber.org/streams", ["xmlns:db"] = 'jabber:server:dialback', version = "1.0", to = session.to_host, from = session.from_host}; session.sends2s("<?xml version='1.0'?>"); - session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag()); + session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag()); session.compressed = true; end ); @@ -181,10 +180,10 @@ module:add_handler({"c2s_unauthed", "c2s", "s2sin_unauthed", "s2sin"}, "compress session.log("info", method.." compression selected."); -- create deflate and inflate streams - deflate_stream = get_deflate_stream(session); + local deflate_stream = get_deflate_stream(session); if not deflate_stream then return end - inflate_stream = get_inflate_stream(session); + local inflate_stream = get_inflate_stream(session); if not inflate_stream then return end (session.sends2s or session.send)(st.stanza("compressed", {xmlns=xmlns_compression_protocol})); diff --git a/plugins/mod_console.lua b/plugins/mod_console.lua index 9962b3f6..5e6b6846 100644 --- a/plugins/mod_console.lua +++ b/plugins/mod_console.lua @@ -478,7 +478,7 @@ function def_env.s2s:show(match_jid) for remotehost, session in pairs(host_session.s2sout) do if (not match_jid) or remotehost:match(match_jid) or host:match(match_jid) then count_out = count_out + 1; - print(" "..host.." -> "..remotehost..(session.secure and " (encrypted)" or "")); + print(" "..host.." -> "..remotehost..(session.secure and " (encrypted)" or "")..(session.compressed and " (compressed)" or "")); if session.sendq then print(" There are "..#session.sendq.." queued outgoing stanzas for this connection"); end @@ -515,7 +515,7 @@ function def_env.s2s:show(match_jid) -- Pft! is what I say to list comprehensions or (session.hosts and #array.collect(keys(session.hosts)):filter(subhost_filter)>0)) then count_in = count_in + 1; - print(" "..host.." <- "..(session.from_host or "(unknown)")..(session.secure and " (encrypted)" or "")); + print(" "..host.." <- "..(session.from_host or "(unknown)")..(session.secure and " (encrypted)" or "")..(session.compressed and " (compressed)" or "")); if session.type == "s2sin_unauthed" then print(" Connection not yet authenticated"); end diff --git a/plugins/mod_proxy65.lua b/plugins/mod_proxy65.lua index 504cfc0c..af618c01 100644 --- a/plugins/mod_proxy65.lua +++ b/plugins/mod_proxy65.lua @@ -55,8 +55,12 @@ function connlistener.onincoming(conn, data) if session.setup then if session.sha ~= nil and transfers[session.sha] ~= nil then local sha = session.sha; - if transfers[sha].activated == true and transfers[sha].initiator == conn and transfers[sha].target ~= nil then - transfers[sha].target:write(data); + if transfers[sha].activated == true and transfers[sha].target ~= nil then + if transfers[sha].initiator == conn then + transfers[sha].target:write(data); + else + transfers[sha].initiator:write(data); + end return; end end @@ -67,7 +71,7 @@ function connlistener.onincoming(conn, data) data:sub(4):byte() == 0x03 and -- ATYP must be 3 data:sub(5):byte() == 40 and -- SHA1 HASH length must be 40 (0x28) data:sub(-2):byte() == 0x00 and -- PORT must be 0, size 2 byte - data:sub(-1):byte() == 0x00 + data:sub(-1):byte() == 0x00 then local sha = data:sub(6, 45); -- second param is not count! it's the ending index (included!) if transfers[sha] == nil then @@ -80,10 +84,13 @@ function connlistener.onincoming(conn, data) transfers[sha].initiator = conn; session.sha = sha; module:log("debug", "initiator connected ... "); + throttle_sending(conn, transfers[sha].target); + throttle_sending(transfers[sha].target, conn); end conn:write(string.char(5, 0, 0, 3, sha:len()) .. sha .. string.char(0, 0)); -- VER, REP, RSV, ATYP, BND.ADDR (sha), BND.PORT (2 Byte) + conn:lock_read(true) else - log:module("warn", "Neither data transfer nor initial connect of a participator of a transfer.") + module:log("warn", "Neither data transfer nor initial connect of a participator of a transfer.") conn.close(); end else @@ -237,6 +244,8 @@ function handle_to_domain(origin, stanza) elseif(transfers[sha] ~= nil and transfers[sha].initiator ~= nil and transfers[sha].target ~= nil) then origin.send(reply); transfers[sha].activated = true; + transfers[sha].target:lock_read(false); + transfers[sha].initiator:lock_read(false); end else module:log("error", "activation failed: sid: %s, initiator: %s, target: %s", tostring(sid), tostring(from), tostring(to)); @@ -247,9 +256,31 @@ function handle_to_domain(origin, stanza) end if not connlisteners.register(module.host .. ':proxy65', connlistener) then - error("mod_proxy65: Could not establish a connection listener. Check your configuration please."); - error(" one possible cause for this would be that two proxy65 components share the same port."); + module:log("error", "mod_proxy65: Could not establish a connection listener. Check your configuration please."); + module:log("error", "Possibly two proxy65 components are configured to share the same port."); end connlisteners.start(module.host .. ':proxy65'); component = componentmanager.register_component(host, handle_to_domain); +local sender_lock_threshold = 4096; +function throttle_sending(sender, receiver) + sender:pattern(sender_lock_threshold); + local sender_locked; + local _sendbuffer = receiver.sendbuffer; + function receiver.sendbuffer() + _sendbuffer(); + if sender_locked and receiver.bufferlen() < sender_lock_threshold then + sender:lock_read(false); -- Unlock now + sender_locked = nil; + end + end + + local _readbuffer = sender.readbuffer; + function sender.readbuffer() + _readbuffer(); + if not sender_locked and receiver.bufferlen() >= sender_lock_threshold then + sender_locked = true; + sender:lock_read(true); + end + end +end @@ -462,6 +462,28 @@ function commands.unregister(arg) return 1; end +function commands.addplugin(arg) + local url = arg[1]; + if url:match("^http://") then + local http = require "socket.http"; + show_message("Fetching..."); + local code, err = http.request(url); + if not code then + show_message("Failed: "..err); + return 1; + end + if url:match("%.lua$") then + local ok, err = datamanager.store(url:match("/mod_([^/]+)$"), "*", "plugins", {code}); + if not ok then + show_message("Failed to save to data store: "..err); + return 1; + end + end + show_message("Saved. Don't forget to load the module using the config file or admin console!"); + else + show_message("Sorry, I don't understand how to fetch plugins from there."); + end +end --------------------- diff --git a/util/dependencies.lua b/util/dependencies.lua index b1d02921..cb022644 100644 --- a/util/dependencies.lua +++ b/util/dependencies.lua @@ -57,7 +57,7 @@ local lfs, err = softreq "lfs" if not lfs then missingdep("luafilesystem", { ["luarocks"] = "luarocks install luafilesystem"; - ["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-luafilesystem0"; + ["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-filesystem0"; ["Source"] = "http://www.keplerproject.org/luafilesystem/"; }); fatal = true; diff --git a/util/pluginloader.lua b/util/pluginloader.lua index 696af34f..cffc4dfc 100644 --- a/util/pluginloader.lua +++ b/util/pluginloader.lua @@ -9,11 +9,19 @@ local plugin_dir = CFG_PLUGINDIR or "./plugins/"; -local io_open = io.open; -local loadstring = loadstring; +local io_open, os_time = io.open, os.time; +local loadstring, pairs = loadstring, pairs; + +local datamanager = require "util.datamanager"; module "pluginloader" +local function load_from_datastore(name) + local content = datamanager.load(name, "*", "plugins"); + if not content or not content[1] then return nil, "Resource not found"; end + return content[1], name; +end + local function load_file(name) local file, err = io_open(plugin_dir..name); if not file then return file, err; end @@ -22,16 +30,36 @@ local function load_file(name) return content, name; end -function load_resource(plugin, resource) +function load_resource(plugin, resource, loader) if not resource then resource = "mod_"..plugin..".lua"; end - local content, err = load_file(plugin.."/"..resource); - if not content then content, err = load_file(resource); end + loader = loader or load_file; + + local content, err = loader(plugin.."/"..resource); + if not content then content, err = loader(resource); end -- TODO add support for packed plugins + + if not content and loader == load_file then + return load_resource(plugin, resource, load_from_datastore); + end + return content, err; end +function store_resource(plugin, resource, content, metadata) + if not resource then + resource = "mod_"..plugin..".lua"; + end + local store = { content }; + if metadata then + for k,v in pairs(metadata) do + store[k] = v; + end + end + datamanager.store(plugin.."/"..resource, "*", "plugins", store); +end + function load_code(plugin, resource) local content, err = load_resource(plugin, resource); if not content then return content, err; end |