/* * File: StaticIntervalDetector.cc * Author: jgaebler * * Created on February 24, 2014, 2:28 PM */ #include "StaticIntervalDetector.h" #include "fd/nfd/msg/ControlMessage.h" #include "fd/nfd/msg/ControlMessageConfirm.h" #include "Moversight.h" #include "Dispatcher.h" #include "ms/events/PeerJoinedEvent.h" #include "ms/events/PeerLeftEvent.h" #include "ms/events/LocalPeerUpdatedEvent.h" #include "mt/events/PendingPeersEvent.h" #include "fd/events/PeerReconnectedEvent.h" #include "fd/nfd/timer/DetectionTimer.h" #include "fd/events/PeerFailedEvent.h" namespace ubeeme { namespace moversight { #undef DEBUG #define DEBUG(msg) if (module.isPrintDebugNFD()) MOV_DEBUG << "NFD@" << getLocalID() << " "<(this); dispatcher.subscribe(this); dispatcher.subscribe(this); dispatcher.subscribe(this); dispatcher.subscribe(this); dispatcher.subscribe(this); setRunning(false); } /** * @brief Runs operations to finalize the NFD service */ void StaticIntervalDetector::finalise() { dispatcher.unsubscribeAll(this); if (isRunning()) { stop(); }//End if } /** * @brief Handles a detection timer timeout. * @param timer The timer to handle. */ void StaticIntervalDetector::handleDetectionTimer(DetectionTimer * timer) { if (!isRunning()) return; PeerID monitoredPeerId = timer->getReference(); PeerSuspicionState peerState = nfdStateTable.getEntry(monitoredPeerId); switch (peerState) { case NO_SUSPECT: { DEBUG("handleDetectionTimer - HALF_SUSPECT peer " << monitoredPeerId); sendControlMessage(monitoredPeerId); resetDetectionTimer(monitoredPeerId); nfdStateTable.updateEntry(monitoredPeerId, HALF_SUSPECT); break; }//End case case HALF_SUSPECT: { if (timer->getNumberOfRetries() > 0) { DEBUG("handleDetectionTimer - try to contact peer again"); sendControlMessage(monitoredPeerId); startDetectionTimer(monitoredPeerId); }//End if else { DEBUG("handleDetectionTimer - peer does not response ... give up"); DEBUG("handleDetectionTimer - FULL_SUSPECT peer " << monitoredPeerId); DEBUG("handleDetectionTimer - signal peer failed"); nfdStateTable.updateEntry(monitoredPeerId, FULL_SUSPECT); dispatcher.signal(new PeerFailedEvent(monitoredPeerId)); }//End else break; }//End case default: { DEBUG("handleDetectionTimer - unknown peer state"); break; }//end default case }//End switch } /** * @brief Handles a received moversight message to analyze remote peer status * @param pdu The message to analyze */ void StaticIntervalDetector::handleMessage(const MoversightMessage * pdu) { if (!isRunning()) return; if (pdu != NULL) { PeerID senderPeerId = pdu->getSourceID(); restartMonitorPeer(senderPeerId); //to a ControlMessage we have to response if (pdu->getType() == CM) { sendControlMessageConfirm(senderPeerId); }//End if }//End if } /** * @brief A slot for receiving the peer joined event. * @param e The event to handle. */ void StaticIntervalDetector::handleEvent(const PeerJoinedEvent & e) { if (isTargetPeer(e.getPeerID())) { if (!isLocalPeerMaster()) { stopMonitorAllPeers(); }//End if DEBUG("peerJoined - monitor peer ID " << e.getPeerID()); startMonitorPeer(e.getPeerID()); if (module.isPrintDebugNFD()) { DEBUG("peerJoined - resulting suspicion table"); nfdStateTable.printTable(); }//End if }//End if } /** * @brief A slot for receiving the peer left event. * @param e The event to handle. */ void StaticIntervalDetector::handleEvent(const PeerLeftEvent & e) { stopMonitorPeer(e.getPeer().getPeerID()); } /** * @brief A slot for receiving the peer pending event. * @param e The event to handle. */ void StaticIntervalDetector::handleEvent(const PendingPeersEvent & e) { PeerIDList peers = e.getPeerIDList(); for (size_t i = 0; i < peers.size(); i++) { stopMonitorPeer(peers.get(i)); } } /** * @brief A slot for receiving the peer reconnected event. * @param e The event to handle. */ void StaticIntervalDetector::handleEvent(const PeerReconnectedEvent & e) { if (isTargetPeer(e.getPeerID())) { DEBUG("peerReconnected - monitor peer ID " << e.getPeerID()); startMonitorPeer(e.getPeerID()); }//End if } /** * @brief A slot for receiving the local peer updated event. * @param e The event to handle */ void StaticIntervalDetector::handleEvent(const LocalPeerUpdatedEvent & /* e */) { //we are running ?? if (isRunning()) { if (getLocalState() == JOINED) { if (isLocalPeerMaster()) { PeerIDList list1 = nfdStateTable.getStoredPeerIDs(); PeerIDList list2 = dispatcher.getMembershipService().getClusterAndMasterPeerIDList(getLocalPeer()); list1.sort(); list2.sort(); //nothing changed? if (list1 == list2) { return; }//End if }//End if //slave else { if (nfdStateTable.size() == 1) { return; }//End if }//End else stop(); start(); } else if (getLocalState() == LEAVING) { stop(); }//end } //not running else { //we have joined the group, so we can start the detection of the //foreign peers if (getLocalState() == JOINED) { start(); }//End if Joined return; }//End else - not running } /** * @brief Handles the join group done event by starting the detector. * @param e The event to handle. */ void StaticIntervalDetector::handleEvent(const JoinGroupDoneEvent & e){ if(!isRunning()){ start(); }//End if } /** * @brief Starts for a given peer ID a stored detection timer. If no timer stored, an exception is thrown. * @param pId The peer ID of peer corresponding to the timer to start. * @throws TimerNotFoundException If no detection timer stored within the detection timer queue for the given peer ID. */ void StaticIntervalDetector::startDetectionTimer(const PeerID & pId) { DetectionTimer * timer = findDetectionTimer(pId); if (timer != NULL) { { GenericTime time = GenericTime::currentTime() + DETECTION_TIMEOUT; DEBUG("startDetectionTimer - timeout for peer " << pId << ": " << time.toString()); } timer->setTimeout(DETECTION_TIMEOUT); timer->start(); }//end if } /** * @brief Resets for a given peer ID a stored detection timer. If no timer * stored, an exception is thrown. The number of retires per timer will * also be reset to the start value. * @param pId The peer ID of peer corresponding to the timer to reset (stop + start). * @throws TimerNotFoundException If no detection timer stored within the detection timer queue for the given peer ID. */ void StaticIntervalDetector::resetDetectionTimer(const PeerID & pId) { DEBUG("reset detection timer for peer " << pId); stopDetectionTimer(pId); //reset timer retries findDetectionTimer(pId)->resetNumberOfRetries(); startDetectionTimer(pId); } /** * @brief Stops for a given peer ID a stored detection timer. * @param pId The peer ID of peer corresponding to the timer to stop. */ void StaticIntervalDetector::stopDetectionTimer(const PeerID & pId) { try { DetectionTimer * timer = findDetectionTimer(pId); timer->stop(); }//End try catch (TimerNotFoundException & /*e*/) { } } /** * @brief Stop and deletes the detection timer for the given peer from * the detection timer queue. * @param pId The peer ID of the peer corresponding to the timer */ void StaticIntervalDetector::stopAndDeleteDetectionTimer(const PeerID & pId) { DEBUG("stopAndDeleteDetectionTimer - remove detection timer for peer " << pId); try { DetectionTimer * timer = findDetectionTimer(pId); timer->stop(); detectionTimerQueue.remove(pId); delete timer; }//End try catch (TimerNotFoundException & /*e*/) { } } /** * @brief Finds the corresponding detection timer for the given peer ID. * @param pId The peer ID of the peer to find the corresponding * detection timer. * @throws TimerNotFoundException If no detection timer stored within * the detection timer queue for the given peer ID. * @return The desired timer. */ DetectionTimer * StaticIntervalDetector::findDetectionTimer(const PeerID & pId) { DetectionTimer * timer = detectionTimerQueue.find(pId); if (timer != NULL) { return timer; }//End if throw TimerNotFoundException("findDetectionTimer - no detection timer stored within the detection timer queue for the given peer ID"); } /** * @brief Creates and enqueues a detection timer for a given peer. * A new timer is created and stored within the detection timer queue, * if no timer for the given peer ID already stored within the queue. * If a timer already stored for the given peer ID, an additional * timer is not created and the method returns without do anything. * @param pId The peer ID of the peer corresponding to the created * and stored timer. */ void StaticIntervalDetector::createAndEnqueueDetectionTimer(const PeerID & pId) { try { DetectionTimer * timer = findDetectionTimer(pId); //if already a timer stored for this peer??? if (timer != NULL) { return; //do nothing }//End if } catch (TimerNotFoundException & /*e*/) { } DetectionTimer * timer = new DetectionTimer(*this); timer->setReference(pId); detectionTimerQueue.add(timer); } /** * @brief This stops the monitoring off all peers, register within * the nfd. * * This method stops and removes all detection timer. */ void StaticIntervalDetector::stopMonitorAllPeers() { PeerIDList monitoredPeerIDList = nfdStateTable.getStoredPeerIDs(); for (size_t i = 0; i < monitoredPeerIDList.size(); i++) { stopMonitorPeer(monitoredPeerIDList.get(i)); }//End for } /** * @brief Stops to monitor a peer. * * The method ends to monitor the peer, identified by the given peer ID, * by: *
    *
  • remove the peer id form the nfd state table
  • *
  • stop and remove the detection timer.
  • *
