libassa
3.5.0
|
00001 // -*- c++ -*- 00002 //------------------------------------------------------------------------------ 00003 // Reactor.cpp 00004 //------------------------------------------------------------------------------ 00005 // Copyright (C) 1997-2002,2005-2007 Vladislav Grinchenko 00006 // 00007 // This library is free software; you can redistribute it and/or 00008 // modify it under the terms of the GNU Library General Public 00009 // License as published by the Free Software Foundation; either 00010 // version 2 of the License, or (at your option) any later version. 00011 //----------------------------------------------------------------------------- 00012 // Created: 05/25/1999 00013 //----------------------------------------------------------------------------- 00014 #include <iostream> 00015 #include <sstream> 00016 #include <string> 00017 00018 #include "assa/Reactor.h" 00019 #include "assa/Logger.h" 00020 00021 using namespace ASSA; 00022 00023 Reactor:: 00024 Reactor () : 00025 m_fd_setsize (1024), 00026 m_maxfd_plus1 (0), 00027 m_active (true) 00028 { 00029 trace_with_mask("Reactor::Reactor",REACTTRACE); 00030 00034 #if defined(WIN32) 00035 m_fd_setsize = FD_SETSIZE; 00036 00037 #else // POSIX 00038 struct rlimit rlim; 00039 rlim.rlim_max = 0; 00040 00041 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) { 00042 m_fd_setsize = rlim.rlim_cur; 00043 } 00044 #endif 00045 00048 #if defined (WIN32) 00049 WSADATA data; 00050 WSAStartup (MAKEWORD (2, 2), &data); 00051 #endif 00052 } 00053 00054 Reactor:: 00055 ~Reactor() 00056 { 00057 trace_with_mask("Reactor::~Reactor",REACTTRACE); 00058 00059 m_readSet.clear (); 00060 m_writeSet.clear (); 00061 m_exceptSet.clear (); 00062 deactivate (); 00063 } 00064 00065 TimerId 00066 Reactor:: 00067 registerTimerHandler (EventHandler* eh_, 00068 const TimeVal& timeout_, 00069 const std::string& name_) 00070 { 00071 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE); 00072 Assure_return (eh_); 00073 00074 TimeVal now (TimeVal::gettimeofday()); 00075 TimeVal t (now + timeout_); 00076 00077 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n", 00078 timeout_.sec(),timeout_.msec())); 00079 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() )); 00080 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() )); 00081 00082 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_); 00083 00084 DL((REACT,"---Modified Timer Queue----\n")); 00085 m_tqueue.dump(); 00086 DL((REACT,"---------------------------\n")); 00087 00088 return (tid); 00089 } 00090 00091 bool 00092 Reactor:: 00093 registerIOHandler (EventHandler* eh_, handler_t fd_, EventType et_) 00094 { 00095 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE); 00096 00097 std::ostringstream msg; 00098 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_)); 00099 00100 if (isReadEvent (et_)) 00101 { 00102 if (!m_waitSet.m_rset.setFd (fd_)) 00103 { 00104 DL((ASSAERR,"readset: fd %d out of range\n", fd_)); 00105 return (false); 00106 } 00107 m_readSet[fd_] = eh_; 00108 msg << "READ_EVENT"; 00109 } 00110 00111 if (isWriteEvent (et_)) 00112 { 00113 if (!m_waitSet.m_wset.setFd (fd_)) 00114 { 00115 DL((ASSAERR,"writeset: fd %d out of range\n", fd_)); 00116 return (false); 00117 } 00118 m_writeSet[fd_] = eh_; 00119 msg << " WRITE_EVENT"; 00120 } 00121 00122 if (isExceptEvent (et_)) 00123 { 00124 if (!m_waitSet.m_eset.setFd (fd_)) 00125 { 00126 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_)); 00127 return (false); 00128 } 00129 m_exceptSet[fd_] = eh_; 00130 msg << " EXCEPT_EVENT"; 00131 } 00132 msg << std::ends; 00133 00134 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 00135 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () )); 00136 00137 #if !defined (WIN32) 00138 if (m_maxfd_plus1 < fd_+1) { 00139 m_maxfd_plus1 = fd_+1; 00140 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); 00141 } 00142 #endif 00143 00144 DL((REACT,"Modified waitSet:\n")); 00145 m_waitSet.dump (); 00146 00147 return (true); 00148 } 00149 00150 bool 00151 Reactor:: 00152 removeTimerHandler (TimerId tid_) 00153 { 00154 trace_with_mask("Reactor::removeTimer",REACTTRACE); 00155 bool ret; 00156 00157 if ((ret = m_tqueue.remove (tid_))) { 00158 DL((REACT,"---Modified Timer Queue----\n")); 00159 m_tqueue.dump(); 00160 DL((REACT,"---------------------------\n")); 00161 } 00162 else { 00163 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ )); 00164 } 00165 return (ret); 00166 } 00167 00171 bool 00172 Reactor:: 00173 removeHandler (EventHandler* eh_, EventType event_) 00174 { 00175 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE); 00176 00177 bool ret = false; 00178 handler_t fd; 00179 Fd2Eh_Map_Iter iter; 00180 00181 if (eh_ == NULL) { 00182 return false; 00183 } 00184 00185 if (isTimeoutEvent (event_)) { 00186 ret = m_tqueue.remove (eh_); 00187 ret = true; 00188 } 00189 00190 if (isReadEvent (event_)) { 00191 iter = m_readSet.begin (); 00192 while (iter != m_readSet.end ()) { 00193 if ((*iter).second == eh_) { 00194 fd = (*iter).first; 00195 m_readSet.erase (iter); 00196 m_waitSet.m_rset.clear (fd); 00197 ret = true; 00198 break; 00199 } 00200 iter++; 00201 } 00202 } 00203 00204 if (isWriteEvent (event_)) { 00205 iter = m_writeSet.begin (); 00206 while (iter != m_writeSet.end ()) { 00207 if ((*iter).second == eh_) { 00208 fd = (*iter).first; 00209 m_writeSet.erase (iter); 00210 m_waitSet.m_wset.clear (fd); 00211 ret = true; 00212 break; 00213 } 00214 iter++; 00215 } 00216 } 00217 00218 if (isExceptEvent (event_)) { 00219 iter = m_exceptSet.begin (); 00220 while (iter != m_exceptSet.end ()) { 00221 if ((*iter).second == eh_) { 00222 fd = (*iter).first; 00223 m_exceptSet.erase (iter); 00224 m_waitSet.m_eset.clear (fd); 00225 ret = true; 00226 break; 00227 } 00228 iter++; 00229 } 00230 } 00231 00232 if (ret == true) { 00233 DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_)); 00234 eh_->handle_close (fd); 00235 } 00236 00237 adjust_maxfdp1 (fd); 00238 00239 DL((REACT,"Modifies waitSet:\n")); 00240 m_waitSet.dump (); 00241 00242 return (ret); 00243 } 00244 00245 bool 00246 Reactor:: 00247 removeIOHandler (handler_t fd_) 00248 { 00249 trace_with_mask("Reactor::removeIOHandler",REACTTRACE); 00250 00251 bool ret = false; 00252 EventHandler* ehp = NULL; 00253 Fd2Eh_Map_Iter iter; 00254 00255 Assure_return (ASSA::is_valid_handler (fd_)); 00256 00257 DL((REACT,"Removing handler for fd=%d\n",fd_)); 00258 00263 if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 00264 { 00265 ehp = (*iter).second; 00266 m_readSet.erase (iter); 00267 m_waitSet.m_rset.clear (fd_); 00268 m_readySet.m_rset.clear (fd_); 00269 if (m_readSet.size () > 0) { 00270 iter = m_readSet.end (); 00271 iter--; 00272 } 00273 ret = true; 00274 } 00275 00276 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 00277 { 00278 ehp = (*iter).second; 00279 m_writeSet.erase (iter); 00280 m_waitSet.m_wset.clear (fd_); 00281 m_readySet.m_wset.clear (fd_); 00282 if (m_writeSet.size () > 0) { 00283 iter = m_writeSet.end (); 00284 iter--; 00285 } 00286 ret = true; 00287 } 00288 00289 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 00290 { 00291 ehp = (*iter).second; 00292 m_exceptSet.erase (iter); 00293 m_waitSet.m_eset.clear (fd_); 00294 m_readySet.m_eset.clear (fd_); 00295 if (m_exceptSet.size () > 0) { 00296 iter = m_exceptSet.end (); 00297 iter--; 00298 } 00299 ret = true; 00300 } 00301 00302 if (ret == true && ehp != NULL) { 00303 DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp)); 00304 ehp->handle_close (fd_); 00305 } 00306 00307 adjust_maxfdp1 (fd_); 00308 00309 DL((REACT,"Modifies waitSet:\n")); 00310 m_waitSet.dump (); 00311 00312 return (ret); 00313 } 00314 00315 bool 00316 Reactor:: 00317 checkFDs (void) 00318 { 00319 trace_with_mask("Reactor::checkFDs",REACTTRACE); 00320 00321 bool num_removed = false; 00322 FdSet mask; 00323 timeval poll = { 0, 0 }; 00324 00325 for (handler_t fd = 0; fd < m_fd_setsize; fd++) { 00326 if ( m_readSet[fd] != NULL ) { 00327 mask.setFd (fd); 00328 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) { 00329 removeIOHandler (fd); 00330 num_removed = true; 00331 DL((REACT,"Detected BAD FD: %d\n", fd )); 00332 } 00333 mask.clear (fd); 00334 } 00335 } 00336 return (num_removed); 00337 } 00338 00339 bool 00340 Reactor:: 00341 handleError (void) 00342 { 00343 trace_with_mask("Reactor::handleError",REACTTRACE); 00344 00347 if ( !m_active ) { 00348 DL((REACT,"Received cmd to stop Reactor\n")); 00349 return (false); 00350 } 00351 00352 /*--- 00353 TODO: If select(2) returns before time expires, with 00354 a descriptor ready or with EINTR, timeval is not 00355 going to be updated with number of seconds remaining. 00356 This is true for all systems except Linux, which will 00357 do so. Therefore, to restart correctly in case of 00358 EINTR, we ought to take time measurement before and 00359 after select, and try to select() for remaining time. 00360 00361 For now, we restart with the initial timing value. 00362 ---*/ 00363 /*--- 00364 BSD kernel never restarts select(2). SVR4 will restart if 00365 the SA_RESTART flag is specified when the signal handler 00366 for the signal delivered is installed. This means taht for 00367 portability, we must handle signal interrupts. 00368 ---*/ 00369 00370 if ( errno == EINTR ) { 00371 EL((REACT,"EINTR: interrupted select(2)\n")); 00372 /* 00373 If I was sitting in select(2) and received SIGTERM, 00374 the signal handler would have set m_active to 'false', 00375 and this function would have returned 'false' as above. 00376 For any other non-critical signals (USR1,...), 00377 we retry select. 00378 */ 00379 return (true); 00380 } 00381 /* 00382 EBADF - bad file number. One of the file descriptors does 00383 not reference an open file to open(), close(), ioctl(). 00384 This can happen if user closed fd and forgot to remove 00385 handler from Reactor. 00386 */ 00387 if ( errno == EBADF ) { 00388 DL((REACT,"EBADF: bad file descriptor\n")); 00389 return (checkFDs ()); 00390 } 00391 /* 00392 Any other error from select 00393 */ 00394 #if defined (WIN32) 00395 DL ((REACT,"select(3) error = %d\n", WSAGetLastError())); 00396 #else 00397 EL((ASSAERR,"select(3) error\n")); 00398 #endif 00399 return (false); 00400 } 00401 00402 int 00403 Reactor:: 00404 isAnyReady (void) 00405 { 00406 trace_with_mask("Reactor::isAnyReady",REACTTRACE); 00407 00408 int n = m_readySet.m_rset.numSet () + 00409 m_readySet.m_wset.numSet () + 00410 m_readySet.m_eset.numSet (); 00411 00412 if ( n > 0 ) { 00413 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n)); 00414 m_readySet.dump (); 00415 } 00416 return (n); 00417 } 00418 00419 void 00420 Reactor:: 00421 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_) 00422 { 00423 trace_with_mask("Reactor::calculateTimeout",REACTTRACE); 00424 00425 TimeVal now; 00426 TimeVal tv; 00427 00428 if (m_tqueue.isEmpty () ) { 00429 howlong_ = maxwait_; 00430 goto done; 00431 } 00432 now = TimeVal::gettimeofday (); 00433 tv = m_tqueue.top (); 00434 00435 if (tv < now) { 00436 /*--- 00437 It took too long to get here (fraction of a millisecond), 00438 and top timer had already expired. In this case, 00439 perform non-blocking select in order to drain the timer queue. 00440 ---*/ 00441 *howlong_ = 0; 00442 } 00443 else { 00444 DL((REACT,"--------- Timer Queue ----------\n")); 00445 m_tqueue.dump(); 00446 DL((REACT,"--------------------------------\n")); 00447 00448 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) { 00449 *howlong_ = tv - now; 00450 } 00451 else { 00452 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now; 00453 } 00454 } 00455 00456 done: 00457 if (howlong_ != NULL) { 00458 DL((REACT,"delay (%f)\n", double (*howlong_) )); 00459 } 00460 else { 00461 DL((REACT,"delay (forever)\n")); 00462 } 00463 } 00464 00468 void 00469 Reactor:: 00470 waitForEvents (void) 00471 { 00472 while ( m_active ) { 00473 waitForEvents ((TimeVal*) NULL); 00474 } 00475 } 00476 00493 void 00494 Reactor:: 00495 waitForEvents (TimeVal* tv_) 00496 { 00497 trace_with_mask("Reactor::waitForEvents",REACTTRACE); 00498 00499 TimerCountdown traceTime (tv_); 00500 DL((REACT,"======================================\n")); 00501 00502 /*--- Expire all stale Timers ---*/ 00503 m_tqueue.expire (TimeVal::gettimeofday ()); 00504 00505 /* Test to see if Reactor has been deactivated as a result 00506 * of processing done by any TimerHandlers. 00507 */ 00508 if (!m_active) { 00509 return; 00510 } 00511 00512 int nReady; 00513 TimeVal delay; 00514 TimeVal* dlp = &delay; 00515 00516 /*--- 00517 In case if not all data have been processed by the EventHandler, 00518 and EventHandler stated so in its callback's return value 00519 to dispatcher (), it will be called again. This way 00520 underlying file/socket stream can efficiently utilize its 00521 buffering mechaninsm. 00522 ---*/ 00523 if ((nReady = isAnyReady ())) { 00524 DL((REACT,"isAnyReady returned: %d\n",nReady)); 00525 dispatch (nReady); 00526 return; 00527 } 00528 00529 DL((REACT,"=== m_waitSet ===\n")); 00530 m_waitSet.dump (); 00531 00532 do { 00533 m_readySet.reset (); 00534 DL ((REACT,"m_readySet after reset():\n")); 00535 m_readySet.dump (); 00536 00537 m_readySet = m_waitSet; 00538 DL ((REACT,"m_readySet after assign:\n")); 00539 m_readySet.dump (); 00540 00541 calculateTimeout (dlp, tv_); 00542 00543 nReady = ::select (m_maxfd_plus1, 00544 &m_readySet.m_rset, 00545 &m_readySet.m_wset, 00546 &m_readySet.m_eset, 00547 dlp); 00548 DL((REACT,"::select() returned: %d\n",nReady)); 00549 00550 m_readySet.sync (); 00551 DL ((REACT,"m_readySet after select:\n")); 00552 m_readySet.dump (); 00553 00554 } 00555 while (nReady < 0 && handleError ()); 00556 00557 dispatch (nReady); 00558 } 00559 00566 void 00567 Reactor:: 00568 dispatchHandler (FdSet& mask_, Fd2Eh_Map_Type& fdSet_, EH_IO_Callback callback_) 00569 { 00570 trace_with_mask("Reactor::dispatchHandler",REACTTRACE); 00571 00572 int ret = 0; 00573 handler_t fd; 00574 EventHandler* ehp = NULL; 00575 std::string eh_id; 00576 00577 Fd2Eh_Map_Iter iter = fdSet_.begin (); 00578 00579 while (iter != fdSet_.end ()) 00580 { 00581 fd = (*iter).first; 00582 ehp = (*iter).second; 00583 00584 if (mask_.isSet (fd) && ehp != NULL) 00585 { 00586 eh_id = ehp->get_id (); 00587 DL((REACT,"Data detected from \"%s\"(fd=%d)\n", 00588 eh_id.c_str (), fd)); 00589 00590 ret = (ehp->*callback_) (fd); /* Fire up a callback */ 00591 00592 if (ret == -1) { 00593 removeIOHandler (fd); 00594 } 00595 else if (ret > 0) { 00596 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n", 00597 ret, fd, eh_id.c_str ())); 00598 //return; <-- would starve other connections 00599 } 00600 else { 00601 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 00602 eh_id.c_str (), fd)); 00603 mask_.clear (fd); 00604 } 00611 iter = fdSet_.begin (); 00612 } 00613 else { 00614 iter++; 00615 } 00616 } 00617 } 00618 00624 bool 00625 Reactor:: 00626 dispatch (int ready_) 00627 { 00628 trace_with_mask("Reactor::dispatch", REACTTRACE); 00629 00630 m_tqueue.expire (TimeVal::gettimeofday ()); 00631 00632 if ( ready_ < 0 ) 00633 { 00634 #if !defined (WIN32) 00635 EL((ASSAERR,"::select(3) error\n")); 00636 #endif 00637 return (false); 00638 } 00639 if ( ready_ == 0 ) { 00640 return (true); 00641 } 00642 00643 DL((REACT,"Dispatching %d FDs.\n",ready_)); 00644 DL((REACT,"m_readySet:\n")); 00645 m_readySet.dump (); 00646 00647 /*--- Writes first ---*/ 00648 dispatchHandler (m_readySet.m_wset, 00649 m_writeSet, 00650 &EventHandler::handle_write); 00651 00652 /*--- Exceptions next ---*/ 00653 dispatchHandler (m_readySet.m_eset, 00654 m_exceptSet, 00655 &EventHandler::handle_except); 00656 00657 /*--- Finally, the Reads ---*/ 00658 dispatchHandler (m_readySet.m_rset, 00659 m_readSet, 00660 &EventHandler::handle_read); 00661 00662 return (true); 00663 } 00664 00665 void 00666 Reactor:: 00667 stopReactor (void) 00668 { 00669 trace_with_mask("Reactor::stopReactor", REACTTRACE); 00670 00671 m_active = false; 00672 00673 Fd2Eh_Map_Iter iter; 00674 EventHandler* ehp; 00675 00676 while (m_readSet.size () > 0) { 00677 iter = m_readSet.begin (); 00678 ehp = (*iter).second; 00679 removeHandler (ehp); 00680 } 00681 00682 while (m_writeSet.size () > 0) { 00683 iter = m_writeSet.begin (); 00684 ehp = (*iter).second; 00685 removeHandler (ehp); 00686 } 00687 00688 while (m_exceptSet.size () > 0) { 00689 iter = m_exceptSet.begin (); 00690 ehp = (*iter).second; 00691 removeHandler (ehp); 00692 } 00693 } 00694 00699 void 00700 Reactor:: 00701 adjust_maxfdp1 (handler_t fd_) 00702 { 00703 #if !defined (WIN32) /* POSIX */ 00704 00705 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE); 00706 00707 if (m_maxfd_plus1 == fd_ + 1) 00708 { 00709 m_maxfd_plus1 = m_waitSet.max_fd () + 1; 00710 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); 00711 } 00712 #endif 00713 }