1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
-- Because we deal with pre-authed sessions and streams we can't be host-specific
module:set_global();
local filters = require "util.filters";
local throttle = require "util.throttle";
local timer = require "util.timer";
local ceil = math.ceil;
local limits_cfg = module:get_option("limits", {});
local limits_resolution = module:get_option_number("limits_resolution", 1);
local default_bytes_per_second = 3000;
local default_burst = 2;
local rate_units = { b = 1, k = 3, m = 6, g = 9, t = 12 } -- Plan for the future.
local function parse_rate(rate, sess_type)
local quantity, unit, exp;
if rate then
quantity, unit = rate:match("^(%d+) ?([^/]+)/s$");
exp = quantity and rate_units[unit:sub(1,1):lower()];
end
if not exp then
module:log("error", "Error parsing rate for %s: %q, using default rate (%d bytes/s)", sess_type, rate, default_bytes_per_second);
return default_bytes_per_second;
end
return quantity*(10^exp);
end
local function parse_burst(burst, sess_type)
if type(burst) == "string" then
burst = burst:match("^(%d+) ?s$");
end
local n_burst = tonumber(burst);
if burst and not n_burst then
module:log("error", "Unable to parse burst for %s: %q, using default burst interval (%ds)", sess_type, burst, default_burst);
end
return n_burst or default_burst;
end
-- Process config option into limits table:
-- limits = { c2s = { bytes_per_second = X, burst_seconds = Y } }
local limits = {
c2s = {
bytes_per_second = 10 * 1024;
burst_seconds = 2;
};
s2sin = {
bytes_per_second = 30 * 1024;
burst_seconds = 2;
};
};
for sess_type, sess_limits in pairs(limits_cfg) do
limits[sess_type] = {
bytes_per_second = parse_rate(sess_limits.rate, sess_type);
burst_seconds = parse_burst(sess_limits.burst, sess_type);
};
end
local default_filter_set = {};
function default_filter_set.bytes_in(bytes, session)
local sess_throttle = session.throttle;
if sess_throttle then
local ok, _, outstanding = sess_throttle:poll(#bytes, true);
if not ok then
session.log("debug", "Session over rate limit (%d) with %d (by %d), pausing", sess_throttle.max, #bytes, outstanding);
outstanding = ceil(outstanding);
session.conn:pause(); -- Read no more data from the connection until there is no outstanding data
local outstanding_data = bytes:sub(-outstanding);
bytes = bytes:sub(1, #bytes-outstanding);
timer.add_task(limits_resolution, function ()
if not session.conn then return; end
if sess_throttle:peek(#outstanding_data) then
session.log("debug", "Resuming paused session");
session.conn:resume();
end
-- Handle what we can of the outstanding data
session.data(outstanding_data);
end);
end
end
return bytes;
end
local type_filters = {
c2s = default_filter_set;
s2sin = default_filter_set;
s2sout = default_filter_set;
};
local function filter_hook(session)
local session_type = session.type:match("^[^_]+");
local filter_set, opts = type_filters[session_type], limits[session_type];
if opts then
if session.conn and session.conn.setlimit then
session.conn:setlimit(opts.bytes_per_second);
-- Currently no burst support
else
session.throttle = throttle.create(opts.bytes_per_second * opts.burst_seconds, opts.burst_seconds);
filters.add_filter(session, "bytes/in", filter_set.bytes_in, 1000);
end
end
end
function module.load()
filters.add_filter_hook(filter_hook);
end
function module.unload()
filters.remove_filter_hook(filter_hook);
end
function unlimited(session)
local session_type = session.type:match("^[^_]+");
if session.conn and session.conn.setlimit then
session.conn:setlimit(0);
-- Currently no burst support
else
local filter_set = type_filters[session_type];
filters.remove_filter(session, "bytes/in", filter_set.bytes_in);
session.throttle = nil;
end
end
function module.add_host(module)
local unlimited_jids = module:get_option_inherited_set("unlimited_jids", {});
if not unlimited_jids:empty() then
module:hook("authentication-success", function (event)
local session = event.session;
local jid = session.username .. "@" .. session.host;
if unlimited_jids:contains(jid) then
unlimited(session);
end
end);
module:hook("s2sout-established", function (event)
local session = event.session;
if unlimited_jids:contains(session.to_host) then
unlimited(session);
end
end);
module:hook("s2sin-established", function (event)
local session = event.session;
if session.from_host and unlimited_jids:contains(session.from_host) then
unlimited(session);
end
end);
end
end
|