* @param pId The id of the peer that monitoring is stopped. */ void StaticIntervalDetector::stopMonitorPeer(const PeerID & pId) { if (nfdStateTable.contains(pId)) { nfdStateTable.removeEntry(pId); stopAndDeleteDetectionTimer(pId); } } /** * @brief Starts monitoring a peer. * * This method starts to monitor the peer, identified by the given * peer ID, by: *
    *
  • create an nfd state table entry for the peer
  • *
  • create, enqueue and start a detection timer for this peer.
  • *
* @param pId The id of the peer to monitor. */ void StaticIntervalDetector::startMonitorPeer(const PeerID & pId) { //already monitoring that peer? return. if (nfdStateTable.contains(pId)) return; //add an entry to the monitor table nfdStateTable.addEntry(pId, NO_SUSPECT); ///create and start the timer createAndEnqueueDetectionTimer(pId); startDetectionTimer(pId); } /** * @brief Starts to monitor peers as a master peer by monitor the local * cluster and the remaining master peers. */ void StaticIntervalDetector::startMonitorPeersAsMaster() { PeerIDList cmpList = dispatcher.getMembershipService().getClusterAndMasterPeerIDList(getLocalPeer()); //add peers for (size_t i = 0; i < cmpList.size(); i++) { PeerID pId = cmpList.get(i); DEBUG("startMonitorPeersAsMaster - monitor peer ID " << pId); startMonitorPeer(pId); }//End for } /** * @brief Starts the monitoring as a slave by monitor the master peer. */ void StaticIntervalDetector::startMonitorPeersAsSlave() { //begin monitor new master DEBUG("startMonitorPeersAsSlave - monitor master " << getLocalPeer().getMasterID()); startMonitorPeer(getLocalPeer().getMasterID()); } /** * @brief Unsuspects the given peer. * * This method sets the suspicion state peer of the given peer to NO_SUSPECT, if the peer was former half or full suspected. * @param peerId The id of the peer to unsuspect * @param suspectionState The former suspection state. */ void StaticIntervalDetector::unsuspectPeer(const PeerID & peerId, const PeerSuspicionState suspectionState) { if (suspectionState == HALF_SUSPECT || suspectionState == FULL_SUSPECT) { nfdStateTable.updateEntry(peerId, NO_SUSPECT); DEBUG("unsuspectPeer - UNSUSPECT peer " << peerId); //the master suspects this peer? if (getLocalPeer().isMaster() && suspectionState == FULL_SUSPECT) { //welcome back DEBUG("unsuspectPeer - announce peer reconnect"); //dispatcher.signal( new PeerReconnectedEvent( peerId, dispatcher.getLocalAddressFromPeer( peerId))); // sendPeerReconnectAnnounceMessage(peerId); std::cerr << getLocalID()<<": unsuspectPeer - announce peer reconnect for peer " << peerId << " not finish implemented yet. \n"; }//End if MASTER && peerState == FULL_SUSPECT }//End if FULL_SUSPECT || HALF_SUSPECT } /** * @brief Restarts to monitor the peer, identified by the given id. * * The method restarts to monitor the peer with the given id by * restarting the detection timer. The peer state is set to NO_SUSPECT. * @param pId The id of the peer to monitor. */ void StaticIntervalDetector::restartMonitorPeer(const PeerID & pId) { //monitored peer? if (nfdStateTable.contains(pId)) { PeerSuspicionState peerState = nfdStateTable.getEntry(pId); resetDetectionTimer(pId); unsuspectPeer(pId, peerState); }//End if } } }