Quassel IRC  Pre-Release
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros
compressor.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * Copyright (C) 2005-2015 by the Quassel Project *
3  * devel@quassel-irc.org *
4  * *
5  * This program is free software; you can redistribute it and/or modify *
6  * it under the terms of the GNU General Public License as published by *
7  * the Free Software Foundation; either version 2 of the License, or *
8  * (at your option) version 3. *
9  * *
10  * This program is distributed in the hope that it will be useful, *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13  * GNU General Public License for more details. *
14  * *
15  * You should have received a copy of the GNU General Public License *
16  * along with this program; if not, write to the *
17  * Free Software Foundation, Inc., *
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
19  ***************************************************************************/
20 
21 #include "compressor.h"
22 
23 #include <QTcpSocket>
24 #include <QTimer>
25 
26 #ifdef HAVE_ZLIB
27 # include <zlib.h>
28 #else
29 # define MINIZ_HEADER_FILE_ONLY
30 # include "../../3rdparty/miniz/miniz.c"
31 #endif
32 
33 const int maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
34 const int ioBufferSize = 64 * 1024; // chunk size for inflate/deflate; should not be too large as we preallocate that space!
35 
36 Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
37  : QObject(parent),
38  _socket(socket),
39  _level(level),
40  _inflater(0),
41  _deflater(0)
42 {
43  connect(socket, SIGNAL(readyRead()), SLOT(readData()));
44 
45  bool ok = true;
46  if (level != NoCompression)
47  ok = initStreams();
48 
49  if (!ok) {
50  // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal
51  QTimer::singleShot(0, this, SIGNAL(error()));
52  return;
53  }
54 
55  // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
56  // However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously.
57  if (socket->bytesAvailable())
58  QTimer::singleShot(0, this, SLOT(readData()));
59 }
60 
61 
63 {
64  // release resources allocated by zlib
65  if (_inflater) {
66  inflateEnd(_inflater);
67  delete _inflater;
68  }
69  if (_deflater) {
70  deflateEnd(_deflater);
71  delete _deflater;
72  }
73 }
74 
75 
77 {
78  int zlevel;
79  switch(compressionLevel()) {
80  case BestCompression:
81  zlevel = 9;
82  break;
83  case BestSpeed:
84  zlevel = 1;
85  break;
86  default:
87  zlevel = Z_DEFAULT_COMPRESSION;
88  }
89 
90  _inflater = new z_stream;
91  memset(_inflater, 0, sizeof(z_stream));
92  if (Z_OK != inflateInit(_inflater)) {
93  qWarning() << "Could not initialize the inflate stream!";
94  return false;
95  }
96 
97  _deflater = new z_stream;
98  memset(_deflater, 0, sizeof(z_stream));
99  if (Z_OK != deflateInit(_deflater, zlevel)) {
100  qWarning() << "Could not intialize the deflate stream!";
101  return false;
102  }
103 
104  _inputBuffer.reserve(ioBufferSize); // pre-allocate space
105  _outputBuffer.resize(ioBufferSize); // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!)
106 
107  qDebug() << "Enabling compression...";
108 
109  return true;
110 }
111 
112 
113 
115 {
116  return _readBuffer.size();
117 }
118 
119 
120 qint64 Compressor::read(char *data, qint64 maxSize)
121 {
122  if (maxSize <= 0)
123  maxSize = _readBuffer.size();
124 
125  qint64 n = qMin(maxSize, (qint64)_readBuffer.size());
126  memcpy(data, _readBuffer.constData(), n);
127 
128  // TODO: don't copy for every read
129  if (n == _readBuffer.size())
130  _readBuffer.clear();
131  else
132  _readBuffer = _readBuffer.mid(n);
133 
134  // If there's still data left in the socket buffer, make sure to schedule a read
135  if (_socket->bytesAvailable())
136  QTimer::singleShot(0, this, SLOT(readData()));
137 
138  return n;
139 }
140 
141 
142 // The usual usage pattern is to write a blocksize first, followed by the actual data.
143 // By setting NoFlush, one can indicate that the write buffer should not immediately be
144 // written, which should make things a bit more efficient.
145 qint64 Compressor::write(const char *data, qint64 count, WriteBufferHint flush)
146 {
147  int pos = _writeBuffer.size();
148  _writeBuffer.resize(pos + count);
149  memcpy(_writeBuffer.data() + pos, data, count);
150 
151  if (flush != NoFlush)
152  writeData();
153 
154  return count;
155 }
156 
157 
159 {
160  // don't try to read more data if we're already closing
161  if (_socket->state() != QAbstractSocket::ConnectedState)
162  return;
163 
164  if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize)
165  return;
166 
167  if (compressionLevel() == NoCompression) {
168  _readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size()));
169  emit readyRead();
170  return;
171  }
172 
173  // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize.
174  // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's
175  // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing
176  // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message.
177  // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)!
178 
179  while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) {
180  _readBuffer.resize(_readBuffer.size() + ioBufferSize);
181  _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size()));
182 
183  _inflater->next_in = reinterpret_cast<unsigned char *>(_inputBuffer.data());
184  _inflater->avail_in = _inputBuffer.size();
185  _inflater->next_out = reinterpret_cast<unsigned char *>(_readBuffer.data() + _readBuffer.size() - ioBufferSize);
186  _inflater->avail_out = ioBufferSize;
187 
188  const unsigned char *orig_out = _inflater->next_out; // so we see if we have actually produced any output
189 
190  int status = inflate(_inflater, Z_SYNC_FLUSH); // get as much data as possible
191 
192  // adjust input and output buffers
193  _readBuffer.resize(_inflater->next_out - reinterpret_cast<unsigned char *>(_readBuffer.data()));
194  if (_inflater->avail_in > 0)
195  memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in);
196  _inputBuffer.resize(_inflater->avail_in);
197 
198  if (_inflater->next_out != orig_out)
199  emit readyRead();
200 
201  switch(status) {
202  case Z_NEED_DICT:
203  case Z_DATA_ERROR:
204  case Z_MEM_ERROR:
205  case Z_STREAM_ERROR:
206  qWarning() << "Error while decompressing stream:" << status;
207  emit error(StreamError);
208  return;
209  case Z_BUF_ERROR:
210  // means that we need more input to continue, so this is not an actual error
211  return;
212  case Z_STREAM_END:
213  qWarning() << "Reached end of zlib stream!"; // this should really never happen
214  return;
215  default:
216  // just try to get more out of the stream
217  break;
218  }
219  }
220  //qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out;
221 }
222 
223 
225 {
226  if (compressionLevel() == NoCompression) {
227  _socket->write(_writeBuffer);
228  _writeBuffer.clear();
229  return;
230  }
231 
232  _deflater->next_in = reinterpret_cast<unsigned char *>(_writeBuffer.data());
233  _deflater->avail_in = _writeBuffer.size();
234 
235  int status;
236  do {
237  _deflater->next_out = reinterpret_cast<unsigned char *>(_outputBuffer.data());
238  _deflater->avail_out = ioBufferSize;
239  status = deflate(_deflater, Z_PARTIAL_FLUSH);
240  if (status != Z_OK && status != Z_BUF_ERROR) {
241  qWarning() << "Error while compressing stream:" << status;
242  emit error(StreamError);
243  return;
244  }
245 
246  if (_deflater->avail_out == static_cast<unsigned int>(ioBufferSize))
247  continue; // nothing to write here
248 
249  if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) {
250  qWarning() << "Error while writing to socket:" << _socket->errorString();
251  emit error(DeviceError);
252  return;
253  }
254  } while (_deflater->avail_out == 0); // the output buffer being full is the only reason we should have to loop here!
255 
256  if (_deflater->avail_in > 0) {
257  qWarning() << "Oops, something weird happened: data still remaining in write buffer!";
258  emit error(StreamError);
259  }
260 
261  _writeBuffer.resize(0);
262 
263  //qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in;
264 }
265 
266 
268 {
269  if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState)
270  _socket->flush();
271 
272  // FIXME: missing impl for enabled compression; but then we're not using this method yet
273 }