1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
local queue = require "util.queue";
local s_byte, s_sub = string.byte, string.sub;
local dbuffer_methods = {};
local dynamic_buffer_mt = { __index = dbuffer_methods };
function dbuffer_methods:write(data)
if self.max_size and #data + self._length > self.max_size then
return nil;
end
local ok = self.items:push(data);
if not ok then
self:collapse();
ok = self.items:push(data);
end
if not ok then
return nil;
end
self._length = self._length + #data;
return true;
end
function dbuffer_methods:read_chunk(requested_bytes)
local chunk, consumed = self.items:peek(), self.front_consumed;
if not chunk then return; end
local chunk_length = #chunk;
local remaining_chunk_length = chunk_length - consumed;
if not requested_bytes then
requested_bytes = remaining_chunk_length;
end
if remaining_chunk_length <= requested_bytes then
self.front_consumed = 0;
self._length = self._length - remaining_chunk_length;
self.items:pop();
assert(#chunk:sub(consumed + 1, -1) == remaining_chunk_length);
return chunk:sub(consumed + 1, -1), remaining_chunk_length;
end
local end_pos = consumed + requested_bytes;
self.front_consumed = end_pos;
self._length = self._length - requested_bytes;
assert(#chunk:sub(consumed + 1, end_pos) == requested_bytes);
return chunk:sub(consumed + 1, end_pos), requested_bytes;
end
function dbuffer_methods:read(requested_bytes)
local chunks;
if requested_bytes and requested_bytes > self._length then
return nil;
end
local chunk, read_bytes = self:read_chunk(requested_bytes);
if not requested_bytes then
return chunk;
elseif chunk then
requested_bytes = requested_bytes - read_bytes;
if requested_bytes == 0 then -- Already read everything we need
return chunk;
end
chunks = {};
else
return nil;
end
-- Need to keep reading more chunks
while chunk do
table.insert(chunks, chunk);
if requested_bytes > 0 then
chunk, read_bytes = self:read_chunk(requested_bytes);
requested_bytes = requested_bytes - read_bytes;
else
break;
end
end
return table.concat(chunks);
end
function dbuffer_methods:discard(requested_bytes)
if requested_bytes > self._length then
return nil;
end
local chunk, read_bytes = self:read_chunk(requested_bytes);
if chunk then
requested_bytes = requested_bytes - read_bytes;
if requested_bytes == 0 then -- Already read everything we need
return true;
end
else
return nil;
end
while chunk do
if requested_bytes > 0 then
chunk, read_bytes = self:read_chunk(requested_bytes);
requested_bytes = requested_bytes - read_bytes;
else
break;
end
end
return true;
end
-- Normalize i, j into absolute offsets within the
-- front chunk (accounting for front_consumed), and
-- ensure there is enough data in the first chunk
-- to cover any subsequent :sub() or :byte() operation
function dbuffer_methods:_prep_sub(i, j)
if j == nil then
j = -1;
end
if j < 0 then
j = self._length + (j+1);
end
if i < 0 then
i = self._length + (i+1);
end
if i < 1 then
i = 1;
end
if j > self._length then
j = self._length;
end
if i > j then
return nil;
end
self:collapse(j);
if self.front_consumed > 0 then
i = i + self.front_consumed;
j = j + self.front_consumed;
end
return i, j;
end
function dbuffer_methods:sub(i, j)
i, j = self:_prep_sub(i, j);
if not i then
return "";
end
return s_sub(self.items:peek(), i, j);
end
function dbuffer_methods:byte(i, j)
i = i or 1;
j = j or i;
i, j = self:_prep_sub(i, j);
if not i then
return;
end
return s_byte(self.items:peek(), i, j);
end
function dbuffer_methods:length()
return self._length;
end
dbuffer_methods.len = dbuffer_methods.length; -- strings have :len()
dynamic_buffer_mt.__len = dbuffer_methods.length; -- support # operator
function dbuffer_methods:collapse(bytes)
bytes = bytes or self._length;
local front_chunk = self.items:peek();
if not front_chunk or #front_chunk - self.front_consumed >= bytes then
return;
end
local front_chunks = { front_chunk:sub(self.front_consumed+1) };
local front_bytes = #front_chunks[1];
while front_bytes < bytes do
self.items:pop();
local chunk = self.items:peek();
front_bytes = front_bytes + #chunk;
table.insert(front_chunks, chunk);
end
self.items:replace(table.concat(front_chunks));
self.front_consumed = 0;
end
local function new(max_size, max_chunks)
if max_size and max_size <= 0 then
return nil;
end
return setmetatable({
front_consumed = 0;
_length = 0;
max_size = max_size;
items = queue.new(max_chunks or 32);
}, dynamic_buffer_mt);
end
return {
new = new;
};
|