aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/modulemanager.lua8
-rw-r--r--net/dns.lua17
-rw-r--r--net/httpserver.lua8
-rw-r--r--net/server_event.lua2
-rw-r--r--net/server_select.lua31
-rw-r--r--plugins/mod_compression.lua13
-rw-r--r--plugins/mod_console.lua4
-rw-r--r--plugins/mod_proxy65.lua43
-rwxr-xr-xprosodyctl22
-rw-r--r--util/dependencies.lua2
-rw-r--r--util/pluginloader.lua38
11 files changed, 140 insertions, 48 deletions
diff --git a/core/modulemanager.lua b/core/modulemanager.lua
index d1f7d413..65a38ccb 100644
--- a/core/modulemanager.lua
+++ b/core/modulemanager.lua
@@ -158,6 +158,7 @@ function load(host, module_name, config)
log("error", "Error initializing module '%s' on '%s': %s", module_name, host, err or "nil");
end
if success then
+ hosts[host].events.fire_event("module-loaded", { module = module_name, host = host });
return true;
else -- load failed, unloading
unload(api_instance.host, module_name);
@@ -174,7 +175,7 @@ function is_loaded(host, name)
end
function unload(host, name, ...)
- local mod = get_module(host, name);
+ local mod = get_module(host, name);
if not mod then return nil, "module-not-loaded"; end
if module_has_method(mod, "unload") then
@@ -200,6 +201,7 @@ function unload(host, name, ...)
end
hooks:remove(host, name);
modulemap[host][name] = nil;
+ hosts[host].events.fire_event("module-unloaded", { module = name, host = host });
return true;
end
@@ -280,7 +282,7 @@ function module_has_method(module, method)
end
function call_module_method(module, method, ...)
- if module_has_method(module, method) then
+ if module_has_method(module, method) then
local f = module.module[method];
return pcall(f, ...);
else
@@ -289,7 +291,7 @@ function call_module_method(module, method, ...)
end
----- API functions exposed to modules -----------
--- Must all be in api.*
+-- Must all be in api.*
-- Returns the name of the current module
function api:get_name()
diff --git a/net/dns.lua b/net/dns.lua
index c50e893c..08bc53a9 100644
--- a/net/dns.lua
+++ b/net/dns.lua
@@ -726,7 +726,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
local packet = sock:receive();
if packet then
response = self:decode(packet);
- if response and self.active[response.header.id]
+ if response and self.active[response.header.id]
and self.active[response.header.id][response.question.raw] then
--print('received response');
--self.print(response);
@@ -745,7 +745,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
if not next(self.active) then self:closeall(); end
-- was the query on the wanted list?
- local q = response.question;
+ local q = response.question[1];
local cos = get(self.wanted, q.class, q.type, q.name);
if cos then
for co in pairs(cos) do
@@ -768,21 +768,18 @@ function resolver:feed(sock, packet)
self.time = socket.gettime();
local response = self:decode(packet);
- if response then
+ if response and self.active[response.header.id]
+ and self.active[response.header.id][response.question.raw] then
--print('received response');
--self.print(response);
- for i,section in pairs({ 'answer', 'authority', 'additional' }) do
- for j,rr in pairs(response[section]) do
- self:remember(rr, response.question[1].type);
- end
+ for j,rr in pairs(response.answer) do
+ self:remember(rr, response.question[1].type);
end
-- retire the query
local queries = self.active[response.header.id];
- if queries[response.question.raw] then
- queries[response.question.raw] = nil;
- end
+ queries[response.question.raw] = nil;
if not next(queries) then self.active[response.header.id] = nil; end
if not next(self.active) then self:closeall(); end
diff --git a/net/httpserver.lua b/net/httpserver.lua
index 51dca166..654025ba 100644
--- a/net/httpserver.lua
+++ b/net/httpserver.lua
@@ -23,6 +23,9 @@ local urlencode = function (s) return s and (s:gsub("%W", function (c) return st
local log = require "util.logger".init("httpserver");
+-- TODO: Should we read this from /etc/mime.types if it exists? (startup time...?)
+local mime_map = { html = "text/html", txt = "plain/text; charset=utf-8", js = "text/javascript" };
+
local http_servers = {};
module "httpserver"
@@ -65,6 +68,9 @@ local function send_response(request, response)
resp = { "HTTP/1.0 200 OK\r\n" };
t_insert(resp, "Connection: close\r\n");
+ t_insert(resp, "Content-Type: ");
+ t_insert(resp, mime_map[request.url.path:match("%.(%w+)")] or "application/octet-stream");
+ t_insert(resp, "\r\n");
t_insert(resp, "Content-Length: ");
t_insert(resp, #response);
t_insert(resp, "\r\n\r\n");
@@ -210,7 +216,7 @@ end
function new_request(handler)
return { handler = handler, conn = handler.socket,
write = function (...) return handler:write(...); end, state = "request",
- server = http_servers[handler.serverport()],
+ server = http_servers[handler:serverport()],
send = send_response,
destroy = destroy_request,
id = tostring{}:match("%x+$")
diff --git a/net/server_event.lua b/net/server_event.lua
index 450bd341..99e972d7 100644
--- a/net/server_event.lua
+++ b/net/server_event.lua
@@ -572,6 +572,7 @@ do
interface.eventread = nil
return -1
end
+ interface.onincoming( interface, buffer, err ) -- send new data to listener
if err and ( err ~= "timeout" and err ~= "wantread" ) then
if "wantwrite" == err then -- need to read on write event
if not interface.eventwrite then -- register new write event if needed
@@ -592,7 +593,6 @@ do
return -1
end
end
- interface.onincoming( interface, buffer, err ) -- send new data to listener
return EV_READ, cfg.READ_TIMEOUT
end
end
diff --git a/net/server_select.lua b/net/server_select.lua
index ab2efcb0..f278edd5 100644
--- a/net/server_select.lua
+++ b/net/server_select.lua
@@ -434,30 +434,37 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
handler.bufferlen = function( self, readlen, sendlen )
maxsendlen = sendlen or maxsendlen
maxreadlen = readlen or maxreadlen
- return maxreadlen, maxsendlen
+ return bufferlen, maxreadlen, maxsendlen
end
- handler.lock = function( self, switch )
+ handler.lock_read = function (self, switch)
if switch == true then
- handler.write = idfalse
- local tmp = _sendlistlen
- _sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
- _writetimes[ handler ] = nil
- if _sendlistlen ~= tmp then
- nosend = true
- end
- tmp = _readlistlen
+ local tmp = _readlistlen
_readlistlen = removesocket( _readlist, socket, _readlistlen )
_readtimes[ handler ] = nil
if _readlistlen ~= tmp then
noread = true
end
elseif switch == false then
- handler.write = write
if noread then
noread = false
_readlistlen = addsocket(_readlist, socket, _readlistlen)
_readtimes[ handler ] = _currenttime
end
+ end
+ return noread
+ end
+ handler.lock = function( self, switch )
+ handler.lock_read (switch)
+ if switch == true then
+ handler.write = idfalse
+ local tmp = _sendlistlen
+ _sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
+ _writetimes[ handler ] = nil
+ if _sendlistlen ~= tmp then
+ nosend = true
+ end
+ elseif switch == false then
+ handler.write = write
if nosend then
nosend = false
write( "" )
@@ -467,7 +474,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
end
local _readbuffer = function( ) -- this function reads data
local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern"
- if not err or ( err == "timeout" or err == "wantread" ) then -- received something
+ if not err or (err == "wantread" or err == "timeout") or string_len(part) > 0 then -- received something
local buffer = buffer or part or ""
local len = string_len( buffer )
if len > maxreadlen then
diff --git a/plugins/mod_compression.lua b/plugins/mod_compression.lua
index 802e1822..8fdf9dcc 100644
--- a/plugins/mod_compression.lua
+++ b/plugins/mod_compression.lua
@@ -44,7 +44,7 @@ module:hook("s2s-stream-features",
end
);
--- S2Sout handling aka the client perspective in the S2S connection
+-- Hook to activate compression if remote server supports it.
module:hook_stanza(xmlns_stream, "features",
function (session, stanza)
if not session.compressed then
@@ -135,15 +135,14 @@ local function setup_decompression(session, inflate_stream)
end;
end
--- TODO Support compression on S2S level too.
module:add_handler({"s2sout_unauthed", "s2sout"}, "compressed", xmlns_compression_protocol,
function(session ,stanza)
session.log("debug", "Activating compression...")
-- create deflate and inflate streams
- deflate_stream = get_deflate_stream(session);
+ local deflate_stream = get_deflate_stream(session);
if not deflate_stream then return end
- inflate_stream = get_inflate_stream(session);
+ local inflate_stream = get_inflate_stream(session);
if not inflate_stream then return end
-- setup compression for session.w
@@ -161,7 +160,7 @@ module:add_handler({"s2sout_unauthed", "s2sout"}, "compressed", xmlns_compressio
local default_stream_attr = {xmlns = "jabber:server", ["xmlns:stream"] = "http://etherx.jabber.org/streams",
["xmlns:db"] = 'jabber:server:dialback', version = "1.0", to = session.to_host, from = session.from_host};
session.sends2s("<?xml version='1.0'?>");
- session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());
+ session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());
session.compressed = true;
end
);
@@ -181,10 +180,10 @@ module:add_handler({"c2s_unauthed", "c2s", "s2sin_unauthed", "s2sin"}, "compress
session.log("info", method.." compression selected.");
-- create deflate and inflate streams
- deflate_stream = get_deflate_stream(session);
+ local deflate_stream = get_deflate_stream(session);
if not deflate_stream then return end
- inflate_stream = get_inflate_stream(session);
+ local inflate_stream = get_inflate_stream(session);
if not inflate_stream then return end
(session.sends2s or session.send)(st.stanza("compressed", {xmlns=xmlns_compression_protocol}));
diff --git a/plugins/mod_console.lua b/plugins/mod_console.lua
index 9962b3f6..5e6b6846 100644
--- a/plugins/mod_console.lua
+++ b/plugins/mod_console.lua
@@ -478,7 +478,7 @@ function def_env.s2s:show(match_jid)
for remotehost, session in pairs(host_session.s2sout) do
if (not match_jid) or remotehost:match(match_jid) or host:match(match_jid) then
count_out = count_out + 1;
- print(" "..host.." -> "..remotehost..(session.secure and " (encrypted)" or ""));
+ print(" "..host.." -> "..remotehost..(session.secure and " (encrypted)" or "")..(session.compressed and " (compressed)" or ""));
if session.sendq then
print(" There are "..#session.sendq.." queued outgoing stanzas for this connection");
end
@@ -515,7 +515,7 @@ function def_env.s2s:show(match_jid)
-- Pft! is what I say to list comprehensions
or (session.hosts and #array.collect(keys(session.hosts)):filter(subhost_filter)>0)) then
count_in = count_in + 1;
- print(" "..host.." <- "..(session.from_host or "(unknown)")..(session.secure and " (encrypted)" or ""));
+ print(" "..host.." <- "..(session.from_host or "(unknown)")..(session.secure and " (encrypted)" or "")..(session.compressed and " (compressed)" or ""));
if session.type == "s2sin_unauthed" then
print(" Connection not yet authenticated");
end
diff --git a/plugins/mod_proxy65.lua b/plugins/mod_proxy65.lua
index 504cfc0c..af618c01 100644
--- a/plugins/mod_proxy65.lua
+++ b/plugins/mod_proxy65.lua
@@ -55,8 +55,12 @@ function connlistener.onincoming(conn, data)
if session.setup then
if session.sha ~= nil and transfers[session.sha] ~= nil then
local sha = session.sha;
- if transfers[sha].activated == true and transfers[sha].initiator == conn and transfers[sha].target ~= nil then
- transfers[sha].target:write(data);
+ if transfers[sha].activated == true and transfers[sha].target ~= nil then
+ if transfers[sha].initiator == conn then
+ transfers[sha].target:write(data);
+ else
+ transfers[sha].initiator:write(data);
+ end
return;
end
end
@@ -67,7 +71,7 @@ function connlistener.onincoming(conn, data)
data:sub(4):byte() == 0x03 and -- ATYP must be 3
data:sub(5):byte() == 40 and -- SHA1 HASH length must be 40 (0x28)
data:sub(-2):byte() == 0x00 and -- PORT must be 0, size 2 byte
- data:sub(-1):byte() == 0x00
+ data:sub(-1):byte() == 0x00
then
local sha = data:sub(6, 45); -- second param is not count! it's the ending index (included!)
if transfers[sha] == nil then
@@ -80,10 +84,13 @@ function connlistener.onincoming(conn, data)
transfers[sha].initiator = conn;
session.sha = sha;
module:log("debug", "initiator connected ... ");
+ throttle_sending(conn, transfers[sha].target);
+ throttle_sending(transfers[sha].target, conn);
end
conn:write(string.char(5, 0, 0, 3, sha:len()) .. sha .. string.char(0, 0)); -- VER, REP, RSV, ATYP, BND.ADDR (sha), BND.PORT (2 Byte)
+ conn:lock_read(true)
else
- log:module("warn", "Neither data transfer nor initial connect of a participator of a transfer.")
+ module:log("warn", "Neither data transfer nor initial connect of a participator of a transfer.")
conn.close();
end
else
@@ -237,6 +244,8 @@ function handle_to_domain(origin, stanza)
elseif(transfers[sha] ~= nil and transfers[sha].initiator ~= nil and transfers[sha].target ~= nil) then
origin.send(reply);
transfers[sha].activated = true;
+ transfers[sha].target:lock_read(false);
+ transfers[sha].initiator:lock_read(false);
end
else
module:log("error", "activation failed: sid: %s, initiator: %s, target: %s", tostring(sid), tostring(from), tostring(to));
@@ -247,9 +256,31 @@ function handle_to_domain(origin, stanza)
end
if not connlisteners.register(module.host .. ':proxy65', connlistener) then
- error("mod_proxy65: Could not establish a connection listener. Check your configuration please.");
- error(" one possible cause for this would be that two proxy65 components share the same port.");
+ module:log("error", "mod_proxy65: Could not establish a connection listener. Check your configuration please.");
+ module:log("error", "Possibly two proxy65 components are configured to share the same port.");
end
connlisteners.start(module.host .. ':proxy65');
component = componentmanager.register_component(host, handle_to_domain);
+local sender_lock_threshold = 4096;
+function throttle_sending(sender, receiver)
+ sender:pattern(sender_lock_threshold);
+ local sender_locked;
+ local _sendbuffer = receiver.sendbuffer;
+ function receiver.sendbuffer()
+ _sendbuffer();
+ if sender_locked and receiver.bufferlen() < sender_lock_threshold then
+ sender:lock_read(false); -- Unlock now
+ sender_locked = nil;
+ end
+ end
+
+ local _readbuffer = sender.readbuffer;
+ function sender.readbuffer()
+ _readbuffer();
+ if not sender_locked and receiver.bufferlen() >= sender_lock_threshold then
+ sender_locked = true;
+ sender:lock_read(true);
+ end
+ end
+end
diff --git a/prosodyctl b/prosodyctl
index 522ebde9..8f79046f 100755
--- a/prosodyctl
+++ b/prosodyctl
@@ -462,6 +462,28 @@ function commands.unregister(arg)
return 1;
end
+function commands.addplugin(arg)
+ local url = arg[1];
+ if url:match("^http://") then
+ local http = require "socket.http";
+ show_message("Fetching...");
+ local code, err = http.request(url);
+ if not code then
+ show_message("Failed: "..err);
+ return 1;
+ end
+ if url:match("%.lua$") then
+ local ok, err = datamanager.store(url:match("/mod_([^/]+)$"), "*", "plugins", {code});
+ if not ok then
+ show_message("Failed to save to data store: "..err);
+ return 1;
+ end
+ end
+ show_message("Saved. Don't forget to load the module using the config file or admin console!");
+ else
+ show_message("Sorry, I don't understand how to fetch plugins from there.");
+ end
+end
---------------------
diff --git a/util/dependencies.lua b/util/dependencies.lua
index b1d02921..cb022644 100644
--- a/util/dependencies.lua
+++ b/util/dependencies.lua
@@ -57,7 +57,7 @@ local lfs, err = softreq "lfs"
if not lfs then
missingdep("luafilesystem", {
["luarocks"] = "luarocks install luafilesystem";
- ["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-luafilesystem0";
+ ["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-filesystem0";
["Source"] = "http://www.keplerproject.org/luafilesystem/";
});
fatal = true;
diff --git a/util/pluginloader.lua b/util/pluginloader.lua
index 696af34f..cffc4dfc 100644
--- a/util/pluginloader.lua
+++ b/util/pluginloader.lua
@@ -9,11 +9,19 @@
local plugin_dir = CFG_PLUGINDIR or "./plugins/";
-local io_open = io.open;
-local loadstring = loadstring;
+local io_open, os_time = io.open, os.time;
+local loadstring, pairs = loadstring, pairs;
+
+local datamanager = require "util.datamanager";
module "pluginloader"
+local function load_from_datastore(name)
+ local content = datamanager.load(name, "*", "plugins");
+ if not content or not content[1] then return nil, "Resource not found"; end
+ return content[1], name;
+end
+
local function load_file(name)
local file, err = io_open(plugin_dir..name);
if not file then return file, err; end
@@ -22,16 +30,36 @@ local function load_file(name)
return content, name;
end
-function load_resource(plugin, resource)
+function load_resource(plugin, resource, loader)
if not resource then
resource = "mod_"..plugin..".lua";
end
- local content, err = load_file(plugin.."/"..resource);
- if not content then content, err = load_file(resource); end
+ loader = loader or load_file;
+
+ local content, err = loader(plugin.."/"..resource);
+ if not content then content, err = loader(resource); end
-- TODO add support for packed plugins
+
+ if not content and loader == load_file then
+ return load_resource(plugin, resource, load_from_datastore);
+ end
+
return content, err;
end
+function store_resource(plugin, resource, content, metadata)
+ if not resource then
+ resource = "mod_"..plugin..".lua";
+ end
+ local store = { content };
+ if metadata then
+ for k,v in pairs(metadata) do
+ store[k] = v;
+ end
+ end
+ datamanager.store(plugin.."/"..resource, "*", "plugins", store);
+end
+
function load_code(plugin, resource)
local content, err = load_resource(plugin, resource);
if not content then return content, err; end