root/nntp/connection.cc

Revision 81c6e32180ba6f1018c28ae75b7281fd38121812, 21.3 KB (checked in by Antti-Juhani Kaijanaho <antti-juhani@…>, 21 months ago)

Exception-proof destructors.

Hopefully addresses #78.

Signed-off-by: Antti-Juhani Kaijanaho <antti-juhani@…>

  • Property mode set to 100644
Line 
1/*  This file is part of Alue, the multiprotocol Internet discussion daemon
2
3    Copyright © 2009, 2010 Antti-Juhani Kaijanaho
4
5    Alue is free software: you can redistribute it and/or modify it
6    under the terms of the GNU General Public License as published by
7    the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    Alue is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with Alue.  If not, see <http://www.gnu.org/licenses/>.
17
18 */
19
20#include "connection.hh"
21#include "lexutils.hh"
22
23#include "../db/user.hh"
24#include "../logger/logline.hh"
25#include "../tls/session_impl.hh"
26
27#include <boost/version.hpp>
28#include <boost/date_time.hpp>
29#include <map>
30#include <sstream>
31
32#if BOOST_VERSION < 103600
33include "../buffers_iterator.hpp"
34#endif
35
36#include "../assert.hh"
37
38namespace nntp
39{
40        connection::connection(std::string port_name,
41                               bool initial_ssl,
42                               boost::asio::ip::tcp::acceptor &acc,
43                               tls::init(tls_init),
44                               server::conn_cb srv_cb)
45                : port_name(port_name)
46                , initial_ssl(initial_ssl)
47                , current_article(0)
48                , identified(false)
49                , authenticated(false)
50                , startup_timestamp
51                  (boost::posix_time::second_clock::universal_time())
52                , client_activity_timestamp
53                  (boost::posix_time::second_clock::universal_time())
54                , num_commands_served(0)
55                , num_500_responses(0)
56                , capabilities_used(false)
57                , srv_cb(srv_cb)
58                , ios(acc.get_io_service())
59                , peer(ios)
60                , speer(peer, tls_init, tls::server)
61                , tls_active(false)
62        {
63                acc.async_accept(peer, ep,
64                                 boost::bind(&connection::accept, this, _1));
65        }
66
67        connection::cmds_type *connection::cmds;
68
69        const connection::overview_fmt_type connection::overview_fmt[] = {
70                { "Subject", false, },
71                { "From", false, },
72                { "Date", false, },
73                { "Message-ID", false, },
74                { "References", false, },
75                { "Bytes", false, },
76                { "Lines", false, },
77                { "Xref", true }
78        };
79        const size_t connection::overview_fmt_count =
80                sizeof overview_fmt / sizeof *overview_fmt;
81
82        void connection::abort(boost::system::error_code ec)
83        {
84                logger::logline ll;
85                ll << loghead
86                   << "failure (" 
87                   << ec.message()
88                   << "), aborting connection";
89                ll.close();
90                kill();
91        }
92
93        void connection::kill()
94        {
95                stats();
96                speer.kill();
97                peer.cancel();
98                srv_cb.terminal(shared_from_this());
99                // reset these pointers to break pointer loops
100                terminate.reset();
101                dispatch.reset();
102                starttls.reset();
103        }
104
105        void connection::terminator::done(ptr, boost::system::error_code)
106        {
107                cp->kill();
108        }
109
110        void connection::terminator::sent(ptr lifeline,
111                                          boost::system::error_code ec)
112        {
113                if (!cp->tls_active)
114                {
115                        done(lifeline, ec);
116                        return;
117                }
118               
119                logger::logline ll;
120                ll << cp->loghead << "shutting down TLS";
121                ll.close();
122
123                cp->speer.async_shutdown
124                        (boost::bind(&connection::terminator::done,
125                                     this, lifeline, _1));
126        }
127
128        void connection::accept(boost::system::error_code ec)
129        {
130                if (ec) { abort(ec); return; }
131
132                terminate.reset(new terminator(shared_from_this()));
133                dispatch.reset(new dispatcher(shared_from_this()));
134                starttls.reset(new tls_starter(shared_from_this()));
135
136                startup_timestamp =
137                        boost::posix_time::second_clock::universal_time();
138
139                std::ostringstream ss;
140                ss << "nntpd for " << ep << " ";
141                loghead = ss.str();
142               
143                logger::logline ll;
144                ll << loghead << "starts";
145                ll.close();
146                logger::logger.flush();
147
148                srv_cb.active(shared_from_this());
149
150                if (initial_ssl)
151                {
152                        ll.open();
153                        ll << loghead << "starting TLS negotiation";
154                        ll.close();
155
156                        tls_active = true;
157                        speer.async_handshake(bind(&connection::greet,
158                                                   this, _1));
159                }
160                else
161                {
162                        greet(ec);
163                }
164        }
165
166        void connection::greet(boost::system::error_code ec)
167        {
168                using boost::asio::async_write;
169
170                if (ec) { abort(ec); return; }
171
172                std::string msg;
173
174                send_line("200 posting requires authentication");
175                flush_writebuf(dispatch);
176        }
177
178        void connection::flush_writebuf(continuation::ptr cont)
179        {
180                using boost::asio::async_write;
181                using boost::bind;
182                using boost::asio::buffer;
183
184                if (!cont)
185                {
186                        logger::logline ll;
187                        ll << loghead << "has a null continuation";
188                        return;
189                }
190
191                // to work around a weird bug in the TLS wrapper...
192                if (writebuf.size() == 0)
193                {
194                        ios.post(bind(&continuation::sent,
195                                      cont.get(), cont,
196                                      boost::system::error_code()));
197                        return;
198                }
199
200                if (srv_cb.do_log_protocol_details())
201                {
202                        logger::logline ll;
203                        ll << loghead << "==> ";
204                        for (size_t i = 0; i < writebuf.size(); i++)
205                        {
206                                if (writebuf[i] == '\r') continue;
207                                if (writebuf[i] == '\n')
208                                {
209                                        if (i + 1 < writebuf.size())
210                                        {
211                                                ll.nl();
212                                                ll << loghead << "==> ";
213                                        }
214                                }
215                                else
216                                        ll << writebuf[i];
217                        }
218                        ll.close();
219                }
220
221                async_write(speer, buffer(writebuf.data(), writebuf.size()),
222                            bind(&continuation::sent,
223                                 cont.get(), cont, _1));
224        }
225
226        void connection::dispatcher::received(ptr, boost::system::error_code ec,
227                                              std::size_t received_bytes)
228        {
229                typedef boost::asio::buffers_iterator
230                        <boost::asio::streambuf::const_buffers_type> sbit;
231                using boost::bind;
232
233                if (ec) { cp->abort(ec); return; }
234
235                cp->client_activity_timestamp =
236                        boost::posix_time::second_clock::universal_time();
237
238                cp->readbuf.commit(received_bytes);
239
240                // if no CRLF yet, read more
241                boost::asio::streambuf::const_buffers_type sbuf =
242                        cp->readbuf.data();
243                sbit beginit = sbit::begin(sbuf);
244                sbit endit = sbit::end(sbuf);
245                sbit crlf = sbit::begin(sbuf);
246                for (/**/; crlf != endit; crlf++)
247                {
248                        if (crlf == beginit) continue;
249                        if (crlf[-1] != '\r' || crlf[0] != '\n') continue;
250                        break;
251                }
252                if (crlf == endit)
253                {
254                        std::string line;
255                        for (sbit it = beginit; it != crlf; it++)
256                        {
257                                line += *it;
258                        }
259                        cp->speer.async_read_some
260                                (cp->readbuf.prepare(1024),
261                                 bind(&connection::dispatcher::received,
262                                      this, shared_from_this(),
263                                      _1, _2));
264
265                        return;
266                }
267               
268                assert(crlf != beginit);
269                assert(crlf != endit);
270                assert(crlf[-1] == '\r');
271                assert(crlf[ 0] == '\n');
272                crlf--;
273
274                std::string line;
275                for (sbit it = beginit; it != crlf; it++)
276                {
277                        assert(it != endit);
278                        line += *it;
279                }
280                cp->readbuf.consume(line.length() + 2); // 2 is for the CRLF
281
282                if (cp->srv_cb.do_log_protocol_details())
283                {
284                        std::string li = line;
285                        util::strip_crlf(li);
286                        logger::logline ll;
287                        ll << cp->loghead << "<== " << li;
288                        ll.close();
289                }
290
291
292                if (line.length() != 0 && line[line.length()-1] == '\r')
293                        line.erase(line.length()-1);
294
295                std::string cmd = split(line);
296                boost::to_lower(cmd);
297
298                std::map<std::string, command *>::const_iterator it =
299                        cp->cmds->find(cmd);
300
301                cp->num_commands_served += 1;
302
303                if (it == cp->cmds->end()) {
304                        logger::logline ll;
305                        ll << cp->loghead << "received unknown command " << cmd;
306                        ll.close();
307                        cp->num_500_responses += 1;
308                        cp->send_line("500 unrecognized command");
309                        cp->flush_writebuf(shared_from_this());
310                        return;
311                }
312
313                std::vector<std::string> args;
314                while (!line.empty()) {
315                        std::string arg = split(line);
316                        if (!arg.empty()) args.push_back(arg);
317                }
318                continuation::ptr cont;
319                try
320                {
321                        cont = it->second->perform(cp->shared_from_this(),
322                                                   args.data(), args.size());
323                }
324                catch (std::exception &e)
325                {
326                        logger::logline ll;
327                        ll << cp->loghead
328                           << "exception caught: "
329                           << e.what()
330                           << " (" << typeid(e).name() << ")";
331                        ll.close();
332                        cp->writebuf.clear();
333                        cp->send_line("403 server error, please try again");
334                        cont = shared_from_this();
335                }
336                cp->flush_writebuf(cont);
337        }
338
339        void connection::dispatcher::sent(ptr, boost::system::error_code ec)
340        {
341                using boost::asio::async_read_until;
342                using boost::bind;
343
344                if (ec) { cp->abort(ec); return; }
345
346                cp->writebuf.clear();  /* we are only called after a
347                                        * complete writebuf flush */
348
349
350                if (cp->readbuf.size() == 0)
351                {
352                        cp->speer.async_read_some
353                                (cp->readbuf.prepare(8),
354                                 bind(&connection::dispatcher::received,
355                                      this, shared_from_this(),
356                                      _1, _2));
357                }
358                else
359                        received(shared_from_this(), ec, 0);
360        }
361
362        void connection::reader::received(ptr lifeline,
363                                          size_t scanned,
364                                          boost::system::error_code ec,
365                                          std::size_t received_bytes)
366        {
367                using boost::bind;
368                typedef boost::asio::buffers_iterator
369                        <boost::asio::streambuf::const_buffers_type> sbit;
370
371                if (ec) { cp->abort(ec); return; }
372
373                cp->readbuf.commit(received_bytes);
374
375                cp->client_activity_timestamp =
376                        boost::posix_time::second_clock::universal_time();
377
378                bool done = false;
379                boost::asio::streambuf::const_buffers_type sbuf =
380                        cp->readbuf.data();
381                sbit it = sbit::begin(sbuf) + scanned;
382                sbit endit = sbit::end(sbuf);
383                for (/**/; it != endit; it++, scanned++)
384                {
385                        if (*it != '.') continue;
386                        if (0 < scanned && scanned < 2) continue;
387                        if (it[-2] != '\r' || it[-1] != '\n') continue;
388                        sbit it1 = it+1;
389                        if (it1 == endit) continue;
390                        sbit it2 = it1+1;
391                        if (it2 == endit) continue;
392                        if (*it1 != '\r' || *it2 != '\n') continue;
393                        // we now have "." CRLF at the beginning of line
394                        done = true;
395                        break;
396                }
397                if (!done)
398                {
399                        /* The above loop may end in a state where
400                         * it's past the '.' but hasn't seen enough to
401                         * decide that it's starting the
402                         * end-of-message mark.  */
403                        /**/ if (scanned >= 2 && it[-2] == '.') scanned -= 2;
404                        else if (scanned >= 1 && it[-1] == '.') scanned -= 1;
405                       
406                        cp->speer.async_read_some
407                                (cp->readbuf.prepare(64),
408                                 bind(&connection::reader::received,
409                                      this, lifeline, scanned, _1, _2));
410                        return;
411                }
412
413                continuation::ptr cont;
414
415                std::string data;
416                bool cr = false;
417                it = sbit::begin(sbuf);
418                for (size_t i = 0; i < scanned; i++, it++)
419                {
420                        bool oldcr = cr;
421                        cr = false;
422                        char c = *it;
423                        switch (c)
424                        {
425                        default:
426                                break;
427                        case '\r':
428                                if (oldcr) goto fail;
429                                cr = true;
430                                break;
431                        case '\n':
432                                if (!oldcr) goto fail;
433                                break;
434                        case '\0':
435                                goto fail;
436                        }
437                        data.push_back(c);
438                }
439                cp->readbuf.consume(scanned+3); // the 3 is the final "." CRLF
440
441                if (cp->srv_cb.do_log_protocol_details())
442                {
443                        logger::logline ll;
444                        ll << cp->loghead << "<== ";
445                        for (size_t i = 0; i < data.size(); i++)
446                        {
447                                if (data[i] == '\r') continue;
448                                if (data[i] == '\n')
449                                {
450                                        if (i + 1 < data.size())
451                                        {
452                                                ll.nl();
453                                                ll << cp->loghead << "<== ";
454                                        }
455                                }
456                                else
457                                        ll << data[i];
458                        }
459                        ll.close();
460                }
461
462
463                cont = (*h)(data);
464                cp->flush_writebuf(cont);
465                return;
466        fail:
467                logger::logline ll;
468                ll << cp->loghead << "received invalid multi-line data";
469                ll.close();
470                cp->srv_cb.terminal(cp->shared_from_this());
471        }
472
473        void connection::reader::sent(ptr lifeline,
474                                      boost::system::error_code ec)
475        {
476                using boost::asio::async_read_until;
477                using boost::bind;
478
479                if (ec) { cp->abort(ec); return; }
480
481                cp->writebuf.clear();  /* we are only called after a
482                                        * complete writebuf flush */
483
484
485                cp->speer.async_read_some
486                        (cp->readbuf.prepare(64),
487                         bind(&connection::reader::received,
488                              this, lifeline, 0, _1, _2));
489        }
490
491        void connection::tls_starter::sent(ptr, boost::system::error_code ec)
492        {
493                using boost::asio::async_read_until;
494                using boost::bind;
495
496                if (ec) { cp->abort(ec); return; }
497
498                cp->writebuf.clear();  /* we are only called after a
499                                        * complete writebuf flush */
500
501                logger::logline ll;
502                ll << cp->loghead << "starting TLS negotiation";
503                ll.close();
504
505                cp->current_group.reset();
506                cp->current_article = 0;
507                cp->tls_active = true;
508                cp->speer.async_handshake(bind(&connection::dispatcher::sent,
509                                               (dispatcher*)cp->dispatch.get(),
510                                               cp->dispatch,
511                                               _1));
512        }
513
514        void connection::failure_delayer::waited(ptr, boost::system::error_code)
515        {
516                cp->send_line(line);
517                cp->flush_writebuf(cp->dispatch);
518        }
519
520        void connection::failure_delayer::sent(ptr lifeline,
521                                               boost::system::error_code ec)
522        {
523                if (ec) { cp->abort(ec); return; }
524
525                cp->writebuf.clear();  /* we are only called after a
526                                        * complete writebuf flush */
527
528                timer.expires_from_now(boost::posix_time::seconds(5));
529                timer.async_wait(boost::bind
530                                 (&connection::failure_delayer::waited,
531                                  this, lifeline, _1));
532        }
533
534        connection::command::~command() {}
535
536        void connection::stats()
537        {
538                using namespace boost::posix_time;
539                ptime now = second_clock::universal_time();
540                time_duration idle = now - client_activity_timestamp;
541
542                logger::logline ll;
543                ll << loghead
544                   << (tls_active ? "TLS; " : "")
545                   << (capabilities_used ? "CAPABILITIES; " : "")
546                   << "age " 
547                   << now - startup_timestamp
548                   << ", no client activity in "
549                   << idle
550                   << "; "
551                   << num_commands_served
552                   << " commands served; "
553                   << num_500_responses
554                   << " unknown commands; "
555                   << (identified ? "identified; " : "not identified; ")
556                   << (authenticated ? "authenticated" : "not authenticated");
557        }
558
559        void connection::tick(bool log)
560        {
561                if (log) stats();
562                using namespace boost::posix_time;
563                ptime now = second_clock::universal_time();
564                time_duration idle = now - client_activity_timestamp;
565                if (idle > hours(2) && dispatch)
566                {
567                        logger::logline ll;
568                        ll << loghead
569                           << "dropping idle connection";
570                        ll.close();
571                        terminate.reset();
572                        dispatch.reset();
573                        starttls.reset();
574                        kill();
575                }
576        }
577
578        connection::~connection()
579        {
580                try
581                {
582                        logger::logline ll;
583                        ll << loghead << "dying";
584                        ll.close();
585                } catch (std::exception)
586                {}
587        }
588
589};
590
Note: See TracBrowser for help on using the browser.